mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 10:17:17 +08:00
Compare commits
9 Commits
streaming-
...
fix/event-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab6abc2c13 | ||
|
|
aafe86d81a | ||
|
|
1aa7027be1 | ||
|
|
f961937097 | ||
|
|
7a427d7b03 | ||
|
|
66a1942524 | ||
|
|
1173adbe86 | ||
|
|
a5beb6d8f0 | ||
|
|
8f6ecd5c64 |
@@ -1191,8 +1191,18 @@ def _get_cached_client(
|
||||
cache_key = (provider, async_mode, base_url or "", api_key or "")
|
||||
with _client_cache_lock:
|
||||
if cache_key in _client_cache:
|
||||
cached_client, cached_default = _client_cache[cache_key]
|
||||
return cached_client, model or cached_default
|
||||
cached_client, cached_default, cached_loop = _client_cache[cache_key]
|
||||
if async_mode:
|
||||
# Async clients are bound to the event loop that created them.
|
||||
# A cached async client whose loop has been closed will raise
|
||||
# "Event loop is closed" when httpx tries to clean up its
|
||||
# transport. Discard the stale client and create a fresh one.
|
||||
if cached_loop is not None and cached_loop.is_closed():
|
||||
del _client_cache[cache_key]
|
||||
else:
|
||||
return cached_client, model or cached_default
|
||||
else:
|
||||
return cached_client, model or cached_default
|
||||
# Build outside the lock
|
||||
client, default_model = resolve_provider_client(
|
||||
provider,
|
||||
@@ -1202,11 +1212,20 @@ def _get_cached_client(
|
||||
explicit_api_key=api_key,
|
||||
)
|
||||
if client is not None:
|
||||
# For async clients, remember which loop they were created on so we
|
||||
# can detect stale entries later.
|
||||
bound_loop = None
|
||||
if async_mode:
|
||||
try:
|
||||
import asyncio as _aio
|
||||
bound_loop = _aio.get_event_loop()
|
||||
except RuntimeError:
|
||||
pass
|
||||
with _client_cache_lock:
|
||||
if cache_key not in _client_cache:
|
||||
_client_cache[cache_key] = (client, default_model)
|
||||
_client_cache[cache_key] = (client, default_model, bound_loop)
|
||||
else:
|
||||
client, default_model = _client_cache[cache_key]
|
||||
client, default_model, _ = _client_cache[cache_key]
|
||||
return client, model or default_model
|
||||
|
||||
|
||||
|
||||
@@ -356,7 +356,7 @@ class CopilotACPClient:
|
||||
text_parts=text_parts,
|
||||
reasoning_parts=reasoning_parts,
|
||||
)
|
||||
return "".join(text_parts).strip(), "".join(reasoning_parts).strip()
|
||||
return "".join(text_parts), "".join(reasoning_parts)
|
||||
finally:
|
||||
self.close()
|
||||
|
||||
@@ -380,7 +380,7 @@ class CopilotACPClient:
|
||||
content = update.get("content") or {}
|
||||
chunk_text = ""
|
||||
if isinstance(content, dict):
|
||||
chunk_text = str(content.get("text") or "").strip()
|
||||
chunk_text = str(content.get("text") or "")
|
||||
if kind == "agent_message_chunk" and chunk_text and text_parts is not None:
|
||||
text_parts.append(chunk_text)
|
||||
elif kind == "agent_thought_chunk" and chunk_text and reasoning_parts is not None:
|
||||
|
||||
12
cli.py
12
cli.py
@@ -3678,6 +3678,18 @@ class HermesCLI:
|
||||
self._handle_stop_command()
|
||||
elif canonical == "background":
|
||||
self._handle_background_command(cmd_original)
|
||||
elif canonical == "queue":
|
||||
if not self._agent_running:
|
||||
_cprint(" /queue only works while Hermes is busy. Just type your message normally.")
|
||||
else:
|
||||
# Extract prompt after "/queue " or "/q "
|
||||
parts = cmd_original.split(None, 1)
|
||||
payload = parts[1].strip() if len(parts) > 1 else ""
|
||||
if not payload:
|
||||
_cprint(" Usage: /queue <prompt>")
|
||||
else:
|
||||
self._pending_input.put(payload)
|
||||
_cprint(f" Queued for the next turn: {payload[:80]}{'...' if len(payload) > 80 else ''}")
|
||||
elif canonical == "skin":
|
||||
self._handle_skin_command(cmd_original)
|
||||
elif canonical == "voice":
|
||||
|
||||
@@ -137,6 +137,9 @@ def _deliver_result(job: dict, content: str) -> None:
|
||||
"whatsapp": Platform.WHATSAPP,
|
||||
"signal": Platform.SIGNAL,
|
||||
"matrix": Platform.MATRIX,
|
||||
"mattermost": Platform.MATTERMOST,
|
||||
"homeassistant": Platform.HOMEASSISTANT,
|
||||
"dingtalk": Platform.DINGTALK,
|
||||
"email": Platform.EMAIL,
|
||||
"sms": Platform.SMS,
|
||||
}
|
||||
|
||||
@@ -182,9 +182,31 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
# Ensure session directory exists
|
||||
self._session_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check if bridge is already running and connected
|
||||
import aiohttp
|
||||
import asyncio
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||
timeout=aiohttp.ClientTimeout(total=2)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
bridge_status = data.get("status", "unknown")
|
||||
if bridge_status == "connected":
|
||||
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
||||
self._running = True
|
||||
self._bridge_process = None # Not managed by us
|
||||
asyncio.create_task(self._poll_messages())
|
||||
return True
|
||||
else:
|
||||
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
|
||||
except Exception:
|
||||
pass # Bridge not running, start a new one
|
||||
|
||||
# Kill any orphaned bridge from a previous gateway run
|
||||
_kill_port_process(self._bridge_port)
|
||||
import asyncio
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Start the bridge process in its own process group.
|
||||
@@ -232,7 +254,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f"http://localhost:{self._bridge_port}/health",
|
||||
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||
timeout=aiohttp.ClientTimeout(total=2)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
@@ -264,7 +286,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f"http://localhost:{self._bridge_port}/health",
|
||||
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||
timeout=aiohttp.ClientTimeout(total=2)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
@@ -326,9 +348,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
self._bridge_process.kill()
|
||||
except Exception as e:
|
||||
print(f"[{self.name}] Error stopping bridge: {e}")
|
||||
|
||||
# Also kill any orphaned bridge processes on our port
|
||||
_kill_port_process(self._bridge_port)
|
||||
else:
|
||||
# Bridge was not started by us, don't kill it
|
||||
print(f"[{self.name}] Disconnecting (external bridge left running)")
|
||||
|
||||
self._running = False
|
||||
self._bridge_process = None
|
||||
@@ -358,7 +380,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
payload["replyTo"] = reply_to
|
||||
|
||||
async with session.post(
|
||||
f"http://localhost:{self._bridge_port}/send",
|
||||
f"http://127.0.0.1:{self._bridge_port}/send",
|
||||
json=payload,
|
||||
timeout=aiohttp.ClientTimeout(total=30)
|
||||
) as resp:
|
||||
@@ -394,7 +416,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
import aiohttp
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f"http://localhost:{self._bridge_port}/edit",
|
||||
f"http://127.0.0.1:{self._bridge_port}/edit",
|
||||
json={
|
||||
"chatId": chat_id,
|
||||
"messageId": message_id,
|
||||
@@ -439,7 +461,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f"http://localhost:{self._bridge_port}/send-media",
|
||||
f"http://127.0.0.1:{self._bridge_port}/send-media",
|
||||
json=payload,
|
||||
timeout=aiohttp.ClientTimeout(total=120),
|
||||
) as resp:
|
||||
@@ -515,7 +537,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
await session.post(
|
||||
f"http://localhost:{self._bridge_port}/typing",
|
||||
f"http://127.0.0.1:{self._bridge_port}/typing",
|
||||
json={"chatId": chat_id},
|
||||
timeout=aiohttp.ClientTimeout(total=5)
|
||||
)
|
||||
@@ -532,7 +554,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f"http://localhost:{self._bridge_port}/chat/{chat_id}",
|
||||
f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}",
|
||||
timeout=aiohttp.ClientTimeout(total=10)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
@@ -559,7 +581,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f"http://localhost:{self._bridge_port}/messages",
|
||||
f"http://127.0.0.1:{self._bridge_port}/messages",
|
||||
timeout=aiohttp.ClientTimeout(total=30)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
@@ -621,6 +643,11 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
||||
cached_urls.append(url)
|
||||
media_types.append("image/jpeg")
|
||||
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
|
||||
# Local file path — bridge already downloaded the image
|
||||
cached_urls.append(url)
|
||||
media_types.append("image/jpeg")
|
||||
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
|
||||
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
||||
try:
|
||||
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
||||
|
||||
@@ -1369,6 +1369,23 @@ class GatewayRunner:
|
||||
del self._running_agents[_quick_key]
|
||||
return await self._handle_reset_command(event)
|
||||
|
||||
# /queue <prompt> — queue without interrupting
|
||||
if event.get_command() in ("queue", "q"):
|
||||
queued_text = event.get_command_args().strip()
|
||||
if not queued_text:
|
||||
return "Usage: /queue <prompt>"
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter:
|
||||
from gateway.platforms.base import MessageEvent as _ME, MessageType as _MT
|
||||
queued_event = _ME(
|
||||
text=queued_text,
|
||||
message_type=_MT.TEXT,
|
||||
source=event.source,
|
||||
message_id=event.message_id,
|
||||
)
|
||||
adapter._pending_messages[_quick_key] = queued_event
|
||||
return "Queued for the next turn."
|
||||
|
||||
if event.message_type == MessageType.PHOTO:
|
||||
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
|
||||
adapter = self.adapters.get(source.platform)
|
||||
|
||||
@@ -67,6 +67,8 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
||||
gateway_only=True),
|
||||
CommandDef("background", "Run a prompt in the background", "Session",
|
||||
aliases=("bg",), args_hint="<prompt>"),
|
||||
CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session",
|
||||
aliases=("q",), args_hint="<prompt>"),
|
||||
CommandDef("status", "Show session info", "Session",
|
||||
gateway_only=True),
|
||||
CommandDef("sethome", "Set this chat as the home channel", "Session",
|
||||
|
||||
@@ -24,6 +24,7 @@ import json
|
||||
import asyncio
|
||||
import os
|
||||
import logging
|
||||
import threading
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
from tools.registry import registry
|
||||
@@ -36,6 +37,48 @@ logger = logging.getLogger(__name__)
|
||||
# Async Bridging (single source of truth -- used by registry.dispatch too)
|
||||
# =============================================================================
|
||||
|
||||
_tool_loop = None # persistent loop for the main (CLI) thread
|
||||
_tool_loop_lock = threading.Lock()
|
||||
_worker_thread_local = threading.local() # per-worker-thread persistent loops
|
||||
|
||||
|
||||
def _get_tool_loop():
|
||||
"""Return a long-lived event loop for running async tool handlers.
|
||||
|
||||
Using a persistent loop (instead of asyncio.run() which creates and
|
||||
*closes* a fresh loop every time) prevents "Event loop is closed"
|
||||
errors that occur when cached httpx/AsyncOpenAI clients attempt to
|
||||
close their transport on a dead loop during garbage collection.
|
||||
"""
|
||||
global _tool_loop
|
||||
with _tool_loop_lock:
|
||||
if _tool_loop is None or _tool_loop.is_closed():
|
||||
_tool_loop = asyncio.new_event_loop()
|
||||
return _tool_loop
|
||||
|
||||
|
||||
def _get_worker_loop():
|
||||
"""Return a persistent event loop for the current worker thread.
|
||||
|
||||
Each worker thread (e.g., delegate_task's ThreadPoolExecutor threads)
|
||||
gets its own long-lived loop stored in thread-local storage. This
|
||||
prevents the "Event loop is closed" errors that occurred when
|
||||
asyncio.run() was used per-call: asyncio.run() creates a loop, runs
|
||||
the coroutine, then *closes* the loop — but cached httpx/AsyncOpenAI
|
||||
clients remain bound to that now-dead loop and raise RuntimeError
|
||||
during garbage collection or subsequent use.
|
||||
|
||||
By keeping the loop alive for the thread's lifetime, cached clients
|
||||
stay valid and their cleanup runs on a live loop.
|
||||
"""
|
||||
loop = getattr(_worker_thread_local, 'loop', None)
|
||||
if loop is None or loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
_worker_thread_local.loop = loop
|
||||
return loop
|
||||
|
||||
|
||||
def _run_async(coro):
|
||||
"""Run an async coroutine from a sync context.
|
||||
|
||||
@@ -44,6 +87,15 @@ def _run_async(coro):
|
||||
disposable thread so asyncio.run() can create its own loop without
|
||||
conflicting.
|
||||
|
||||
For the common CLI path (no running loop), we use a persistent event
|
||||
loop so that cached async clients (httpx / AsyncOpenAI) remain bound
|
||||
to a live loop and don't trigger "Event loop is closed" on GC.
|
||||
|
||||
When called from a worker thread (parallel tool execution), we use a
|
||||
per-thread persistent loop to avoid both contention with the main
|
||||
thread's shared loop AND the "Event loop is closed" errors caused by
|
||||
asyncio.run()'s create-and-destroy lifecycle.
|
||||
|
||||
This is the single source of truth for sync->async bridging in tool
|
||||
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
|
||||
outer thread-pool wrapping as defense-in-depth, but each handler is
|
||||
@@ -55,11 +107,23 @@ def _run_async(coro):
|
||||
loop = None
|
||||
|
||||
if loop and loop.is_running():
|
||||
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
||||
import concurrent.futures
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
future = pool.submit(asyncio.run, coro)
|
||||
return future.result(timeout=300)
|
||||
return asyncio.run(coro)
|
||||
|
||||
# If we're on a worker thread (e.g., parallel tool execution in
|
||||
# delegate_task), use a per-thread persistent loop. This avoids
|
||||
# contention with the main thread's shared loop while keeping cached
|
||||
# httpx/AsyncOpenAI clients bound to a live loop for the thread's
|
||||
# lifetime — preventing "Event loop is closed" on GC cleanup.
|
||||
if threading.current_thread() is not threading.main_thread():
|
||||
worker_loop = _get_worker_loop()
|
||||
return worker_loop.run_until_complete(coro)
|
||||
|
||||
tool_loop = _get_tool_loop()
|
||||
return tool_loop.run_until_complete(coro)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
||||
@@ -18,12 +18,13 @@
|
||||
* node bridge.js --port 3000 --session ~/.hermes/whatsapp/session
|
||||
*/
|
||||
|
||||
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion } from '@whiskeysockets/baileys';
|
||||
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage } from '@whiskeysockets/baileys';
|
||||
import express from 'express';
|
||||
import { Boom } from '@hapi/boom';
|
||||
import pino from 'pino';
|
||||
import path from 'path';
|
||||
import { mkdirSync, readFileSync, existsSync } from 'fs';
|
||||
import { mkdirSync, readFileSync, writeFileSync, existsSync, readdirSync } from 'fs';
|
||||
import { randomBytes } from 'crypto';
|
||||
import qrcode from 'qrcode-terminal';
|
||||
|
||||
// Parse CLI args
|
||||
@@ -41,6 +42,7 @@ const WHATSAPP_DEBUG =
|
||||
|
||||
const PORT = parseInt(getArg('port', '3000'), 10);
|
||||
const SESSION_DIR = getArg('session', path.join(process.env.HOME || '~', '.hermes', 'whatsapp', 'session'));
|
||||
const IMAGE_CACHE_DIR = path.join(process.env.HOME || '~', '.hermes', 'image_cache');
|
||||
const PAIR_ONLY = args.includes('--pair-only');
|
||||
const WHATSAPP_MODE = getArg('mode', process.env.WHATSAPP_MODE || 'self-chat'); // "bot" or "self-chat"
|
||||
const ALLOWED_USERS = (process.env.WHATSAPP_ALLOWED_USERS || '').split(',').map(s => s.trim()).filter(Boolean);
|
||||
@@ -55,6 +57,22 @@ function formatOutgoingMessage(message) {
|
||||
|
||||
mkdirSync(SESSION_DIR, { recursive: true });
|
||||
|
||||
// Build LID → phone reverse map from session files (lid-mapping-{phone}.json)
|
||||
function buildLidMap() {
|
||||
const map = {};
|
||||
try {
|
||||
for (const f of readdirSync(SESSION_DIR)) {
|
||||
const m = f.match(/^lid-mapping-(\d+)\.json$/);
|
||||
if (!m) continue;
|
||||
const phone = m[1];
|
||||
const lid = JSON.parse(readFileSync(path.join(SESSION_DIR, f), 'utf8'));
|
||||
if (lid) map[String(lid)] = phone;
|
||||
}
|
||||
} catch {}
|
||||
return map;
|
||||
}
|
||||
let lidToPhone = buildLidMap();
|
||||
|
||||
const logger = pino({ level: 'warn' });
|
||||
|
||||
// Message queue for polling
|
||||
@@ -80,9 +98,16 @@ async function startSocket() {
|
||||
browser: ['Hermes Agent', 'Chrome', '120.0'],
|
||||
syncFullHistory: false,
|
||||
markOnlineOnConnect: false,
|
||||
// Required for Baileys 7.x: without this, incoming messages that need
|
||||
// E2EE session re-establishment are silently dropped (msg.message === null)
|
||||
getMessage: async (key) => {
|
||||
// We don't maintain a message store, so return a placeholder.
|
||||
// This is enough for Baileys to complete the retry handshake.
|
||||
return { conversation: '' };
|
||||
},
|
||||
});
|
||||
|
||||
sock.ev.on('creds.update', saveCreds);
|
||||
sock.ev.on('creds.update', () => { saveCreds(); lidToPhone = buildLidMap(); });
|
||||
|
||||
sock.ev.on('connection.update', (update) => {
|
||||
const { connection, lastDisconnect, qr } = update;
|
||||
@@ -120,7 +145,7 @@ async function startSocket() {
|
||||
}
|
||||
});
|
||||
|
||||
sock.ev.on('messages.upsert', ({ messages, type }) => {
|
||||
sock.ev.on('messages.upsert', async ({ messages, type }) => {
|
||||
// In self-chat mode, your own messages commonly arrive as 'append' rather
|
||||
// than 'notify'. Accept both and filter agent echo-backs below.
|
||||
if (type !== 'notify' && type !== 'append') return;
|
||||
@@ -163,9 +188,10 @@ async function startSocket() {
|
||||
if (!isSelfChat) continue;
|
||||
}
|
||||
|
||||
// Check allowlist for messages from others
|
||||
if (!msg.key.fromMe && ALLOWED_USERS.length > 0 && !ALLOWED_USERS.includes(senderNumber)) {
|
||||
continue;
|
||||
// Check allowlist for messages from others (resolve LID → phone if needed)
|
||||
if (!msg.key.fromMe && ALLOWED_USERS.length > 0) {
|
||||
const resolvedNumber = lidToPhone[senderNumber] || senderNumber;
|
||||
if (!ALLOWED_USERS.includes(resolvedNumber)) continue;
|
||||
}
|
||||
|
||||
// Extract message body
|
||||
@@ -182,6 +208,18 @@ async function startSocket() {
|
||||
body = msg.message.imageMessage.caption || '';
|
||||
hasMedia = true;
|
||||
mediaType = 'image';
|
||||
try {
|
||||
const buf = await downloadMediaMessage(msg, 'buffer', {}, { logger, reuploadRequest: sock.updateMediaMessage });
|
||||
const mime = msg.message.imageMessage.mimetype || 'image/jpeg';
|
||||
const extMap = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/webp': '.webp', 'image/gif': '.gif' };
|
||||
const ext = extMap[mime] || '.jpg';
|
||||
mkdirSync(IMAGE_CACHE_DIR, { recursive: true });
|
||||
const filePath = path.join(IMAGE_CACHE_DIR, `img_${randomBytes(6).toString('hex')}${ext}`);
|
||||
writeFileSync(filePath, buf);
|
||||
mediaUrls.push(filePath);
|
||||
} catch (err) {
|
||||
console.error('[bridge] Failed to download image:', err.message);
|
||||
}
|
||||
} else if (msg.message.videoMessage) {
|
||||
body = msg.message.videoMessage.caption || '';
|
||||
hasMedia = true;
|
||||
@@ -195,6 +233,11 @@ async function startSocket() {
|
||||
mediaType = 'document';
|
||||
}
|
||||
|
||||
// For media without caption, use a placeholder so the API message is never empty
|
||||
if (hasMedia && !body) {
|
||||
body = `[${mediaType} received]`;
|
||||
}
|
||||
|
||||
// Ignore Hermes' own reply messages in self-chat mode to avoid loops.
|
||||
if (msg.key.fromMe && ((REPLY_PREFIX && body.startsWith(REPLY_PREFIX)) || recentlySentIds.has(msg.key.id))) {
|
||||
if (WHATSAPP_DEBUG) {
|
||||
@@ -433,7 +476,7 @@ if (PAIR_ONLY) {
|
||||
console.log();
|
||||
startSocket();
|
||||
} else {
|
||||
app.listen(PORT, () => {
|
||||
app.listen(PORT, '127.0.0.1', () => {
|
||||
console.log(`🌉 WhatsApp bridge listening on port ${PORT} (mode: ${WHATSAPP_MODE})`);
|
||||
console.log(`📁 Session stored in: ${SESSION_DIR}`);
|
||||
if (ALLOWED_USERS.length > 0) {
|
||||
|
||||
307
tests/test_model_tools_async_bridge.py
Normal file
307
tests/test_model_tools_async_bridge.py
Normal file
@@ -0,0 +1,307 @@
|
||||
"""Regression tests for the _run_async() event-loop lifecycle.
|
||||
|
||||
These tests verify the fix for GitHub issue #2104:
|
||||
"Event loop is closed" after vision_analyze used as first call in session.
|
||||
|
||||
Root cause: asyncio.run() creates and *closes* a fresh event loop on every
|
||||
call. Cached httpx/AsyncOpenAI clients that were bound to the now-dead loop
|
||||
would crash with RuntimeError("Event loop is closed") when garbage-collected.
|
||||
|
||||
The fix replaces asyncio.run() with a persistent event loop in _run_async().
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _get_current_loop():
|
||||
"""Return the running event loop from inside a coroutine."""
|
||||
return asyncio.get_event_loop()
|
||||
|
||||
|
||||
async def _create_and_return_transport():
|
||||
"""Simulate an async client creating a transport on the current loop.
|
||||
|
||||
Returns a simple asyncio.Future bound to the running loop so we can
|
||||
later check whether the loop is still alive.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
fut = loop.create_future()
|
||||
fut.set_result("ok")
|
||||
return loop, fut
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRunAsyncLoopLifecycle:
|
||||
"""Verify _run_async() keeps the event loop alive after returning."""
|
||||
|
||||
def test_loop_not_closed_after_run_async(self):
|
||||
"""The loop used by _run_async must still be open after the call."""
|
||||
from model_tools import _run_async
|
||||
|
||||
loop = _run_async(_get_current_loop())
|
||||
|
||||
assert not loop.is_closed(), (
|
||||
"_run_async() closed the event loop — cached async clients will "
|
||||
"crash with 'Event loop is closed' on GC (issue #2104)"
|
||||
)
|
||||
|
||||
def test_same_loop_reused_across_calls(self):
|
||||
"""Consecutive _run_async calls should reuse the same loop."""
|
||||
from model_tools import _run_async
|
||||
|
||||
loop1 = _run_async(_get_current_loop())
|
||||
loop2 = _run_async(_get_current_loop())
|
||||
|
||||
assert loop1 is loop2, (
|
||||
"_run_async() created a new loop on the second call — cached "
|
||||
"async clients from the first call would be orphaned"
|
||||
)
|
||||
|
||||
def test_cached_transport_survives_between_calls(self):
|
||||
"""A transport/future created in call 1 must be valid in call 2."""
|
||||
from model_tools import _run_async
|
||||
|
||||
loop, fut = _run_async(_create_and_return_transport())
|
||||
|
||||
assert not loop.is_closed()
|
||||
assert fut.result() == "ok"
|
||||
|
||||
loop2 = _run_async(_get_current_loop())
|
||||
assert loop2 is loop, "Loop changed between calls"
|
||||
assert not loop.is_closed(), "Loop closed before second call"
|
||||
|
||||
|
||||
class TestRunAsyncWorkerThread:
|
||||
"""Verify worker threads get persistent per-thread loops (delegate_task fix)."""
|
||||
|
||||
def test_worker_thread_loop_not_closed(self):
|
||||
"""A worker thread's loop must stay open after _run_async returns,
|
||||
so cached httpx/AsyncOpenAI clients don't crash on GC."""
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from model_tools import _run_async
|
||||
|
||||
def _run_on_worker():
|
||||
loop = _run_async(_get_current_loop())
|
||||
still_open = not loop.is_closed()
|
||||
return loop, still_open
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
loop, still_open = pool.submit(_run_on_worker).result()
|
||||
|
||||
assert still_open, (
|
||||
"Worker thread's event loop was closed after _run_async — "
|
||||
"cached async clients will crash with 'Event loop is closed'"
|
||||
)
|
||||
|
||||
def test_worker_thread_reuses_loop_across_calls(self):
|
||||
"""Multiple _run_async calls on the same worker thread should
|
||||
reuse the same persistent loop (not create-and-destroy each time)."""
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from model_tools import _run_async
|
||||
|
||||
def _run_twice_on_worker():
|
||||
loop1 = _run_async(_get_current_loop())
|
||||
loop2 = _run_async(_get_current_loop())
|
||||
return loop1, loop2
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
loop1, loop2 = pool.submit(_run_twice_on_worker).result()
|
||||
|
||||
assert loop1 is loop2, (
|
||||
"Worker thread created different loops for consecutive calls — "
|
||||
"cached clients from the first call would be orphaned"
|
||||
)
|
||||
assert not loop1.is_closed()
|
||||
|
||||
def test_parallel_workers_get_separate_loops(self):
|
||||
"""Different worker threads must get their own loops to avoid
|
||||
contention (the original reason for the worker-thread branch)."""
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from model_tools import _run_async
|
||||
|
||||
barrier = threading.Barrier(3, timeout=5)
|
||||
|
||||
def _get_loop_id():
|
||||
# Use a barrier to force all 3 threads to be alive simultaneously,
|
||||
# ensuring the ThreadPoolExecutor actually uses 3 distinct threads.
|
||||
loop = _run_async(_get_current_loop())
|
||||
barrier.wait()
|
||||
return id(loop), not loop.is_closed(), threading.current_thread().ident
|
||||
|
||||
with ThreadPoolExecutor(max_workers=3) as pool:
|
||||
futures = [pool.submit(_get_loop_id) for _ in range(3)]
|
||||
results = [f.result() for f in as_completed(futures)]
|
||||
|
||||
loop_ids = {r[0] for r in results}
|
||||
thread_ids = {r[2] for r in results}
|
||||
all_open = all(r[1] for r in results)
|
||||
|
||||
assert all_open, "At least one worker thread's loop was closed"
|
||||
# The barrier guarantees 3 distinct threads were used
|
||||
assert len(thread_ids) == 3, f"Expected 3 threads, got {len(thread_ids)}"
|
||||
# Each thread should have its own loop
|
||||
assert len(loop_ids) == 3, (
|
||||
f"Expected 3 distinct loops for 3 parallel workers, "
|
||||
f"got {len(loop_ids)} — workers may be contending on a shared loop"
|
||||
)
|
||||
|
||||
def test_worker_loop_separate_from_main_loop(self):
|
||||
"""Worker thread loops must be different from the main thread's
|
||||
persistent loop to avoid cross-thread contention."""
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from model_tools import _run_async, _get_tool_loop
|
||||
|
||||
main_loop = _get_tool_loop()
|
||||
|
||||
def _get_worker_loop_id():
|
||||
loop = _run_async(_get_current_loop())
|
||||
return id(loop)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
worker_loop_id = pool.submit(_get_worker_loop_id).result()
|
||||
|
||||
assert worker_loop_id != id(main_loop), (
|
||||
"Worker thread used the main thread's loop — this would cause "
|
||||
"cross-thread contention on the event loop"
|
||||
)
|
||||
|
||||
|
||||
class TestRunAsyncWithRunningLoop:
|
||||
"""When a loop is already running, _run_async falls back to a thread."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_async_from_async_context(self):
|
||||
"""_run_async should still work when called from inside an
|
||||
already-running event loop (gateway / Atropos path)."""
|
||||
from model_tools import _run_async
|
||||
|
||||
async def _simple():
|
||||
return 42
|
||||
|
||||
result = await asyncio.get_event_loop().run_in_executor(
|
||||
None, _run_async, _simple()
|
||||
)
|
||||
assert result == 42
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: full vision_analyze dispatch chain
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_vision_response():
|
||||
"""Build a fake LLM response matching async_call_llm's return shape."""
|
||||
message = SimpleNamespace(content="A cat sitting on a chair.")
|
||||
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
|
||||
return SimpleNamespace(choices=[choice], model="test/vision", usage=None)
|
||||
|
||||
|
||||
class TestVisionDispatchLoopSafety:
|
||||
"""Simulate the full registry.dispatch('vision_analyze') chain and
|
||||
verify the event loop stays alive afterwards — the exact scenario
|
||||
from issue #2104."""
|
||||
|
||||
def test_vision_dispatch_keeps_loop_alive(self, tmp_path):
|
||||
"""After dispatching vision_analyze via the registry, the event
|
||||
loop must remain open so cached async clients don't crash on GC."""
|
||||
from model_tools import _run_async, _get_tool_loop
|
||||
from tools.registry import registry
|
||||
|
||||
fake_response = _mock_vision_response()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"tools.vision_tools.async_call_llm",
|
||||
new_callable=AsyncMock,
|
||||
return_value=fake_response,
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._download_image",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._validate_image_url",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._image_to_base64_data_url",
|
||||
return_value="data:image/jpeg;base64,abc",
|
||||
),
|
||||
):
|
||||
result_json = registry.dispatch(
|
||||
"vision_analyze",
|
||||
{"image_url": "https://example.com/cat.png", "question": "What is this?"},
|
||||
)
|
||||
|
||||
result = json.loads(result_json)
|
||||
assert result.get("success") is True, f"dispatch failed: {result}"
|
||||
assert "cat" in result.get("analysis", "").lower()
|
||||
|
||||
loop = _get_tool_loop()
|
||||
assert not loop.is_closed(), (
|
||||
"Event loop closed after vision_analyze dispatch — cached async "
|
||||
"clients will crash with 'Event loop is closed' (issue #2104)"
|
||||
)
|
||||
|
||||
def test_two_consecutive_vision_dispatches(self, tmp_path):
|
||||
"""Two back-to-back vision_analyze dispatches must both succeed
|
||||
and share the same loop (simulates 'first call fails, second
|
||||
works' from the issue report)."""
|
||||
from model_tools import _get_tool_loop
|
||||
from tools.registry import registry
|
||||
|
||||
fake_response = _mock_vision_response()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"tools.vision_tools.async_call_llm",
|
||||
new_callable=AsyncMock,
|
||||
return_value=fake_response,
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._download_image",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._validate_image_url",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"tools.vision_tools._image_to_base64_data_url",
|
||||
return_value="data:image/jpeg;base64,abc",
|
||||
),
|
||||
):
|
||||
args = {"image_url": "https://example.com/cat.png", "question": "Describe"}
|
||||
|
||||
r1 = json.loads(registry.dispatch("vision_analyze", args))
|
||||
loop_after_first = _get_tool_loop()
|
||||
|
||||
r2 = json.loads(registry.dispatch("vision_analyze", args))
|
||||
loop_after_second = _get_tool_loop()
|
||||
|
||||
assert r1.get("success") is True
|
||||
assert r2.get("success") is True
|
||||
assert loop_after_first is loop_after_second, "Loop changed between dispatches"
|
||||
assert not loop_after_second.is_closed()
|
||||
|
||||
|
||||
def _write_fake_image(dest):
|
||||
"""Write minimal bytes so vision_analyze_tool thinks download succeeded."""
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
dest.write_bytes(b"\xff\xd8\xff" + b"\x00" * 16)
|
||||
return dest
|
||||
@@ -370,7 +370,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
||||
},
|
||||
"deliver": {
|
||||
"type": "string",
|
||||
"description": "Delivery target: origin, local, telegram, discord, signal, sms, or platform:chat_id"
|
||||
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, matrix, mattermost, homeassistant, dingtalk, email, sms, or platform:chat_id"
|
||||
},
|
||||
"model": {
|
||||
"type": "string",
|
||||
|
||||
@@ -124,6 +124,10 @@ def _handle_send(args):
|
||||
"slack": Platform.SLACK,
|
||||
"whatsapp": Platform.WHATSAPP,
|
||||
"signal": Platform.SIGNAL,
|
||||
"matrix": Platform.MATRIX,
|
||||
"mattermost": Platform.MATTERMOST,
|
||||
"homeassistant": Platform.HOMEASSISTANT,
|
||||
"dingtalk": Platform.DINGTALK,
|
||||
"email": Platform.EMAIL,
|
||||
"sms": Platform.SMS,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user