mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 02:07:34 +08:00
Compare commits
2 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7224469bb1 | ||
|
|
e1b8c7f658 |
@@ -158,6 +158,44 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background
|
||||||
|
_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'})
|
||||||
|
_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
|
||||||
|
_IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'})
|
||||||
|
|
||||||
|
|
||||||
|
def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: dict | None, loop, job: dict) -> None:
|
||||||
|
"""Send extracted MEDIA files as native platform attachments via a live adapter.
|
||||||
|
|
||||||
|
Routes each file to the appropriate adapter method (send_voice, send_image_file,
|
||||||
|
send_video, send_document) based on file extension — mirroring the routing logic
|
||||||
|
in ``BasePlatformAdapter._process_message_background``.
|
||||||
|
"""
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
for media_path, _is_voice in media_files:
|
||||||
|
try:
|
||||||
|
ext = Path(media_path).suffix.lower()
|
||||||
|
if ext in _AUDIO_EXTS:
|
||||||
|
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
|
||||||
|
elif ext in _VIDEO_EXTS:
|
||||||
|
coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata)
|
||||||
|
elif ext in _IMAGE_EXTS:
|
||||||
|
coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata)
|
||||||
|
else:
|
||||||
|
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
|
||||||
|
|
||||||
|
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
||||||
|
result = future.result(timeout=30)
|
||||||
|
if result and not getattr(result, "success", True):
|
||||||
|
logger.warning(
|
||||||
|
"Job '%s': media send failed for %s: %s",
|
||||||
|
job.get("id", "?"), media_path, getattr(result, "error", "unknown"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
|
||||||
|
|
||||||
|
|
||||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
||||||
"""
|
"""
|
||||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||||
@@ -246,8 +284,12 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
|||||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||||
try:
|
try:
|
||||||
|
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||||
|
text_to_send = cleaned_delivery_content.strip()
|
||||||
|
adapter_ok = True
|
||||||
|
if text_to_send:
|
||||||
future = asyncio.run_coroutine_threadsafe(
|
future = asyncio.run_coroutine_threadsafe(
|
||||||
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
|
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
|
||||||
loop,
|
loop,
|
||||||
)
|
)
|
||||||
send_result = future.result(timeout=60)
|
send_result = future.result(timeout=60)
|
||||||
@@ -257,7 +299,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
|||||||
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
||||||
job["id"], platform_name, chat_id, err,
|
job["id"], platform_name, chat_id, err,
|
||||||
)
|
)
|
||||||
else:
|
adapter_ok = False # fall through to standalone path
|
||||||
|
|
||||||
|
# Send extracted media files as native attachments via the live adapter
|
||||||
|
if adapter_ok and media_files:
|
||||||
|
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
|
||||||
|
|
||||||
|
if adapter_ok:
|
||||||
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
|
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt
|
||||||
|
|
||||||
|
|
||||||
class TestResolveOrigin:
|
class TestResolveOrigin:
|
||||||
@@ -277,6 +277,188 @@ class TestDeliverResultWrapping:
|
|||||||
# Media files should be forwarded separately
|
# Media files should be forwarded separately
|
||||||
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
|
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
|
||||||
|
|
||||||
|
def test_live_adapter_sends_media_as_attachments(self):
|
||||||
|
"""When a live adapter is available, MEDIA files should be sent as native
|
||||||
|
platform attachments (e.g., Discord voice, Telegram audio) rather than
|
||||||
|
as literal 'MEDIA:/path' text."""
|
||||||
|
from gateway.config import Platform
|
||||||
|
from concurrent.futures import Future
|
||||||
|
|
||||||
|
adapter = AsyncMock()
|
||||||
|
adapter.send.return_value = MagicMock(success=True)
|
||||||
|
adapter.send_voice.return_value = MagicMock(success=True)
|
||||||
|
|
||||||
|
pconfig = MagicMock()
|
||||||
|
pconfig.enabled = True
|
||||||
|
mock_cfg = MagicMock()
|
||||||
|
mock_cfg.platforms = {Platform.DISCORD: pconfig}
|
||||||
|
|
||||||
|
loop = MagicMock()
|
||||||
|
loop.is_running.return_value = True
|
||||||
|
|
||||||
|
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
|
||||||
|
def fake_run_coro(coro, _loop):
|
||||||
|
future = Future()
|
||||||
|
future.set_result(MagicMock(success=True))
|
||||||
|
coro.close()
|
||||||
|
return future
|
||||||
|
|
||||||
|
job = {
|
||||||
|
"id": "tts-job",
|
||||||
|
"deliver": "origin",
|
||||||
|
"origin": {"platform": "discord", "chat_id": "9876"},
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||||
|
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||||
|
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||||
|
_deliver_result(
|
||||||
|
job,
|
||||||
|
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
|
||||||
|
adapters={Platform.DISCORD: adapter},
|
||||||
|
loop=loop,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Text should be sent without the MEDIA tag
|
||||||
|
adapter.send.assert_called_once()
|
||||||
|
text_sent = adapter.send.call_args[0][1]
|
||||||
|
assert "MEDIA:" not in text_sent
|
||||||
|
assert "Here is TTS" in text_sent
|
||||||
|
|
||||||
|
# Audio file should be sent as a voice attachment
|
||||||
|
adapter.send_voice.assert_called_once()
|
||||||
|
voice_call = adapter.send_voice.call_args
|
||||||
|
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
|
||||||
|
|
||||||
|
def test_live_adapter_routes_image_to_send_image_file(self):
|
||||||
|
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
|
||||||
|
from gateway.config import Platform
|
||||||
|
from concurrent.futures import Future
|
||||||
|
|
||||||
|
adapter = AsyncMock()
|
||||||
|
adapter.send.return_value = MagicMock(success=True)
|
||||||
|
adapter.send_image_file.return_value = MagicMock(success=True)
|
||||||
|
|
||||||
|
pconfig = MagicMock()
|
||||||
|
pconfig.enabled = True
|
||||||
|
mock_cfg = MagicMock()
|
||||||
|
mock_cfg.platforms = {Platform.DISCORD: pconfig}
|
||||||
|
|
||||||
|
loop = MagicMock()
|
||||||
|
loop.is_running.return_value = True
|
||||||
|
|
||||||
|
def fake_run_coro(coro, _loop):
|
||||||
|
future = Future()
|
||||||
|
future.set_result(MagicMock(success=True))
|
||||||
|
coro.close()
|
||||||
|
return future
|
||||||
|
|
||||||
|
job = {
|
||||||
|
"id": "img-job",
|
||||||
|
"deliver": "origin",
|
||||||
|
"origin": {"platform": "discord", "chat_id": "1234"},
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||||
|
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||||
|
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||||
|
_deliver_result(
|
||||||
|
job,
|
||||||
|
"Chart attached\nMEDIA:/tmp/chart.png",
|
||||||
|
adapters={Platform.DISCORD: adapter},
|
||||||
|
loop=loop,
|
||||||
|
)
|
||||||
|
|
||||||
|
adapter.send_image_file.assert_called_once()
|
||||||
|
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
|
||||||
|
adapter.send_voice.assert_not_called()
|
||||||
|
|
||||||
|
def test_live_adapter_media_only_no_text(self):
|
||||||
|
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
|
||||||
|
from gateway.config import Platform
|
||||||
|
from concurrent.futures import Future
|
||||||
|
|
||||||
|
adapter = AsyncMock()
|
||||||
|
adapter.send_voice.return_value = MagicMock(success=True)
|
||||||
|
|
||||||
|
pconfig = MagicMock()
|
||||||
|
pconfig.enabled = True
|
||||||
|
mock_cfg = MagicMock()
|
||||||
|
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||||
|
|
||||||
|
loop = MagicMock()
|
||||||
|
loop.is_running.return_value = True
|
||||||
|
|
||||||
|
def fake_run_coro(coro, _loop):
|
||||||
|
future = Future()
|
||||||
|
future.set_result(MagicMock(success=True))
|
||||||
|
coro.close()
|
||||||
|
return future
|
||||||
|
|
||||||
|
job = {
|
||||||
|
"id": "voice-only",
|
||||||
|
"deliver": "origin",
|
||||||
|
"origin": {"platform": "telegram", "chat_id": "999"},
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||||
|
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||||
|
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||||
|
_deliver_result(
|
||||||
|
job,
|
||||||
|
"MEDIA:/tmp/voice.ogg",
|
||||||
|
adapters={Platform.TELEGRAM: adapter},
|
||||||
|
loop=loop,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Text send should NOT be called (no text after stripping MEDIA tag)
|
||||||
|
adapter.send.assert_not_called()
|
||||||
|
# Audio should still be delivered
|
||||||
|
adapter.send_voice.assert_called_once()
|
||||||
|
|
||||||
|
def test_live_adapter_sends_cleaned_text_not_raw(self):
|
||||||
|
"""The live adapter path must send cleaned text (MEDIA tags stripped),
|
||||||
|
not the raw delivery_content with embedded MEDIA: tags."""
|
||||||
|
from gateway.config import Platform
|
||||||
|
from concurrent.futures import Future
|
||||||
|
|
||||||
|
adapter = AsyncMock()
|
||||||
|
adapter.send.return_value = MagicMock(success=True)
|
||||||
|
|
||||||
|
pconfig = MagicMock()
|
||||||
|
pconfig.enabled = True
|
||||||
|
mock_cfg = MagicMock()
|
||||||
|
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||||
|
|
||||||
|
loop = MagicMock()
|
||||||
|
loop.is_running.return_value = True
|
||||||
|
|
||||||
|
def fake_run_coro(coro, _loop):
|
||||||
|
future = Future()
|
||||||
|
future.set_result(MagicMock(success=True))
|
||||||
|
coro.close()
|
||||||
|
return future
|
||||||
|
|
||||||
|
job = {
|
||||||
|
"id": "img-job",
|
||||||
|
"deliver": "origin",
|
||||||
|
"origin": {"platform": "telegram", "chat_id": "555"},
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||||
|
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||||
|
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||||
|
_deliver_result(
|
||||||
|
job,
|
||||||
|
"Report\nMEDIA:/tmp/chart.png",
|
||||||
|
adapters={Platform.TELEGRAM: adapter},
|
||||||
|
loop=loop,
|
||||||
|
)
|
||||||
|
|
||||||
|
text_sent = adapter.send.call_args[0][1]
|
||||||
|
assert "MEDIA:" not in text_sent
|
||||||
|
assert "Report" in text_sent
|
||||||
|
|
||||||
def test_no_mirror_to_session_call(self):
|
def test_no_mirror_to_session_call(self):
|
||||||
"""Cron deliveries should NOT mirror into the gateway session."""
|
"""Cron deliveries should NOT mirror into the gateway session."""
|
||||||
from gateway.config import Platform
|
from gateway.config import Platform
|
||||||
@@ -862,3 +1044,57 @@ class TestTickAdvanceBeforeRun:
|
|||||||
adv_mock.assert_called_once_with("test-advance")
|
adv_mock.assert_called_once_with("test-advance")
|
||||||
# advance must happen before run
|
# advance must happen before run
|
||||||
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
|
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendMediaViaAdapter:
|
||||||
|
"""Unit tests for _send_media_via_adapter — routes files to typed adapter methods."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _run_with_loop(adapter, chat_id, media_files, metadata, job):
|
||||||
|
"""Helper: run _send_media_via_adapter with a real running event loop."""
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
t = threading.Thread(target=loop.run_forever, daemon=True)
|
||||||
|
t.start()
|
||||||
|
try:
|
||||||
|
_send_media_via_adapter(adapter, chat_id, media_files, metadata, loop, job)
|
||||||
|
finally:
|
||||||
|
loop.call_soon_threadsafe(loop.stop)
|
||||||
|
t.join(timeout=5)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
def test_video_dispatched_to_send_video(self):
|
||||||
|
adapter = MagicMock()
|
||||||
|
adapter.send_video = AsyncMock()
|
||||||
|
media_files = [("/tmp/clip.mp4", False)]
|
||||||
|
self._run_with_loop(adapter, "123", media_files, None, {"id": "j1"})
|
||||||
|
adapter.send_video.assert_called_once()
|
||||||
|
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/clip.mp4"
|
||||||
|
|
||||||
|
def test_unknown_ext_dispatched_to_send_document(self):
|
||||||
|
adapter = MagicMock()
|
||||||
|
adapter.send_document = AsyncMock()
|
||||||
|
media_files = [("/tmp/report.pdf", False)]
|
||||||
|
self._run_with_loop(adapter, "123", media_files, None, {"id": "j2"})
|
||||||
|
adapter.send_document.assert_called_once()
|
||||||
|
assert adapter.send_document.call_args[1]["file_path"] == "/tmp/report.pdf"
|
||||||
|
|
||||||
|
def test_multiple_media_files_all_delivered(self):
|
||||||
|
adapter = MagicMock()
|
||||||
|
adapter.send_voice = AsyncMock()
|
||||||
|
adapter.send_image_file = AsyncMock()
|
||||||
|
media_files = [("/tmp/voice.mp3", False), ("/tmp/photo.jpg", False)]
|
||||||
|
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
|
||||||
|
adapter.send_voice.assert_called_once()
|
||||||
|
adapter.send_image_file.assert_called_once()
|
||||||
|
|
||||||
|
def test_single_failure_does_not_block_others(self):
|
||||||
|
adapter = MagicMock()
|
||||||
|
adapter.send_voice = AsyncMock(side_effect=RuntimeError("network error"))
|
||||||
|
adapter.send_image_file = AsyncMock()
|
||||||
|
media_files = [("/tmp/voice.ogg", False), ("/tmp/photo.png", False)]
|
||||||
|
self._run_with_loop(adapter, "123", media_files, None, {"id": "j4"})
|
||||||
|
adapter.send_voice.assert_called_once()
|
||||||
|
adapter.send_image_file.assert_called_once()
|
||||||
|
|||||||
Reference in New Issue
Block a user