mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-06 18:57:21 +08:00
Compare commits
9 Commits
hermes/her
...
fix/cron-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b140b31e6 | ||
|
|
fa89b65230 | ||
|
|
ed0c7194ed | ||
|
|
dc44e183e6 | ||
|
|
79c81b2244 | ||
|
|
b2bdaecf9b | ||
|
|
3fab72f1e1 | ||
|
|
f3a38c90fc | ||
|
|
8fb618234f |
@@ -141,6 +141,13 @@ PLATFORM_HINTS = {
|
||||
"is preserved for threading. Do not include greetings or sign-offs unless "
|
||||
"contextually appropriate."
|
||||
),
|
||||
"cron": (
|
||||
"You are running as a scheduled cron job. Your final response is automatically "
|
||||
"delivered to the job's configured destination, so do not use send_message to "
|
||||
"send to that same target again. If you want the user to receive something in "
|
||||
"the scheduled destination, put it directly in your final response. Use "
|
||||
"send_message only for additional or different targets."
|
||||
),
|
||||
"cli": (
|
||||
"You are a CLI AI Agent. Try not to use markdown but simple text "
|
||||
"renderable inside a terminal."
|
||||
|
||||
@@ -56,6 +56,50 @@ def _resolve_origin(job: dict) -> Optional[dict]:
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
||||
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
||||
deliver = job.get("deliver", "local")
|
||||
origin = _resolve_origin(job)
|
||||
|
||||
if deliver == "local":
|
||||
return None
|
||||
|
||||
if deliver == "origin":
|
||||
if not origin:
|
||||
return None
|
||||
return {
|
||||
"platform": origin["platform"],
|
||||
"chat_id": str(origin["chat_id"]),
|
||||
"thread_id": origin.get("thread_id"),
|
||||
}
|
||||
|
||||
if ":" in deliver:
|
||||
platform_name, chat_id = deliver.split(":", 1)
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
platform_name = deliver
|
||||
if origin and origin.get("platform") == platform_name:
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": str(origin["chat_id"]),
|
||||
"thread_id": origin.get("thread_id"),
|
||||
}
|
||||
|
||||
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
||||
if not chat_id:
|
||||
return None
|
||||
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
|
||||
def _deliver_result(job: dict, content: str) -> None:
|
||||
"""
|
||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||
@@ -63,36 +107,19 @@ def _deliver_result(job: dict, content: str) -> None:
|
||||
Uses the standalone platform send functions from send_message_tool so delivery
|
||||
works whether or not the gateway is running.
|
||||
"""
|
||||
deliver = job.get("deliver", "local")
|
||||
origin = _resolve_origin(job)
|
||||
|
||||
if deliver == "local":
|
||||
target = _resolve_delivery_target(job)
|
||||
if not target:
|
||||
if job.get("deliver", "local") != "local":
|
||||
logger.warning(
|
||||
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
|
||||
job["id"],
|
||||
job.get("deliver", "local"),
|
||||
)
|
||||
return
|
||||
|
||||
thread_id = None
|
||||
|
||||
# Resolve target platform + chat_id
|
||||
if deliver == "origin":
|
||||
if not origin:
|
||||
logger.warning("Job '%s' deliver=origin but no origin stored, skipping delivery", job["id"])
|
||||
return
|
||||
platform_name = origin["platform"]
|
||||
chat_id = origin["chat_id"]
|
||||
thread_id = origin.get("thread_id")
|
||||
elif ":" in deliver:
|
||||
platform_name, chat_id = deliver.split(":", 1)
|
||||
else:
|
||||
# Bare platform name like "telegram" — need to resolve to origin or home channel
|
||||
platform_name = deliver
|
||||
if origin and origin.get("platform") == platform_name:
|
||||
chat_id = origin["chat_id"]
|
||||
thread_id = origin.get("thread_id")
|
||||
else:
|
||||
# Fall back to home channel
|
||||
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
||||
if not chat_id:
|
||||
logger.warning("Job '%s' deliver=%s but no chat_id or home channel. Set via: hermes config set %s_HOME_CHANNEL <channel_id>", job["id"], deliver, platform_name.upper())
|
||||
return
|
||||
platform_name = target["platform"]
|
||||
chat_id = target["chat_id"]
|
||||
thread_id = target.get("thread_id")
|
||||
|
||||
from tools.send_message_tool import _send_to_platform
|
||||
from gateway.config import load_gateway_config, Platform
|
||||
@@ -169,6 +196,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
job_name = job["name"]
|
||||
prompt = job["prompt"]
|
||||
origin = _resolve_origin(job)
|
||||
delivery_target = _resolve_delivery_target(job)
|
||||
|
||||
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
||||
logger.info("Prompt: %s", prompt[:100])
|
||||
@@ -179,6 +207,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
|
||||
if origin.get("chat_name"):
|
||||
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
||||
if delivery_target:
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"]
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"])
|
||||
if delivery_target.get("thread_id") is not None:
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"])
|
||||
|
||||
try:
|
||||
# Re-read .env and config.yaml fresh every run so provider/key
|
||||
@@ -324,7 +357,14 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
|
||||
finally:
|
||||
# Clean up injected env vars so they don't leak to other jobs
|
||||
for key in ("HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"):
|
||||
for key in (
|
||||
"HERMES_SESSION_PLATFORM",
|
||||
"HERMES_SESSION_CHAT_ID",
|
||||
"HERMES_SESSION_CHAT_NAME",
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
||||
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
||||
):
|
||||
os.environ.pop(key, None)
|
||||
if _session_db:
|
||||
try:
|
||||
|
||||
@@ -105,11 +105,14 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
|
||||
# Telegram message limits
|
||||
MAX_MESSAGE_LENGTH = 4096
|
||||
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
||||
|
||||
def __init__(self, config: PlatformConfig):
|
||||
super().__init__(config, Platform.TELEGRAM)
|
||||
self._app: Optional[Application] = None
|
||||
self._bot: Optional[Bot] = None
|
||||
self._media_group_events: Dict[str, MessageEvent] = {}
|
||||
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
||||
self._token_lock_identity: Optional[str] = None
|
||||
self._polling_error_task: Optional[asyncio.Task] = None
|
||||
|
||||
@@ -261,7 +264,15 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return False
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Stop polling and disconnect."""
|
||||
"""Stop polling, cancel pending album flushes, and disconnect."""
|
||||
pending_media_group_tasks = list(self._media_group_tasks.values())
|
||||
for task in pending_media_group_tasks:
|
||||
task.cancel()
|
||||
if pending_media_group_tasks:
|
||||
await asyncio.gather(*pending_media_group_tasks, return_exceptions=True)
|
||||
self._media_group_tasks.clear()
|
||||
self._media_group_events.clear()
|
||||
|
||||
if self._app:
|
||||
try:
|
||||
await self._app.updater.stop()
|
||||
@@ -943,8 +954,53 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
except Exception as e:
|
||||
logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True)
|
||||
|
||||
media_group_id = getattr(msg, "media_group_id", None)
|
||||
if media_group_id:
|
||||
await self._queue_media_group_event(str(media_group_id), event)
|
||||
return
|
||||
|
||||
await self.handle_message(event)
|
||||
|
||||
async def _queue_media_group_event(self, media_group_id: str, event: MessageEvent) -> None:
|
||||
"""Buffer Telegram media-group items so albums arrive as one logical event.
|
||||
|
||||
Telegram delivers albums as multiple updates with a shared media_group_id.
|
||||
If we forward each item immediately, the gateway thinks the second image is a
|
||||
new user message and interrupts the first. We debounce briefly and merge the
|
||||
attachments into a single MessageEvent.
|
||||
"""
|
||||
existing = self._media_group_events.get(media_group_id)
|
||||
if existing is None:
|
||||
self._media_group_events[media_group_id] = event
|
||||
else:
|
||||
existing.media_urls.extend(event.media_urls)
|
||||
existing.media_types.extend(event.media_types)
|
||||
if event.text:
|
||||
if existing.text:
|
||||
if event.text not in existing.text.split("\n\n"):
|
||||
existing.text = f"{existing.text}\n\n{event.text}"
|
||||
else:
|
||||
existing.text = event.text
|
||||
|
||||
prior_task = self._media_group_tasks.get(media_group_id)
|
||||
if prior_task:
|
||||
prior_task.cancel()
|
||||
|
||||
self._media_group_tasks[media_group_id] = asyncio.create_task(
|
||||
self._flush_media_group_event(media_group_id)
|
||||
)
|
||||
|
||||
async def _flush_media_group_event(self, media_group_id: str) -> None:
|
||||
try:
|
||||
await asyncio.sleep(self.MEDIA_GROUP_WAIT_SECONDS)
|
||||
event = self._media_group_events.pop(media_group_id, None)
|
||||
if event is not None:
|
||||
await self.handle_message(event)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
self._media_group_tasks.pop(media_group_id, None)
|
||||
|
||||
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
|
||||
"""
|
||||
Describe a Telegram sticker via vision analysis, with caching.
|
||||
|
||||
@@ -215,6 +215,33 @@ def _resolve_gateway_model() -> str:
|
||||
return model
|
||||
|
||||
|
||||
def _resolve_hermes_bin() -> Optional[list[str]]:
|
||||
"""Resolve the Hermes update command as argv parts.
|
||||
|
||||
Tries in order:
|
||||
1. ``shutil.which("hermes")`` — standard PATH lookup
|
||||
2. ``sys.executable -m hermes_cli.main`` — fallback when Hermes is running
|
||||
from a venv/module invocation and the ``hermes`` shim is not on PATH
|
||||
|
||||
Returns argv parts ready for quoting/joining, or ``None`` if neither works.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
hermes_bin = shutil.which("hermes")
|
||||
if hermes_bin:
|
||||
return [hermes_bin]
|
||||
|
||||
try:
|
||||
import importlib.util
|
||||
|
||||
if importlib.util.find_spec("hermes_cli") is not None:
|
||||
return [sys.executable, "-m", "hermes_cli.main"]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class GatewayRunner:
|
||||
"""
|
||||
Main gateway controller.
|
||||
@@ -3222,9 +3249,14 @@ class GatewayRunner:
|
||||
if not git_dir.exists():
|
||||
return "✗ Not a git repository — cannot update."
|
||||
|
||||
hermes_bin = shutil.which("hermes")
|
||||
if not hermes_bin:
|
||||
return "✗ `hermes` command not found on PATH."
|
||||
hermes_cmd = _resolve_hermes_bin()
|
||||
if not hermes_cmd:
|
||||
return (
|
||||
"✗ Could not locate the `hermes` command. "
|
||||
"Hermes is running, but the update command could not find the "
|
||||
"executable on PATH or via the current Python interpreter. "
|
||||
"Try running `hermes update` manually in your terminal."
|
||||
)
|
||||
|
||||
pending_path = _hermes_home / ".update_pending.json"
|
||||
output_path = _hermes_home / ".update_output.txt"
|
||||
@@ -3240,8 +3272,9 @@ class GatewayRunner:
|
||||
|
||||
# Spawn `hermes update` in a separate cgroup so it survives gateway
|
||||
# restart. systemd-run --user --scope creates a transient scope unit.
|
||||
hermes_cmd_str = " ".join(shlex.quote(part) for part in hermes_cmd)
|
||||
update_cmd = (
|
||||
f"{shlex.quote(hermes_bin)} update > {shlex.quote(str(output_path))} 2>&1; "
|
||||
f"{hermes_cmd_str} update > {shlex.quote(str(output_path))} 2>&1; "
|
||||
f"status=$?; printf '%s' \"$status\" > {shlex.quote(str(exit_code_path))}"
|
||||
)
|
||||
try:
|
||||
|
||||
@@ -455,6 +455,7 @@ class TestPromptBuilderConstants:
|
||||
assert "whatsapp" in PLATFORM_HINTS
|
||||
assert "telegram" in PLATFORM_HINTS
|
||||
assert "discord" in PLATFORM_HINTS
|
||||
assert "cron" in PLATFORM_HINTS
|
||||
assert "cli" in PLATFORM_HINTS
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from cron.scheduler import _resolve_origin, _deliver_result, run_job
|
||||
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job
|
||||
|
||||
|
||||
class TestResolveOrigin:
|
||||
@@ -44,6 +44,56 @@ class TestResolveOrigin:
|
||||
assert _resolve_origin(job) is None
|
||||
|
||||
|
||||
class TestResolveDeliveryTarget:
|
||||
def test_origin_delivery_preserves_thread_id(self):
|
||||
job = {
|
||||
"deliver": "origin",
|
||||
"origin": {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
}
|
||||
|
||||
def test_bare_platform_uses_matching_origin_chat(self):
|
||||
job = {
|
||||
"deliver": "telegram",
|
||||
"origin": {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
}
|
||||
|
||||
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
|
||||
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
|
||||
job = {
|
||||
"deliver": "telegram",
|
||||
"origin": {
|
||||
"platform": "discord",
|
||||
"chat_id": "abc",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-2002",
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
|
||||
class TestDeliverResultMirrorLogging:
|
||||
"""Verify that mirror_to_session failures are logged, not silently swallowed."""
|
||||
|
||||
|
||||
@@ -81,20 +81,21 @@ def _make_document(
|
||||
return doc
|
||||
|
||||
|
||||
def _make_message(document=None, caption=None):
|
||||
"""Build a mock Telegram Message with the given document."""
|
||||
def _make_message(document=None, caption=None, media_group_id=None, photo=None):
|
||||
"""Build a mock Telegram Message with the given document/photo."""
|
||||
msg = MagicMock()
|
||||
msg.message_id = 42
|
||||
msg.text = caption or ""
|
||||
msg.caption = caption
|
||||
msg.date = None
|
||||
# Media flags — all None except document
|
||||
msg.photo = None
|
||||
# Media flags — all None except explicit payload
|
||||
msg.photo = photo
|
||||
msg.video = None
|
||||
msg.audio = None
|
||||
msg.voice = None
|
||||
msg.sticker = None
|
||||
msg.document = document
|
||||
msg.media_group_id = media_group_id
|
||||
# Chat / user
|
||||
msg.chat = MagicMock()
|
||||
msg.chat.id = 100
|
||||
@@ -165,6 +166,12 @@ class TestDocumentTypeDetection:
|
||||
# TestDocumentDownloadBlock
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_photo(file_obj=None):
|
||||
photo = MagicMock()
|
||||
photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes"))
|
||||
return photo
|
||||
|
||||
|
||||
class TestDocumentDownloadBlock:
|
||||
@pytest.mark.asyncio
|
||||
async def test_supported_pdf_is_cached(self, adapter):
|
||||
@@ -339,6 +346,50 @@ class TestDocumentDownloadBlock:
|
||||
adapter.handle_message.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMediaGroups — media group (album) buffering
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMediaGroups:
|
||||
@pytest.mark.asyncio
|
||||
async def test_photo_album_is_buffered_and_combined(self, adapter):
|
||||
first_photo = _make_photo(_make_file_obj(b"first"))
|
||||
second_photo = _make_photo(_make_file_obj(b"second"))
|
||||
|
||||
msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo])
|
||||
msg2 = _make_message(media_group_id="album-1", photo=[second_photo])
|
||||
|
||||
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]):
|
||||
await adapter._handle_media_message(_make_update(msg1), MagicMock())
|
||||
await adapter._handle_media_message(_make_update(msg2), MagicMock())
|
||||
assert adapter.handle_message.await_count == 0
|
||||
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
||||
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.call_args[0][0]
|
||||
assert event.text == "two images"
|
||||
assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"]
|
||||
assert len(event.media_types) == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_cancels_pending_media_group_flush(self, adapter):
|
||||
first_photo = _make_photo(_make_file_obj(b"first"))
|
||||
msg = _make_message(caption="two images", media_group_id="album-2", photo=[first_photo])
|
||||
|
||||
with patch("gateway.platforms.telegram.cache_image_from_bytes", return_value="/tmp/one.jpg"):
|
||||
await adapter._handle_media_message(_make_update(msg), MagicMock())
|
||||
|
||||
assert "album-2" in adapter._media_group_events
|
||||
assert "album-2" in adapter._media_group_tasks
|
||||
|
||||
await adapter.disconnect()
|
||||
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
||||
|
||||
assert adapter._media_group_events == {}
|
||||
assert adapter._media_group_tasks == {}
|
||||
adapter.handle_message.assert_not_awaited()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestSendDocument — outbound file attachment delivery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -88,7 +88,7 @@ class TestHandleUpdateCommand:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_hermes_binary(self, tmp_path):
|
||||
"""Returns error when hermes is not on PATH."""
|
||||
"""Returns error when hermes is not on PATH and hermes_cli is not importable."""
|
||||
runner = _make_runner()
|
||||
event = _make_event()
|
||||
|
||||
@@ -102,10 +102,77 @@ class TestHandleUpdateCommand:
|
||||
|
||||
with patch("gateway.run._hermes_home", tmp_path), \
|
||||
patch("gateway.run.__file__", fake_file), \
|
||||
patch("shutil.which", return_value=None):
|
||||
patch("shutil.which", return_value=None), \
|
||||
patch("importlib.util.find_spec", return_value=None):
|
||||
result = await runner._handle_update_command(event)
|
||||
|
||||
assert "not found on PATH" in result
|
||||
assert "Could not locate" in result
|
||||
assert "hermes update" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_to_sys_executable(self, tmp_path):
|
||||
"""Falls back to sys.executable -m hermes_cli.main when hermes not on PATH."""
|
||||
runner = _make_runner()
|
||||
event = _make_event()
|
||||
|
||||
fake_root = tmp_path / "project"
|
||||
fake_root.mkdir()
|
||||
(fake_root / ".git").mkdir()
|
||||
(fake_root / "gateway").mkdir()
|
||||
(fake_root / "gateway" / "run.py").touch()
|
||||
fake_file = str(fake_root / "gateway" / "run.py")
|
||||
hermes_home = tmp_path / "hermes"
|
||||
hermes_home.mkdir()
|
||||
|
||||
mock_popen = MagicMock()
|
||||
fake_spec = MagicMock()
|
||||
|
||||
with patch("gateway.run._hermes_home", hermes_home), \
|
||||
patch("gateway.run.__file__", fake_file), \
|
||||
patch("shutil.which", return_value=None), \
|
||||
patch("importlib.util.find_spec", return_value=fake_spec), \
|
||||
patch("subprocess.Popen", mock_popen):
|
||||
result = await runner._handle_update_command(event)
|
||||
|
||||
assert "Starting Hermes update" in result
|
||||
call_args = mock_popen.call_args[0][0]
|
||||
# The update_cmd uses sys.executable -m hermes_cli.main
|
||||
joined = " ".join(call_args) if isinstance(call_args, list) else call_args
|
||||
assert "hermes_cli.main" in joined or "bash" in call_args[0]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_hermes_bin_prefers_which(self, tmp_path):
|
||||
"""_resolve_hermes_bin returns argv parts from shutil.which when available."""
|
||||
from gateway.run import _resolve_hermes_bin
|
||||
|
||||
with patch("shutil.which", return_value="/custom/path/hermes"):
|
||||
result = _resolve_hermes_bin()
|
||||
|
||||
assert result == ["/custom/path/hermes"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_hermes_bin_fallback(self):
|
||||
"""_resolve_hermes_bin falls back to sys.executable argv when which fails."""
|
||||
import sys
|
||||
from gateway.run import _resolve_hermes_bin
|
||||
|
||||
fake_spec = MagicMock()
|
||||
with patch("shutil.which", return_value=None), \
|
||||
patch("importlib.util.find_spec", return_value=fake_spec):
|
||||
result = _resolve_hermes_bin()
|
||||
|
||||
assert result == [sys.executable, "-m", "hermes_cli.main"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_hermes_bin_returns_none_when_both_fail(self):
|
||||
"""_resolve_hermes_bin returns None when both strategies fail."""
|
||||
from gateway.run import _resolve_hermes_bin
|
||||
|
||||
with patch("shutil.which", return_value=None), \
|
||||
patch("importlib.util.find_spec", return_value=None):
|
||||
result = _resolve_hermes_bin()
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_writes_pending_marker(self, tmp_path):
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
@@ -29,6 +30,118 @@ def _install_telegram_mock(monkeypatch, bot):
|
||||
|
||||
|
||||
class TestSendMessageTool:
|
||||
def test_cron_duplicate_target_is_skipped_and_explained(self):
|
||||
home = SimpleNamespace(chat_id="-1001")
|
||||
config, _telegram_cfg = _make_config()
|
||||
config.get_home_channel = lambda _platform: home
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["skipped"] is True
|
||||
assert result["reason"] == "cron_auto_delivery_duplicate_target"
|
||||
assert "final response" in result["note"]
|
||||
send_mock.assert_not_awaited()
|
||||
mirror_mock.assert_not_called()
|
||||
|
||||
def test_cron_different_target_still_sends(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram:-1002",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result.get("skipped") is not True
|
||||
send_mock.assert_awaited_once_with(
|
||||
Platform.TELEGRAM,
|
||||
telegram_cfg,
|
||||
"-1002",
|
||||
"hello",
|
||||
thread_id=None,
|
||||
media_files=[],
|
||||
)
|
||||
mirror_mock.assert_called_once_with("telegram", "-1002", "hello", source_label="cli", thread_id=None)
|
||||
|
||||
def test_cron_same_chat_different_thread_still_sends(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": "17585",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram:-1001:99999",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result.get("skipped") is not True
|
||||
send_mock.assert_awaited_once_with(
|
||||
Platform.TELEGRAM,
|
||||
telegram_cfg,
|
||||
"-1001",
|
||||
"hello",
|
||||
thread_id="99999",
|
||||
media_files=[],
|
||||
)
|
||||
mirror_mock.assert_called_once_with("telegram", "-1001", "hello", source_label="cli", thread_id="99999")
|
||||
|
||||
def test_sends_to_explicit_telegram_topic_target(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
|
||||
@@ -194,7 +194,10 @@ DELIVERY OPTIONS (where output goes):
|
||||
- "telegram:123456": Send to specific chat (if user provides ID)
|
||||
|
||||
NOTE: The agent's final response is auto-delivered to the target — do NOT use
|
||||
send_message in the prompt. Just have the agent compose its response normally.
|
||||
send_message in the prompt for that same destination. Same-target send_message
|
||||
calls are skipped so the cron doesn't double-message the user. Put the main
|
||||
user-facing content in the final response, and use send_message only for
|
||||
additional or different targets.
|
||||
|
||||
Use for: reminders, periodic checks, scheduled reports, automated maintenance.""",
|
||||
"parameters": {
|
||||
|
||||
@@ -153,6 +153,10 @@ def _handle_send(args):
|
||||
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
|
||||
})
|
||||
|
||||
duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
|
||||
if duplicate_skip:
|
||||
return json.dumps(duplicate_skip)
|
||||
|
||||
try:
|
||||
from model_tools import _run_async
|
||||
result = _run_async(
|
||||
@@ -213,6 +217,51 @@ def _describe_media_for_mirror(media_files):
|
||||
return f"[Sent {len(media_files)} media attachments]"
|
||||
|
||||
|
||||
def _get_cron_auto_delivery_target():
|
||||
"""Return the cron scheduler's auto-delivery target for the current run, if any."""
|
||||
platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
|
||||
chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
|
||||
if not platform or not chat_id:
|
||||
return None
|
||||
thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
|
||||
return {
|
||||
"platform": platform,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": thread_id,
|
||||
}
|
||||
|
||||
|
||||
def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
|
||||
"""Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
|
||||
auto_target = _get_cron_auto_delivery_target()
|
||||
if not auto_target:
|
||||
return None
|
||||
|
||||
same_target = (
|
||||
auto_target["platform"] == platform_name
|
||||
and str(auto_target["chat_id"]) == str(chat_id)
|
||||
and auto_target.get("thread_id") == thread_id
|
||||
)
|
||||
if not same_target:
|
||||
return None
|
||||
|
||||
target_label = f"{platform_name}:{chat_id}"
|
||||
if thread_id is not None:
|
||||
target_label += f":{thread_id}"
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"skipped": True,
|
||||
"reason": "cron_auto_delivery_duplicate_target",
|
||||
"target": target_label,
|
||||
"note": (
|
||||
f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
|
||||
"its final response to that same target. Put the intended user-facing content in "
|
||||
"your final response instead, or use a different target if you want an additional message."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
|
||||
"""Route a message to the appropriate platform sender."""
|
||||
from gateway.config import Platform
|
||||
|
||||
@@ -79,7 +79,7 @@ When scheduling jobs, you specify where the output goes:
|
||||
|
||||
**How platform names work:** When you specify a bare platform name like `"telegram"`, Hermes first checks if the job's origin matches that platform and uses the origin chat ID. Otherwise, it falls back to the platform's home channel configured via environment variable (e.g., `TELEGRAM_HOME_CHANNEL`).
|
||||
|
||||
The agent's final response is automatically delivered — you do **not** need to include `send_message` in the cron prompt.
|
||||
The agent's final response is automatically delivered — you do **not** need to include `send_message` in the cron prompt for that same destination. If a cron run calls `send_message` to the exact target the scheduler will already deliver to, Hermes skips that duplicate send and tells the model to put the user-facing content in the final response instead. Use `send_message` only for additional or different targets.
|
||||
|
||||
The agent knows your connected platforms and home channels — it'll choose sensible defaults.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user