mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 02:07:34 +08:00
Adds a 'notify' parameter to the cronjob tool that controls when delivery happens: - 'always' (default): deliver every run (current behavior) - 'changes_only': the cron agent can respond with [SILENT] to suppress delivery when nothing new to report. The scheduler injects guidance into the prompt so the agent knows about this. - 'never': skip delivery entirely, only save output locally The notify field is stored in the job JSON, passed through create and update, and shown in job listings. Output is always saved to disk for audit regardless of notify mode. Failed jobs always deliver regardless of the setting. Changes: - cron/jobs.py: accept and store notify parameter in create_job - tools/cronjob_tools.py: add notify to function, schema, handler lambda, format_job, and update path - cron/scheduler.py: SILENT_MARKER constant, notify-aware delivery logic in tick(), prompt injection for changes_only jobs - tests: 14 new tests covering all three notify modes, prompt injection, case insensitivity, failure delivery, output saving
609 lines
24 KiB
Python
609 lines
24 KiB
Python
"""Tests for cron/scheduler.py — origin resolution, delivery routing, and error logging."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from unittest.mock import AsyncMock, patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
from cron.scheduler import (
|
|
_resolve_origin,
|
|
_resolve_delivery_target,
|
|
_deliver_result,
|
|
run_job,
|
|
SILENT_MARKER,
|
|
)
|
|
|
|
|
|
class TestResolveOrigin:
|
|
def test_full_origin(self):
|
|
job = {
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "123456",
|
|
"chat_name": "Test Chat",
|
|
"thread_id": "42",
|
|
}
|
|
}
|
|
result = _resolve_origin(job)
|
|
assert isinstance(result, dict)
|
|
assert result == job["origin"]
|
|
assert result["platform"] == "telegram"
|
|
assert result["chat_id"] == "123456"
|
|
assert result["chat_name"] == "Test Chat"
|
|
assert result["thread_id"] == "42"
|
|
|
|
def test_no_origin(self):
|
|
assert _resolve_origin({}) is None
|
|
assert _resolve_origin({"origin": None}) is None
|
|
|
|
def test_missing_platform(self):
|
|
job = {"origin": {"chat_id": "123"}}
|
|
assert _resolve_origin(job) is None
|
|
|
|
def test_missing_chat_id(self):
|
|
job = {"origin": {"platform": "telegram"}}
|
|
assert _resolve_origin(job) is None
|
|
|
|
def test_empty_origin(self):
|
|
job = {"origin": {}}
|
|
assert _resolve_origin(job) is None
|
|
|
|
|
|
class TestResolveDeliveryTarget:
|
|
def test_origin_delivery_preserves_thread_id(self):
|
|
job = {
|
|
"deliver": "origin",
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "-1001",
|
|
"thread_id": "17585",
|
|
},
|
|
}
|
|
|
|
assert _resolve_delivery_target(job) == {
|
|
"platform": "telegram",
|
|
"chat_id": "-1001",
|
|
"thread_id": "17585",
|
|
}
|
|
|
|
def test_bare_platform_uses_matching_origin_chat(self):
|
|
job = {
|
|
"deliver": "telegram",
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "-1001",
|
|
"thread_id": "17585",
|
|
},
|
|
}
|
|
|
|
assert _resolve_delivery_target(job) == {
|
|
"platform": "telegram",
|
|
"chat_id": "-1001",
|
|
"thread_id": "17585",
|
|
}
|
|
|
|
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
|
|
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
|
|
job = {
|
|
"deliver": "telegram",
|
|
"origin": {
|
|
"platform": "discord",
|
|
"chat_id": "abc",
|
|
},
|
|
}
|
|
|
|
assert _resolve_delivery_target(job) == {
|
|
"platform": "telegram",
|
|
"chat_id": "-2002",
|
|
"thread_id": None,
|
|
}
|
|
|
|
|
|
class TestDeliverResultMirrorLogging:
|
|
"""Verify that mirror_to_session failures are logged, not silently swallowed."""
|
|
|
|
def test_mirror_failure_is_logged(self, caplog):
|
|
"""When mirror_to_session raises, a warning should be logged."""
|
|
from gateway.config import Platform
|
|
|
|
pconfig = MagicMock()
|
|
pconfig.enabled = True
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
|
|
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
|
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
|
|
patch("gateway.mirror.mirror_to_session", side_effect=ConnectionError("network down")):
|
|
job = {
|
|
"id": "test-job",
|
|
"deliver": "origin",
|
|
"origin": {"platform": "telegram", "chat_id": "123"},
|
|
}
|
|
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
|
|
_deliver_result(job, "Hello!")
|
|
|
|
assert any("mirror_to_session failed" in r.message for r in caplog.records), \
|
|
f"Expected 'mirror_to_session failed' warning in logs, got: {[r.message for r in caplog.records]}"
|
|
|
|
def test_origin_delivery_preserves_thread_id(self):
|
|
"""Origin delivery should forward thread_id to send/mirror helpers."""
|
|
from gateway.config import Platform
|
|
|
|
pconfig = MagicMock()
|
|
pconfig.enabled = True
|
|
mock_cfg = MagicMock()
|
|
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
|
|
|
job = {
|
|
"id": "test-job",
|
|
"deliver": "origin",
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "-1001",
|
|
"thread_id": "17585",
|
|
},
|
|
}
|
|
|
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
|
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
|
patch("gateway.mirror.mirror_to_session") as mirror_mock:
|
|
_deliver_result(job, "hello")
|
|
|
|
send_mock.assert_called_once()
|
|
assert send_mock.call_args.kwargs["thread_id"] == "17585"
|
|
mirror_mock.assert_called_once_with(
|
|
"telegram",
|
|
"-1001",
|
|
"hello",
|
|
source_label="cron",
|
|
thread_id="17585",
|
|
)
|
|
|
|
|
|
class TestRunJobSessionPersistence:
|
|
def test_run_job_passes_session_db_and_cron_platform(self, tmp_path):
|
|
job = {
|
|
"id": "test-job",
|
|
"name": "test",
|
|
"prompt": "hello",
|
|
}
|
|
fake_db = MagicMock()
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("hermes_state.SessionDB", return_value=fake_db), \
|
|
patch(
|
|
"hermes_cli.runtime_provider.resolve_runtime_provider",
|
|
return_value={
|
|
"api_key": "test-key",
|
|
"base_url": "https://example.invalid/v1",
|
|
"provider": "openrouter",
|
|
"api_mode": "chat_completions",
|
|
},
|
|
), \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert final_response == "ok"
|
|
assert "ok" in output
|
|
|
|
kwargs = mock_agent_cls.call_args.kwargs
|
|
assert kwargs["session_db"] is fake_db
|
|
assert kwargs["platform"] == "cron"
|
|
assert kwargs["session_id"].startswith("cron_test-job_")
|
|
fake_db.close.assert_called_once()
|
|
|
|
def test_run_job_sets_auto_delivery_env_from_dotenv_home_channel(self, tmp_path, monkeypatch):
|
|
job = {
|
|
"id": "test-job",
|
|
"name": "test",
|
|
"prompt": "hello",
|
|
"deliver": "telegram",
|
|
}
|
|
fake_db = MagicMock()
|
|
seen = {}
|
|
|
|
(tmp_path / ".env").write_text("TELEGRAM_HOME_CHANNEL=-2002\n")
|
|
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
|
|
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", raising=False)
|
|
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", raising=False)
|
|
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", raising=False)
|
|
|
|
class FakeAgent:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def run_conversation(self, *args, **kwargs):
|
|
seen["platform"] = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM")
|
|
seen["chat_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID")
|
|
seen["thread_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID")
|
|
return {"final_response": "ok"}
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("hermes_state.SessionDB", return_value=fake_db), \
|
|
patch(
|
|
"hermes_cli.runtime_provider.resolve_runtime_provider",
|
|
return_value={
|
|
"api_key": "***",
|
|
"base_url": "https://example.invalid/v1",
|
|
"provider": "openrouter",
|
|
"api_mode": "chat_completions",
|
|
},
|
|
), \
|
|
patch("run_agent.AIAgent", FakeAgent):
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert final_response == "ok"
|
|
assert "ok" in output
|
|
assert seen == {
|
|
"platform": "telegram",
|
|
"chat_id": "-2002",
|
|
"thread_id": None,
|
|
}
|
|
assert os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") is None
|
|
assert os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") is None
|
|
assert os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") is None
|
|
fake_db.close.assert_called_once()
|
|
|
|
|
|
class TestRunJobConfigLogging:
|
|
"""Verify that config.yaml parse failures are logged, not silently swallowed."""
|
|
|
|
def test_bad_config_yaml_is_logged(self, caplog, tmp_path):
|
|
"""When config.yaml is malformed, a warning should be logged."""
|
|
bad_yaml = tmp_path / "config.yaml"
|
|
bad_yaml.write_text("invalid: yaml: [[[bad")
|
|
|
|
job = {
|
|
"id": "test-job",
|
|
"name": "test",
|
|
"prompt": "hello",
|
|
}
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
|
|
run_job(job)
|
|
|
|
assert any("failed to load config.yaml" in r.message for r in caplog.records), \
|
|
f"Expected 'failed to load config.yaml' warning in logs, got: {[r.message for r in caplog.records]}"
|
|
|
|
def test_bad_prefill_messages_is_logged(self, caplog, tmp_path):
|
|
"""When the prefill messages file contains invalid JSON, a warning should be logged."""
|
|
# Valid config.yaml that points to a bad prefill file
|
|
config_yaml = tmp_path / "config.yaml"
|
|
config_yaml.write_text("prefill_messages_file: prefill.json\n")
|
|
|
|
bad_prefill = tmp_path / "prefill.json"
|
|
bad_prefill.write_text("{not valid json!!!")
|
|
|
|
job = {
|
|
"id": "test-job",
|
|
"name": "test",
|
|
"prompt": "hello",
|
|
}
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
|
|
run_job(job)
|
|
|
|
assert any("failed to parse prefill messages" in r.message for r in caplog.records), \
|
|
f"Expected 'failed to parse prefill messages' warning in logs, got: {[r.message for r in caplog.records]}"
|
|
|
|
|
|
class TestRunJobPerJobOverrides:
|
|
def test_job_level_model_provider_and_base_url_overrides_are_used(self, tmp_path):
|
|
config_yaml = tmp_path / "config.yaml"
|
|
config_yaml.write_text(
|
|
"model:\n"
|
|
" default: gpt-5.4\n"
|
|
" provider: openai-codex\n"
|
|
" base_url: https://chatgpt.com/backend-api/codex\n"
|
|
)
|
|
|
|
job = {
|
|
"id": "briefing-job",
|
|
"name": "briefing",
|
|
"prompt": "hello",
|
|
"model": "perplexity/sonar-pro",
|
|
"provider": "custom",
|
|
"base_url": "http://127.0.0.1:4000/v1",
|
|
}
|
|
|
|
fake_db = MagicMock()
|
|
fake_runtime = {
|
|
"provider": "openrouter",
|
|
"api_mode": "chat_completions",
|
|
"base_url": "http://127.0.0.1:4000/v1",
|
|
"api_key": "***",
|
|
}
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("hermes_state.SessionDB", return_value=fake_db), \
|
|
patch("hermes_cli.runtime_provider.resolve_runtime_provider", return_value=fake_runtime) as runtime_mock, \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert final_response == "ok"
|
|
assert "ok" in output
|
|
runtime_mock.assert_called_once_with(
|
|
requested="custom",
|
|
explicit_base_url="http://127.0.0.1:4000/v1",
|
|
)
|
|
assert mock_agent_cls.call_args.kwargs["model"] == "perplexity/sonar-pro"
|
|
fake_db.close.assert_called_once()
|
|
|
|
|
|
class TestRunJobSkillBacked:
|
|
def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path):
|
|
job = {
|
|
"id": "skill-job",
|
|
"name": "skill test",
|
|
"prompt": "Check the feeds and summarize anything new.",
|
|
"skill": "blogwatcher",
|
|
}
|
|
|
|
fake_db = MagicMock()
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("hermes_state.SessionDB", return_value=fake_db), \
|
|
patch(
|
|
"hermes_cli.runtime_provider.resolve_runtime_provider",
|
|
return_value={
|
|
"api_key": "***",
|
|
"base_url": "https://example.invalid/v1",
|
|
"provider": "openrouter",
|
|
"api_mode": "chat_completions",
|
|
},
|
|
), \
|
|
patch("tools.skills_tool.skill_view", return_value=json.dumps({"success": True, "content": "# Blogwatcher\nFollow this skill."})), \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert final_response == "ok"
|
|
|
|
kwargs = mock_agent_cls.call_args.kwargs
|
|
assert "cronjob" in (kwargs["disabled_toolsets"] or [])
|
|
|
|
prompt_arg = mock_agent.run_conversation.call_args.args[0]
|
|
assert "blogwatcher" in prompt_arg
|
|
assert "Follow this skill" in prompt_arg
|
|
assert "Check the feeds and summarize anything new." in prompt_arg
|
|
|
|
def test_run_job_loads_multiple_skills_in_order(self, tmp_path):
|
|
job = {
|
|
"id": "multi-skill-job",
|
|
"name": "multi skill test",
|
|
"prompt": "Combine the results.",
|
|
"skills": ["blogwatcher", "find-nearby"],
|
|
}
|
|
|
|
fake_db = MagicMock()
|
|
|
|
def _skill_view(name):
|
|
return json.dumps({"success": True, "content": f"# {name}\nInstructions for {name}."})
|
|
|
|
with patch("cron.scheduler._hermes_home", tmp_path), \
|
|
patch("cron.scheduler._resolve_origin", return_value=None), \
|
|
patch("dotenv.load_dotenv"), \
|
|
patch("hermes_state.SessionDB", return_value=fake_db), \
|
|
patch(
|
|
"hermes_cli.runtime_provider.resolve_runtime_provider",
|
|
return_value={
|
|
"api_key": "***",
|
|
"base_url": "https://example.invalid/v1",
|
|
"provider": "openrouter",
|
|
"api_mode": "chat_completions",
|
|
},
|
|
), \
|
|
patch("tools.skills_tool.skill_view", side_effect=_skill_view) as skill_view_mock, \
|
|
patch("run_agent.AIAgent") as mock_agent_cls:
|
|
mock_agent = MagicMock()
|
|
mock_agent.run_conversation.return_value = {"final_response": "ok"}
|
|
mock_agent_cls.return_value = mock_agent
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert final_response == "ok"
|
|
assert skill_view_mock.call_count == 2
|
|
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "find-nearby"]
|
|
|
|
prompt_arg = mock_agent.run_conversation.call_args.args[0]
|
|
assert prompt_arg.index("blogwatcher") < prompt_arg.index("find-nearby")
|
|
assert "Instructions for blogwatcher." in prompt_arg
|
|
assert "Instructions for find-nearby." in prompt_arg
|
|
assert "Combine the results." in prompt_arg
|
|
|
|
|
|
class TestNotifyMode:
|
|
"""Verify the notify parameter controls delivery behavior."""
|
|
|
|
def _make_job(self, notify="always"):
|
|
return {
|
|
"id": "monitor-job",
|
|
"name": "monitor",
|
|
"deliver": "origin",
|
|
"notify": notify,
|
|
"origin": {"platform": "telegram", "chat_id": "123"},
|
|
}
|
|
|
|
# -- notify=always (default) --
|
|
|
|
def test_always_delivers_normally(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("always")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "Results here", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_called_once()
|
|
|
|
def test_always_delivers_even_silent_response(self):
|
|
"""With notify=always, [SILENT] in the response is just normal text."""
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("always")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT] nothing", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_called_once()
|
|
|
|
# -- notify=never --
|
|
|
|
def test_never_skips_delivery(self, caplog):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("never")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "Important results!", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
with caplog.at_level(logging.INFO, logger="cron.scheduler"):
|
|
tick(verbose=False)
|
|
deliver_mock.assert_not_called()
|
|
assert any("notify=never" in r.message for r in caplog.records)
|
|
|
|
def test_never_still_delivers_on_failure(self):
|
|
"""Failed jobs always get delivered regardless of notify mode."""
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("never")]), \
|
|
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "some error")), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_called_once()
|
|
|
|
# -- notify=changes_only --
|
|
|
|
def test_changes_only_delivers_normal_response(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "New data found!", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_called_once()
|
|
|
|
def test_changes_only_suppresses_silent(self, caplog):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT]", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
with caplog.at_level(logging.INFO, logger="cron.scheduler"):
|
|
tick(verbose=False)
|
|
deliver_mock.assert_not_called()
|
|
assert any(SILENT_MARKER in r.message for r in caplog.records)
|
|
|
|
def test_changes_only_suppresses_silent_with_note(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT] No changes detected", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_not_called()
|
|
|
|
def test_changes_only_case_insensitive(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# output", "[silent] nothing new", None)), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_not_called()
|
|
|
|
def test_changes_only_still_delivers_on_failure(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "error msg")), \
|
|
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
deliver_mock.assert_called_once()
|
|
|
|
def test_output_saved_even_when_suppressed(self):
|
|
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job("changes_only")]), \
|
|
patch("cron.scheduler.run_job", return_value=(True, "# full output", "[SILENT]", None)), \
|
|
patch("cron.scheduler.save_job_output") as save_mock, \
|
|
patch("cron.scheduler._deliver_result") as deliver_mock, \
|
|
patch("cron.scheduler.mark_job_run"):
|
|
save_mock.return_value = "/tmp/out.md"
|
|
from cron.scheduler import tick
|
|
tick(verbose=False)
|
|
save_mock.assert_called_once_with("monitor-job", "# full output")
|
|
deliver_mock.assert_not_called()
|
|
|
|
|
|
class TestBuildJobPromptNotify:
|
|
"""Verify _build_job_prompt injects [SILENT] guidance for changes_only jobs."""
|
|
|
|
def test_changes_only_injects_silent_hint(self):
|
|
from cron.scheduler import _build_job_prompt
|
|
job = {"prompt": "Check for updates", "notify": "changes_only"}
|
|
result = _build_job_prompt(job)
|
|
assert "[SILENT]" in result
|
|
assert "Check for updates" in result
|
|
|
|
def test_always_does_not_inject_hint(self):
|
|
from cron.scheduler import _build_job_prompt
|
|
job = {"prompt": "Check for updates", "notify": "always"}
|
|
result = _build_job_prompt(job)
|
|
assert "[SILENT]" not in result
|
|
assert result == "Check for updates"
|
|
|
|
def test_missing_notify_defaults_to_no_hint(self):
|
|
from cron.scheduler import _build_job_prompt
|
|
job = {"prompt": "Check for updates"}
|
|
result = _build_job_prompt(job)
|
|
assert "[SILENT]" not in result
|