mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-16 15:11:18 +08:00
Compare commits
29 Commits
fix/1412-s
...
sid/persis
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4511322f56 | ||
|
|
934fc9df22 | ||
|
|
5847c180c6 | ||
|
|
93a0c0cddd | ||
|
|
23e8fdd167 | ||
|
|
3268b98779 | ||
|
|
20f381cfb6 | ||
|
|
77bfa252b9 | ||
|
|
f24c00a5bf | ||
|
|
463239ed85 | ||
|
|
60cce9ca6d | ||
|
|
2d57946ee9 | ||
|
|
5f32fd8b6d | ||
|
|
3ea039684e | ||
|
|
63f0ec96ec | ||
|
|
1cacaccca6 | ||
|
|
773f3c1137 | ||
|
|
0cc784068d | ||
|
|
f1b4d0b280 | ||
|
|
5254d0bba1 | ||
|
|
21c20aeaa5 | ||
|
|
dc095f8491 | ||
|
|
e266530c7d | ||
|
|
879b7d3fbf | ||
|
|
9f36483bf4 | ||
|
|
7be314c456 | ||
|
|
9001b34146 | ||
|
|
861202b56c | ||
|
|
9d63dcc3f9 |
@@ -42,19 +42,16 @@ def _setup_logging() -> None:
|
||||
|
||||
def _load_env() -> None:
|
||||
"""Load .env from HERMES_HOME (default ``~/.hermes``)."""
|
||||
from dotenv import load_dotenv
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
|
||||
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
env_file = hermes_home / ".env"
|
||||
if env_file.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=env_file, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=env_file, encoding="latin-1")
|
||||
logging.getLogger(__name__).info("Loaded env from %s", env_file)
|
||||
loaded = load_hermes_dotenv(hermes_home=hermes_home)
|
||||
if loaded:
|
||||
for env_file in loaded:
|
||||
logging.getLogger(__name__).info("Loaded env from %s", env_file)
|
||||
else:
|
||||
logging.getLogger(__name__).info(
|
||||
"No .env found at %s, using system env", env_file
|
||||
"No .env found at %s, using system env", hermes_home / ".env"
|
||||
)
|
||||
|
||||
|
||||
|
||||
17
cli.py
17
cli.py
@@ -61,23 +61,14 @@ import queue
|
||||
_COMMAND_SPINNER_FRAMES = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏")
|
||||
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback
|
||||
from dotenv import load_dotenv
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from hermes_constants import OPENROUTER_BASE_URL
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
|
||||
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
_user_env = _hermes_home / ".env"
|
||||
_project_env = Path(__file__).parent / '.env'
|
||||
if _user_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="latin-1")
|
||||
elif _project_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="latin-1")
|
||||
load_hermes_dotenv(hermes_home=_hermes_home, project_env=_project_env)
|
||||
|
||||
# Point mini-swe-agent at ~/.hermes/ so it shares our config
|
||||
os.environ.setdefault("MSWEA_GLOBAL_CONFIG_DIR", str(_hermes_home))
|
||||
|
||||
@@ -356,6 +356,10 @@ class BasePlatformAdapter(ABC):
|
||||
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
|
||||
self._active_sessions: Dict[str, asyncio.Event] = {}
|
||||
self._pending_messages: Dict[str, MessageEvent] = {}
|
||||
# Background message-processing tasks spawned by handle_message().
|
||||
# Gateway shutdown cancels these so an old gateway instance doesn't keep
|
||||
# working on a task after --replace or manual restarts.
|
||||
self._background_tasks: set[asyncio.Task] = set()
|
||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||
self._auto_tts_disabled_chats: set = set()
|
||||
|
||||
@@ -778,7 +782,15 @@ class BasePlatformAdapter(ABC):
|
||||
return # Don't process now - will be handled after current task finishes
|
||||
|
||||
# Spawn background task to process this message
|
||||
asyncio.create_task(self._process_message_background(event, session_key))
|
||||
task = asyncio.create_task(self._process_message_background(event, session_key))
|
||||
try:
|
||||
self._background_tasks.add(task)
|
||||
except TypeError:
|
||||
# Some tests stub create_task() with lightweight sentinels that are not
|
||||
# hashable and do not support lifecycle callbacks.
|
||||
return
|
||||
if hasattr(task, "add_done_callback"):
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
@staticmethod
|
||||
def _get_human_delay() -> float:
|
||||
@@ -988,6 +1000,21 @@ class BasePlatformAdapter(ABC):
|
||||
if session_key in self._active_sessions:
|
||||
del self._active_sessions[session_key]
|
||||
|
||||
async def cancel_background_tasks(self) -> None:
|
||||
"""Cancel any in-flight background message-processing tasks.
|
||||
|
||||
Used during gateway shutdown/replacement so active sessions from the old
|
||||
process do not keep running after adapters are being torn down.
|
||||
"""
|
||||
tasks = [task for task in self._background_tasks if not task.done()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
self._background_tasks.clear()
|
||||
self._pending_messages.clear()
|
||||
self._active_sessions.clear()
|
||||
|
||||
def has_pending_interrupt(self, session_key: str) -> bool:
|
||||
"""Check if there's a pending interrupt for a session."""
|
||||
return session_key in self._active_sessions and self._active_sessions[session_key].is_set()
|
||||
|
||||
@@ -87,8 +87,9 @@ class VoiceReceiver:
|
||||
SAMPLE_RATE = 48000 # Discord native rate
|
||||
CHANNELS = 2 # Discord sends stereo
|
||||
|
||||
def __init__(self, voice_client):
|
||||
def __init__(self, voice_client, allowed_user_ids: set = None):
|
||||
self._vc = voice_client
|
||||
self._allowed_user_ids = allowed_user_ids or set()
|
||||
self._running = False
|
||||
|
||||
# Decryption
|
||||
@@ -274,19 +275,21 @@ class VoiceReceiver:
|
||||
if self._dave_session:
|
||||
with self._lock:
|
||||
user_id = self._ssrc_to_user.get(ssrc, 0)
|
||||
if user_id == 0:
|
||||
if self._packet_debug_count <= 10:
|
||||
logger.warning("DAVE skip: unknown user for ssrc=%d", ssrc)
|
||||
return # unknown user, can't DAVE-decrypt
|
||||
try:
|
||||
import davey
|
||||
decrypted = self._dave_session.decrypt(
|
||||
user_id, davey.MediaType.audio, decrypted
|
||||
)
|
||||
except Exception as e:
|
||||
if self._packet_debug_count <= 10:
|
||||
logger.warning("DAVE decrypt failed for ssrc=%d: %s", ssrc, e)
|
||||
return
|
||||
if user_id:
|
||||
try:
|
||||
import davey
|
||||
decrypted = self._dave_session.decrypt(
|
||||
user_id, davey.MediaType.audio, decrypted
|
||||
)
|
||||
except Exception as e:
|
||||
# Unencrypted passthrough — use NaCl-decrypted data as-is
|
||||
if "Unencrypted" not in str(e):
|
||||
if self._packet_debug_count <= 10:
|
||||
logger.warning("DAVE decrypt failed for ssrc=%d: %s", ssrc, e)
|
||||
return
|
||||
# If SSRC unknown (no SPEAKING event yet), skip DAVE and try
|
||||
# Opus decode directly — audio may be in passthrough mode.
|
||||
# Buffer will get a user_id when SPEAKING event arrives later.
|
||||
|
||||
# --- Opus decode -> PCM ---
|
||||
try:
|
||||
@@ -304,6 +307,32 @@ class VoiceReceiver:
|
||||
# Silence detection
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _infer_user_for_ssrc(self, ssrc: int) -> int:
|
||||
"""Try to infer user_id for an unmapped SSRC.
|
||||
|
||||
When the bot rejoins a voice channel, Discord may not resend
|
||||
SPEAKING events for users already speaking. If exactly one
|
||||
allowed user is in the channel, map the SSRC to them.
|
||||
"""
|
||||
try:
|
||||
channel = self._vc.channel
|
||||
if not channel:
|
||||
return 0
|
||||
bot_id = self._vc.user.id if self._vc.user else 0
|
||||
allowed = self._allowed_user_ids
|
||||
candidates = [
|
||||
m.id for m in channel.members
|
||||
if m.id != bot_id and (not allowed or str(m.id) in allowed)
|
||||
]
|
||||
if len(candidates) == 1:
|
||||
uid = candidates[0]
|
||||
self._ssrc_to_user[ssrc] = uid
|
||||
logger.info("Auto-mapped ssrc=%d -> user=%d (sole allowed member)", ssrc, uid)
|
||||
return uid
|
||||
except Exception:
|
||||
pass
|
||||
return 0
|
||||
|
||||
def check_silence(self) -> list:
|
||||
"""Return list of (user_id, pcm_bytes) for completed utterances."""
|
||||
now = time.monotonic()
|
||||
@@ -322,6 +351,10 @@ class VoiceReceiver:
|
||||
|
||||
if silence_duration >= self.SILENCE_THRESHOLD and buf_duration >= self.MIN_SPEECH_DURATION:
|
||||
user_id = ssrc_user_map.get(ssrc, 0)
|
||||
if not user_id:
|
||||
# SSRC not mapped (SPEAKING event missing after bot rejoin).
|
||||
# Infer from allowed users in the voice channel.
|
||||
user_id = self._infer_user_for_ssrc(ssrc)
|
||||
if user_id:
|
||||
completed.append((user_id, bytes(buf)))
|
||||
self._buffers[ssrc] = bytearray()
|
||||
@@ -400,6 +433,9 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
self._voice_listen_tasks: Dict[int, asyncio.Task] = {} # guild_id -> listen loop
|
||||
self._voice_input_callback: Optional[Callable] = None # set by run.py
|
||||
self._on_voice_disconnect: Optional[Callable] = None # set by run.py
|
||||
# Track threads where the bot has participated so follow-up messages
|
||||
# in those threads don't require @mention.
|
||||
self._bot_participated_threads: set = set()
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Discord and start receiving events."""
|
||||
@@ -580,7 +616,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
"""Send a message to a Discord channel."""
|
||||
if not self._client:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
|
||||
|
||||
try:
|
||||
# Get the channel
|
||||
channel = self._client.get_channel(int(chat_id))
|
||||
@@ -695,13 +731,14 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
) -> SendResult:
|
||||
"""Play auto-TTS audio.
|
||||
|
||||
When the bot is in a voice channel for this chat's guild, skip the
|
||||
file attachment — the gateway runner plays audio in the VC instead.
|
||||
When the bot is in a voice channel for this chat's guild, play
|
||||
directly in the VC instead of sending as a file attachment.
|
||||
"""
|
||||
for gid, text_ch_id in self._voice_text_channels.items():
|
||||
if str(text_ch_id) == str(chat_id) and self.is_in_voice_channel(gid):
|
||||
logger.debug("[%s] Skipping play_tts for %s — VC playback handled by runner", self.name, chat_id)
|
||||
return SendResult(success=True)
|
||||
logger.info("[%s] Playing TTS in voice channel (guild=%d)", self.name, gid)
|
||||
success = await self.play_in_voice_channel(gid, audio_path)
|
||||
return SendResult(success=success)
|
||||
return await self.send_voice(chat_id=chat_id, audio_path=audio_path, **kwargs)
|
||||
|
||||
async def send_voice(
|
||||
@@ -805,7 +842,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
|
||||
# Start voice receiver (Phase 2: listen to users)
|
||||
try:
|
||||
receiver = VoiceReceiver(vc)
|
||||
receiver = VoiceReceiver(vc, allowed_user_ids=self._allowed_user_ids)
|
||||
receiver.start()
|
||||
self._voice_receivers[guild_id] = receiver
|
||||
self._voice_listen_tasks[guild_id] = asyncio.ensure_future(
|
||||
@@ -1001,14 +1038,32 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
# Voice listening (Phase 2)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# UDP keepalive interval in seconds — prevents Discord from dropping
|
||||
# the UDP route after ~60s of silence.
|
||||
_KEEPALIVE_INTERVAL = 15
|
||||
|
||||
async def _voice_listen_loop(self, guild_id: int):
|
||||
"""Periodically check for completed utterances and process them."""
|
||||
receiver = self._voice_receivers.get(guild_id)
|
||||
if not receiver:
|
||||
return
|
||||
last_keepalive = time.monotonic()
|
||||
try:
|
||||
while receiver._running:
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Send periodic UDP keepalive to prevent Discord from
|
||||
# dropping the UDP session after ~60s of silence.
|
||||
now = time.monotonic()
|
||||
if now - last_keepalive >= self._KEEPALIVE_INTERVAL:
|
||||
last_keepalive = now
|
||||
try:
|
||||
vc = self._voice_clients.get(guild_id)
|
||||
if vc and vc.is_connected():
|
||||
vc._connection.send_packet(b'\xf8\xff\xfe')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
completed = receiver.check_silence()
|
||||
for user_id, pcm_data in completed:
|
||||
if not self._is_allowed_user(str(user_id)):
|
||||
@@ -1746,14 +1801,13 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
async def _handle_message(self, message: DiscordMessage) -> None:
|
||||
"""Handle incoming Discord messages."""
|
||||
# In server channels (not DMs), require the bot to be @mentioned
|
||||
# UNLESS the channel is in the free-response list.
|
||||
# UNLESS the channel is in the free-response list or the message is
|
||||
# in a thread where the bot has already participated.
|
||||
#
|
||||
# Config:
|
||||
# DISCORD_FREE_RESPONSE_CHANNELS: Comma-separated channel IDs where the
|
||||
# bot responds to every message without needing a mention.
|
||||
# DISCORD_REQUIRE_MENTION: Set to "false" to disable mention requirement
|
||||
# globally (all channels become free-response). Default: "true".
|
||||
# Can also be set via discord.require_mention in config.yaml.
|
||||
# Config (all settable via discord.* in config.yaml):
|
||||
# discord.require_mention: Require @mention in server channels (default: true)
|
||||
# discord.free_response_channels: Channel IDs where bot responds without mention
|
||||
# discord.auto_thread: Auto-create thread on @mention in channels (default: true)
|
||||
|
||||
thread_id = None
|
||||
parent_channel_id = None
|
||||
@@ -1772,7 +1826,11 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
require_mention = os.getenv("DISCORD_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
|
||||
is_free_channel = bool(channel_ids & free_channels)
|
||||
|
||||
if require_mention and not is_free_channel:
|
||||
# Skip the mention check if the message is in a thread where
|
||||
# the bot has previously participated (auto-created or replied in).
|
||||
in_bot_thread = is_thread and thread_id in self._bot_participated_threads
|
||||
|
||||
if require_mention and not is_free_channel and not in_bot_thread:
|
||||
if self._client.user not in message.mentions:
|
||||
return
|
||||
|
||||
@@ -1781,17 +1839,18 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
message.content = message.content.replace(f"<@!{self._client.user.id}>", "").strip()
|
||||
|
||||
# Auto-thread: when enabled, automatically create a thread for every
|
||||
# new message in a text channel so each conversation is isolated.
|
||||
# @mention in a text channel so each conversation is isolated (like Slack).
|
||||
# Messages already inside threads or DMs are unaffected.
|
||||
auto_threaded_channel = None
|
||||
if not is_thread and not isinstance(message.channel, discord.DMChannel):
|
||||
auto_thread = os.getenv("DISCORD_AUTO_THREAD", "").lower() in ("true", "1", "yes")
|
||||
auto_thread = os.getenv("DISCORD_AUTO_THREAD", "true").lower() in ("true", "1", "yes")
|
||||
if auto_thread:
|
||||
thread = await self._auto_create_thread(message)
|
||||
if thread:
|
||||
is_thread = True
|
||||
thread_id = str(thread.id)
|
||||
auto_threaded_channel = thread
|
||||
self._bot_participated_threads.add(thread_id)
|
||||
|
||||
# Determine message type
|
||||
msg_type = MessageType.TEXT
|
||||
@@ -1891,7 +1950,12 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
reply_to_message_id=str(message.reference.message_id) if message.reference else None,
|
||||
timestamp=message.created_at,
|
||||
)
|
||||
|
||||
|
||||
# Track thread participation so the bot won't require @mention for
|
||||
# follow-up messages in threads it has already engaged in.
|
||||
if thread_id:
|
||||
self._bot_participated_threads.add(thread_id)
|
||||
|
||||
await self.handle_message(event)
|
||||
|
||||
|
||||
|
||||
@@ -35,16 +35,12 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
# Resolve Hermes home directory (respects HERMES_HOME override)
|
||||
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
|
||||
# Load environment variables from ~/.hermes/.env first
|
||||
from dotenv import load_dotenv
|
||||
# Load environment variables from ~/.hermes/.env first.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from dotenv import load_dotenv # backward-compat for tests that monkeypatch this symbol
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
_env_path = _hermes_home / '.env'
|
||||
if _env_path.exists():
|
||||
try:
|
||||
load_dotenv(_env_path, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(_env_path, encoding="latin-1")
|
||||
# Also try project .env as fallback
|
||||
load_dotenv()
|
||||
load_hermes_dotenv(hermes_home=_hermes_home, project_env=Path(__file__).resolve().parents[1] / '.env')
|
||||
|
||||
# Bridge config.yaml values into the environment so os.getenv() picks them up.
|
||||
# config.yaml is authoritative for terminal settings — overrides .env.
|
||||
@@ -900,8 +896,19 @@ class GatewayRunner:
|
||||
"""Stop the gateway and disconnect all adapters."""
|
||||
logger.info("Stopping gateway...")
|
||||
self._running = False
|
||||
|
||||
|
||||
for session_key, agent in list(self._running_agents.items()):
|
||||
try:
|
||||
agent.interrupt("Gateway shutting down")
|
||||
logger.debug("Interrupted running agent for session %s during shutdown", session_key[:20])
|
||||
except Exception as e:
|
||||
logger.debug("Failed interrupting agent during shutdown: %s", e)
|
||||
|
||||
for platform, adapter in list(self.adapters.items()):
|
||||
try:
|
||||
await adapter.cancel_background_tasks()
|
||||
except Exception as e:
|
||||
logger.debug("✗ %s background-task cancel error: %s", platform.value, e)
|
||||
try:
|
||||
await adapter.disconnect()
|
||||
logger.info("✓ %s disconnected", platform.value)
|
||||
@@ -909,6 +916,9 @@ class GatewayRunner:
|
||||
logger.error("✗ %s disconnect error: %s", platform.value, e)
|
||||
|
||||
self.adapters.clear()
|
||||
self._running_agents.clear()
|
||||
self._pending_messages.clear()
|
||||
self._pending_approvals.clear()
|
||||
self._shutdown_all_gateway_honcho()
|
||||
self._shutdown_event.set()
|
||||
|
||||
@@ -2421,6 +2431,13 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.warning("Failed to join voice channel: %s", e)
|
||||
adapter._voice_input_callback = None
|
||||
err_lower = str(e).lower()
|
||||
if "pynacl" in err_lower or "nacl" in err_lower or "davey" in err_lower:
|
||||
return (
|
||||
"Voice dependencies are missing (PyNaCl / davey). "
|
||||
"Install or reinstall Hermes with the messaging extra, e.g. "
|
||||
"`pip install hermes-agent[messaging]`."
|
||||
)
|
||||
return f"Failed to join voice channel: {e}"
|
||||
|
||||
if success:
|
||||
@@ -2561,18 +2578,9 @@ class GatewayRunner:
|
||||
if has_agent_tts:
|
||||
return False
|
||||
|
||||
# Dedup: base adapter auto-TTS already handles voice input.
|
||||
# Exception: Discord voice channel — play_tts override is a no-op,
|
||||
# so the runner must handle VC playback.
|
||||
skip_double = is_voice_input
|
||||
if skip_double:
|
||||
adapter = self.adapters.get(event.source.platform)
|
||||
guild_id = self._get_guild_id(event)
|
||||
if (guild_id and adapter
|
||||
and hasattr(adapter, "is_in_voice_channel")
|
||||
and adapter.is_in_voice_channel(guild_id)):
|
||||
skip_double = False
|
||||
if skip_double:
|
||||
# Dedup: base adapter auto-TTS already handles voice input
|
||||
# (play_tts plays in VC when connected, so runner can skip).
|
||||
if is_voice_input:
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -3494,10 +3502,12 @@ class GatewayRunner:
|
||||
os.environ["HERMES_SESSION_CHAT_ID"] = context.source.chat_id
|
||||
if context.source.chat_name:
|
||||
os.environ["HERMES_SESSION_CHAT_NAME"] = context.source.chat_name
|
||||
if context.source.thread_id:
|
||||
os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id)
|
||||
|
||||
def _clear_session_env(self) -> None:
|
||||
"""Clear session environment variables."""
|
||||
for var in ["HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"]:
|
||||
for var in ["HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME", "HERMES_SESSION_THREAD_ID"]:
|
||||
if var in os.environ:
|
||||
del os.environ[var]
|
||||
|
||||
|
||||
@@ -280,6 +280,7 @@ DEFAULT_CONFIG = {
|
||||
"discord": {
|
||||
"require_mention": True, # Require @mention to respond in server channels
|
||||
"free_response_channels": "", # Comma-separated channel IDs where bot responds without mention
|
||||
"auto_thread": True, # Auto-create threads on @mention in channels (like Slack)
|
||||
},
|
||||
|
||||
# Permanently allowed dangerous command patterns (added via "always" approval)
|
||||
|
||||
46
hermes_cli/env_loader.py
Normal file
46
hermes_cli/env_loader.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Helpers for loading Hermes .env files consistently across entrypoints."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
def _load_dotenv_with_fallback(path: Path, *, override: bool) -> None:
|
||||
try:
|
||||
load_dotenv(dotenv_path=path, override=override, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=path, override=override, encoding="latin-1")
|
||||
|
||||
|
||||
def load_hermes_dotenv(
|
||||
*,
|
||||
hermes_home: str | os.PathLike | None = None,
|
||||
project_env: str | os.PathLike | None = None,
|
||||
) -> list[Path]:
|
||||
"""Load Hermes environment files with user config taking precedence.
|
||||
|
||||
Behavior:
|
||||
- `~/.hermes/.env` overrides stale shell-exported values when present.
|
||||
- project `.env` acts as a dev fallback and only fills missing values when
|
||||
the user env exists.
|
||||
- if no user env exists, the project `.env` also overrides stale shell vars.
|
||||
"""
|
||||
loaded: list[Path] = []
|
||||
|
||||
home_path = Path(hermes_home or os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
user_env = home_path / ".env"
|
||||
project_env_path = Path(project_env) if project_env else None
|
||||
|
||||
if user_env.exists():
|
||||
_load_dotenv_with_fallback(user_env, override=True)
|
||||
loaded.append(user_env)
|
||||
|
||||
if project_env_path and project_env_path.exists():
|
||||
_load_dotenv_with_fallback(project_env_path, override=not loaded)
|
||||
loaded.append(project_env_path)
|
||||
|
||||
return loaded
|
||||
@@ -54,16 +54,11 @@ from typing import Optional
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback
|
||||
from dotenv import load_dotenv
|
||||
from hermes_cli.config import get_env_path, get_hermes_home
|
||||
_user_env = get_env_path()
|
||||
if _user_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="latin-1")
|
||||
load_dotenv(dotenv_path=PROJECT_ROOT / '.env', override=False)
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from hermes_cli.config import get_hermes_home
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
load_hermes_dotenv(project_env=PROJECT_ROOT / '.env')
|
||||
|
||||
# Point mini-swe-agent at ~/.hermes/ so it shares our config
|
||||
os.environ.setdefault("MSWEA_GLOBAL_CONFIG_DIR", str(get_hermes_home()))
|
||||
|
||||
23
rl_cli.py
23
rl_cli.py
@@ -27,25 +27,16 @@ from pathlib import Path
|
||||
import fire
|
||||
import yaml
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
_user_env = _hermes_home / ".env"
|
||||
_project_env = Path(__file__).parent / '.env'
|
||||
|
||||
if _user_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="latin-1")
|
||||
print(f"✅ Loaded environment variables from {_user_env}")
|
||||
elif _project_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="latin-1")
|
||||
print(f"✅ Loaded environment variables from {_project_env}")
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
|
||||
_loaded_env_paths = load_hermes_dotenv(hermes_home=_hermes_home, project_env=_project_env)
|
||||
for _env_path in _loaded_env_paths:
|
||||
print(f"✅ Loaded environment variables from {_env_path}")
|
||||
|
||||
# Set terminal working directory to tinker-atropos submodule
|
||||
# This ensures terminal commands run in the right context for RL work
|
||||
|
||||
28
run_agent.py
28
run_agent.py
@@ -45,24 +45,16 @@ import fire
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback
|
||||
from dotenv import load_dotenv
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
|
||||
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
_user_env = _hermes_home / ".env"
|
||||
_project_env = Path(__file__).parent / '.env'
|
||||
if _user_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_user_env, encoding="latin-1")
|
||||
logger.info("Loaded environment variables from %s", _user_env)
|
||||
elif _project_env.exists():
|
||||
try:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(dotenv_path=_project_env, encoding="latin-1")
|
||||
logger.info("Loaded environment variables from %s", _project_env)
|
||||
_loaded_env_paths = load_hermes_dotenv(hermes_home=_hermes_home, project_env=_project_env)
|
||||
if _loaded_env_paths:
|
||||
for _env_path in _loaded_env_paths:
|
||||
logger.info("Loaded environment variables from %s", _env_path)
|
||||
else:
|
||||
logger.info("No .env file found. Using system environment variables.")
|
||||
|
||||
@@ -5590,6 +5582,12 @@ class AIAgent:
|
||||
invalid_json_args = []
|
||||
for tc in assistant_message.tool_calls:
|
||||
args = tc.function.arguments
|
||||
if isinstance(args, (dict, list)):
|
||||
tc.function.arguments = json.dumps(args)
|
||||
continue
|
||||
if args is not None and not isinstance(args, str):
|
||||
tc.function.arguments = str(args)
|
||||
args = tc.function.arguments
|
||||
# Treat empty/whitespace strings as empty object
|
||||
if not args or not args.strip():
|
||||
tc.function.arguments = "{}"
|
||||
|
||||
389
scripts/discord-voice-doctor.py
Executable file
389
scripts/discord-voice-doctor.py
Executable file
@@ -0,0 +1,389 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Discord Voice Doctor — diagnostic tool for voice channel support.
|
||||
|
||||
Checks all dependencies, configuration, and bot permissions needed
|
||||
for Discord voice mode to work correctly.
|
||||
|
||||
Usage:
|
||||
python scripts/discord-voice-doctor.py
|
||||
.venv/bin/python scripts/discord-voice-doctor.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
# Resolve project root
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
PROJECT_ROOT = SCRIPT_DIR.parent
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
ENV_FILE = HERMES_HOME / ".env"
|
||||
|
||||
OK = "\033[92m\u2713\033[0m"
|
||||
FAIL = "\033[91m\u2717\033[0m"
|
||||
WARN = "\033[93m!\033[0m"
|
||||
|
||||
# Track whether discord.py is available for later sections
|
||||
_discord_available = False
|
||||
|
||||
|
||||
def mask(value):
|
||||
"""Mask sensitive value: show only first 4 chars."""
|
||||
if not value or len(value) < 8:
|
||||
return "****"
|
||||
return f"{value[:4]}{'*' * (len(value) - 4)}"
|
||||
|
||||
|
||||
def check(label, ok, detail=""):
|
||||
symbol = OK if ok else FAIL
|
||||
msg = f" {symbol} {label}"
|
||||
if detail:
|
||||
msg += f" ({detail})"
|
||||
print(msg)
|
||||
return ok
|
||||
|
||||
|
||||
def warn(label, detail=""):
|
||||
msg = f" {WARN} {label}"
|
||||
if detail:
|
||||
msg += f" ({detail})"
|
||||
print(msg)
|
||||
|
||||
|
||||
def section(title):
|
||||
print(f"\n\033[1m{title}\033[0m")
|
||||
|
||||
|
||||
def check_packages():
|
||||
"""Check Python package dependencies. Returns True if all critical deps OK."""
|
||||
global _discord_available
|
||||
section("Python Packages")
|
||||
ok = True
|
||||
|
||||
# discord.py
|
||||
try:
|
||||
import discord
|
||||
_discord_available = True
|
||||
check("discord.py", True, f"v{discord.__version__}")
|
||||
except ImportError:
|
||||
check("discord.py", False, "pip install discord.py[voice]")
|
||||
ok = False
|
||||
|
||||
# PyNaCl
|
||||
try:
|
||||
import nacl
|
||||
ver = getattr(nacl, "__version__", "unknown")
|
||||
try:
|
||||
import nacl.secret
|
||||
nacl.secret.Aead(bytes(32))
|
||||
check("PyNaCl", True, f"v{ver}")
|
||||
except (AttributeError, Exception):
|
||||
check("PyNaCl (Aead)", False, f"v{ver} — need >=1.5.0")
|
||||
ok = False
|
||||
except ImportError:
|
||||
check("PyNaCl", False, "pip install PyNaCl>=1.5.0")
|
||||
ok = False
|
||||
|
||||
# davey (DAVE E2EE)
|
||||
try:
|
||||
import davey
|
||||
check("davey (DAVE E2EE)", True, f"v{getattr(davey, '__version__', '?')}")
|
||||
except ImportError:
|
||||
check("davey (DAVE E2EE)", False, "pip install davey")
|
||||
ok = False
|
||||
|
||||
# Optional: local STT
|
||||
try:
|
||||
import faster_whisper
|
||||
check("faster-whisper (local STT)", True)
|
||||
except ImportError:
|
||||
warn("faster-whisper (local STT)", "not installed — local STT unavailable")
|
||||
|
||||
# Optional: TTS providers
|
||||
try:
|
||||
import edge_tts
|
||||
check("edge-tts", True)
|
||||
except ImportError:
|
||||
warn("edge-tts", "not installed — edge TTS unavailable")
|
||||
|
||||
try:
|
||||
import elevenlabs
|
||||
check("elevenlabs SDK", True)
|
||||
except ImportError:
|
||||
warn("elevenlabs SDK", "not installed — premium TTS unavailable")
|
||||
|
||||
return ok
|
||||
|
||||
|
||||
def check_system_tools():
|
||||
"""Check system-level tools (opus, ffmpeg). Returns True if all OK."""
|
||||
section("System Tools")
|
||||
ok = True
|
||||
|
||||
# Opus codec
|
||||
if _discord_available:
|
||||
try:
|
||||
import discord
|
||||
opus_loaded = discord.opus.is_loaded()
|
||||
if not opus_loaded:
|
||||
import ctypes.util
|
||||
opus_path = ctypes.util.find_library("opus")
|
||||
if not opus_path:
|
||||
# Platform-specific fallback paths
|
||||
candidates = [
|
||||
"/opt/homebrew/lib/libopus.dylib", # macOS Apple Silicon
|
||||
"/usr/local/lib/libopus.dylib", # macOS Intel
|
||||
"/usr/lib/x86_64-linux-gnu/libopus.so.0", # Debian/Ubuntu x86
|
||||
"/usr/lib/aarch64-linux-gnu/libopus.so.0", # Debian/Ubuntu ARM
|
||||
"/usr/lib/libopus.so", # Arch Linux
|
||||
"/usr/lib64/libopus.so", # RHEL/Fedora
|
||||
]
|
||||
for p in candidates:
|
||||
if os.path.isfile(p):
|
||||
opus_path = p
|
||||
break
|
||||
if opus_path:
|
||||
discord.opus.load_opus(opus_path)
|
||||
opus_loaded = discord.opus.is_loaded()
|
||||
if opus_loaded:
|
||||
check("Opus codec", True)
|
||||
else:
|
||||
check("Opus codec", False, "brew install opus / apt install libopus0")
|
||||
ok = False
|
||||
except Exception as e:
|
||||
check("Opus codec", False, str(e))
|
||||
ok = False
|
||||
else:
|
||||
warn("Opus codec", "skipped — discord.py not installed")
|
||||
|
||||
# ffmpeg
|
||||
ffmpeg_path = shutil.which("ffmpeg")
|
||||
if ffmpeg_path:
|
||||
check("ffmpeg", True, ffmpeg_path)
|
||||
else:
|
||||
check("ffmpeg", False, "brew install ffmpeg / apt install ffmpeg")
|
||||
ok = False
|
||||
|
||||
return ok
|
||||
|
||||
|
||||
def check_env_vars():
|
||||
"""Check environment variables. Returns (ok, token, groq_key, eleven_key)."""
|
||||
section("Environment Variables")
|
||||
|
||||
# Load .env
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
if ENV_FILE.exists():
|
||||
load_dotenv(ENV_FILE)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
ok = True
|
||||
|
||||
token = os.getenv("DISCORD_BOT_TOKEN", "")
|
||||
if token:
|
||||
check("DISCORD_BOT_TOKEN", True, mask(token))
|
||||
else:
|
||||
check("DISCORD_BOT_TOKEN", False, "not set")
|
||||
ok = False
|
||||
|
||||
# Allowed users — resolve usernames if possible
|
||||
allowed = os.getenv("DISCORD_ALLOWED_USERS", "")
|
||||
if allowed:
|
||||
users = [u.strip() for u in allowed.split(",") if u.strip()]
|
||||
user_labels = []
|
||||
for uid in users:
|
||||
label = mask(uid)
|
||||
if token and uid.isdigit():
|
||||
try:
|
||||
import requests
|
||||
r = requests.get(
|
||||
f"https://discord.com/api/v10/users/{uid}",
|
||||
headers={"Authorization": f"Bot {token}"},
|
||||
timeout=3,
|
||||
)
|
||||
if r.status_code == 200:
|
||||
label = f"{r.json().get('username', '?')} ({mask(uid)})"
|
||||
except Exception:
|
||||
pass
|
||||
user_labels.append(label)
|
||||
check("DISCORD_ALLOWED_USERS", True, f"{len(users)} user(s): {', '.join(user_labels)}")
|
||||
else:
|
||||
warn("DISCORD_ALLOWED_USERS", "not set — all users can use voice")
|
||||
|
||||
groq_key = os.getenv("GROQ_API_KEY", "")
|
||||
eleven_key = os.getenv("ELEVENLABS_API_KEY", "")
|
||||
|
||||
if groq_key:
|
||||
check("GROQ_API_KEY (STT)", True, mask(groq_key))
|
||||
else:
|
||||
warn("GROQ_API_KEY", "not set — Groq STT unavailable")
|
||||
|
||||
if eleven_key:
|
||||
check("ELEVENLABS_API_KEY (TTS)", True, mask(eleven_key))
|
||||
else:
|
||||
warn("ELEVENLABS_API_KEY", "not set — ElevenLabs TTS unavailable")
|
||||
|
||||
return ok, token, groq_key, eleven_key
|
||||
|
||||
|
||||
def check_config(groq_key, eleven_key):
|
||||
"""Check hermes config.yaml."""
|
||||
section("Configuration")
|
||||
|
||||
config_path = HERMES_HOME / "config.yaml"
|
||||
if config_path.exists():
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
|
||||
stt_provider = cfg.get("stt", {}).get("provider", "local")
|
||||
tts_provider = cfg.get("tts", {}).get("provider", "edge")
|
||||
check("STT provider", True, stt_provider)
|
||||
check("TTS provider", True, tts_provider)
|
||||
|
||||
if stt_provider == "groq" and not groq_key:
|
||||
warn("STT config says groq but GROQ_API_KEY is missing")
|
||||
if tts_provider == "elevenlabs" and not eleven_key:
|
||||
warn("TTS config says elevenlabs but ELEVENLABS_API_KEY is missing")
|
||||
except Exception as e:
|
||||
warn("config.yaml", f"parse error: {e}")
|
||||
else:
|
||||
warn("config.yaml", "not found — using defaults")
|
||||
|
||||
# Voice mode state
|
||||
voice_mode_path = HERMES_HOME / "gateway_voice_mode.json"
|
||||
if voice_mode_path.exists():
|
||||
try:
|
||||
import json
|
||||
modes = json.loads(voice_mode_path.read_text())
|
||||
off_count = sum(1 for v in modes.values() if v == "off")
|
||||
all_count = sum(1 for v in modes.values() if v == "all")
|
||||
check("Voice mode state", True, f"{all_count} on, {off_count} off, {len(modes)} total")
|
||||
except Exception:
|
||||
warn("Voice mode state", "parse error")
|
||||
else:
|
||||
check("Voice mode state", True, "no saved state (fresh)")
|
||||
|
||||
|
||||
def check_bot_permissions(token):
|
||||
"""Check bot permissions via Discord API. Returns True if all OK."""
|
||||
section("Bot Permissions")
|
||||
|
||||
if not token:
|
||||
warn("Bot permissions", "no token — skipping")
|
||||
return True
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
warn("Bot permissions", "requests not installed — skipping")
|
||||
return True
|
||||
|
||||
VOICE_PERMS = {
|
||||
"Priority Speaker": 8,
|
||||
"Stream": 9,
|
||||
"View Channel": 10,
|
||||
"Send Messages": 11,
|
||||
"Embed Links": 14,
|
||||
"Attach Files": 15,
|
||||
"Read Message History": 16,
|
||||
"Connect": 20,
|
||||
"Speak": 21,
|
||||
"Mute Members": 22,
|
||||
"Deafen Members": 23,
|
||||
"Move Members": 24,
|
||||
"Use VAD": 25,
|
||||
"Send Voice Messages": 46,
|
||||
}
|
||||
REQUIRED_PERMS = {"Connect", "Speak", "View Channel", "Send Messages"}
|
||||
ok = True
|
||||
|
||||
try:
|
||||
headers = {"Authorization": f"Bot {token}"}
|
||||
r = requests.get("https://discord.com/api/v10/users/@me", headers=headers, timeout=5)
|
||||
|
||||
if r.status_code == 401:
|
||||
check("Bot login", False, "invalid token (401)")
|
||||
return False
|
||||
if r.status_code != 200:
|
||||
check("Bot login", False, f"HTTP {r.status_code}")
|
||||
return False
|
||||
|
||||
bot = r.json()
|
||||
bot_name = bot.get("username", "?")
|
||||
check("Bot login", True, f"{bot_name[:3]}{'*' * (len(bot_name) - 3)}")
|
||||
|
||||
# Check guilds
|
||||
r2 = requests.get("https://discord.com/api/v10/users/@me/guilds", headers=headers, timeout=5)
|
||||
if r2.status_code != 200:
|
||||
warn("Guilds", f"HTTP {r2.status_code}")
|
||||
return ok
|
||||
|
||||
guilds = r2.json()
|
||||
check("Guilds", True, f"{len(guilds)} guild(s)")
|
||||
|
||||
for g in guilds[:5]:
|
||||
perms = int(g.get("permissions", 0))
|
||||
is_admin = bool(perms & (1 << 3))
|
||||
|
||||
if is_admin:
|
||||
print(f" {OK} {g['name']}: Administrator (all permissions)")
|
||||
continue
|
||||
|
||||
has = []
|
||||
missing = []
|
||||
for name, bit in sorted(VOICE_PERMS.items(), key=lambda x: x[1]):
|
||||
if perms & (1 << bit):
|
||||
has.append(name)
|
||||
elif name in REQUIRED_PERMS:
|
||||
missing.append(name)
|
||||
|
||||
if missing:
|
||||
print(f" {FAIL} {g['name']}: missing {', '.join(missing)}")
|
||||
ok = False
|
||||
else:
|
||||
print(f" {OK} {g['name']}: {', '.join(has)}")
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
warn("Bot permissions", "Discord API timeout")
|
||||
except requests.exceptions.ConnectionError:
|
||||
warn("Bot permissions", "cannot reach Discord API")
|
||||
except Exception as e:
|
||||
warn("Bot permissions", f"check failed: {e}")
|
||||
|
||||
return ok
|
||||
|
||||
|
||||
def main():
|
||||
print()
|
||||
print("\033[1m" + "=" * 50 + "\033[0m")
|
||||
print("\033[1m Discord Voice Doctor\033[0m")
|
||||
print("\033[1m" + "=" * 50 + "\033[0m")
|
||||
|
||||
all_ok = True
|
||||
|
||||
all_ok &= check_packages()
|
||||
all_ok &= check_system_tools()
|
||||
env_ok, token, groq_key, eleven_key = check_env_vars()
|
||||
all_ok &= env_ok
|
||||
check_config(groq_key, eleven_key)
|
||||
all_ok &= check_bot_permissions(token)
|
||||
|
||||
# Summary
|
||||
print()
|
||||
print("\033[1m" + "-" * 50 + "\033[0m")
|
||||
if all_ok:
|
||||
print(f" {OK} \033[92mAll checks passed — voice mode ready!\033[0m")
|
||||
else:
|
||||
print(f" {FAIL} \033[91mSome checks failed — fix issues above.\033[0m")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -252,3 +252,109 @@ async def test_discord_dms_ignore_mention_requirement(adapter, monkeypatch):
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert event.text == "dm without mention"
|
||||
assert event.source.chat_type == "dm"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_auto_thread_enabled_by_default(adapter, monkeypatch):
|
||||
"""Auto-threading should be enabled by default (DISCORD_AUTO_THREAD defaults to 'true')."""
|
||||
monkeypatch.delenv("DISCORD_AUTO_THREAD", raising=False)
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
|
||||
# Patch _auto_create_thread to return a fake thread
|
||||
fake_thread = FakeThread(channel_id=999, name="auto-thread")
|
||||
adapter._auto_create_thread = AsyncMock(return_value=fake_thread)
|
||||
|
||||
message = make_message(channel=FakeTextChannel(channel_id=123), content="hello")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
adapter._auto_create_thread.assert_awaited_once()
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert event.source.chat_type == "thread"
|
||||
assert event.source.thread_id == "999"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_auto_thread_can_be_disabled(adapter, monkeypatch):
|
||||
"""Setting auto_thread to false skips thread creation."""
|
||||
monkeypatch.setenv("DISCORD_AUTO_THREAD", "false")
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
|
||||
adapter._auto_create_thread = AsyncMock()
|
||||
|
||||
message = make_message(channel=FakeTextChannel(channel_id=123), content="hello")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
adapter._auto_create_thread.assert_not_awaited()
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert event.source.chat_type == "group"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_bot_thread_skips_mention_requirement(adapter, monkeypatch):
|
||||
"""Messages in a thread the bot has participated in should not require @mention."""
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
|
||||
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
|
||||
monkeypatch.setenv("DISCORD_AUTO_THREAD", "false")
|
||||
|
||||
# Simulate bot having previously participated in thread 456
|
||||
adapter._bot_participated_threads.add("456")
|
||||
|
||||
thread = FakeThread(channel_id=456, name="existing thread")
|
||||
message = make_message(channel=thread, content="follow-up without mention")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert event.text == "follow-up without mention"
|
||||
assert event.source.chat_type == "thread"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_unknown_thread_still_requires_mention(adapter, monkeypatch):
|
||||
"""Messages in a thread the bot hasn't participated in should still require @mention."""
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
|
||||
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
|
||||
monkeypatch.setenv("DISCORD_AUTO_THREAD", "false")
|
||||
|
||||
# Bot has NOT participated in thread 789
|
||||
thread = FakeThread(channel_id=789, name="some thread")
|
||||
message = make_message(channel=thread, content="hello from unknown thread")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
adapter.handle_message.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_auto_thread_tracks_participation(adapter, monkeypatch):
|
||||
"""Auto-created threads should be tracked for future mention-free replies."""
|
||||
monkeypatch.delenv("DISCORD_AUTO_THREAD", raising=False)
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
|
||||
fake_thread = FakeThread(channel_id=555, name="auto-thread")
|
||||
adapter._auto_create_thread = AsyncMock(return_value=fake_thread)
|
||||
|
||||
message = make_message(channel=FakeTextChannel(channel_id=123), content="start a thread")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
assert "555" in adapter._bot_participated_threads
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_thread_participation_tracked_on_dispatch(adapter, monkeypatch):
|
||||
"""When the bot processes a message in a thread, it tracks participation."""
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
monkeypatch.setenv("DISCORD_AUTO_THREAD", "false")
|
||||
|
||||
thread = FakeThread(channel_id=777, name="manually created thread")
|
||||
message = make_message(channel=thread, content="hello in thread")
|
||||
|
||||
await adapter._handle_message(message)
|
||||
|
||||
assert "777" in adapter._bot_participated_threads
|
||||
|
||||
@@ -363,11 +363,37 @@ async def test_auto_thread_creates_thread_and_redirects(adapter, monkeypatch):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_disabled_by_default(adapter, monkeypatch):
|
||||
"""Without DISCORD_AUTO_THREAD, messages stay in the channel."""
|
||||
async def test_auto_thread_enabled_by_default_slash_commands(adapter, monkeypatch):
|
||||
"""Without DISCORD_AUTO_THREAD env var, auto-threading is enabled (default: true)."""
|
||||
monkeypatch.delenv("DISCORD_AUTO_THREAD", raising=False)
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
|
||||
fake_thread = _FakeThreadChannel(channel_id=999, name="auto-thread")
|
||||
adapter._auto_create_thread = AsyncMock(return_value=fake_thread)
|
||||
|
||||
captured_events = []
|
||||
|
||||
async def capture_handle(event):
|
||||
captured_events.append(event)
|
||||
|
||||
adapter.handle_message = capture_handle
|
||||
|
||||
msg = _fake_message(_FakeTextChannel())
|
||||
|
||||
await adapter._handle_message(msg)
|
||||
|
||||
adapter._auto_create_thread.assert_awaited_once()
|
||||
assert len(captured_events) == 1
|
||||
assert captured_events[0].source.chat_id == "999" # redirected to thread
|
||||
assert captured_events[0].source.chat_type == "thread"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_can_be_disabled(adapter, monkeypatch):
|
||||
"""Setting DISCORD_AUTO_THREAD=false keeps messages in the channel."""
|
||||
monkeypatch.setenv("DISCORD_AUTO_THREAD", "false")
|
||||
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
|
||||
|
||||
adapter._auto_create_thread = AsyncMock()
|
||||
|
||||
captured_events = []
|
||||
|
||||
106
tests/gateway/test_gateway_shutdown.py
Normal file
106
tests/gateway/test_gateway_shutdown.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
|
||||
from gateway.run import GatewayRunner
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
class StubAdapter(BasePlatformAdapter):
|
||||
def __init__(self):
|
||||
super().__init__(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM)
|
||||
|
||||
async def connect(self):
|
||||
return True
|
||||
|
||||
async def disconnect(self):
|
||||
return None
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||
return SendResult(success=True, message_id="1")
|
||||
|
||||
async def send_typing(self, chat_id, metadata=None):
|
||||
return None
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"id": chat_id}
|
||||
|
||||
|
||||
def _source(chat_id="123456", chat_type="dm"):
|
||||
return SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id=chat_id,
|
||||
chat_type=chat_type,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_background_tasks_cancels_inflight_message_processing():
|
||||
adapter = StubAdapter()
|
||||
release = asyncio.Event()
|
||||
|
||||
async def block_forever(_event):
|
||||
await release.wait()
|
||||
return None
|
||||
|
||||
adapter.set_message_handler(block_forever)
|
||||
event = MessageEvent(text="work", source=_source(), message_id="1")
|
||||
|
||||
await adapter.handle_message(event)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
session_key = build_session_key(event.source)
|
||||
assert session_key in adapter._active_sessions
|
||||
assert adapter._background_tasks
|
||||
|
||||
await adapter.cancel_background_tasks()
|
||||
|
||||
assert adapter._background_tasks == set()
|
||||
assert adapter._active_sessions == {}
|
||||
assert adapter._pending_messages == {}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks():
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")})
|
||||
runner._running = True
|
||||
runner._shutdown_event = asyncio.Event()
|
||||
runner._exit_reason = None
|
||||
runner._pending_messages = {"session": "pending text"}
|
||||
runner._pending_approvals = {"session": {"command": "rm -rf /tmp/x"}}
|
||||
runner._shutdown_all_gateway_honcho = lambda: None
|
||||
|
||||
adapter = StubAdapter()
|
||||
release = asyncio.Event()
|
||||
|
||||
async def block_forever(_event):
|
||||
await release.wait()
|
||||
return None
|
||||
|
||||
adapter.set_message_handler(block_forever)
|
||||
event = MessageEvent(text="work", source=_source(), message_id="1")
|
||||
await adapter.handle_message(event)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
disconnect_mock = AsyncMock()
|
||||
adapter.disconnect = disconnect_mock
|
||||
|
||||
session_key = build_session_key(event.source)
|
||||
running_agent = MagicMock()
|
||||
runner._running_agents = {session_key: running_agent}
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
|
||||
with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"):
|
||||
await runner.stop()
|
||||
|
||||
running_agent.interrupt.assert_called_once_with("Gateway shutting down")
|
||||
disconnect_mock.assert_awaited_once()
|
||||
assert runner.adapters == {}
|
||||
assert runner._running_agents == {}
|
||||
assert runner._pending_messages == {}
|
||||
assert runner._pending_approvals == {}
|
||||
assert runner._shutdown_event.is_set() is True
|
||||
45
tests/gateway/test_session_env.py
Normal file
45
tests/gateway/test_session_env.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import os
|
||||
|
||||
from gateway.config import Platform
|
||||
from gateway.run import GatewayRunner
|
||||
from gateway.session import SessionContext, SessionSource
|
||||
|
||||
|
||||
def test_set_session_env_includes_thread_id(monkeypatch):
|
||||
runner = object.__new__(GatewayRunner)
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="-1001",
|
||||
chat_name="Group",
|
||||
chat_type="group",
|
||||
thread_id="17585",
|
||||
)
|
||||
context = SessionContext(source=source, connected_platforms=[], home_channels={})
|
||||
|
||||
monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False)
|
||||
monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False)
|
||||
monkeypatch.delenv("HERMES_SESSION_CHAT_NAME", raising=False)
|
||||
monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False)
|
||||
|
||||
runner._set_session_env(context)
|
||||
|
||||
assert os.getenv("HERMES_SESSION_PLATFORM") == "telegram"
|
||||
assert os.getenv("HERMES_SESSION_CHAT_ID") == "-1001"
|
||||
assert os.getenv("HERMES_SESSION_CHAT_NAME") == "Group"
|
||||
assert os.getenv("HERMES_SESSION_THREAD_ID") == "17585"
|
||||
|
||||
|
||||
def test_clear_session_env_removes_thread_id(monkeypatch):
|
||||
runner = object.__new__(GatewayRunner)
|
||||
|
||||
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram")
|
||||
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "-1001")
|
||||
monkeypatch.setenv("HERMES_SESSION_CHAT_NAME", "Group")
|
||||
monkeypatch.setenv("HERMES_SESSION_THREAD_ID", "17585")
|
||||
|
||||
runner._clear_session_env()
|
||||
|
||||
assert os.getenv("HERMES_SESSION_PLATFORM") is None
|
||||
assert os.getenv("HERMES_SESSION_CHAT_ID") is None
|
||||
assert os.getenv("HERMES_SESSION_CHAT_NAME") is None
|
||||
assert os.getenv("HERMES_SESSION_THREAD_ID") is None
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Tests for the /voice command and auto voice reply in the gateway."""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
@@ -206,9 +207,11 @@ class TestAutoVoiceReply:
|
||||
2. gateway _send_voice_reply: fires based on voice_mode setting
|
||||
|
||||
To prevent double audio, _send_voice_reply is skipped when voice input
|
||||
already triggered base adapter auto-TTS (skip_double = is_voice_input).
|
||||
Exception: Discord voice channel — both auto-TTS and Discord play_tts
|
||||
override skip, so the runner must handle it via play_in_voice_channel.
|
||||
already triggered base adapter auto-TTS.
|
||||
|
||||
For Discord voice channels, the base adapter now routes play_tts directly
|
||||
into VC playback, so the runner should still skip voice-input follow-ups to
|
||||
avoid double playback.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
@@ -292,14 +295,14 @@ class TestAutoVoiceReply:
|
||||
|
||||
# -- Discord VC exception: runner must handle --------------------------
|
||||
|
||||
def test_discord_vc_voice_input_runner_fires(self, runner):
|
||||
"""Discord VC + voice input: base play_tts skips (VC override),
|
||||
so runner must handle via play_in_voice_channel."""
|
||||
assert self._call(runner, "all", MessageType.VOICE, in_voice_channel=True) is True
|
||||
def test_discord_vc_voice_input_base_handles(self, runner):
|
||||
"""Discord VC + voice input: base adapter play_tts plays in VC,
|
||||
so runner skips to avoid double playback."""
|
||||
assert self._call(runner, "all", MessageType.VOICE, in_voice_channel=True) is False
|
||||
|
||||
def test_discord_vc_voice_only_runner_fires(self, runner):
|
||||
"""Discord VC + voice_only + voice: runner must handle."""
|
||||
assert self._call(runner, "voice_only", MessageType.VOICE, in_voice_channel=True) is True
|
||||
def test_discord_vc_voice_only_base_handles(self, runner):
|
||||
"""Discord VC + voice_only + voice: base adapter handles."""
|
||||
assert self._call(runner, "voice_only", MessageType.VOICE, in_voice_channel=True) is False
|
||||
|
||||
# -- Edge cases --------------------------------------------------------
|
||||
|
||||
@@ -422,17 +425,23 @@ class TestDiscordPlayTtsSkip:
|
||||
return adapter
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_tts_skipped_when_in_vc(self):
|
||||
async def test_play_tts_plays_in_vc_when_connected(self):
|
||||
adapter = self._make_discord_adapter()
|
||||
# Simulate bot in voice channel for guild 111, text channel 123
|
||||
mock_vc = MagicMock()
|
||||
mock_vc.is_connected.return_value = True
|
||||
mock_vc.is_playing.return_value = False
|
||||
adapter._voice_clients[111] = mock_vc
|
||||
adapter._voice_text_channels[111] = 123
|
||||
|
||||
# Mock play_in_voice_channel to avoid actual ffmpeg call
|
||||
async def fake_play(gid, path):
|
||||
return True
|
||||
adapter.play_in_voice_channel = fake_play
|
||||
|
||||
result = await adapter.play_tts(chat_id="123", audio_path="/tmp/test.ogg")
|
||||
# play_tts now plays in VC instead of being a no-op
|
||||
assert result.success is True
|
||||
# send_voice should NOT have been called (no client, would fail)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_tts_not_skipped_when_not_in_vc(self):
|
||||
@@ -728,6 +737,24 @@ class TestVoiceChannelCommands:
|
||||
result = await runner._handle_voice_channel_join(event)
|
||||
assert "failed" in result.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_join_missing_voice_dependencies(self, runner):
|
||||
"""Missing PyNaCl/davey should return a user-actionable install hint."""
|
||||
mock_channel = MagicMock()
|
||||
mock_channel.name = "General"
|
||||
mock_adapter = AsyncMock()
|
||||
mock_adapter.join_voice_channel = AsyncMock(
|
||||
side_effect=RuntimeError("PyNaCl library needed in order to use voice")
|
||||
)
|
||||
mock_adapter.get_user_voice_channel = AsyncMock(return_value=mock_channel)
|
||||
event = self._make_discord_event()
|
||||
runner.adapters[event.source.platform] = mock_adapter
|
||||
|
||||
result = await runner._handle_voice_channel_join(event)
|
||||
|
||||
assert "voice dependencies are missing" in result.lower()
|
||||
assert "hermes-agent[messaging]" in result
|
||||
|
||||
# -- _handle_voice_channel_leave --
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -2031,3 +2058,534 @@ class TestDisconnectVoiceCleanup:
|
||||
assert len(adapter._voice_receivers) == 0
|
||||
assert len(adapter._voice_listen_tasks) == 0
|
||||
assert len(adapter._voice_timeout_tasks) == 0
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Discord Voice Channel Flow Tests
|
||||
# =====================================================================
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
importlib.util.find_spec("nacl") is None,
|
||||
reason="PyNaCl not installed",
|
||||
)
|
||||
class TestVoiceReception:
|
||||
"""Audio reception: SSRC mapping, DAVE passthrough, buffer lifecycle."""
|
||||
|
||||
@staticmethod
|
||||
def _make_receiver(allowed_ids=None, members=None, dave=False, bot_id=9999):
|
||||
from gateway.platforms.discord import VoiceReceiver
|
||||
vc = MagicMock()
|
||||
vc._connection.secret_key = [0] * 32
|
||||
vc._connection.dave_session = MagicMock() if dave else None
|
||||
vc._connection.ssrc = bot_id
|
||||
vc._connection.add_socket_listener = MagicMock()
|
||||
vc._connection.remove_socket_listener = MagicMock()
|
||||
vc._connection.hook = None
|
||||
vc.user = SimpleNamespace(id=bot_id)
|
||||
vc.channel = MagicMock()
|
||||
vc.channel.members = members or []
|
||||
receiver = VoiceReceiver(vc, allowed_user_ids=allowed_ids)
|
||||
return receiver
|
||||
|
||||
@staticmethod
|
||||
def _fill_buffer(receiver, ssrc, duration_s=1.0, age_s=3.0):
|
||||
"""Add PCM data to buffer. 48kHz stereo 16-bit = 192000 bytes/sec."""
|
||||
size = int(192000 * duration_s)
|
||||
receiver._buffers[ssrc] = bytearray(b"\x00" * size)
|
||||
receiver._last_packet_time[ssrc] = time.monotonic() - age_s
|
||||
|
||||
# -- Known SSRC (normal flow) --
|
||||
|
||||
def test_known_ssrc_returns_completed(self):
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver.map_ssrc(100, 42)
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
assert len(receiver._buffers[100]) == 0 # cleared
|
||||
|
||||
def test_known_ssrc_short_buffer_ignored(self):
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver.map_ssrc(100, 42)
|
||||
self._fill_buffer(receiver, 100, duration_s=0.1) # too short
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
def test_known_ssrc_recent_audio_waits(self):
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver.map_ssrc(100, 42)
|
||||
self._fill_buffer(receiver, 100, age_s=0.0) # just arrived
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
# -- Unknown SSRC + DAVE passthrough --
|
||||
|
||||
def test_unknown_ssrc_no_automap_no_completed(self):
|
||||
"""Unknown SSRC, no members to infer — buffer cleared, not returned."""
|
||||
receiver = self._make_receiver(dave=True, members=[])
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
assert len(receiver._buffers[100]) == 0
|
||||
|
||||
def test_unknown_ssrc_late_speaking_event(self):
|
||||
"""Audio buffered before SPEAKING → SPEAKING maps → next check returns it."""
|
||||
receiver = self._make_receiver(dave=True)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100, age_s=0.0) # still receiving
|
||||
# No user yet
|
||||
assert receiver.check_silence() == []
|
||||
# SPEAKING event arrives
|
||||
receiver.map_ssrc(100, 42)
|
||||
# Silence kicks in
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
# -- SSRC auto-mapping --
|
||||
|
||||
def test_automap_single_allowed_user(self):
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = self._make_receiver(allowed_ids={"42"}, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
assert receiver._ssrc_to_user[100] == 42
|
||||
|
||||
def test_automap_multiple_allowed_users_no_map(self):
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
SimpleNamespace(id=43, name="Bob"),
|
||||
]
|
||||
receiver = self._make_receiver(allowed_ids={"42", "43"}, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
def test_automap_no_allowlist_single_member(self):
|
||||
"""No allowed_user_ids → sole non-bot member inferred."""
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = self._make_receiver(allowed_ids=None, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
def test_automap_unallowed_user_rejected(self):
|
||||
"""User in channel but not in allowed list — not mapped."""
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = self._make_receiver(allowed_ids={"99"}, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
def test_automap_only_bot_in_channel(self):
|
||||
"""Only bot in channel — no one to map to."""
|
||||
members = [SimpleNamespace(id=9999, name="Bot")]
|
||||
receiver = self._make_receiver(allowed_ids=None, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
def test_automap_persists_across_calls(self):
|
||||
"""Auto-mapped SSRC stays mapped for subsequent checks."""
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = self._make_receiver(allowed_ids={"42"}, members=members)
|
||||
receiver.start()
|
||||
self._fill_buffer(receiver, 100)
|
||||
receiver.check_silence()
|
||||
assert receiver._ssrc_to_user[100] == 42
|
||||
# Second utterance — should use cached mapping
|
||||
self._fill_buffer(receiver, 100)
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
# -- Stale buffer cleanup --
|
||||
|
||||
def test_stale_unknown_buffer_discarded(self):
|
||||
"""Buffer with no user and very old timestamp is discarded."""
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver._buffers[200] = bytearray(b"\x00" * 100)
|
||||
receiver._last_packet_time[200] = time.monotonic() - 10.0
|
||||
receiver.check_silence()
|
||||
assert 200 not in receiver._buffers
|
||||
|
||||
# -- Pause / resume (echo prevention) --
|
||||
|
||||
def test_paused_receiver_ignores_packets(self):
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver.pause()
|
||||
receiver._on_packet(b"\x00" * 100)
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
def test_resumed_receiver_accepts_packets(self):
|
||||
receiver = self._make_receiver()
|
||||
receiver.start()
|
||||
receiver.pause()
|
||||
receiver.resume()
|
||||
assert receiver._paused is False
|
||||
|
||||
# -- _on_packet DAVE passthrough behavior --
|
||||
|
||||
def _make_receiver_with_nacl(self, dave_session=None, mapped_ssrcs=None):
|
||||
"""Create a receiver that can process _on_packet with mocked NaCl + Opus."""
|
||||
from gateway.platforms.discord import VoiceReceiver
|
||||
vc = MagicMock()
|
||||
vc._connection.secret_key = [0] * 32
|
||||
vc._connection.dave_session = dave_session
|
||||
vc._connection.ssrc = 9999
|
||||
vc._connection.add_socket_listener = MagicMock()
|
||||
vc._connection.remove_socket_listener = MagicMock()
|
||||
vc._connection.hook = None
|
||||
vc.user = SimpleNamespace(id=9999)
|
||||
vc.channel = MagicMock()
|
||||
vc.channel.members = []
|
||||
receiver = VoiceReceiver(vc)
|
||||
receiver.start()
|
||||
# Pre-map SSRCs if provided
|
||||
if mapped_ssrcs:
|
||||
for ssrc, uid in mapped_ssrcs.items():
|
||||
receiver.map_ssrc(ssrc, uid)
|
||||
return receiver
|
||||
|
||||
@staticmethod
|
||||
def _build_rtp_packet(ssrc=100, seq=1, timestamp=960):
|
||||
"""Build a minimal valid RTP packet for _on_packet.
|
||||
|
||||
We need: RTP header (12 bytes) + encrypted payload + 4-byte nonce.
|
||||
NaCl decrypt is mocked so payload content doesn't matter.
|
||||
"""
|
||||
import struct
|
||||
# RTP header: version=2, payload_type=0x78, no extension, no CSRC
|
||||
header = struct.pack(">BBHII", 0x80, 0x78, seq, timestamp, ssrc)
|
||||
# Fake encrypted payload (NaCl will be mocked) + 4 byte nonce
|
||||
payload = b"\x00" * 20 + b"\x00\x00\x00\x01"
|
||||
return header + payload
|
||||
|
||||
def _inject_mock_decoder(self, receiver, ssrc):
|
||||
"""Pre-inject a mock Opus decoder for the given SSRC."""
|
||||
mock_decoder = MagicMock()
|
||||
mock_decoder.decode.return_value = b"\x00" * 3840
|
||||
receiver._decoders[ssrc] = mock_decoder
|
||||
return mock_decoder
|
||||
|
||||
def test_on_packet_dave_known_user_decrypt_ok(self):
|
||||
"""Known SSRC + DAVE decrypt success → audio buffered."""
|
||||
dave = MagicMock()
|
||||
dave.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver = self._make_receiver_with_nacl(
|
||||
dave_session=dave, mapped_ssrcs={100: 42}
|
||||
)
|
||||
self._inject_mock_decoder(receiver, 100)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
dave.decrypt.assert_called_once()
|
||||
|
||||
def test_on_packet_dave_unknown_ssrc_passthrough(self):
|
||||
"""Unknown SSRC + DAVE → skip DAVE, attempt Opus decode (passthrough)."""
|
||||
dave = MagicMock()
|
||||
receiver = self._make_receiver_with_nacl(dave_session=dave)
|
||||
self._inject_mock_decoder(receiver, 100)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
|
||||
dave.decrypt.assert_not_called()
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_on_packet_dave_unencrypted_error_passthrough(self):
|
||||
"""DAVE decrypt 'Unencrypted' error → use data as-is, don't drop."""
|
||||
dave = MagicMock()
|
||||
dave.decrypt.side_effect = Exception(
|
||||
"Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"
|
||||
)
|
||||
receiver = self._make_receiver_with_nacl(
|
||||
dave_session=dave, mapped_ssrcs={100: 42}
|
||||
)
|
||||
self._inject_mock_decoder(receiver, 100)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_on_packet_dave_other_error_drops(self):
|
||||
"""DAVE decrypt non-Unencrypted error → packet dropped."""
|
||||
dave = MagicMock()
|
||||
dave.decrypt.side_effect = Exception("KeyRotationFailed")
|
||||
receiver = self._make_receiver_with_nacl(
|
||||
dave_session=dave, mapped_ssrcs={100: 42}
|
||||
)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
def test_on_packet_no_dave_direct_decode(self):
|
||||
"""No DAVE session → decode directly."""
|
||||
receiver = self._make_receiver_with_nacl(dave_session=None)
|
||||
self._inject_mock_decoder(receiver, 100)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_on_packet_bot_own_ssrc_ignored(self):
|
||||
"""Bot's own SSRC → dropped (echo prevention)."""
|
||||
receiver = self._make_receiver_with_nacl()
|
||||
with patch("nacl.secret.Aead"):
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=9999))
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
def test_on_packet_multiple_ssrcs_separate_buffers(self):
|
||||
"""Different SSRCs → separate buffers."""
|
||||
receiver = self._make_receiver_with_nacl(dave_session=None)
|
||||
self._inject_mock_decoder(receiver, 100)
|
||||
self._inject_mock_decoder(receiver, 200)
|
||||
|
||||
with patch("nacl.secret.Aead") as mock_aead:
|
||||
mock_aead.return_value.decrypt.return_value = b"\xf8\xff\xfe"
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=100))
|
||||
receiver._on_packet(self._build_rtp_packet(ssrc=200))
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
assert 200 in receiver._buffers
|
||||
|
||||
|
||||
class TestVoiceTTSPlayback:
|
||||
"""TTS playback: play_tts in VC, dedup, fallback."""
|
||||
|
||||
@staticmethod
|
||||
def _make_discord_adapter():
|
||||
from gateway.platforms.discord import DiscordAdapter
|
||||
from gateway.config import PlatformConfig, Platform
|
||||
config = PlatformConfig(enabled=True, extra={})
|
||||
config.token = "fake-token"
|
||||
adapter = object.__new__(DiscordAdapter)
|
||||
adapter.platform = Platform.DISCORD
|
||||
adapter.config = config
|
||||
adapter._voice_clients = {}
|
||||
adapter._voice_text_channels = {}
|
||||
adapter._voice_receivers = {}
|
||||
return adapter
|
||||
|
||||
# -- play_tts behavior --
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_tts_plays_in_vc(self):
|
||||
"""play_tts calls play_in_voice_channel when bot is in VC."""
|
||||
adapter = self._make_discord_adapter()
|
||||
mock_vc = MagicMock()
|
||||
mock_vc.is_connected.return_value = True
|
||||
adapter._voice_clients[111] = mock_vc
|
||||
adapter._voice_text_channels[111] = 123
|
||||
|
||||
played = []
|
||||
async def fake_play(gid, path):
|
||||
played.append((gid, path))
|
||||
return True
|
||||
adapter.play_in_voice_channel = fake_play
|
||||
|
||||
result = await adapter.play_tts(chat_id="123", audio_path="/tmp/tts.ogg")
|
||||
assert result.success is True
|
||||
assert played == [(111, "/tmp/tts.ogg")]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_tts_fallback_when_not_in_vc(self):
|
||||
"""play_tts sends as file attachment when bot is not in VC."""
|
||||
adapter = self._make_discord_adapter()
|
||||
from gateway.platforms.base import SendResult
|
||||
adapter.send_voice = AsyncMock(return_value=SendResult(success=False, error="no client"))
|
||||
result = await adapter.play_tts(chat_id="123", audio_path="/tmp/tts.ogg")
|
||||
assert result.success is False
|
||||
adapter.send_voice.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_tts_wrong_channel_no_match(self):
|
||||
"""play_tts doesn't match if chat_id is for a different channel."""
|
||||
adapter = self._make_discord_adapter()
|
||||
mock_vc = MagicMock()
|
||||
mock_vc.is_connected.return_value = True
|
||||
adapter._voice_clients[111] = mock_vc
|
||||
adapter._voice_text_channels[111] = 123
|
||||
|
||||
from gateway.platforms.base import SendResult
|
||||
adapter.send_voice = AsyncMock(return_value=SendResult(success=True))
|
||||
# Different chat_id — shouldn't match VC
|
||||
result = await adapter.play_tts(chat_id="999", audio_path="/tmp/tts.ogg")
|
||||
adapter.send_voice.assert_called_once()
|
||||
|
||||
# -- Runner dedup --
|
||||
|
||||
@staticmethod
|
||||
def _make_runner():
|
||||
from gateway.run import GatewayRunner
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._voice_mode = {}
|
||||
runner.adapters = {}
|
||||
return runner
|
||||
|
||||
def _call_should_reply(self, runner, voice_mode, msg_type, response="Hello", agent_msgs=None):
|
||||
from gateway.platforms.base import MessageType, MessageEvent, SessionSource
|
||||
from gateway.config import Platform
|
||||
runner._voice_mode["ch1"] = voice_mode
|
||||
source = SessionSource(
|
||||
platform=Platform.DISCORD, chat_id="ch1",
|
||||
user_id="1", user_name="test", chat_type="channel",
|
||||
)
|
||||
event = MessageEvent(source=source, text="test", message_type=msg_type)
|
||||
return runner._should_send_voice_reply(event, response, agent_msgs or [])
|
||||
|
||||
def test_voice_input_runner_skips(self):
|
||||
"""Voice input: runner skips — base adapter handles via play_tts."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "all", MessageType.VOICE) is False
|
||||
|
||||
def test_text_input_voice_all_runner_fires(self):
|
||||
"""Text input + voice_mode=all: runner generates TTS."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "all", MessageType.TEXT) is True
|
||||
|
||||
def test_text_input_voice_off_no_tts(self):
|
||||
"""Text input + voice_mode=off: no TTS."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "off", MessageType.TEXT) is False
|
||||
|
||||
def test_text_input_voice_only_no_tts(self):
|
||||
"""Text input + voice_mode=voice_only: no TTS for text."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "voice_only", MessageType.TEXT) is False
|
||||
|
||||
def test_error_response_no_tts(self):
|
||||
"""Error response: no TTS regardless of voice_mode."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "all", MessageType.TEXT, response="Error: boom") is False
|
||||
|
||||
def test_empty_response_no_tts(self):
|
||||
"""Empty response: no TTS."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
assert self._call_should_reply(runner, "all", MessageType.TEXT, response="") is False
|
||||
|
||||
def test_agent_tts_tool_dedup(self):
|
||||
"""Agent already called text_to_speech tool: runner skips."""
|
||||
from gateway.platforms.base import MessageType
|
||||
runner = self._make_runner()
|
||||
agent_msgs = [{"role": "assistant", "tool_calls": [
|
||||
{"id": "1", "type": "function", "function": {"name": "text_to_speech", "arguments": "{}"}}
|
||||
]}]
|
||||
assert self._call_should_reply(runner, "all", MessageType.TEXT, agent_msgs=agent_msgs) is False
|
||||
|
||||
|
||||
class TestUDPKeepalive:
|
||||
"""UDP keepalive prevents Discord from dropping the voice session."""
|
||||
|
||||
def test_keepalive_interval_is_reasonable(self):
|
||||
from gateway.platforms.discord import DiscordAdapter
|
||||
interval = DiscordAdapter._KEEPALIVE_INTERVAL
|
||||
assert 5 <= interval <= 30, f"Keepalive interval {interval}s should be between 5-30s"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keepalive_sends_silence_frame(self):
|
||||
"""Listen loop sends silence frame via send_packet after interval."""
|
||||
from gateway.platforms.discord import DiscordAdapter
|
||||
from gateway.config import PlatformConfig, Platform
|
||||
|
||||
config = PlatformConfig(enabled=True, extra={})
|
||||
config.token = "fake"
|
||||
adapter = object.__new__(DiscordAdapter)
|
||||
adapter.platform = Platform.DISCORD
|
||||
adapter.config = config
|
||||
adapter._voice_clients = {}
|
||||
adapter._voice_text_channels = {}
|
||||
adapter._voice_receivers = {}
|
||||
adapter._voice_listen_tasks = {}
|
||||
|
||||
# Mock VC and receiver
|
||||
mock_vc = MagicMock()
|
||||
mock_vc.is_connected.return_value = True
|
||||
mock_conn = MagicMock()
|
||||
adapter._voice_clients[111] = mock_vc
|
||||
mock_vc._connection = mock_conn
|
||||
|
||||
from gateway.platforms.discord import VoiceReceiver
|
||||
mock_receiver_vc = MagicMock()
|
||||
mock_receiver_vc._connection.secret_key = [0] * 32
|
||||
mock_receiver_vc._connection.dave_session = None
|
||||
mock_receiver_vc._connection.ssrc = 9999
|
||||
mock_receiver_vc._connection.add_socket_listener = MagicMock()
|
||||
mock_receiver_vc._connection.remove_socket_listener = MagicMock()
|
||||
mock_receiver_vc._connection.hook = None
|
||||
receiver = VoiceReceiver(mock_receiver_vc)
|
||||
receiver.start()
|
||||
adapter._voice_receivers[111] = receiver
|
||||
|
||||
# Set keepalive interval very short for test
|
||||
original_interval = DiscordAdapter._KEEPALIVE_INTERVAL
|
||||
DiscordAdapter._KEEPALIVE_INTERVAL = 0.1
|
||||
|
||||
try:
|
||||
# Run listen loop briefly
|
||||
import asyncio
|
||||
loop_task = asyncio.create_task(adapter._voice_listen_loop(111))
|
||||
await asyncio.sleep(0.3)
|
||||
receiver._running = False # stop loop
|
||||
await asyncio.sleep(0.1)
|
||||
loop_task.cancel()
|
||||
try:
|
||||
await loop_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# send_packet should have been called with silence frame
|
||||
mock_conn.send_packet.assert_called_with(b'\xf8\xff\xfe')
|
||||
finally:
|
||||
DiscordAdapter._KEEPALIVE_INTERVAL = original_interval
|
||||
|
||||
70
tests/hermes_cli/test_env_loader.py
Normal file
70
tests/hermes_cli/test_env_loader.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import importlib
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
|
||||
|
||||
def test_user_env_overrides_stale_shell_values(tmp_path, monkeypatch):
|
||||
home = tmp_path / "hermes"
|
||||
home.mkdir()
|
||||
env_file = home / ".env"
|
||||
env_file.write_text("OPENAI_BASE_URL=https://new.example/v1\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setenv("OPENAI_BASE_URL", "https://old.example/v1")
|
||||
|
||||
loaded = load_hermes_dotenv(hermes_home=home)
|
||||
|
||||
assert loaded == [env_file]
|
||||
assert os.getenv("OPENAI_BASE_URL") == "https://new.example/v1"
|
||||
|
||||
|
||||
def test_project_env_overrides_stale_shell_values_when_user_env_missing(tmp_path, monkeypatch):
|
||||
home = tmp_path / "hermes"
|
||||
project_env = tmp_path / ".env"
|
||||
project_env.write_text("OPENAI_BASE_URL=https://project.example/v1\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setenv("OPENAI_BASE_URL", "https://old.example/v1")
|
||||
|
||||
loaded = load_hermes_dotenv(hermes_home=home, project_env=project_env)
|
||||
|
||||
assert loaded == [project_env]
|
||||
assert os.getenv("OPENAI_BASE_URL") == "https://project.example/v1"
|
||||
|
||||
|
||||
def test_user_env_takes_precedence_over_project_env(tmp_path, monkeypatch):
|
||||
home = tmp_path / "hermes"
|
||||
home.mkdir()
|
||||
user_env = home / ".env"
|
||||
project_env = tmp_path / ".env"
|
||||
user_env.write_text("OPENAI_BASE_URL=https://user.example/v1\n", encoding="utf-8")
|
||||
project_env.write_text("OPENAI_BASE_URL=https://project.example/v1\nOPENAI_API_KEY=project-key\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setenv("OPENAI_BASE_URL", "https://old.example/v1")
|
||||
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
|
||||
|
||||
loaded = load_hermes_dotenv(hermes_home=home, project_env=project_env)
|
||||
|
||||
assert loaded == [user_env, project_env]
|
||||
assert os.getenv("OPENAI_BASE_URL") == "https://user.example/v1"
|
||||
assert os.getenv("OPENAI_API_KEY") == "project-key"
|
||||
|
||||
|
||||
def test_main_import_applies_user_env_over_shell_values(tmp_path, monkeypatch):
|
||||
home = tmp_path / "hermes"
|
||||
home.mkdir()
|
||||
(home / ".env").write_text(
|
||||
"OPENAI_BASE_URL=https://new.example/v1\nHERMES_INFERENCE_PROVIDER=custom\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
monkeypatch.setenv("OPENAI_BASE_URL", "https://old.example/v1")
|
||||
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "openrouter")
|
||||
|
||||
sys.modules.pop("hermes_cli.main", None)
|
||||
importlib.import_module("hermes_cli.main")
|
||||
|
||||
assert os.getenv("OPENAI_BASE_URL") == "https://new.example/v1"
|
||||
assert os.getenv("HERMES_INFERENCE_PROVIDER") == "custom"
|
||||
611
tests/integration/test_voice_channel_flow.py
Normal file
611
tests/integration/test_voice_channel_flow.py
Normal file
@@ -0,0 +1,611 @@
|
||||
"""Integration tests for Discord voice channel audio flow.
|
||||
|
||||
Uses real NaCl encryption and Opus codec (no mocks for crypto/codec).
|
||||
Does NOT require a Discord connection — tests the VoiceReceiver
|
||||
packet processing pipeline end-to-end.
|
||||
|
||||
Requires: PyNaCl>=1.5.0, discord.py[voice] (opus codec)
|
||||
"""
|
||||
|
||||
import struct
|
||||
import time
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
# Skip entire module if voice deps are missing
|
||||
pytest.importorskip("nacl.secret", reason="PyNaCl required for voice integration tests")
|
||||
discord = pytest.importorskip("discord", reason="discord.py required for voice integration tests")
|
||||
|
||||
import nacl.secret
|
||||
|
||||
try:
|
||||
if not discord.opus.is_loaded():
|
||||
import ctypes.util
|
||||
opus_path = ctypes.util.find_library("opus")
|
||||
if not opus_path:
|
||||
import sys
|
||||
for p in ("/opt/homebrew/lib/libopus.dylib", "/usr/local/lib/libopus.dylib"):
|
||||
import os
|
||||
if os.path.isfile(p):
|
||||
opus_path = p
|
||||
break
|
||||
if opus_path:
|
||||
discord.opus.load_opus(opus_path)
|
||||
OPUS_AVAILABLE = discord.opus.is_loaded()
|
||||
except Exception:
|
||||
OPUS_AVAILABLE = False
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
from gateway.platforms.discord import VoiceReceiver
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_secret_key():
|
||||
"""Generate a random 32-byte key."""
|
||||
import os
|
||||
return os.urandom(32)
|
||||
|
||||
|
||||
def _build_encrypted_rtp_packet(secret_key, opus_payload, ssrc=100, seq=1, timestamp=960):
|
||||
"""Build a real NaCl-encrypted RTP packet matching Discord's format.
|
||||
|
||||
Format: RTP header (12 bytes) + encrypted(opus) + 4-byte nonce
|
||||
Encryption: aead_xchacha20_poly1305 with RTP header as AAD.
|
||||
"""
|
||||
# RTP header: version=2, payload_type=0x78, no extension, no CSRC
|
||||
header = struct.pack(">BBHII", 0x80, 0x78, seq, timestamp, ssrc)
|
||||
|
||||
# Encrypt with NaCl AEAD
|
||||
box = nacl.secret.Aead(secret_key)
|
||||
nonce_counter = struct.pack(">I", seq) # 4-byte counter as nonce seed
|
||||
# Full 24-byte nonce: counter in first 4 bytes, rest zeros
|
||||
full_nonce = nonce_counter + b'\x00' * 20
|
||||
|
||||
enc_msg = box.encrypt(opus_payload, header, full_nonce)
|
||||
ciphertext = enc_msg.ciphertext # without nonce prefix
|
||||
|
||||
# Discord format: header + ciphertext + 4-byte nonce
|
||||
return header + ciphertext + nonce_counter
|
||||
|
||||
|
||||
def _make_voice_receiver(secret_key, dave_session=None, bot_ssrc=9999,
|
||||
allowed_user_ids=None, members=None):
|
||||
"""Create a VoiceReceiver with real secret key."""
|
||||
vc = MagicMock()
|
||||
vc._connection.secret_key = list(secret_key)
|
||||
vc._connection.dave_session = dave_session
|
||||
vc._connection.ssrc = bot_ssrc
|
||||
vc._connection.add_socket_listener = MagicMock()
|
||||
vc._connection.remove_socket_listener = MagicMock()
|
||||
vc._connection.hook = None
|
||||
vc.user = SimpleNamespace(id=bot_ssrc)
|
||||
vc.channel = MagicMock()
|
||||
vc.channel.members = members or []
|
||||
receiver = VoiceReceiver(vc, allowed_user_ids=allowed_user_ids)
|
||||
receiver.start()
|
||||
return receiver
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRealNaClDecrypt:
|
||||
"""End-to-end: real NaCl encrypt → _on_packet decrypt → buffer."""
|
||||
|
||||
def test_valid_encrypted_packet_buffered(self):
|
||||
"""Real NaCl encrypted packet → decrypted → buffered."""
|
||||
key = _make_secret_key()
|
||||
opus_silence = b'\xf8\xff\xfe'
|
||||
receiver = _make_voice_receiver(key)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(key, opus_silence, ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_wrong_key_packet_dropped(self):
|
||||
"""Packet encrypted with wrong key → NaCl fails → not buffered."""
|
||||
real_key = _make_secret_key()
|
||||
wrong_key = _make_secret_key()
|
||||
opus_silence = b'\xf8\xff\xfe'
|
||||
receiver = _make_voice_receiver(real_key)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(wrong_key, opus_silence, ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
def test_bot_ssrc_ignored(self):
|
||||
"""Packet from bot's own SSRC → ignored."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key, bot_ssrc=9999)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=9999)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
def test_multiple_packets_accumulate(self):
|
||||
"""Multiple valid packets → buffer grows."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
|
||||
for seq in range(1, 6):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert 100 in receiver._buffers
|
||||
buf_size = len(receiver._buffers[100])
|
||||
assert buf_size > 0, "Multiple packets should accumulate in buffer"
|
||||
|
||||
def test_different_ssrcs_separate_buffers(self):
|
||||
"""Packets from different SSRCs → separate buffers."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
|
||||
for ssrc in [100, 200, 300]:
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=ssrc)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers) == 3
|
||||
for ssrc in [100, 200, 300]:
|
||||
assert ssrc in receiver._buffers
|
||||
|
||||
|
||||
class TestRealNaClWithDAVE:
|
||||
"""NaCl decrypt + DAVE passthrough scenarios with real crypto."""
|
||||
|
||||
def test_dave_unknown_ssrc_passthrough(self):
|
||||
"""DAVE enabled but SSRC unknown → skip DAVE, buffer audio."""
|
||||
key = _make_secret_key()
|
||||
dave = MagicMock() # DAVE session present but SSRC not mapped
|
||||
receiver = _make_voice_receiver(key, dave_session=dave)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
# DAVE decrypt not called (SSRC unknown)
|
||||
dave.decrypt.assert_not_called()
|
||||
# Audio still buffered via passthrough
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_dave_unencrypted_error_passthrough(self):
|
||||
"""DAVE raises 'Unencrypted' → use NaCl-decrypted data as-is."""
|
||||
key = _make_secret_key()
|
||||
dave = MagicMock()
|
||||
dave.decrypt.side_effect = Exception(
|
||||
"DecryptionFailed(UnencryptedWhenPassthroughDisabled)"
|
||||
)
|
||||
receiver = _make_voice_receiver(key, dave_session=dave)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
# DAVE was called but failed → passthrough
|
||||
dave.decrypt.assert_called_once()
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_dave_real_error_drops(self):
|
||||
"""DAVE raises non-Unencrypted error → packet dropped."""
|
||||
key = _make_secret_key()
|
||||
dave = MagicMock()
|
||||
dave.decrypt.side_effect = Exception("KeyRotationFailed")
|
||||
receiver = _make_voice_receiver(key, dave_session=dave)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
|
||||
class TestFullVoiceFlow:
|
||||
"""End-to-end: encrypt → receive → buffer → silence detect → complete."""
|
||||
|
||||
def test_single_utterance_flow(self):
|
||||
"""Encrypt packets → buffer → silence → check_silence returns utterance."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
# Send enough packets to exceed MIN_SPEECH_DURATION (0.5s)
|
||||
# At 48kHz stereo 16-bit, each Opus silence frame decodes to ~3840 bytes
|
||||
# Need 96000 bytes = ~25 frames
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
# Simulate silence by setting last_packet_time in the past
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
user_id, pcm_data = completed[0]
|
||||
assert user_id == 42
|
||||
assert len(pcm_data) > 0
|
||||
|
||||
def test_utterance_with_ssrc_automap(self):
|
||||
"""No SPEAKING event → auto-map sole allowed user → utterance processed."""
|
||||
key = _make_secret_key()
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = _make_voice_receiver(
|
||||
key, allowed_user_ids={"42"}, members=members
|
||||
)
|
||||
# No map_ssrc call — simulating missing SPEAKING event
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42 # auto-mapped to sole allowed user
|
||||
|
||||
def test_pause_blocks_during_playback(self):
|
||||
"""Pause receiver → packets ignored → resume → packets accepted."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
|
||||
# Pause (echo prevention during TTS playback)
|
||||
receiver.pause()
|
||||
packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100)
|
||||
receiver._on_packet(packet)
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
# Resume
|
||||
receiver.resume()
|
||||
receiver._on_packet(packet)
|
||||
assert 100 in receiver._buffers
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
def test_corrupted_packet_ignored(self):
|
||||
"""Corrupted/truncated packet → silently ignored."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
|
||||
# Too short
|
||||
receiver._on_packet(b"\x00" * 5)
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
# Wrong RTP version
|
||||
bad_header = struct.pack(">BBHII", 0x00, 0x78, 1, 960, 100)
|
||||
receiver._on_packet(bad_header + b"\x00" * 20)
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
# Wrong payload type
|
||||
bad_pt = struct.pack(">BBHII", 0x80, 0x00, 1, 960, 100)
|
||||
receiver._on_packet(bad_pt + b"\x00" * 20)
|
||||
assert len(receiver._buffers) == 0
|
||||
|
||||
def test_stop_cleans_everything(self):
|
||||
"""stop() clears all state cleanly."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
for seq in range(1, 10):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
|
||||
receiver.stop()
|
||||
assert receiver._running is False
|
||||
assert len(receiver._buffers) == 0
|
||||
assert len(receiver._ssrc_to_user) == 0
|
||||
assert len(receiver._decoders) == 0
|
||||
|
||||
|
||||
class TestSPEAKINGHook:
|
||||
"""SPEAKING event hook correctly maps SSRC to user_id."""
|
||||
|
||||
def test_speaking_hook_installed(self):
|
||||
"""start() installs speaking hook on connection."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
conn = receiver._vc._connection
|
||||
# hook should be set (wrapped)
|
||||
assert conn.hook is not None
|
||||
|
||||
def test_map_ssrc_via_speaking(self):
|
||||
"""SPEAKING op 5 event maps SSRC to user_id."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(500, 12345)
|
||||
assert receiver._ssrc_to_user[500] == 12345
|
||||
|
||||
def test_map_ssrc_overwrites(self):
|
||||
"""New SPEAKING event for same SSRC overwrites old mapping."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(500, 111)
|
||||
receiver.map_ssrc(500, 222)
|
||||
assert receiver._ssrc_to_user[500] == 222
|
||||
|
||||
def test_speaking_mapped_audio_processed(self):
|
||||
"""After SSRC is mapped, audio from that SSRC gets correct user_id."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
|
||||
class TestAuthFiltering:
|
||||
"""Only allowed users' audio should be processed."""
|
||||
|
||||
def test_allowed_user_audio_processed(self):
|
||||
"""Allowed user's utterance is returned by check_silence."""
|
||||
key = _make_secret_key()
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = _make_voice_receiver(
|
||||
key, allowed_user_ids={"42"}, members=members,
|
||||
)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
def test_automap_rejects_unallowed_user(self):
|
||||
"""Auto-map refuses to map SSRC to user not in allowed list."""
|
||||
key = _make_secret_key()
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = _make_voice_receiver(
|
||||
key, allowed_user_ids={"99"}, # Alice not allowed
|
||||
members=members,
|
||||
)
|
||||
# No map_ssrc — SSRC unknown, auto-map should reject
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 0
|
||||
|
||||
def test_empty_allowlist_allows_all(self):
|
||||
"""Empty allowed_user_ids means no restriction."""
|
||||
key = _make_secret_key()
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
receiver = _make_voice_receiver(
|
||||
key, allowed_user_ids=None, members=members,
|
||||
)
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
# Auto-mapped to sole non-bot member
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
|
||||
class TestRejoinFlow:
|
||||
"""Leave and rejoin: state cleanup and fresh receiver."""
|
||||
|
||||
def test_stop_then_new_receiver_clean_state(self):
|
||||
"""After stop(), a new receiver starts with empty state."""
|
||||
key = _make_secret_key()
|
||||
receiver1 = _make_voice_receiver(key)
|
||||
receiver1.map_ssrc(100, 42)
|
||||
|
||||
for seq in range(1, 10):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver1._on_packet(packet)
|
||||
|
||||
assert len(receiver1._buffers[100]) > 0
|
||||
receiver1.stop()
|
||||
|
||||
# New receiver (simulates rejoin)
|
||||
receiver2 = _make_voice_receiver(key)
|
||||
assert len(receiver2._buffers) == 0
|
||||
assert len(receiver2._ssrc_to_user) == 0
|
||||
assert len(receiver2._decoders) == 0
|
||||
|
||||
def test_rejoin_new_ssrc_works(self):
|
||||
"""After rejoin, user may get new SSRC — still works."""
|
||||
key = _make_secret_key()
|
||||
receiver1 = _make_voice_receiver(key)
|
||||
receiver1.map_ssrc(100, 42) # old SSRC
|
||||
receiver1.stop()
|
||||
|
||||
receiver2 = _make_voice_receiver(key)
|
||||
receiver2.map_ssrc(200, 42) # new SSRC after rejoin
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=200, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver2._on_packet(packet)
|
||||
|
||||
receiver2._last_packet_time[200] = time.monotonic() - 3.0
|
||||
completed = receiver2.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
def test_rejoin_without_speaking_event_automap(self):
|
||||
"""Rejoin without SPEAKING event — auto-map sole allowed user."""
|
||||
key = _make_secret_key()
|
||||
members = [
|
||||
SimpleNamespace(id=9999, name="Bot"),
|
||||
SimpleNamespace(id=42, name="Alice"),
|
||||
]
|
||||
|
||||
# First session
|
||||
receiver1 = _make_voice_receiver(
|
||||
key, allowed_user_ids={"42"}, members=members,
|
||||
)
|
||||
receiver1.stop()
|
||||
|
||||
# Rejoin — new key (Discord may assign new secret_key)
|
||||
new_key = _make_secret_key()
|
||||
receiver2 = _make_voice_receiver(
|
||||
new_key, allowed_user_ids={"42"}, members=members,
|
||||
)
|
||||
# No map_ssrc — simulating missing SPEAKING event
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
new_key, b'\xf8\xff\xfe', ssrc=300, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver2._on_packet(packet)
|
||||
|
||||
receiver2._last_packet_time[300] = time.monotonic() - 3.0
|
||||
completed = receiver2.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
|
||||
|
||||
class TestMultiGuildIsolation:
|
||||
"""Each guild has independent voice state."""
|
||||
|
||||
def test_separate_receivers_independent(self):
|
||||
"""Two receivers (different guilds) don't interfere."""
|
||||
key1 = _make_secret_key()
|
||||
key2 = _make_secret_key()
|
||||
|
||||
receiver1 = _make_voice_receiver(key1, bot_ssrc=1111)
|
||||
receiver2 = _make_voice_receiver(key2, bot_ssrc=2222)
|
||||
|
||||
receiver1.map_ssrc(100, 42)
|
||||
receiver2.map_ssrc(200, 99)
|
||||
|
||||
# Send to receiver1
|
||||
for seq in range(1, 10):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key1, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver1._on_packet(packet)
|
||||
|
||||
# receiver2 should be empty
|
||||
assert len(receiver2._buffers) == 0
|
||||
assert 100 in receiver1._buffers
|
||||
|
||||
def test_stop_one_doesnt_affect_other(self):
|
||||
"""Stopping one receiver doesn't affect another."""
|
||||
key1 = _make_secret_key()
|
||||
key2 = _make_secret_key()
|
||||
|
||||
receiver1 = _make_voice_receiver(key1)
|
||||
receiver2 = _make_voice_receiver(key2)
|
||||
|
||||
receiver1.map_ssrc(100, 42)
|
||||
receiver2.map_ssrc(200, 99)
|
||||
|
||||
for seq in range(1, 10):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key2, b'\xf8\xff\xfe', ssrc=200, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver2._on_packet(packet)
|
||||
|
||||
receiver1.stop()
|
||||
|
||||
# receiver2 still has data
|
||||
assert receiver2._running is True
|
||||
assert len(receiver2._buffers[200]) > 0
|
||||
|
||||
|
||||
class TestEchoPreventionFlow:
|
||||
"""Receiver pause/resume during TTS playback prevents echo."""
|
||||
|
||||
def test_audio_during_pause_ignored(self):
|
||||
"""Audio arriving while paused is completely ignored."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(100, 42)
|
||||
receiver.pause()
|
||||
|
||||
for seq in range(1, 30):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
def test_audio_after_resume_processed(self):
|
||||
"""Audio arriving after resume is processed normally."""
|
||||
key = _make_secret_key()
|
||||
receiver = _make_voice_receiver(key)
|
||||
receiver.map_ssrc(100, 42)
|
||||
|
||||
# Pause → send packets → resume → send more packets
|
||||
receiver.pause()
|
||||
for seq in range(1, 5):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
assert len(receiver._buffers.get(100, b"")) == 0
|
||||
|
||||
receiver.resume()
|
||||
for seq in range(5, 35):
|
||||
packet = _build_encrypted_rtp_packet(
|
||||
key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq
|
||||
)
|
||||
receiver._on_packet(packet)
|
||||
|
||||
assert len(receiver._buffers[100]) > 0
|
||||
receiver._last_packet_time[100] = time.monotonic() - 3.0
|
||||
completed = receiver.check_silence()
|
||||
assert len(completed) == 1
|
||||
assert completed[0][0] == 42
|
||||
72
tests/test_dict_tool_call_args.py
Normal file
72
tests/test_dict_tool_call_args.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
|
||||
|
||||
def _tool_call(name: str, arguments):
|
||||
return SimpleNamespace(
|
||||
id="call_1",
|
||||
type="function",
|
||||
function=SimpleNamespace(name=name, arguments=arguments),
|
||||
)
|
||||
|
||||
|
||||
def _response_with_tool_call(arguments):
|
||||
assistant = SimpleNamespace(
|
||||
content=None,
|
||||
reasoning=None,
|
||||
tool_calls=[_tool_call("read_file", arguments)],
|
||||
)
|
||||
choice = SimpleNamespace(message=assistant, finish_reason="tool_calls")
|
||||
return SimpleNamespace(choices=[choice], usage=None)
|
||||
|
||||
|
||||
class _FakeChatCompletions:
|
||||
def __init__(self):
|
||||
self.calls = 0
|
||||
|
||||
def create(self, **kwargs):
|
||||
self.calls += 1
|
||||
if self.calls == 1:
|
||||
return _response_with_tool_call({"path": "README.md"})
|
||||
return SimpleNamespace(
|
||||
choices=[
|
||||
SimpleNamespace(
|
||||
message=SimpleNamespace(content="done", reasoning=None, tool_calls=[]),
|
||||
finish_reason="stop",
|
||||
)
|
||||
],
|
||||
usage=None,
|
||||
)
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
def __init__(self):
|
||||
self.chat = SimpleNamespace(completions=_FakeChatCompletions())
|
||||
|
||||
|
||||
def test_tool_call_validation_accepts_dict_arguments(monkeypatch):
|
||||
from run_agent import AIAgent
|
||||
|
||||
monkeypatch.setattr("run_agent.OpenAI", lambda **kwargs: _FakeClient())
|
||||
monkeypatch.setattr(
|
||||
"run_agent.get_tool_definitions",
|
||||
lambda *args, **kwargs: [{"function": {"name": "read_file"}}],
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"run_agent.handle_function_call",
|
||||
lambda name, args, task_id=None, **kwargs: json.dumps({"ok": True, "args": args}),
|
||||
)
|
||||
|
||||
agent = AIAgent(
|
||||
model="test-model",
|
||||
api_key="test-key",
|
||||
base_url="http://localhost:8080/v1",
|
||||
platform="cli",
|
||||
max_iterations=3,
|
||||
quiet_mode=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
|
||||
result = agent.run_conversation("read the file")
|
||||
|
||||
assert result["final_response"] == "done"
|
||||
@@ -153,6 +153,36 @@ class TestScheduleCronjob:
|
||||
assert job["provider"] == "custom"
|
||||
assert job["base_url"] == "http://127.0.0.1:4000/v1"
|
||||
|
||||
def test_thread_id_captured_in_origin(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram")
|
||||
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "123456")
|
||||
monkeypatch.setenv("HERMES_SESSION_THREAD_ID", "42")
|
||||
import cron.jobs as _jobs
|
||||
created = json.loads(schedule_cronjob(
|
||||
prompt="Thread test",
|
||||
schedule="every 1h",
|
||||
deliver="origin",
|
||||
))
|
||||
assert created["success"] is True
|
||||
job_id = created["job_id"]
|
||||
job = _jobs.get_job(job_id)
|
||||
assert job["origin"]["thread_id"] == "42"
|
||||
|
||||
def test_thread_id_absent_when_not_set(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram")
|
||||
monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "123456")
|
||||
monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False)
|
||||
import cron.jobs as _jobs
|
||||
created = json.loads(schedule_cronjob(
|
||||
prompt="No thread test",
|
||||
schedule="every 1h",
|
||||
deliver="origin",
|
||||
))
|
||||
assert created["success"] is True
|
||||
job_id = created["job_id"]
|
||||
job = _jobs.get_job(job_id)
|
||||
assert job["origin"].get("thread_id") is None
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# list_cronjobs
|
||||
|
||||
@@ -26,8 +26,7 @@ def _make_fake_popen(captured: dict):
|
||||
proc = MagicMock()
|
||||
proc.poll.return_value = 0
|
||||
proc.returncode = 0
|
||||
proc.stdout = iter([])
|
||||
proc.stdout.close = lambda: None
|
||||
proc.stdout = MagicMock(__iter__=lambda s: iter([]), __next__=lambda s: (_ for _ in ()).throw(StopIteration))
|
||||
proc.stdin = MagicMock()
|
||||
return proc
|
||||
return fake_popen
|
||||
|
||||
152
tests/tools/test_local_persistent.py
Normal file
152
tests/tools/test_local_persistent.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""Tests for the local persistent shell backend."""
|
||||
|
||||
import glob as glob_mod
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.environments.local import LocalEnvironment
|
||||
from tools.environments.persistent_shell import PersistentShellMixin
|
||||
|
||||
|
||||
class TestLocalConfig:
|
||||
def test_local_persistent_default_false(self, monkeypatch):
|
||||
monkeypatch.delenv("TERMINAL_LOCAL_PERSISTENT", raising=False)
|
||||
from tools.terminal_tool import _get_env_config
|
||||
assert _get_env_config()["local_persistent"] is False
|
||||
|
||||
def test_local_persistent_true(self, monkeypatch):
|
||||
monkeypatch.setenv("TERMINAL_LOCAL_PERSISTENT", "true")
|
||||
from tools.terminal_tool import _get_env_config
|
||||
assert _get_env_config()["local_persistent"] is True
|
||||
|
||||
def test_local_persistent_yes(self, monkeypatch):
|
||||
monkeypatch.setenv("TERMINAL_LOCAL_PERSISTENT", "yes")
|
||||
from tools.terminal_tool import _get_env_config
|
||||
assert _get_env_config()["local_persistent"] is True
|
||||
|
||||
|
||||
class TestMergeOutput:
|
||||
def test_stdout_only(self):
|
||||
assert PersistentShellMixin._merge_output("out", "") == "out"
|
||||
|
||||
def test_stderr_only(self):
|
||||
assert PersistentShellMixin._merge_output("", "err") == "err"
|
||||
|
||||
def test_both(self):
|
||||
assert PersistentShellMixin._merge_output("out", "err") == "out\nerr"
|
||||
|
||||
def test_empty(self):
|
||||
assert PersistentShellMixin._merge_output("", "") == ""
|
||||
|
||||
def test_strips_trailing_newlines(self):
|
||||
assert PersistentShellMixin._merge_output("out\n\n", "err\n") == "out\nerr"
|
||||
|
||||
|
||||
class TestLocalOneShotRegression:
|
||||
def test_echo(self):
|
||||
env = LocalEnvironment(persistent=False)
|
||||
r = env.execute("echo hello")
|
||||
assert r["returncode"] == 0
|
||||
assert "hello" in r["output"]
|
||||
env.cleanup()
|
||||
|
||||
def test_exit_code(self):
|
||||
env = LocalEnvironment(persistent=False)
|
||||
r = env.execute("exit 42")
|
||||
assert r["returncode"] == 42
|
||||
env.cleanup()
|
||||
|
||||
def test_state_does_not_persist(self):
|
||||
env = LocalEnvironment(persistent=False)
|
||||
env.execute("export HERMES_ONESHOT_LOCAL=yes")
|
||||
r = env.execute("echo $HERMES_ONESHOT_LOCAL")
|
||||
assert r["output"].strip() == ""
|
||||
env.cleanup()
|
||||
|
||||
|
||||
class TestLocalPersistent:
|
||||
@pytest.fixture
|
||||
def env(self):
|
||||
e = LocalEnvironment(persistent=True)
|
||||
yield e
|
||||
e.cleanup()
|
||||
|
||||
def test_echo(self, env):
|
||||
r = env.execute("echo hello-persistent")
|
||||
assert r["returncode"] == 0
|
||||
assert "hello-persistent" in r["output"]
|
||||
|
||||
def test_env_var_persists(self, env):
|
||||
env.execute("export HERMES_LOCAL_PERSIST_TEST=works")
|
||||
r = env.execute("echo $HERMES_LOCAL_PERSIST_TEST")
|
||||
assert r["output"].strip() == "works"
|
||||
|
||||
def test_cwd_persists(self, env):
|
||||
env.execute("cd /tmp")
|
||||
r = env.execute("pwd")
|
||||
assert r["output"].strip() == "/tmp"
|
||||
|
||||
def test_exit_code(self, env):
|
||||
r = env.execute("(exit 42)")
|
||||
assert r["returncode"] == 42
|
||||
|
||||
def test_stderr(self, env):
|
||||
r = env.execute("echo oops >&2")
|
||||
assert r["returncode"] == 0
|
||||
assert "oops" in r["output"]
|
||||
|
||||
def test_multiline_output(self, env):
|
||||
r = env.execute("echo a; echo b; echo c")
|
||||
lines = r["output"].strip().splitlines()
|
||||
assert lines == ["a", "b", "c"]
|
||||
|
||||
def test_timeout_then_recovery(self, env):
|
||||
r = env.execute("sleep 999", timeout=2)
|
||||
assert r["returncode"] in (124, 130)
|
||||
r = env.execute("echo alive")
|
||||
assert r["returncode"] == 0
|
||||
assert "alive" in r["output"]
|
||||
|
||||
def test_large_output(self, env):
|
||||
r = env.execute("seq 1 1000")
|
||||
assert r["returncode"] == 0
|
||||
lines = r["output"].strip().splitlines()
|
||||
assert len(lines) == 1000
|
||||
assert lines[0] == "1"
|
||||
assert lines[-1] == "1000"
|
||||
|
||||
def test_shell_variable_persists(self, env):
|
||||
env.execute("MY_LOCAL_VAR=hello123")
|
||||
r = env.execute("echo $MY_LOCAL_VAR")
|
||||
assert r["output"].strip() == "hello123"
|
||||
|
||||
def test_cleanup_removes_temp_files(self, env):
|
||||
env.execute("echo warmup")
|
||||
prefix = env._temp_prefix
|
||||
assert len(glob_mod.glob(f"{prefix}-*")) > 0
|
||||
env.cleanup()
|
||||
remaining = glob_mod.glob(f"{prefix}-*")
|
||||
assert remaining == []
|
||||
|
||||
def test_state_does_not_leak_between_instances(self):
|
||||
env1 = LocalEnvironment(persistent=True)
|
||||
env2 = LocalEnvironment(persistent=True)
|
||||
try:
|
||||
env1.execute("export LEAK_TEST=from_env1")
|
||||
r = env2.execute("echo $LEAK_TEST")
|
||||
assert r["output"].strip() == ""
|
||||
finally:
|
||||
env1.cleanup()
|
||||
env2.cleanup()
|
||||
|
||||
def test_special_characters_in_command(self, env):
|
||||
r = env.execute("echo 'hello world'")
|
||||
assert r["output"].strip() == "hello world"
|
||||
|
||||
def test_pipe_command(self, env):
|
||||
r = env.execute("echo hello | tr 'h' 'H'")
|
||||
assert r["output"].strip() == "Hello"
|
||||
|
||||
def test_multiple_commands_semicolon(self, env):
|
||||
r = env.execute("X=42; echo $X")
|
||||
assert r["output"].strip() == "42"
|
||||
167
tests/tools/test_ssh_environment.py
Normal file
167
tests/tools/test_ssh_environment.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""Tests for the SSH remote execution environment backend."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.environments.ssh import SSHEnvironment
|
||||
|
||||
_SSH_HOST = os.getenv("TERMINAL_SSH_HOST", "")
|
||||
_SSH_USER = os.getenv("TERMINAL_SSH_USER", "")
|
||||
_SSH_PORT = int(os.getenv("TERMINAL_SSH_PORT", "22"))
|
||||
_SSH_KEY = os.getenv("TERMINAL_SSH_KEY", "")
|
||||
|
||||
_has_ssh = bool(_SSH_HOST and _SSH_USER)
|
||||
|
||||
requires_ssh = pytest.mark.skipif(
|
||||
not _has_ssh,
|
||||
reason="TERMINAL_SSH_HOST / TERMINAL_SSH_USER not set",
|
||||
)
|
||||
|
||||
|
||||
def _run(command, task_id="ssh_test", **kwargs):
|
||||
from tools.terminal_tool import terminal_tool
|
||||
return json.loads(terminal_tool(command, task_id=task_id, **kwargs))
|
||||
|
||||
|
||||
def _cleanup(task_id="ssh_test"):
|
||||
from tools.terminal_tool import cleanup_vm
|
||||
cleanup_vm(task_id)
|
||||
|
||||
|
||||
class TestBuildSSHCommand:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _mock_connection(self, monkeypatch):
|
||||
monkeypatch.setattr("tools.environments.ssh.subprocess.run",
|
||||
lambda *a, **k: subprocess.CompletedProcess([], 0))
|
||||
monkeypatch.setattr("tools.environments.ssh.subprocess.Popen",
|
||||
lambda *a, **k: MagicMock(stdout=iter([]),
|
||||
stderr=iter([]),
|
||||
stdin=MagicMock()))
|
||||
monkeypatch.setattr("tools.environments.ssh.time.sleep", lambda _: None)
|
||||
|
||||
def test_base_flags(self):
|
||||
env = SSHEnvironment(host="h", user="u")
|
||||
cmd = " ".join(env._build_ssh_command())
|
||||
for flag in ("ControlMaster=auto", "ControlPersist=300",
|
||||
"BatchMode=yes", "StrictHostKeyChecking=accept-new"):
|
||||
assert flag in cmd
|
||||
|
||||
def test_custom_port(self):
|
||||
env = SSHEnvironment(host="h", user="u", port=2222)
|
||||
cmd = env._build_ssh_command()
|
||||
assert "-p" in cmd and "2222" in cmd
|
||||
|
||||
def test_key_path(self):
|
||||
env = SSHEnvironment(host="h", user="u", key_path="/k")
|
||||
cmd = env._build_ssh_command()
|
||||
assert "-i" in cmd and "/k" in cmd
|
||||
|
||||
def test_user_host_suffix(self):
|
||||
env = SSHEnvironment(host="h", user="u")
|
||||
assert env._build_ssh_command()[-1] == "u@h"
|
||||
|
||||
|
||||
class TestTerminalToolConfig:
|
||||
def test_ssh_persistent_default_false(self, monkeypatch):
|
||||
monkeypatch.delenv("TERMINAL_SSH_PERSISTENT", raising=False)
|
||||
from tools.terminal_tool import _get_env_config
|
||||
assert _get_env_config()["ssh_persistent"] is False
|
||||
|
||||
def test_ssh_persistent_true(self, monkeypatch):
|
||||
monkeypatch.setenv("TERMINAL_SSH_PERSISTENT", "true")
|
||||
from tools.terminal_tool import _get_env_config
|
||||
assert _get_env_config()["ssh_persistent"] is True
|
||||
|
||||
|
||||
def _setup_ssh_env(monkeypatch, persistent: bool):
|
||||
monkeypatch.setenv("TERMINAL_ENV", "ssh")
|
||||
monkeypatch.setenv("TERMINAL_SSH_HOST", _SSH_HOST)
|
||||
monkeypatch.setenv("TERMINAL_SSH_USER", _SSH_USER)
|
||||
monkeypatch.setenv("TERMINAL_SSH_PERSISTENT", "true" if persistent else "false")
|
||||
if _SSH_PORT != 22:
|
||||
monkeypatch.setenv("TERMINAL_SSH_PORT", str(_SSH_PORT))
|
||||
if _SSH_KEY:
|
||||
monkeypatch.setenv("TERMINAL_SSH_KEY", _SSH_KEY)
|
||||
|
||||
|
||||
@requires_ssh
|
||||
class TestOneShotSSH:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup(self, monkeypatch):
|
||||
_setup_ssh_env(monkeypatch, persistent=False)
|
||||
yield
|
||||
_cleanup()
|
||||
|
||||
def test_echo(self):
|
||||
r = _run("echo hello")
|
||||
assert r["exit_code"] == 0
|
||||
assert "hello" in r["output"]
|
||||
|
||||
def test_exit_code(self):
|
||||
r = _run("exit 42")
|
||||
assert r["exit_code"] == 42
|
||||
|
||||
def test_state_does_not_persist(self):
|
||||
_run("export HERMES_ONESHOT_TEST=yes")
|
||||
r = _run("echo $HERMES_ONESHOT_TEST")
|
||||
assert r["output"].strip() == ""
|
||||
|
||||
|
||||
@requires_ssh
|
||||
class TestPersistentSSH:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup(self, monkeypatch):
|
||||
_setup_ssh_env(monkeypatch, persistent=True)
|
||||
yield
|
||||
_cleanup()
|
||||
|
||||
def test_echo(self):
|
||||
r = _run("echo hello-persistent")
|
||||
assert r["exit_code"] == 0
|
||||
assert "hello-persistent" in r["output"]
|
||||
|
||||
def test_env_var_persists(self):
|
||||
_run("export HERMES_PERSIST_TEST=works")
|
||||
r = _run("echo $HERMES_PERSIST_TEST")
|
||||
assert r["output"].strip() == "works"
|
||||
|
||||
def test_cwd_persists(self):
|
||||
_run("cd /tmp")
|
||||
r = _run("pwd")
|
||||
assert r["output"].strip() == "/tmp"
|
||||
|
||||
def test_exit_code(self):
|
||||
r = _run("(exit 42)")
|
||||
assert r["exit_code"] == 42
|
||||
|
||||
def test_stderr(self):
|
||||
r = _run("echo oops >&2")
|
||||
assert r["exit_code"] == 0
|
||||
assert "oops" in r["output"]
|
||||
|
||||
def test_multiline_output(self):
|
||||
r = _run("echo a; echo b; echo c")
|
||||
lines = r["output"].strip().splitlines()
|
||||
assert lines == ["a", "b", "c"]
|
||||
|
||||
def test_timeout_then_recovery(self):
|
||||
r = _run("sleep 999", timeout=2)
|
||||
assert r["exit_code"] == 124
|
||||
r = _run("echo alive")
|
||||
assert r["exit_code"] == 0
|
||||
assert "alive" in r["output"]
|
||||
|
||||
def test_large_output(self):
|
||||
r = _run("seq 1 1000")
|
||||
assert r["exit_code"] == 0
|
||||
lines = r["output"].strip().splitlines()
|
||||
assert len(lines) == 1000
|
||||
assert lines[0] == "1"
|
||||
assert lines[-1] == "1000"
|
||||
@@ -72,6 +72,7 @@ def _origin_from_env() -> Optional[Dict[str, str]]:
|
||||
"platform": origin_platform,
|
||||
"chat_id": origin_chat_id,
|
||||
"chat_name": os.getenv("HERMES_SESSION_CHAT_NAME"),
|
||||
"thread_id": os.getenv("HERMES_SESSION_THREAD_ID"),
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Local execution environment with interrupt support and non-blocking I/O."""
|
||||
|
||||
import glob
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
@@ -11,6 +12,8 @@ import time
|
||||
_IS_WINDOWS = platform.system() == "Windows"
|
||||
|
||||
from tools.environments.base import BaseEnvironment
|
||||
from tools.environments.persistent_shell import PersistentShellMixin
|
||||
from tools.interrupt import is_interrupted
|
||||
|
||||
# Unique marker to isolate real command output from shell init/exit noise.
|
||||
# printf (no trailing newline) keeps the boundaries clean for splitting.
|
||||
@@ -244,6 +247,25 @@ def _clean_shell_noise(output: str) -> str:
|
||||
return result
|
||||
|
||||
|
||||
_SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
||||
|
||||
|
||||
def _make_run_env(env: dict) -> dict:
|
||||
"""Build a run environment with a sane PATH and provider-var stripping."""
|
||||
merged = dict(os.environ | env)
|
||||
run_env = {}
|
||||
for k, v in merged.items():
|
||||
if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
|
||||
real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
|
||||
run_env[real_key] = v
|
||||
elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST:
|
||||
run_env[k] = v
|
||||
existing_path = run_env.get("PATH", "")
|
||||
if "/usr/bin" not in existing_path.split(":"):
|
||||
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH
|
||||
return run_env
|
||||
|
||||
|
||||
def _extract_fenced_output(raw: str) -> str:
|
||||
"""Extract real command output from between fence markers.
|
||||
|
||||
@@ -268,7 +290,7 @@ def _extract_fenced_output(raw: str) -> str:
|
||||
return raw[start:last]
|
||||
|
||||
|
||||
class LocalEnvironment(BaseEnvironment):
|
||||
class LocalEnvironment(PersistentShellMixin, BaseEnvironment):
|
||||
"""Run commands directly on the host machine.
|
||||
|
||||
Features:
|
||||
@@ -277,24 +299,66 @@ class LocalEnvironment(BaseEnvironment):
|
||||
- stdin_data support for piping content (bypasses ARG_MAX limits)
|
||||
- sudo -S transform via SUDO_PASSWORD env var
|
||||
- Uses interactive login shell so full user env is available
|
||||
- Optional persistent shell mode (cwd/env vars survive across calls)
|
||||
"""
|
||||
|
||||
def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None):
|
||||
def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None,
|
||||
persistent: bool = False):
|
||||
super().__init__(cwd=cwd or os.getcwd(), timeout=timeout, env=env)
|
||||
self.persistent = persistent
|
||||
if self.persistent:
|
||||
self._init_persistent_shell()
|
||||
|
||||
def execute(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
from tools.terminal_tool import _interrupt_event
|
||||
@property
|
||||
def _temp_prefix(self) -> str:
|
||||
return f"/tmp/hermes-local-{self._session_id}"
|
||||
|
||||
def _spawn_shell_process(self) -> subprocess.Popen:
|
||||
user_shell = _find_bash()
|
||||
run_env = _make_run_env(self.env)
|
||||
return subprocess.Popen(
|
||||
[user_shell, "-l"],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
env=run_env,
|
||||
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
||||
)
|
||||
|
||||
def _read_temp_files(self, *paths: str) -> list[str]:
|
||||
results = []
|
||||
for path in paths:
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
results.append(f.read())
|
||||
else:
|
||||
results.append("")
|
||||
return results
|
||||
|
||||
def _kill_shell_children(self):
|
||||
if self._shell_pid is None:
|
||||
return
|
||||
try:
|
||||
subprocess.run(
|
||||
["pkill", "-P", str(self._shell_pid)],
|
||||
capture_output=True, timeout=5,
|
||||
)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
def _cleanup_temp_files(self):
|
||||
for f in glob.glob(f"{self._temp_prefix}-*"):
|
||||
if os.path.exists(f):
|
||||
os.remove(f)
|
||||
|
||||
def _execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
work_dir = cwd or self.cwd or os.getcwd()
|
||||
effective_timeout = timeout or self.timeout
|
||||
exec_command, sudo_stdin = self._prepare_command(command)
|
||||
|
||||
# Merge the sudo password (if any) with caller-supplied stdin_data.
|
||||
# sudo -S reads exactly one line (the password) then passes the rest
|
||||
# of stdin to the child, so prepending is safe even when stdin_data
|
||||
# is also present.
|
||||
if sudo_stdin is not None and stdin_data is not None:
|
||||
effective_stdin = sudo_stdin + stdin_data
|
||||
elif sudo_stdin is not None:
|
||||
@@ -302,110 +366,87 @@ class LocalEnvironment(BaseEnvironment):
|
||||
else:
|
||||
effective_stdin = stdin_data
|
||||
|
||||
try:
|
||||
# The fence wrapper uses bash syntax (semicolons, $?, printf).
|
||||
# Always use bash for the wrapper — NOT $SHELL which could be
|
||||
# fish, zsh, or another shell with incompatible syntax.
|
||||
# The -lic flags source rc files so tools like nvm/pyenv work.
|
||||
user_shell = _find_bash()
|
||||
# Wrap with output fences so we can later extract the real
|
||||
# command output and discard shell init/exit noise.
|
||||
fenced_cmd = (
|
||||
f"printf '{_OUTPUT_FENCE}';"
|
||||
f" {exec_command};"
|
||||
f" __hermes_rc=$?;"
|
||||
f" printf '{_OUTPUT_FENCE}';"
|
||||
f" exit $__hermes_rc"
|
||||
)
|
||||
# Ensure PATH always includes standard dirs — systemd services
|
||||
# and some terminal multiplexers inherit a minimal PATH.
|
||||
_SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
||||
# Strip Hermes-managed provider/tool/gateway vars so external CLIs
|
||||
# are not silently misrouted or handed Hermes secrets. Callers that
|
||||
# truly need a blocked var can opt in by prefixing the key with
|
||||
# _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY).
|
||||
run_env = _sanitize_subprocess_env(os.environ, self.env)
|
||||
existing_path = run_env.get("PATH", "")
|
||||
if "/usr/bin" not in existing_path.split(":"):
|
||||
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH
|
||||
user_shell = _find_bash()
|
||||
fenced_cmd = (
|
||||
f"printf '{_OUTPUT_FENCE}';"
|
||||
f" {exec_command};"
|
||||
f" __hermes_rc=$?;"
|
||||
f" printf '{_OUTPUT_FENCE}';"
|
||||
f" exit $__hermes_rc"
|
||||
)
|
||||
run_env = _make_run_env(self.env)
|
||||
|
||||
proc = subprocess.Popen(
|
||||
[user_shell, "-lic", fenced_cmd],
|
||||
text=True,
|
||||
cwd=work_dir,
|
||||
env=run_env,
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.PIPE if effective_stdin is not None else subprocess.DEVNULL,
|
||||
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
||||
)
|
||||
proc = subprocess.Popen(
|
||||
[user_shell, "-lic", fenced_cmd],
|
||||
text=True,
|
||||
cwd=work_dir,
|
||||
env=run_env,
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.PIPE if effective_stdin is not None else subprocess.DEVNULL,
|
||||
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
||||
)
|
||||
|
||||
if effective_stdin is not None:
|
||||
def _write_stdin():
|
||||
try:
|
||||
proc.stdin.write(effective_stdin)
|
||||
proc.stdin.close()
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
threading.Thread(target=_write_stdin, daemon=True).start()
|
||||
|
||||
_output_chunks: list[str] = []
|
||||
|
||||
def _drain_stdout():
|
||||
if effective_stdin is not None:
|
||||
def _write_stdin():
|
||||
try:
|
||||
for line in proc.stdout:
|
||||
_output_chunks.append(line)
|
||||
except ValueError:
|
||||
proc.stdin.write(effective_stdin)
|
||||
proc.stdin.close()
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
proc.stdout.close()
|
||||
except Exception:
|
||||
pass
|
||||
threading.Thread(target=_write_stdin, daemon=True).start()
|
||||
|
||||
reader = threading.Thread(target=_drain_stdout, daemon=True)
|
||||
reader.start()
|
||||
deadline = time.monotonic() + effective_timeout
|
||||
_output_chunks: list[str] = []
|
||||
|
||||
while proc.poll() is None:
|
||||
if _interrupt_event.is_set():
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
proc.terminate()
|
||||
else:
|
||||
pgid = os.getpgid(proc.pid)
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
try:
|
||||
proc.wait(timeout=1.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return {
|
||||
"output": "".join(_output_chunks) + "\n[Command interrupted — user sent a new message]",
|
||||
"returncode": 130,
|
||||
}
|
||||
if time.monotonic() > deadline:
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
proc.terminate()
|
||||
else:
|
||||
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return self._timeout_result(effective_timeout)
|
||||
time.sleep(0.2)
|
||||
def _drain_stdout():
|
||||
try:
|
||||
for line in proc.stdout:
|
||||
_output_chunks.append(line)
|
||||
except ValueError:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
proc.stdout.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
reader.join(timeout=5)
|
||||
output = _extract_fenced_output("".join(_output_chunks))
|
||||
return {"output": output, "returncode": proc.returncode}
|
||||
reader = threading.Thread(target=_drain_stdout, daemon=True)
|
||||
reader.start()
|
||||
deadline = time.monotonic() + effective_timeout
|
||||
|
||||
except Exception as e:
|
||||
return {"output": f"Execution error: {str(e)}", "returncode": 1}
|
||||
while proc.poll() is None:
|
||||
if is_interrupted():
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
proc.terminate()
|
||||
else:
|
||||
pgid = os.getpgid(proc.pid)
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
try:
|
||||
proc.wait(timeout=1.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return {
|
||||
"output": "".join(_output_chunks) + "\n[Command interrupted — user sent a new message]",
|
||||
"returncode": 130,
|
||||
}
|
||||
if time.monotonic() > deadline:
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
proc.terminate()
|
||||
else:
|
||||
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return self._timeout_result(effective_timeout)
|
||||
time.sleep(0.2)
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
reader.join(timeout=5)
|
||||
output = _extract_fenced_output("".join(_output_chunks))
|
||||
return {"output": output, "returncode": proc.returncode}
|
||||
|
||||
272
tools/environments/persistent_shell.py
Normal file
272
tools/environments/persistent_shell.py
Normal file
@@ -0,0 +1,272 @@
|
||||
"""Persistent shell mixin: file-based IPC protocol for long-lived bash shells."""
|
||||
|
||||
import logging
|
||||
import shlex
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from abc import abstractmethod
|
||||
|
||||
from tools.interrupt import is_interrupted
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PersistentShellMixin:
|
||||
"""Mixin that adds persistent shell capability to any BaseEnvironment.
|
||||
|
||||
Subclasses must implement ``_spawn_shell_process()``, ``_read_temp_files()``,
|
||||
``_kill_shell_children()``, ``_execute_oneshot()``, and ``_cleanup_temp_files()``.
|
||||
"""
|
||||
|
||||
persistent: bool
|
||||
|
||||
@abstractmethod
|
||||
def _spawn_shell_process(self) -> subprocess.Popen: ...
|
||||
|
||||
@abstractmethod
|
||||
def _read_temp_files(self, *paths: str) -> list[str]: ...
|
||||
|
||||
@abstractmethod
|
||||
def _kill_shell_children(self): ...
|
||||
|
||||
@abstractmethod
|
||||
def _execute_oneshot(self, command: str, cwd: str, *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict: ...
|
||||
|
||||
@abstractmethod
|
||||
def _cleanup_temp_files(self): ...
|
||||
|
||||
_session_id: str = ""
|
||||
_poll_interval: float = 0.01
|
||||
|
||||
@property
|
||||
def _temp_prefix(self) -> str:
|
||||
return f"/tmp/hermes-persistent-{self._session_id}"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _init_persistent_shell(self):
|
||||
self._shell_lock = threading.Lock()
|
||||
self._shell_proc: subprocess.Popen | None = None
|
||||
self._shell_alive: bool = False
|
||||
self._shell_pid: int | None = None
|
||||
|
||||
self._session_id = uuid.uuid4().hex[:12]
|
||||
p = self._temp_prefix
|
||||
self._pshell_stdout = f"{p}-stdout"
|
||||
self._pshell_stderr = f"{p}-stderr"
|
||||
self._pshell_status = f"{p}-status"
|
||||
self._pshell_cwd = f"{p}-cwd"
|
||||
self._pshell_pid_file = f"{p}-pid"
|
||||
|
||||
self._shell_proc = self._spawn_shell_process()
|
||||
self._shell_alive = True
|
||||
|
||||
self._drain_thread = threading.Thread(
|
||||
target=self._drain_shell_output, daemon=True,
|
||||
)
|
||||
self._drain_thread.start()
|
||||
|
||||
init_script = (
|
||||
f"export TERM=${{TERM:-dumb}}\n"
|
||||
f"touch {self._pshell_stdout} {self._pshell_stderr} "
|
||||
f"{self._pshell_status} {self._pshell_cwd} {self._pshell_pid_file}\n"
|
||||
f"echo $$ > {self._pshell_pid_file}\n"
|
||||
f"pwd > {self._pshell_cwd}\n"
|
||||
)
|
||||
self._send_to_shell(init_script)
|
||||
|
||||
deadline = time.monotonic() + 3.0
|
||||
while time.monotonic() < deadline:
|
||||
pid_str = self._read_temp_files(self._pshell_pid_file)[0].strip()
|
||||
if pid_str.isdigit():
|
||||
self._shell_pid = int(pid_str)
|
||||
break
|
||||
time.sleep(0.05)
|
||||
else:
|
||||
logger.warning("Could not read persistent shell PID")
|
||||
self._shell_pid = None
|
||||
|
||||
if self._shell_pid:
|
||||
logger.info(
|
||||
"Persistent shell started (session=%s, pid=%d)",
|
||||
self._session_id, self._shell_pid,
|
||||
)
|
||||
|
||||
reported_cwd = self._read_temp_files(self._pshell_cwd)[0].strip()
|
||||
if reported_cwd:
|
||||
self.cwd = reported_cwd
|
||||
|
||||
def _cleanup_persistent_shell(self):
|
||||
if self._shell_proc is None:
|
||||
return
|
||||
|
||||
if self._session_id:
|
||||
self._cleanup_temp_files()
|
||||
|
||||
try:
|
||||
self._shell_proc.stdin.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self._shell_proc.terminate()
|
||||
self._shell_proc.wait(timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
self._shell_proc.kill()
|
||||
|
||||
self._shell_alive = False
|
||||
self._shell_proc = None
|
||||
|
||||
if hasattr(self, "_drain_thread") and self._drain_thread.is_alive():
|
||||
self._drain_thread.join(timeout=1.0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# execute() / cleanup() — shared dispatcher, subclasses inherit
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def execute(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
if self.persistent:
|
||||
return self._execute_persistent(
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
return self._execute_oneshot(
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
if self.persistent:
|
||||
self._cleanup_persistent_shell()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Shell I/O
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _drain_shell_output(self):
|
||||
try:
|
||||
for _ in self._shell_proc.stdout:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
self._shell_alive = False
|
||||
|
||||
def _send_to_shell(self, text: str):
|
||||
if not self._shell_alive or self._shell_proc is None:
|
||||
return
|
||||
try:
|
||||
self._shell_proc.stdin.write(text)
|
||||
self._shell_proc.stdin.flush()
|
||||
except (BrokenPipeError, OSError):
|
||||
self._shell_alive = False
|
||||
|
||||
def _read_persistent_output(self) -> tuple[str, int, str]:
|
||||
stdout, stderr, status_raw, cwd = self._read_temp_files(
|
||||
self._pshell_stdout, self._pshell_stderr,
|
||||
self._pshell_status, self._pshell_cwd,
|
||||
)
|
||||
output = self._merge_output(stdout, stderr)
|
||||
status = status_raw.strip()
|
||||
if ":" in status:
|
||||
status = status.split(":", 1)[1]
|
||||
try:
|
||||
exit_code = int(status.strip())
|
||||
except ValueError:
|
||||
exit_code = 1
|
||||
return output, exit_code, cwd.strip()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _execute_persistent(self, command: str, cwd: str, *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
if not self._shell_alive:
|
||||
logger.info("Persistent shell died, restarting...")
|
||||
self._init_persistent_shell()
|
||||
|
||||
exec_command, sudo_stdin = self._prepare_command(command)
|
||||
effective_timeout = timeout or self.timeout
|
||||
if stdin_data or sudo_stdin:
|
||||
return self._execute_oneshot(
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
with self._shell_lock:
|
||||
return self._execute_persistent_locked(
|
||||
exec_command, cwd, effective_timeout,
|
||||
)
|
||||
|
||||
def _execute_persistent_locked(self, command: str, cwd: str,
|
||||
timeout: int) -> dict:
|
||||
work_dir = cwd or self.cwd
|
||||
cmd_id = uuid.uuid4().hex[:8]
|
||||
truncate = (
|
||||
f": > {self._pshell_stdout}\n"
|
||||
f": > {self._pshell_stderr}\n"
|
||||
f": > {self._pshell_status}\n"
|
||||
)
|
||||
self._send_to_shell(truncate)
|
||||
escaped = command.replace("'", "'\\''")
|
||||
|
||||
ipc_script = (
|
||||
f"cd {shlex.quote(work_dir)}\n"
|
||||
f"eval '{escaped}' < /dev/null > {self._pshell_stdout} 2> {self._pshell_stderr}\n"
|
||||
f"__EC=$?\n"
|
||||
f"pwd > {self._pshell_cwd}\n"
|
||||
f"echo {cmd_id}:$__EC > {self._pshell_status}\n"
|
||||
)
|
||||
self._send_to_shell(ipc_script)
|
||||
deadline = time.monotonic() + timeout
|
||||
poll_interval = self._poll_interval
|
||||
|
||||
while True:
|
||||
if is_interrupted():
|
||||
self._kill_shell_children()
|
||||
output, _, _ = self._read_persistent_output()
|
||||
return {
|
||||
"output": output + "\n[Command interrupted]",
|
||||
"returncode": 130,
|
||||
}
|
||||
|
||||
if time.monotonic() > deadline:
|
||||
self._kill_shell_children()
|
||||
output, _, _ = self._read_persistent_output()
|
||||
if output:
|
||||
return {
|
||||
"output": output + f"\n[Command timed out after {timeout}s]",
|
||||
"returncode": 124,
|
||||
}
|
||||
return self._timeout_result(timeout)
|
||||
|
||||
if not self._shell_alive:
|
||||
return {
|
||||
"output": "Persistent shell died during execution",
|
||||
"returncode": 1,
|
||||
}
|
||||
|
||||
status_content = self._read_temp_files(self._pshell_status)[0].strip()
|
||||
if status_content.startswith(cmd_id + ":"):
|
||||
break
|
||||
|
||||
time.sleep(poll_interval)
|
||||
|
||||
output, exit_code, new_cwd = self._read_persistent_output()
|
||||
if new_cwd:
|
||||
self.cwd = new_cwd
|
||||
return {"output": output, "returncode": exit_code}
|
||||
|
||||
@staticmethod
|
||||
def _merge_output(stdout: str, stderr: str) -> str:
|
||||
parts = []
|
||||
if stdout.strip():
|
||||
parts.append(stdout.rstrip("\n"))
|
||||
if stderr.strip():
|
||||
parts.append(stderr.rstrip("\n"))
|
||||
return "\n".join(parts)
|
||||
@@ -8,12 +8,13 @@ import time
|
||||
from pathlib import Path
|
||||
|
||||
from tools.environments.base import BaseEnvironment
|
||||
from tools.environments.persistent_shell import PersistentShellMixin
|
||||
from tools.interrupt import is_interrupted
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SSHEnvironment(BaseEnvironment):
|
||||
class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
|
||||
"""Run commands on a remote machine over SSH.
|
||||
|
||||
Uses SSH ControlMaster for connection persistence so subsequent
|
||||
@@ -22,22 +23,33 @@ class SSHEnvironment(BaseEnvironment):
|
||||
|
||||
Foreground commands are interruptible: the local ssh process is killed
|
||||
and a remote kill is attempted over the ControlMaster socket.
|
||||
|
||||
When ``persistent=True``, a single long-lived bash shell is kept alive
|
||||
over SSH and state (cwd, env vars, shell variables) persists across
|
||||
``execute()`` calls. Output capture uses file-based IPC on the remote
|
||||
host (stdout/stderr/exit-code written to temp files, polled via fast
|
||||
ControlMaster one-shot reads).
|
||||
"""
|
||||
|
||||
def __init__(self, host: str, user: str, cwd: str = "~",
|
||||
timeout: int = 60, port: int = 22, key_path: str = ""):
|
||||
timeout: int = 60, port: int = 22, key_path: str = "",
|
||||
persistent: bool = False):
|
||||
super().__init__(cwd=cwd, timeout=timeout)
|
||||
self.host = host
|
||||
self.user = user
|
||||
self.port = port
|
||||
self.key_path = key_path
|
||||
self.persistent = persistent
|
||||
|
||||
self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh"
|
||||
self.control_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.control_socket = self.control_dir / f"{user}@{host}:{port}.sock"
|
||||
self._establish_connection()
|
||||
|
||||
def _build_ssh_command(self, extra_args: list = None) -> list:
|
||||
if self.persistent:
|
||||
self._init_persistent_shell()
|
||||
|
||||
def _build_ssh_command(self, extra_args: list | None = None) -> list:
|
||||
cmd = ["ssh"]
|
||||
cmd.extend(["-o", f"ControlPath={self.control_socket}"])
|
||||
cmd.extend(["-o", "ControlMaster=auto"])
|
||||
@@ -65,15 +77,76 @@ class SSHEnvironment(BaseEnvironment):
|
||||
except subprocess.TimeoutExpired:
|
||||
raise RuntimeError(f"SSH connection to {self.user}@{self.host} timed out")
|
||||
|
||||
def execute(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
_poll_interval: float = 0.15
|
||||
|
||||
@property
|
||||
def _temp_prefix(self) -> str:
|
||||
return f"/tmp/hermes-ssh-{self._session_id}"
|
||||
|
||||
def _spawn_shell_process(self) -> subprocess.Popen:
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append("bash -l")
|
||||
return subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
|
||||
def _read_temp_files(self, *paths: str) -> list[str]:
|
||||
if len(paths) == 1:
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(f"cat {paths[0]} 2>/dev/null")
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
return [result.stdout]
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
return [""]
|
||||
|
||||
delim = f"__HERMES_SEP_{self._session_id}__"
|
||||
script = "; ".join(
|
||||
f"cat {p} 2>/dev/null; echo '{delim}'" for p in paths
|
||||
)
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(script)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
parts = result.stdout.split(delim + "\n")
|
||||
return [parts[i] if i < len(parts) else "" for i in range(len(paths))]
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
return [""] * len(paths)
|
||||
|
||||
def _kill_shell_children(self):
|
||||
if self._shell_pid is None:
|
||||
return
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(f"pkill -P {self._shell_pid} 2>/dev/null; true")
|
||||
try:
|
||||
subprocess.run(cmd, capture_output=True, timeout=5)
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
pass
|
||||
|
||||
def _cleanup_temp_files(self):
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(f"rm -f {self._temp_prefix}-*")
|
||||
try:
|
||||
subprocess.run(cmd, capture_output=True, timeout=5)
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
pass
|
||||
|
||||
def _execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
work_dir = cwd or self.cwd
|
||||
exec_command, sudo_stdin = self._prepare_command(command)
|
||||
wrapped = f'cd {work_dir} && {exec_command}'
|
||||
effective_timeout = timeout or self.timeout
|
||||
|
||||
# Merge sudo password (if any) with caller-supplied stdin_data.
|
||||
if sudo_stdin is not None and stdin_data is not None:
|
||||
effective_stdin = sudo_stdin + stdin_data
|
||||
elif sudo_stdin is not None:
|
||||
@@ -82,66 +155,60 @@ class SSHEnvironment(BaseEnvironment):
|
||||
effective_stdin = stdin_data
|
||||
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.extend(["bash", "-c", wrapped])
|
||||
cmd.append(wrapped)
|
||||
|
||||
try:
|
||||
kwargs = self._build_run_kwargs(timeout, effective_stdin)
|
||||
# Remove timeout from kwargs -- we handle it in the poll loop
|
||||
kwargs.pop("timeout", None)
|
||||
kwargs = self._build_run_kwargs(timeout, effective_stdin)
|
||||
kwargs.pop("timeout", None)
|
||||
_output_chunks = []
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
|
||||
_output_chunks = []
|
||||
if effective_stdin:
|
||||
try:
|
||||
proc.stdin.write(effective_stdin)
|
||||
proc.stdin.close()
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
|
||||
text=True,
|
||||
)
|
||||
def _drain():
|
||||
try:
|
||||
for line in proc.stdout:
|
||||
_output_chunks.append(line)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if effective_stdin:
|
||||
reader = threading.Thread(target=_drain, daemon=True)
|
||||
reader.start()
|
||||
deadline = time.monotonic() + effective_timeout
|
||||
|
||||
while proc.poll() is None:
|
||||
if is_interrupted():
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.stdin.write(effective_stdin)
|
||||
proc.stdin.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _drain():
|
||||
try:
|
||||
for line in proc.stdout:
|
||||
_output_chunks.append(line)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
reader = threading.Thread(target=_drain, daemon=True)
|
||||
reader.start()
|
||||
deadline = time.monotonic() + effective_timeout
|
||||
|
||||
while proc.poll() is None:
|
||||
if is_interrupted():
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=1)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return {
|
||||
"output": "".join(_output_chunks) + "\n[Command interrupted]",
|
||||
"returncode": 130,
|
||||
}
|
||||
if time.monotonic() > deadline:
|
||||
proc.wait(timeout=1)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return self._timeout_result(effective_timeout)
|
||||
time.sleep(0.2)
|
||||
reader.join(timeout=2)
|
||||
return {
|
||||
"output": "".join(_output_chunks) + "\n[Command interrupted]",
|
||||
"returncode": 130,
|
||||
}
|
||||
if time.monotonic() > deadline:
|
||||
proc.kill()
|
||||
reader.join(timeout=2)
|
||||
return self._timeout_result(effective_timeout)
|
||||
time.sleep(0.2)
|
||||
|
||||
reader.join(timeout=5)
|
||||
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
|
||||
|
||||
except Exception as e:
|
||||
return {"output": f"SSH execution error: {str(e)}", "returncode": 1}
|
||||
reader.join(timeout=5)
|
||||
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
|
||||
|
||||
def cleanup(self):
|
||||
super().cleanup()
|
||||
if self.control_socket.exists():
|
||||
try:
|
||||
cmd = ["ssh", "-o", f"ControlPath={self.control_socket}",
|
||||
|
||||
@@ -101,12 +101,31 @@ def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
|
||||
"container_persistent": config.get("container_persistent", True),
|
||||
"docker_volumes": config.get("docker_volumes", []),
|
||||
}
|
||||
|
||||
ssh_config = None
|
||||
if env_type == "ssh":
|
||||
ssh_config = {
|
||||
"host": config.get("ssh_host", ""),
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
"persistent": config.get("ssh_persistent", False),
|
||||
}
|
||||
|
||||
local_config = None
|
||||
if env_type == "local":
|
||||
local_config = {
|
||||
"persistent": config.get("local_persistent", False),
|
||||
}
|
||||
|
||||
terminal_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
ssh_config=ssh_config,
|
||||
container_config=container_config,
|
||||
local_config=local_config,
|
||||
task_id=task_id,
|
||||
)
|
||||
|
||||
|
||||
@@ -471,6 +471,8 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
# is running inside the container/remote).
|
||||
if env_type == "local":
|
||||
default_cwd = os.getcwd()
|
||||
elif env_type == "ssh":
|
||||
default_cwd = "~"
|
||||
else:
|
||||
default_cwd = "/root"
|
||||
|
||||
@@ -503,6 +505,8 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
"ssh_user": os.getenv("TERMINAL_SSH_USER", ""),
|
||||
"ssh_port": _parse_env_var("TERMINAL_SSH_PORT", "22"),
|
||||
"ssh_key": os.getenv("TERMINAL_SSH_KEY", ""),
|
||||
"ssh_persistent": os.getenv("TERMINAL_SSH_PERSISTENT", "false").lower() in ("true", "1", "yes"),
|
||||
"local_persistent": os.getenv("TERMINAL_LOCAL_PERSISTENT", "false").lower() in ("true", "1", "yes"),
|
||||
# Container resource config (applies to docker, singularity, modal, daytona -- ignored for local/ssh)
|
||||
"container_cpu": _parse_env_var("TERMINAL_CONTAINER_CPU", "1", float, "number"),
|
||||
"container_memory": _parse_env_var("TERMINAL_CONTAINER_MEMORY", "5120"), # MB (default 5GB)
|
||||
@@ -514,6 +518,7 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
|
||||
def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
|
||||
ssh_config: dict = None, container_config: dict = None,
|
||||
local_config: dict = None,
|
||||
task_id: str = "default"):
|
||||
"""
|
||||
Create an execution environment from mini-swe-agent.
|
||||
@@ -538,7 +543,9 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
|
||||
volumes = cc.get("docker_volumes", [])
|
||||
|
||||
if env_type == "local":
|
||||
return _LocalEnvironment(cwd=cwd, timeout=timeout)
|
||||
lc = local_config or {}
|
||||
return _LocalEnvironment(cwd=cwd, timeout=timeout,
|
||||
persistent=lc.get("persistent", False))
|
||||
|
||||
elif env_type == "docker":
|
||||
return _DockerEnvironment(
|
||||
@@ -594,6 +601,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
|
||||
key_path=ssh_config.get("key", ""),
|
||||
cwd=cwd,
|
||||
timeout=timeout,
|
||||
persistent=ssh_config.get("persistent", False),
|
||||
)
|
||||
|
||||
else:
|
||||
@@ -923,6 +931,7 @@ def terminal_tool(
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
"persistent": config.get("ssh_persistent", False),
|
||||
}
|
||||
|
||||
container_config = None
|
||||
@@ -935,6 +944,12 @@ def terminal_tool(
|
||||
"docker_volumes": config.get("docker_volumes", []),
|
||||
}
|
||||
|
||||
local_config = None
|
||||
if env_type == "local":
|
||||
local_config = {
|
||||
"persistent": config.get("local_persistent", False),
|
||||
}
|
||||
|
||||
new_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
@@ -942,6 +957,7 @@ def terminal_tool(
|
||||
timeout=effective_timeout,
|
||||
ssh_config=ssh_config,
|
||||
container_config=container_config,
|
||||
local_config=local_config,
|
||||
task_id=effective_task_id,
|
||||
)
|
||||
except ImportError as e:
|
||||
|
||||
@@ -130,7 +130,41 @@ When an auxiliary task is configured with provider `main`, Hermes resolves that
|
||||
|
||||
## Fallback models
|
||||
|
||||
Hermes also supports a configured fallback model/provider, allowing runtime failover in supported error paths.
|
||||
Hermes supports a configured fallback model/provider pair, allowing runtime failover when the primary model encounters errors.
|
||||
|
||||
### How it works internally
|
||||
|
||||
1. **Storage**: `AIAgent.__init__` stores the `fallback_model` dict and sets `_fallback_activated = False`.
|
||||
|
||||
2. **Trigger points**: `_try_activate_fallback()` is called from three places in the main retry loop in `run_agent.py`:
|
||||
- After max retries on invalid API responses (None choices, missing content)
|
||||
- On non-retryable client errors (HTTP 401, 403, 404)
|
||||
- After max retries on transient errors (HTTP 429, 500, 502, 503)
|
||||
|
||||
3. **Activation flow** (`_try_activate_fallback`):
|
||||
- Returns `False` immediately if already activated or not configured
|
||||
- Calls `resolve_provider_client()` from `auxiliary_client.py` to build a new client with proper auth
|
||||
- Determines `api_mode`: `codex_responses` for openai-codex, `anthropic_messages` for anthropic, `chat_completions` for everything else
|
||||
- Swaps in-place: `self.model`, `self.provider`, `self.base_url`, `self.api_mode`, `self.client`, `self._client_kwargs`
|
||||
- For anthropic fallback: builds a native Anthropic client instead of OpenAI-compatible
|
||||
- Re-evaluates prompt caching (enabled for Claude models on OpenRouter)
|
||||
- Sets `_fallback_activated = True` — prevents firing again
|
||||
- Resets retry count to 0 and continues the loop
|
||||
|
||||
4. **Config flow**:
|
||||
- CLI: `cli.py` reads `CLI_CONFIG["fallback_model"]` → passes to `AIAgent(fallback_model=...)`
|
||||
- Gateway: `gateway/run.py._load_fallback_model()` reads `config.yaml` → passes to `AIAgent`
|
||||
- Validation: both `provider` and `model` keys must be non-empty, or fallback is disabled
|
||||
|
||||
### What does NOT support fallback
|
||||
|
||||
- **Subagent delegation** (`tools/delegate_tool.py`): subagents inherit the parent's provider but not the fallback config
|
||||
- **Cron jobs** (`cron/`): run with a fixed provider, no fallback mechanism
|
||||
- **Auxiliary tasks**: use their own independent provider auto-detection chain (see Auxiliary model routing above)
|
||||
|
||||
### Test coverage
|
||||
|
||||
See `tests/test_fallback_model.py` for comprehensive tests covering all supported providers, one-shot semantics, and edge cases.
|
||||
|
||||
## Related docs
|
||||
|
||||
|
||||
@@ -164,6 +164,7 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
|
||||
| `HERMES_QUIET` | Suppress non-essential output (`true`/`false`) |
|
||||
| `HERMES_API_TIMEOUT` | LLM API call timeout in seconds (default: `900`) |
|
||||
| `HERMES_EXEC_ASK` | Enable execution approval prompts in gateway mode (`true`/`false`) |
|
||||
| `HERMES_BACKGROUND_NOTIFICATIONS` | Background process notification mode in gateway: `all` (default), `result`, `error`, `off` |
|
||||
|
||||
## Session Settings
|
||||
|
||||
@@ -197,6 +198,18 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
|
||||
|
||||
For task-specific direct endpoints, Hermes uses the task's configured API key or `OPENAI_API_KEY`. It does not reuse `OPENROUTER_API_KEY` for those custom endpoints.
|
||||
|
||||
## Fallback Model (config.yaml only)
|
||||
|
||||
The primary model fallback is configured exclusively through `config.yaml` — there are no environment variables for it. Add a `fallback_model` section with `provider` and `model` keys to enable automatic failover when your main model encounters errors.
|
||||
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: openrouter
|
||||
model: anthropic/claude-sonnet-4
|
||||
```
|
||||
|
||||
See [Fallback Providers](/docs/user-guide/features/fallback-providers) for full details.
|
||||
|
||||
## Provider Routing (config.yaml only)
|
||||
|
||||
These go in `~/.hermes/config.yaml` under the `provider_routing` section:
|
||||
|
||||
@@ -31,7 +31,7 @@ Type `/` in the CLI to open the autocomplete menu. Built-in commands are case-in
|
||||
| `/title` | Set a title for the current session (usage: /title My Session Name) |
|
||||
| `/compress` | Manually compress conversation context (flush memories + summarize) |
|
||||
| `/rollback` | List or restore filesystem checkpoints (usage: /rollback [number]) |
|
||||
| `/background` | Run a prompt in the background (usage: /background <prompt>) |
|
||||
| `/background <prompt>` | Run a prompt in a separate background session. The agent processes your prompt independently — your current session stays free for other work. Results appear as a panel when the task finishes. See [CLI Background Sessions](/docs/user-guide/cli#background-sessions). |
|
||||
| `/plan [request]` | Load the bundled `plan` skill to write a markdown plan instead of executing the work. Plans are saved under `.hermes/plans/` relative to the active workspace/backend working directory. |
|
||||
|
||||
### Configuration
|
||||
@@ -109,7 +109,7 @@ The messaging gateway supports the following built-in commands inside Telegram,
|
||||
| `/reasoning [level\|show\|hide]` | Change reasoning effort or toggle reasoning display. |
|
||||
| `/voice [on\|off\|tts\|join\|channel\|leave\|status]` | Control spoken replies in chat. `join`/`channel`/`leave` manage Discord voice-channel mode. |
|
||||
| `/rollback [number]` | List or restore filesystem checkpoints. |
|
||||
| `/background <prompt>` | Run a prompt in a separate background session. |
|
||||
| `/background <prompt>` | Run a prompt in a separate background session. Results are delivered back to the same chat when the task finishes. See [Messaging Background Sessions](/docs/user-guide/messaging/#background-sessions). |
|
||||
| `/plan [request]` | Load the bundled `plan` skill to write a markdown plan instead of executing the work. Plans are saved under `.hermes/plans/` relative to the active workspace/backend working directory. |
|
||||
| `/reload-mcp` | Reload MCP servers from config. |
|
||||
| `/update` | Update Hermes Agent to the latest version. |
|
||||
@@ -119,6 +119,6 @@ The messaging gateway supports the following built-in commands inside Telegram,
|
||||
## Notes
|
||||
|
||||
- `/skin`, `/tools`, `/toolsets`, `/config`, `/prompt`, `/cron`, `/skills`, `/platforms`, `/paste`, and `/verbose` are **CLI-only** commands.
|
||||
- `/status`, `/stop`, `/sethome`, `/resume`, `/background`, and `/update` are **messaging-only** commands.
|
||||
- `/voice`, `/reload-mcp`, and `/rollback` work in **both** the CLI and the messaging gateway.
|
||||
- `/status`, `/stop`, `/sethome`, `/resume`, and `/update` are **messaging-only** commands.
|
||||
- `/background`, `/voice`, `/reload-mcp`, and `/rollback` work in **both** the CLI and the messaging gateway.
|
||||
- `/voice join`, `/voice channel`, and `/voice leave` are only meaningful on Discord.
|
||||
|
||||
@@ -259,6 +259,55 @@ compression:
|
||||
|
||||
When compression triggers, middle turns are summarized while the first 3 and last 4 turns are always preserved.
|
||||
|
||||
## Background Sessions
|
||||
|
||||
Run a prompt in a separate background session while continuing to use the CLI for other work:
|
||||
|
||||
```
|
||||
/background Analyze the logs in /var/log and summarize any errors from today
|
||||
```
|
||||
|
||||
Hermes immediately confirms the task and gives you back the prompt:
|
||||
|
||||
```
|
||||
🔄 Background task #1 started: "Analyze the logs in /var/log and summarize..."
|
||||
Task ID: bg_143022_a1b2c3
|
||||
```
|
||||
|
||||
### How It Works
|
||||
|
||||
Each `/background` prompt spawns a **completely separate agent session** in a daemon thread:
|
||||
|
||||
- **Isolated conversation** — the background agent has no knowledge of your current session's history. It receives only the prompt you provide.
|
||||
- **Same configuration** — the background agent inherits your model, provider, toolsets, reasoning settings, and fallback model from the current session.
|
||||
- **Non-blocking** — your foreground session stays fully interactive. You can chat, run commands, or even start more background tasks.
|
||||
- **Multiple tasks** — you can run several background tasks simultaneously. Each gets a numbered ID.
|
||||
|
||||
### Results
|
||||
|
||||
When a background task finishes, the result appears as a panel in your terminal:
|
||||
|
||||
```
|
||||
╭─ ⚕ Hermes (background #1) ──────────────────────────────────╮
|
||||
│ Found 3 errors in syslog from today: │
|
||||
│ 1. OOM killer invoked at 03:22 — killed process nginx │
|
||||
│ 2. Disk I/O error on /dev/sda1 at 07:15 │
|
||||
│ 3. Failed SSH login attempts from 192.168.1.50 at 14:30 │
|
||||
╰──────────────────────────────────────────────────────────────╯
|
||||
```
|
||||
|
||||
If the task fails, you'll see an error notification instead. If `display.bell_on_complete` is enabled in your config, the terminal bell rings when the task finishes.
|
||||
|
||||
### Use Cases
|
||||
|
||||
- **Long-running research** — "/background research the latest developments in quantum error correction" while you work on code
|
||||
- **File processing** — "/background analyze all Python files in this repo and list any security issues" while you continue a conversation
|
||||
- **Parallel investigations** — start multiple background tasks to explore different angles simultaneously
|
||||
|
||||
:::info
|
||||
Background sessions do not appear in your main conversation history. They are standalone sessions with their own task ID (e.g., `bg_143022_a1b2c3`).
|
||||
:::
|
||||
|
||||
## Quiet Mode
|
||||
|
||||
By default, the CLI runs in quiet mode which:
|
||||
|
||||
@@ -421,6 +421,26 @@ provider_routing:
|
||||
|
||||
**Shortcuts:** Append `:nitro` to any model name for throughput sorting (e.g., `anthropic/claude-sonnet-4:nitro`), or `:floor` for price sorting.
|
||||
|
||||
## Fallback Model
|
||||
|
||||
Configure a backup provider:model that Hermes switches to automatically when your primary model fails (rate limits, server errors, auth failures):
|
||||
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: openrouter # required
|
||||
model: anthropic/claude-sonnet-4 # required
|
||||
# base_url: http://localhost:8000/v1 # optional, for custom endpoints
|
||||
# api_key_env: MY_CUSTOM_KEY # optional, env var name for custom endpoint API key
|
||||
```
|
||||
|
||||
When activated, the fallback swaps the model and provider mid-session without losing your conversation. It fires **at most once** per session.
|
||||
|
||||
Supported providers: `openrouter`, `nous`, `openai-codex`, `anthropic`, `zai`, `kimi-coding`, `minimax`, `minimax-cn`, `custom`.
|
||||
|
||||
:::tip
|
||||
Fallback is configured exclusively through `config.yaml` — there are no environment variables for it. For full details on when it triggers, supported providers, and how it interacts with auxiliary tasks and delegation, see [Fallback Providers](/docs/user-guide/features/fallback-providers).
|
||||
:::
|
||||
|
||||
## Terminal Backend Configuration
|
||||
|
||||
Configure which environment the agent uses for terminal commands:
|
||||
@@ -733,6 +753,7 @@ display:
|
||||
resume_display: full # full (show previous messages on resume) | minimal (one-liner only)
|
||||
bell_on_complete: false # Play terminal bell when agent finishes (great for long tasks)
|
||||
show_reasoning: false # Show model reasoning/thinking above each response (toggle with /reasoning show|hide)
|
||||
background_process_notifications: all # all | result | error | off (gateway only)
|
||||
```
|
||||
|
||||
| Mode | What you see |
|
||||
|
||||
311
website/docs/user-guide/features/fallback-providers.md
Normal file
311
website/docs/user-guide/features/fallback-providers.md
Normal file
@@ -0,0 +1,311 @@
|
||||
---
|
||||
title: Fallback Providers
|
||||
description: Configure automatic failover to backup LLM providers when your primary model is unavailable.
|
||||
sidebar_label: Fallback Providers
|
||||
sidebar_position: 8
|
||||
---
|
||||
|
||||
# Fallback Providers
|
||||
|
||||
Hermes Agent has two separate fallback systems that keep your sessions running when providers hit issues:
|
||||
|
||||
1. **Primary model fallback** — automatically switches to a backup provider:model when your main model fails
|
||||
2. **Auxiliary task fallback** — independent provider resolution for side tasks like vision, compression, and web extraction
|
||||
|
||||
Both are optional and work independently.
|
||||
|
||||
## Primary Model Fallback
|
||||
|
||||
When your main LLM provider encounters errors — rate limits, server overload, auth failures, connection drops — Hermes can automatically switch to a backup provider:model pair mid-session without losing your conversation.
|
||||
|
||||
### Configuration
|
||||
|
||||
Add a `fallback_model` section to `~/.hermes/config.yaml`:
|
||||
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: openrouter
|
||||
model: anthropic/claude-sonnet-4
|
||||
```
|
||||
|
||||
Both `provider` and `model` are **required**. If either is missing, the fallback is disabled.
|
||||
|
||||
### Supported Providers
|
||||
|
||||
| Provider | Value | Requirements |
|
||||
|----------|-------|-------------|
|
||||
| OpenRouter | `openrouter` | `OPENROUTER_API_KEY` |
|
||||
| Nous Portal | `nous` | `hermes login` (OAuth) |
|
||||
| OpenAI Codex | `openai-codex` | `hermes model` (ChatGPT OAuth) |
|
||||
| Anthropic | `anthropic` | `ANTHROPIC_API_KEY` or Claude Code credentials |
|
||||
| z.ai / GLM | `zai` | `GLM_API_KEY` |
|
||||
| Kimi / Moonshot | `kimi-coding` | `KIMI_API_KEY` |
|
||||
| MiniMax | `minimax` | `MINIMAX_API_KEY` |
|
||||
| MiniMax (China) | `minimax-cn` | `MINIMAX_CN_API_KEY` |
|
||||
| Custom endpoint | `custom` | `base_url` + `api_key_env` (see below) |
|
||||
|
||||
### Custom Endpoint Fallback
|
||||
|
||||
For a custom OpenAI-compatible endpoint, add `base_url` and optionally `api_key_env`:
|
||||
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: custom
|
||||
model: my-local-model
|
||||
base_url: http://localhost:8000/v1
|
||||
api_key_env: MY_LOCAL_KEY # env var name containing the API key
|
||||
```
|
||||
|
||||
### When Fallback Triggers
|
||||
|
||||
The fallback activates automatically when the primary model fails with:
|
||||
|
||||
- **Rate limits** (HTTP 429) — after exhausting retry attempts
|
||||
- **Server errors** (HTTP 500, 502, 503) — after exhausting retry attempts
|
||||
- **Auth failures** (HTTP 401, 403) — immediately (no point retrying)
|
||||
- **Not found** (HTTP 404) — immediately
|
||||
- **Invalid responses** — when the API returns malformed or empty responses repeatedly
|
||||
|
||||
When triggered, Hermes:
|
||||
|
||||
1. Resolves credentials for the fallback provider
|
||||
2. Builds a new API client
|
||||
3. Swaps the model, provider, and client in-place
|
||||
4. Resets the retry counter and continues the conversation
|
||||
|
||||
The switch is seamless — your conversation history, tool calls, and context are preserved. The agent continues from exactly where it left off, just using a different model.
|
||||
|
||||
:::info One-Shot
|
||||
Fallback activates **at most once** per session. If the fallback provider also fails, normal error handling takes over (retries, then error message). This prevents cascading failover loops.
|
||||
:::
|
||||
|
||||
### Examples
|
||||
|
||||
**OpenRouter as fallback for Anthropic native:**
|
||||
```yaml
|
||||
model:
|
||||
provider: anthropic
|
||||
default: claude-sonnet-4-6
|
||||
|
||||
fallback_model:
|
||||
provider: openrouter
|
||||
model: anthropic/claude-sonnet-4
|
||||
```
|
||||
|
||||
**Nous Portal as fallback for OpenRouter:**
|
||||
```yaml
|
||||
model:
|
||||
provider: openrouter
|
||||
default: anthropic/claude-opus-4
|
||||
|
||||
fallback_model:
|
||||
provider: nous
|
||||
model: nous-hermes-3
|
||||
```
|
||||
|
||||
**Local model as fallback for cloud:**
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: custom
|
||||
model: llama-3.1-70b
|
||||
base_url: http://localhost:8000/v1
|
||||
api_key_env: LOCAL_API_KEY
|
||||
```
|
||||
|
||||
**Codex OAuth as fallback:**
|
||||
```yaml
|
||||
fallback_model:
|
||||
provider: openai-codex
|
||||
model: gpt-5.3-codex
|
||||
```
|
||||
|
||||
### Where Fallback Works
|
||||
|
||||
| Context | Fallback Supported |
|
||||
|---------|-------------------|
|
||||
| CLI sessions | ✔ |
|
||||
| Messaging gateway (Telegram, Discord, etc.) | ✔ |
|
||||
| Subagent delegation | ✘ (subagents do not inherit fallback config) |
|
||||
| Cron jobs | ✘ (run with a fixed provider) |
|
||||
| Auxiliary tasks (vision, compression) | ✘ (use their own provider chain — see below) |
|
||||
|
||||
:::tip
|
||||
There are no environment variables for `fallback_model` — it is configured exclusively through `config.yaml`. This is intentional: fallback configuration is a deliberate choice, not something a stale shell export should override.
|
||||
:::
|
||||
|
||||
---
|
||||
|
||||
## Auxiliary Task Fallback
|
||||
|
||||
Hermes uses separate lightweight models for side tasks. Each task has its own provider resolution chain that acts as a built-in fallback system.
|
||||
|
||||
### Tasks with Independent Provider Resolution
|
||||
|
||||
| Task | What It Does | Config Key |
|
||||
|------|-------------|-----------|
|
||||
| Vision | Image analysis, browser screenshots | `auxiliary.vision` |
|
||||
| Web Extract | Web page summarization | `auxiliary.web_extract` |
|
||||
| Compression | Context compression summaries | `auxiliary.compression` or `compression.summary_provider` |
|
||||
| Session Search | Past session summarization | `auxiliary.session_search` |
|
||||
| Skills Hub | Skill search and discovery | `auxiliary.skills_hub` |
|
||||
| MCP | MCP helper operations | `auxiliary.mcp` |
|
||||
| Memory Flush | Memory consolidation | `auxiliary.flush_memories` |
|
||||
|
||||
### Auto-Detection Chain
|
||||
|
||||
When a task's provider is set to `"auto"` (the default), Hermes tries providers in order until one works:
|
||||
|
||||
**For text tasks (compression, web extract, etc.):**
|
||||
|
||||
```text
|
||||
OpenRouter → Nous Portal → Custom endpoint → Codex OAuth →
|
||||
API-key providers (z.ai, Kimi, MiniMax, Anthropic) → give up
|
||||
```
|
||||
|
||||
**For vision tasks:**
|
||||
|
||||
```text
|
||||
Main provider (if vision-capable) → OpenRouter → Nous Portal →
|
||||
Codex OAuth → Anthropic → Custom endpoint → give up
|
||||
```
|
||||
|
||||
If the resolved provider fails at call time, Hermes also has an internal retry: if the provider is not OpenRouter and no explicit `base_url` is set, it tries OpenRouter as a last-resort fallback.
|
||||
|
||||
### Configuring Auxiliary Providers
|
||||
|
||||
Each task can be configured independently in `config.yaml`:
|
||||
|
||||
```yaml
|
||||
auxiliary:
|
||||
vision:
|
||||
provider: "auto" # auto | openrouter | nous | codex | main | anthropic
|
||||
model: "" # e.g. "openai/gpt-4o"
|
||||
base_url: "" # direct endpoint (takes precedence over provider)
|
||||
api_key: "" # API key for base_url
|
||||
|
||||
web_extract:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
|
||||
compression:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
|
||||
session_search:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
|
||||
skills_hub:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
|
||||
mcp:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
|
||||
flush_memories:
|
||||
provider: "auto"
|
||||
model: ""
|
||||
```
|
||||
|
||||
Or via environment variables:
|
||||
|
||||
```bash
|
||||
AUXILIARY_VISION_PROVIDER=openrouter
|
||||
AUXILIARY_VISION_MODEL=openai/gpt-4o
|
||||
AUXILIARY_WEB_EXTRACT_PROVIDER=nous
|
||||
CONTEXT_COMPRESSION_PROVIDER=main
|
||||
CONTEXT_COMPRESSION_MODEL=google/gemini-3-flash-preview
|
||||
```
|
||||
|
||||
### Provider Options for Auxiliary Tasks
|
||||
|
||||
| Provider | Description | Requirements |
|
||||
|----------|-------------|-------------|
|
||||
| `"auto"` | Try providers in order until one works (default) | At least one provider configured |
|
||||
| `"openrouter"` | Force OpenRouter | `OPENROUTER_API_KEY` |
|
||||
| `"nous"` | Force Nous Portal | `hermes login` |
|
||||
| `"codex"` | Force Codex OAuth | `hermes model` → Codex |
|
||||
| `"main"` | Use whatever provider the main agent uses | Active main provider configured |
|
||||
| `"anthropic"` | Force Anthropic native | `ANTHROPIC_API_KEY` or Claude Code credentials |
|
||||
|
||||
### Direct Endpoint Override
|
||||
|
||||
For any auxiliary task, setting `base_url` bypasses provider resolution entirely and sends requests directly to that endpoint:
|
||||
|
||||
```yaml
|
||||
auxiliary:
|
||||
vision:
|
||||
base_url: "http://localhost:1234/v1"
|
||||
api_key: "local-key"
|
||||
model: "qwen2.5-vl"
|
||||
```
|
||||
|
||||
`base_url` takes precedence over `provider`. Hermes uses the configured `api_key` for authentication, falling back to `OPENAI_API_KEY` if not set. It does **not** reuse `OPENROUTER_API_KEY` for custom endpoints.
|
||||
|
||||
---
|
||||
|
||||
## Context Compression Fallback
|
||||
|
||||
Context compression has a legacy configuration path in addition to the auxiliary system:
|
||||
|
||||
```yaml
|
||||
compression:
|
||||
summary_provider: "auto" # auto | openrouter | nous | main
|
||||
summary_model: "google/gemini-3-flash-preview"
|
||||
```
|
||||
|
||||
This is equivalent to configuring `auxiliary.compression.provider` and `auxiliary.compression.model`. If both are set, the `auxiliary.compression` values take precedence.
|
||||
|
||||
If no provider is available for compression, Hermes drops middle conversation turns without generating a summary rather than failing the session.
|
||||
|
||||
---
|
||||
|
||||
## Delegation Provider Override
|
||||
|
||||
Subagents spawned by `delegate_task` do **not** use the primary fallback model. However, they can be routed to a different provider:model pair for cost optimization:
|
||||
|
||||
```yaml
|
||||
delegation:
|
||||
provider: "openrouter" # override provider for all subagents
|
||||
model: "google/gemini-3-flash-preview" # override model
|
||||
# base_url: "http://localhost:1234/v1" # or use a direct endpoint
|
||||
# api_key: "local-key"
|
||||
```
|
||||
|
||||
See [Subagent Delegation](/docs/user-guide/features/delegation) for full configuration details.
|
||||
|
||||
---
|
||||
|
||||
## Cron Job Providers
|
||||
|
||||
Cron jobs run with whatever provider is configured at execution time. They do not support a fallback model. To use a different provider for cron jobs, configure `provider` and `model` overrides on the cron job itself:
|
||||
|
||||
```python
|
||||
cronjob(
|
||||
action="create",
|
||||
schedule="every 2h",
|
||||
prompt="Check server status",
|
||||
provider="openrouter",
|
||||
model="google/gemini-3-flash-preview"
|
||||
)
|
||||
```
|
||||
|
||||
See [Scheduled Tasks (Cron)](/docs/user-guide/features/cron) for full configuration details.
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
| Feature | Fallback Mechanism | Config Location |
|
||||
|---------|-------------------|----------------|
|
||||
| Main agent model | `fallback_model` in config.yaml — one-shot failover on errors | `fallback_model:` (top-level) |
|
||||
| Vision | Auto-detection chain + internal OpenRouter retry | `auxiliary.vision` |
|
||||
| Web extraction | Auto-detection chain + internal OpenRouter retry | `auxiliary.web_extract` |
|
||||
| Context compression | Auto-detection chain, degrades to no-summary if unavailable | `auxiliary.compression` or `compression.summary_provider` |
|
||||
| Session search | Auto-detection chain | `auxiliary.session_search` |
|
||||
| Skills hub | Auto-detection chain | `auxiliary.skills_hub` |
|
||||
| MCP helpers | Auto-detection chain | `auxiliary.mcp` |
|
||||
| Memory flush | Auto-detection chain | `auxiliary.flush_memories` |
|
||||
| Delegation | Provider override only (no automatic fallback) | `delegation.provider` / `delegation.model` |
|
||||
| Cron jobs | Per-job provider override only (no automatic fallback) | Per-job `provider` / `model` |
|
||||
@@ -194,3 +194,7 @@ provider_routing:
|
||||
## Default Behavior
|
||||
|
||||
When no `provider_routing` section is configured (the default), OpenRouter uses its own default routing logic, which generally balances cost and availability automatically.
|
||||
|
||||
:::tip Provider Routing vs. Fallback Models
|
||||
Provider routing controls which **sub-providers within OpenRouter** handle your requests. For automatic failover to an entirely different provider when your primary model fails, see [Fallback Providers](/docs/user-guide/features/fallback-providers).
|
||||
:::
|
||||
|
||||
@@ -181,6 +181,63 @@ When enabled, the bot sends status messages as it works:
|
||||
🐍 execute_code...
|
||||
```
|
||||
|
||||
## Background Sessions
|
||||
|
||||
Run a prompt in a separate background session so the agent works on it independently while your main chat stays responsive:
|
||||
|
||||
```
|
||||
/background Check all servers in the cluster and report any that are down
|
||||
```
|
||||
|
||||
Hermes confirms immediately:
|
||||
|
||||
```
|
||||
🔄 Background task started: "Check all servers in the cluster..."
|
||||
Task ID: bg_143022_a1b2c3
|
||||
```
|
||||
|
||||
### How It Works
|
||||
|
||||
Each `/background` prompt spawns a **separate agent instance** that runs asynchronously:
|
||||
|
||||
- **Isolated session** — the background agent has its own session with its own conversation history. It has no knowledge of your current chat context and receives only the prompt you provide.
|
||||
- **Same configuration** — inherits your model, provider, toolsets, reasoning settings, and provider routing from the current gateway setup.
|
||||
- **Non-blocking** — your main chat stays fully interactive. Send messages, run other commands, or start more background tasks while it works.
|
||||
- **Result delivery** — when the task finishes, the result is sent back to the **same chat or channel** where you issued the command, prefixed with "✅ Background task complete". If it fails, you'll see "❌ Background task failed" with the error.
|
||||
|
||||
### Background Process Notifications
|
||||
|
||||
When the agent running a background session uses `terminal(background=true)` to start long-running processes (servers, builds, etc.), the gateway can push status updates to your chat. Control this with `display.background_process_notifications` in `~/.hermes/config.yaml`:
|
||||
|
||||
```yaml
|
||||
display:
|
||||
background_process_notifications: all # all | result | error | off
|
||||
```
|
||||
|
||||
| Mode | What you receive |
|
||||
|------|-----------------|
|
||||
| `all` | Running-output updates **and** the final completion message (default) |
|
||||
| `result` | Only the final completion message (regardless of exit code) |
|
||||
| `error` | Only the final message when the exit code is non-zero |
|
||||
| `off` | No process watcher messages at all |
|
||||
|
||||
You can also set this via environment variable:
|
||||
|
||||
```bash
|
||||
HERMES_BACKGROUND_NOTIFICATIONS=result
|
||||
```
|
||||
|
||||
### Use Cases
|
||||
|
||||
- **Server monitoring** — "/background Check the health of all services and alert me if anything is down"
|
||||
- **Long builds** — "/background Build and deploy the staging environment" while you continue chatting
|
||||
- **Research tasks** — "/background Research competitor pricing and summarize in a table"
|
||||
- **File operations** — "/background Organize the photos in ~/Downloads by date into folders"
|
||||
|
||||
:::tip
|
||||
Background tasks on messaging platforms are fire-and-forget — you don't need to wait or check on them. Results arrive in the same chat automatically when the task finishes.
|
||||
:::
|
||||
|
||||
## Service Management
|
||||
|
||||
### Linux (systemd)
|
||||
|
||||
@@ -91,6 +91,7 @@ const sidebars: SidebarsConfig = {
|
||||
'user-guide/features/mcp',
|
||||
'user-guide/features/honcho',
|
||||
'user-guide/features/provider-routing',
|
||||
'user-guide/features/fallback-providers',
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user