mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 09:17:09 +08:00
Compare commits
8 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df1671d36b | ||
|
|
43906fb89d | ||
|
|
ef7b00d132 | ||
|
|
d48cd893f2 | ||
|
|
595802e14b | ||
|
|
628d9bb729 | ||
|
|
10b070b4bd | ||
|
|
c20d06878d |
@@ -502,6 +502,14 @@ class MessageType(Enum):
|
|||||||
COMMAND = "command" # /command style
|
COMMAND = "command" # /command style
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessingOutcome(Enum):
|
||||||
|
"""Result classification for message-processing lifecycle hooks."""
|
||||||
|
|
||||||
|
SUCCESS = "success"
|
||||||
|
FAILURE = "failure"
|
||||||
|
CANCELLED = "cancelled"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class MessageEvent:
|
class MessageEvent:
|
||||||
"""
|
"""
|
||||||
@@ -625,6 +633,7 @@ class BasePlatformAdapter(ABC):
|
|||||||
# Gateway shutdown cancels these so an old gateway instance doesn't keep
|
# Gateway shutdown cancels these so an old gateway instance doesn't keep
|
||||||
# working on a task after --replace or manual restarts.
|
# working on a task after --replace or manual restarts.
|
||||||
self._background_tasks: set[asyncio.Task] = set()
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
self._expected_cancelled_tasks: set[asyncio.Task] = set()
|
||||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||||
self._auto_tts_disabled_chats: set = set()
|
self._auto_tts_disabled_chats: set = set()
|
||||||
# Chats where typing indicator is paused (e.g. during approval waits).
|
# Chats where typing indicator is paused (e.g. during approval waits).
|
||||||
@@ -1133,7 +1142,7 @@ class BasePlatformAdapter(ABC):
|
|||||||
async def on_processing_start(self, event: MessageEvent) -> None:
|
async def on_processing_start(self, event: MessageEvent) -> None:
|
||||||
"""Hook called when background processing begins."""
|
"""Hook called when background processing begins."""
|
||||||
|
|
||||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||||
"""Hook called when background processing completes."""
|
"""Hook called when background processing completes."""
|
||||||
|
|
||||||
async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None:
|
async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None:
|
||||||
@@ -1294,7 +1303,7 @@ class BasePlatformAdapter(ABC):
|
|||||||
# session lifecycle and its cleanup races with the running task
|
# session lifecycle and its cleanup races with the running task
|
||||||
# (see PR #4926).
|
# (see PR #4926).
|
||||||
cmd = event.get_command()
|
cmd = event.get_command()
|
||||||
if cmd in ("approve", "deny", "status", "stop", "new", "reset"):
|
if cmd in ("approve", "deny", "status", "stop", "new", "reset", "background"):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[%s] Command '/%s' bypassing active-session guard for %s",
|
"[%s] Command '/%s' bypassing active-session guard for %s",
|
||||||
self.name, cmd, session_key,
|
self.name, cmd, session_key,
|
||||||
@@ -1352,6 +1361,7 @@ class BasePlatformAdapter(ABC):
|
|||||||
return
|
return
|
||||||
if hasattr(task, "add_done_callback"):
|
if hasattr(task, "add_done_callback"):
|
||||||
task.add_done_callback(self._background_tasks.discard)
|
task.add_done_callback(self._background_tasks.discard)
|
||||||
|
task.add_done_callback(self._expected_cancelled_tasks.discard)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_human_delay() -> float:
|
def _get_human_delay() -> float:
|
||||||
@@ -1580,7 +1590,11 @@ class BasePlatformAdapter(ABC):
|
|||||||
|
|
||||||
# Determine overall success for the processing hook
|
# Determine overall success for the processing hook
|
||||||
processing_ok = delivery_succeeded if delivery_attempted else not bool(response)
|
processing_ok = delivery_succeeded if delivery_attempted else not bool(response)
|
||||||
await self._run_processing_hook("on_processing_complete", event, processing_ok)
|
await self._run_processing_hook(
|
||||||
|
"on_processing_complete",
|
||||||
|
event,
|
||||||
|
ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE,
|
||||||
|
)
|
||||||
|
|
||||||
# Check if there's a pending message that was queued during our processing
|
# Check if there's a pending message that was queued during our processing
|
||||||
if session_key in self._pending_messages:
|
if session_key in self._pending_messages:
|
||||||
@@ -1599,10 +1613,14 @@ class BasePlatformAdapter(ABC):
|
|||||||
return # Already cleaned up
|
return # Already cleaned up
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
await self._run_processing_hook("on_processing_complete", event, False)
|
current_task = asyncio.current_task()
|
||||||
|
outcome = ProcessingOutcome.CANCELLED
|
||||||
|
if current_task is None or current_task not in self._expected_cancelled_tasks:
|
||||||
|
outcome = ProcessingOutcome.FAILURE
|
||||||
|
await self._run_processing_hook("on_processing_complete", event, outcome)
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await self._run_processing_hook("on_processing_complete", event, False)
|
await self._run_processing_hook("on_processing_complete", event, ProcessingOutcome.FAILURE)
|
||||||
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
|
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
|
||||||
# Send the error to the user so they aren't left with radio silence
|
# Send the error to the user so they aren't left with radio silence
|
||||||
try:
|
try:
|
||||||
@@ -1646,10 +1664,12 @@ class BasePlatformAdapter(ABC):
|
|||||||
"""
|
"""
|
||||||
tasks = [task for task in self._background_tasks if not task.done()]
|
tasks = [task for task in self._background_tasks if not task.done()]
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
|
self._expected_cancelled_tasks.add(task)
|
||||||
task.cancel()
|
task.cancel()
|
||||||
if tasks:
|
if tasks:
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
self._background_tasks.clear()
|
self._background_tasks.clear()
|
||||||
|
self._expected_cancelled_tasks.clear()
|
||||||
self._pending_messages.clear()
|
self._pending_messages.clear()
|
||||||
self._active_sessions.clear()
|
self._active_sessions.clear()
|
||||||
|
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ from gateway.platforms.base import (
|
|||||||
BasePlatformAdapter,
|
BasePlatformAdapter,
|
||||||
MessageEvent,
|
MessageEvent,
|
||||||
MessageType,
|
MessageType,
|
||||||
|
ProcessingOutcome,
|
||||||
SendResult,
|
SendResult,
|
||||||
cache_image_from_url,
|
cache_image_from_url,
|
||||||
cache_audio_from_url,
|
cache_audio_from_url,
|
||||||
@@ -754,14 +755,17 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
if hasattr(message, "add_reaction"):
|
if hasattr(message, "add_reaction"):
|
||||||
await self._add_reaction(message, "👀")
|
await self._add_reaction(message, "👀")
|
||||||
|
|
||||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||||
"""Swap the in-progress reaction for a final success/failure reaction."""
|
"""Swap the in-progress reaction for a final success/failure reaction."""
|
||||||
if not self._reactions_enabled():
|
if not self._reactions_enabled():
|
||||||
return
|
return
|
||||||
message = event.raw_message
|
message = event.raw_message
|
||||||
if hasattr(message, "add_reaction"):
|
if hasattr(message, "add_reaction"):
|
||||||
await self._remove_reaction(message, "👀")
|
await self._remove_reaction(message, "👀")
|
||||||
await self._add_reaction(message, "✅" if success else "❌")
|
if outcome == ProcessingOutcome.SUCCESS:
|
||||||
|
await self._add_reaction(message, "✅")
|
||||||
|
elif outcome == ProcessingOutcome.FAILURE:
|
||||||
|
await self._add_reaction(message, "❌")
|
||||||
|
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -973,7 +973,8 @@ def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None:
|
|||||||
return await original_connect(*args, **kwargs)
|
return await original_connect(*args, **kwargs)
|
||||||
|
|
||||||
def _configure_with_overrides(conf: Any) -> Any:
|
def _configure_with_overrides(conf: Any) -> Any:
|
||||||
assert original_configure is not None
|
if original_configure is None:
|
||||||
|
raise RuntimeError("Feishu _configure_with_overrides called but original_configure is None")
|
||||||
result = original_configure(conf)
|
result = original_configure(conf)
|
||||||
_apply_runtime_ws_overrides()
|
_apply_runtime_ws_overrides()
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ from gateway.platforms.base import (
|
|||||||
BasePlatformAdapter,
|
BasePlatformAdapter,
|
||||||
MessageEvent,
|
MessageEvent,
|
||||||
MessageType,
|
MessageType,
|
||||||
|
ProcessingOutcome,
|
||||||
SendResult,
|
SendResult,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1479,7 +1480,7 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||||||
await self._send_reaction(room_id, msg_id, "\U0001f440")
|
await self._send_reaction(room_id, msg_id, "\U0001f440")
|
||||||
|
|
||||||
async def on_processing_complete(
|
async def on_processing_complete(
|
||||||
self, event: MessageEvent, success: bool,
|
self, event: MessageEvent, outcome: ProcessingOutcome,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Replace eyes with checkmark (success) or cross (failure)."""
|
"""Replace eyes with checkmark (success) or cross (failure)."""
|
||||||
if not self._reactions_enabled:
|
if not self._reactions_enabled:
|
||||||
@@ -1488,11 +1489,15 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||||||
room_id = event.source.chat_id
|
room_id = event.source.chat_id
|
||||||
if not msg_id or not room_id:
|
if not msg_id or not room_id:
|
||||||
return
|
return
|
||||||
|
if outcome == ProcessingOutcome.CANCELLED:
|
||||||
|
return
|
||||||
# Note: Matrix doesn't support removing a specific reaction easily
|
# Note: Matrix doesn't support removing a specific reaction easily
|
||||||
# without tracking the reaction event_id. We send the new reaction;
|
# without tracking the reaction event_id. We send the new reaction;
|
||||||
# the eyes stays (acceptable UX — both are visible).
|
# the eyes stays (acceptable UX — both are visible).
|
||||||
await self._send_reaction(
|
await self._send_reaction(
|
||||||
room_id, msg_id, "\u2705" if success else "\u274c",
|
room_id,
|
||||||
|
msg_id,
|
||||||
|
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _on_reaction(self, room: Any, event: Any) -> None:
|
async def _on_reaction(self, room: Any, event: Any) -> None:
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ from gateway.platforms.base import (
|
|||||||
BasePlatformAdapter,
|
BasePlatformAdapter,
|
||||||
MessageEvent,
|
MessageEvent,
|
||||||
MessageType,
|
MessageType,
|
||||||
|
ProcessingOutcome,
|
||||||
SendResult,
|
SendResult,
|
||||||
cache_image_from_bytes,
|
cache_image_from_bytes,
|
||||||
cache_audio_from_bytes,
|
cache_audio_from_bytes,
|
||||||
@@ -517,6 +518,36 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
|
|
||||||
# Build the application
|
# Build the application
|
||||||
builder = Application.builder().token(self.config.token)
|
builder = Application.builder().token(self.config.token)
|
||||||
|
|
||||||
|
# PTB defaults (pool_timeout=1s) are too aggressive on flaky networks and
|
||||||
|
# can trigger "Pool timeout: All connections in the connection pool are occupied"
|
||||||
|
# during reconnect/bootstrap. Use safer defaults and allow env overrides.
|
||||||
|
def _env_int(name: str, default: int) -> int:
|
||||||
|
try:
|
||||||
|
return int(os.getenv(name, str(default)))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
|
||||||
|
def _env_float(name: str, default: float) -> float:
|
||||||
|
try:
|
||||||
|
return float(os.getenv(name, str(default)))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
|
||||||
|
request_kwargs = {
|
||||||
|
"connection_pool_size": _env_int("HERMES_TELEGRAM_HTTP_POOL_SIZE", 512),
|
||||||
|
"pool_timeout": _env_float("HERMES_TELEGRAM_HTTP_POOL_TIMEOUT", 8.0),
|
||||||
|
"connect_timeout": _env_float("HERMES_TELEGRAM_HTTP_CONNECT_TIMEOUT", 10.0),
|
||||||
|
"read_timeout": _env_float("HERMES_TELEGRAM_HTTP_READ_TIMEOUT", 20.0),
|
||||||
|
"write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy_configured = any(
|
||||||
|
(os.getenv(k) or "").strip()
|
||||||
|
for k in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", "https_proxy", "http_proxy", "all_proxy")
|
||||||
|
)
|
||||||
|
disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in ("1", "true", "yes", "on"))
|
||||||
|
|
||||||
fallback_ips = self._fallback_ips()
|
fallback_ips = self._fallback_ips()
|
||||||
if not fallback_ips:
|
if not fallback_ips:
|
||||||
fallback_ips = await discover_fallback_ips()
|
fallback_ips = await discover_fallback_ips()
|
||||||
@@ -525,15 +556,31 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
self.name,
|
self.name,
|
||||||
", ".join(fallback_ips),
|
", ".join(fallback_ips),
|
||||||
)
|
)
|
||||||
if fallback_ips:
|
|
||||||
|
if fallback_ips and not proxy_configured and not disable_fallback:
|
||||||
logger.info(
|
logger.info(
|
||||||
"[%s] Telegram fallback IPs active: %s",
|
"[%s] Telegram fallback IPs active: %s",
|
||||||
self.name,
|
self.name,
|
||||||
", ".join(fallback_ips),
|
", ".join(fallback_ips),
|
||||||
)
|
)
|
||||||
transport = TelegramFallbackTransport(fallback_ips)
|
# Keep request/update pools separate to reduce contention during
|
||||||
request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
# polling reconnect + bot API bootstrap/delete_webhook calls.
|
||||||
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
request = HTTPXRequest(
|
||||||
|
**request_kwargs,
|
||||||
|
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||||
|
)
|
||||||
|
get_updates_request = HTTPXRequest(
|
||||||
|
**request_kwargs,
|
||||||
|
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if proxy_configured:
|
||||||
|
logger.info("[%s] Proxy configured; skipping Telegram fallback-IP transport", self.name)
|
||||||
|
elif disable_fallback:
|
||||||
|
logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name)
|
||||||
|
request = HTTPXRequest(**request_kwargs)
|
||||||
|
get_updates_request = HTTPXRequest(**request_kwargs)
|
||||||
|
|
||||||
builder = builder.request(request).get_updates_request(get_updates_request)
|
builder = builder.request(request).get_updates_request(get_updates_request)
|
||||||
self._app = builder.build()
|
self._app = builder.build()
|
||||||
self._bot = self._app.bot
|
self._bot = self._app.bot
|
||||||
@@ -2732,7 +2779,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
if chat_id and message_id:
|
if chat_id and message_id:
|
||||||
await self._set_reaction(chat_id, message_id, "\U0001f440")
|
await self._set_reaction(chat_id, message_id, "\U0001f440")
|
||||||
|
|
||||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||||
"""Swap the in-progress reaction for a final success/failure reaction.
|
"""Swap the in-progress reaction for a final success/failure reaction.
|
||||||
|
|
||||||
Unlike Discord (additive reactions), Telegram's set_message_reaction
|
Unlike Discord (additive reactions), Telegram's set_message_reaction
|
||||||
@@ -2742,5 +2789,9 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
return
|
return
|
||||||
chat_id = getattr(event.source, "chat_id", None)
|
chat_id = getattr(event.source, "chat_id", None)
|
||||||
message_id = getattr(event, "message_id", None)
|
message_id = getattr(event, "message_id", None)
|
||||||
if chat_id and message_id:
|
if chat_id and message_id and outcome != ProcessingOutcome.CANCELLED:
|
||||||
await self._set_reaction(chat_id, message_id, "\u2705" if success else "\u274c")
|
await self._set_reaction(
|
||||||
|
chat_id,
|
||||||
|
message_id,
|
||||||
|
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
|
||||||
|
)
|
||||||
|
|||||||
@@ -110,7 +110,8 @@ class TelegramFallbackTransport(httpx.AsyncBaseTransport):
|
|||||||
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
|
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
assert last_error is not None
|
if last_error is None:
|
||||||
|
raise RuntimeError("All Telegram fallback IPs exhausted but no error was recorded")
|
||||||
raise last_error
|
raise last_error
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
|||||||
@@ -1991,6 +1991,11 @@ class GatewayRunner:
|
|||||||
return await self._handle_approve_command(event)
|
return await self._handle_approve_command(event)
|
||||||
return await self._handle_deny_command(event)
|
return await self._handle_deny_command(event)
|
||||||
|
|
||||||
|
# /background must bypass the running-agent guard — it starts a
|
||||||
|
# parallel task and must never interrupt the active conversation.
|
||||||
|
if _cmd_def_inner and _cmd_def_inner.name == "background":
|
||||||
|
return await self._handle_background_command(event)
|
||||||
|
|
||||||
if event.message_type == MessageType.PHOTO:
|
if event.message_type == MessageType.PHOTO:
|
||||||
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
|
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
|
||||||
adapter = self.adapters.get(source.platform)
|
adapter = self.adapters.get(source.platform)
|
||||||
@@ -7577,7 +7582,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
# setups (each profile using a distinct HERMES_HOME) will naturally
|
# setups (each profile using a distinct HERMES_HOME) will naturally
|
||||||
# allow concurrent instances without tripping this guard.
|
# allow concurrent instances without tripping this guard.
|
||||||
import time as _time
|
import time as _time
|
||||||
from gateway.status import get_running_pid, remove_pid_file
|
from gateway.status import get_running_pid, remove_pid_file, terminate_pid
|
||||||
existing_pid = get_running_pid()
|
existing_pid = get_running_pid()
|
||||||
if existing_pid is not None and existing_pid != os.getpid():
|
if existing_pid is not None and existing_pid != os.getpid():
|
||||||
if replace:
|
if replace:
|
||||||
@@ -7586,10 +7591,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
existing_pid,
|
existing_pid,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
os.kill(existing_pid, signal.SIGTERM)
|
terminate_pid(existing_pid, force=False)
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass # Already gone
|
pass # Already gone
|
||||||
except PermissionError:
|
except (PermissionError, OSError):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Permission denied killing PID %d. Cannot replace.",
|
"Permission denied killing PID %d. Cannot replace.",
|
||||||
existing_pid,
|
existing_pid,
|
||||||
@@ -7609,9 +7614,9 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
existing_pid,
|
existing_pid,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
os.kill(existing_pid, signal.SIGKILL)
|
terminate_pid(existing_pid, force=True)
|
||||||
_time.sleep(0.5)
|
_time.sleep(0.5)
|
||||||
except (ProcessLookupError, PermissionError):
|
except (ProcessLookupError, PermissionError, OSError):
|
||||||
pass
|
pass
|
||||||
remove_pid_file()
|
remove_pid_file()
|
||||||
# Also release all scoped locks left by the old process.
|
# Also release all scoped locks left by the old process.
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ concurrently under distinct configurations).
|
|||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -23,6 +25,7 @@ from typing import Any, Optional
|
|||||||
_GATEWAY_KIND = "hermes-gateway"
|
_GATEWAY_KIND = "hermes-gateway"
|
||||||
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
||||||
_LOCKS_DIRNAME = "gateway-locks"
|
_LOCKS_DIRNAME = "gateway-locks"
|
||||||
|
_IS_WINDOWS = sys.platform == "win32"
|
||||||
|
|
||||||
|
|
||||||
def _get_pid_path() -> Path:
|
def _get_pid_path() -> Path:
|
||||||
@@ -49,6 +52,33 @@ def _utc_now_iso() -> str:
|
|||||||
return datetime.now(timezone.utc).isoformat()
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def terminate_pid(pid: int, *, force: bool = False) -> None:
|
||||||
|
"""Terminate a PID with platform-appropriate force semantics.
|
||||||
|
|
||||||
|
POSIX uses SIGTERM/SIGKILL. Windows uses taskkill /T /F for true force-kill
|
||||||
|
because os.kill(..., SIGTERM) is not equivalent to a tree-killing hard stop.
|
||||||
|
"""
|
||||||
|
if force and _IS_WINDOWS:
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["taskkill", "/PID", str(pid), "/T", "/F"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=10,
|
||||||
|
)
|
||||||
|
except FileNotFoundError:
|
||||||
|
os.kill(pid, signal.SIGTERM)
|
||||||
|
return
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
details = (result.stderr or result.stdout or "").strip()
|
||||||
|
raise OSError(details or f"taskkill failed for PID {pid}")
|
||||||
|
return
|
||||||
|
|
||||||
|
sig = signal.SIGTERM if not force else getattr(signal, "SIGKILL", signal.SIGTERM)
|
||||||
|
os.kill(pid, sig)
|
||||||
|
|
||||||
|
|
||||||
def _scope_hash(identity: str) -> str:
|
def _scope_hash(identity: str) -> str:
|
||||||
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
||||||
|
|
||||||
|
|||||||
@@ -205,11 +205,20 @@ class GatewayStreamConsumer:
|
|||||||
await self._send_or_edit(self._accumulated)
|
await self._send_or_edit(self._accumulated)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Tool boundary: the should_edit block above already flushed
|
# Tool boundary: reset message state so the next text chunk
|
||||||
# accumulated text without a cursor. Reset state so the next
|
# creates a fresh message below any tool-progress messages.
|
||||||
# text chunk creates a fresh message below any tool-progress
|
#
|
||||||
# messages the gateway sent in between.
|
# Exception: when _message_id is "__no_edit__" the platform
|
||||||
if got_segment_break:
|
# never returned a real message ID (e.g. Signal, webhook with
|
||||||
|
# github_comment delivery). Resetting to None would re-enter
|
||||||
|
# the "first send" path on every tool boundary and post one
|
||||||
|
# platform message per tool call — that is what caused 155
|
||||||
|
# comments under a single PR. Instead, keep all state so the
|
||||||
|
# full continuation is delivered once via _send_fallback_final.
|
||||||
|
# (When editing fails mid-stream due to flood control the id is
|
||||||
|
# a real string like "msg_1", not "__no_edit__", so that case
|
||||||
|
# still resets and creates a fresh segment as intended.)
|
||||||
|
if got_segment_break and self._message_id != "__no_edit__":
|
||||||
self._message_id = None
|
self._message_id = None
|
||||||
self._accumulated = ""
|
self._accumulated = ""
|
||||||
self._last_sent_text = ""
|
self._last_sent_text = ""
|
||||||
|
|||||||
@@ -16,8 +16,18 @@ from collections.abc import Callable, Mapping
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
|
# prompt_toolkit is an optional CLI dependency — only needed for
|
||||||
from prompt_toolkit.completion import Completer, Completion
|
# SlashCommandCompleter and SlashCommandAutoSuggest. Gateway and test
|
||||||
|
# environments that lack it must still be able to import this module
|
||||||
|
# for resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
|
||||||
|
try:
|
||||||
|
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
|
||||||
|
from prompt_toolkit.completion import Completer, Completion
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
AutoSuggest = object # type: ignore[assignment,misc]
|
||||||
|
Completer = object # type: ignore[assignment,misc]
|
||||||
|
Suggestion = None # type: ignore[assignment]
|
||||||
|
Completion = None # type: ignore[assignment]
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||||
|
|
||||||
|
from gateway.status import terminate_pid
|
||||||
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
|
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
|
||||||
# display_hermes_home is imported lazily at call sites to avoid ImportError
|
# display_hermes_home is imported lazily at call sites to avoid ImportError
|
||||||
# when hermes_constants is cached from a pre-update version during `hermes update`.
|
# when hermes_constants is cached from a pre-update version during `hermes update`.
|
||||||
@@ -162,7 +163,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||||||
"""Kill any running gateway processes. Returns count killed.
|
"""Kill any running gateway processes. Returns count killed.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
force: Use SIGKILL instead of SIGTERM.
|
force: Use the platform's force-kill mechanism instead of graceful terminate.
|
||||||
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
|
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
|
||||||
restarted and should not be killed).
|
restarted and should not be killed).
|
||||||
"""
|
"""
|
||||||
@@ -171,10 +172,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||||||
|
|
||||||
for pid in pids:
|
for pid in pids:
|
||||||
try:
|
try:
|
||||||
if force and not is_windows():
|
terminate_pid(pid, force=force)
|
||||||
os.kill(pid, signal.SIGKILL)
|
|
||||||
else:
|
|
||||||
os.kill(pid, signal.SIGTERM)
|
|
||||||
killed += 1
|
killed += 1
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
# Process already gone
|
# Process already gone
|
||||||
@@ -182,6 +180,8 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
|||||||
except PermissionError:
|
except PermissionError:
|
||||||
print(f"⚠ Permission denied to kill PID {pid}")
|
print(f"⚠ Permission denied to kill PID {pid}")
|
||||||
|
|
||||||
|
except OSError as exc:
|
||||||
|
print(f"Failed to kill PID {pid}: {exc}")
|
||||||
return killed
|
return killed
|
||||||
|
|
||||||
|
|
||||||
@@ -1208,7 +1208,7 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
timeout: Total seconds to wait before giving up.
|
timeout: Total seconds to wait before giving up.
|
||||||
force_after: Seconds of graceful waiting before sending SIGKILL.
|
force_after: Seconds of graceful waiting before escalating to force-kill.
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
from gateway.status import get_running_pid
|
from gateway.status import get_running_pid
|
||||||
@@ -1225,15 +1225,15 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
|||||||
if not force_sent and time.monotonic() >= force_deadline:
|
if not force_sent and time.monotonic() >= force_deadline:
|
||||||
# Grace period expired — force-kill the specific PID.
|
# Grace period expired — force-kill the specific PID.
|
||||||
try:
|
try:
|
||||||
os.kill(pid, signal.SIGKILL)
|
terminate_pid(pid, force=True)
|
||||||
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
|
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
|
||||||
except (ProcessLookupError, PermissionError):
|
except (ProcessLookupError, PermissionError, OSError):
|
||||||
return # Already gone or we can't touch it.
|
return # Already gone or we can't touch it.
|
||||||
force_sent = True
|
force_sent = True
|
||||||
|
|
||||||
time.sleep(0.3)
|
time.sleep(0.3)
|
||||||
|
|
||||||
# Timed out even after SIGKILL.
|
# Timed out even after force-kill.
|
||||||
remaining_pid = get_running_pid()
|
remaining_pid = get_running_pid()
|
||||||
if remaining_pid is not None:
|
if remaining_pid is not None:
|
||||||
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")
|
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")
|
||||||
|
|||||||
@@ -308,6 +308,7 @@ class TestBackgroundInCLICommands:
|
|||||||
|
|
||||||
def test_background_autocompletes(self):
|
def test_background_autocompletes(self):
|
||||||
"""The /background command appears in autocomplete results."""
|
"""The /background command appears in autocomplete results."""
|
||||||
|
pytest.importorskip("prompt_toolkit")
|
||||||
from hermes_cli.commands import SlashCommandCompleter
|
from hermes_cli.commands import SlashCommandCompleter
|
||||||
from prompt_toolkit.document import Document
|
from prompt_toolkit.document import Document
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from types import SimpleNamespace
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from gateway.config import Platform, PlatformConfig
|
from gateway.config import Platform, PlatformConfig
|
||||||
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult
|
||||||
from gateway.session import SessionSource, build_session_key
|
from gateway.session import SessionSource, build_session_key
|
||||||
|
|
||||||
|
|
||||||
@@ -44,8 +44,8 @@ class DummyTelegramAdapter(BasePlatformAdapter):
|
|||||||
async def on_processing_start(self, event: MessageEvent) -> None:
|
async def on_processing_start(self, event: MessageEvent) -> None:
|
||||||
self.processing_hooks.append(("start", event.message_id))
|
self.processing_hooks.append(("start", event.message_id))
|
||||||
|
|
||||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||||
self.processing_hooks.append(("complete", event.message_id, success))
|
self.processing_hooks.append(("complete", event.message_id, outcome))
|
||||||
|
|
||||||
|
|
||||||
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
|
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
|
||||||
@@ -142,7 +142,7 @@ class TestBasePlatformTopicSessions:
|
|||||||
]
|
]
|
||||||
assert adapter.processing_hooks == [
|
assert adapter.processing_hooks == [
|
||||||
("start", "1"),
|
("start", "1"),
|
||||||
("complete", "1", True),
|
("complete", "1", ProcessingOutcome.SUCCESS),
|
||||||
]
|
]
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -168,7 +168,7 @@ class TestBasePlatformTopicSessions:
|
|||||||
|
|
||||||
assert adapter.processing_hooks == [
|
assert adapter.processing_hooks == [
|
||||||
("start", "1"),
|
("start", "1"),
|
||||||
("complete", "1", False),
|
("complete", "1", ProcessingOutcome.FAILURE),
|
||||||
]
|
]
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -190,7 +190,7 @@ class TestBasePlatformTopicSessions:
|
|||||||
|
|
||||||
assert adapter.processing_hooks == [
|
assert adapter.processing_hooks == [
|
||||||
("start", "1"),
|
("start", "1"),
|
||||||
("complete", "1", False),
|
("complete", "1", ProcessingOutcome.FAILURE),
|
||||||
]
|
]
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -218,5 +218,31 @@ class TestBasePlatformTopicSessions:
|
|||||||
|
|
||||||
assert adapter.processing_hooks == [
|
assert adapter.processing_hooks == [
|
||||||
("start", "1"),
|
("start", "1"),
|
||||||
("complete", "1", False),
|
("complete", "1", ProcessingOutcome.FAILURE),
|
||||||
|
]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self):
|
||||||
|
adapter = DummyTelegramAdapter()
|
||||||
|
release = asyncio.Event()
|
||||||
|
|
||||||
|
async def handler(_event):
|
||||||
|
await release.wait()
|
||||||
|
return "ack"
|
||||||
|
|
||||||
|
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
||||||
|
await asyncio.Event().wait()
|
||||||
|
|
||||||
|
adapter.set_message_handler(handler)
|
||||||
|
adapter._keep_typing = hold_typing
|
||||||
|
|
||||||
|
event = _make_event("-1001", "17585")
|
||||||
|
await adapter.handle_message(event)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
await adapter.cancel_background_tasks()
|
||||||
|
|
||||||
|
assert adapter.processing_hooks == [
|
||||||
|
("start", "1"),
|
||||||
|
("complete", "1", ProcessingOutcome.CANCELLED),
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -160,6 +160,22 @@ class TestCommandBypassActiveSession:
|
|||||||
assert sk not in adapter._pending_messages
|
assert sk not in adapter._pending_messages
|
||||||
assert any("handled:status" in r for r in adapter.sent_responses)
|
assert any("handled:status" in r for r in adapter.sent_responses)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_background_bypasses_guard(self):
|
||||||
|
"""/background must bypass so it spawns a parallel task, not an interrupt."""
|
||||||
|
adapter = _make_adapter()
|
||||||
|
sk = _session_key()
|
||||||
|
adapter._active_sessions[sk] = asyncio.Event()
|
||||||
|
|
||||||
|
await adapter.handle_message(_make_event("/background summarize HN"))
|
||||||
|
|
||||||
|
assert sk not in adapter._pending_messages, (
|
||||||
|
"/background was queued as a pending message instead of being dispatched"
|
||||||
|
)
|
||||||
|
assert any("handled:background" in r for r in adapter.sent_responses), (
|
||||||
|
"/background response was not sent back to the user"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Tests: non-bypass messages still get queued
|
# Tests: non-bypass messages still get queued
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from gateway.config import Platform, PlatformConfig
|
from gateway.config import Platform, PlatformConfig
|
||||||
from gateway.platforms.base import MessageEvent, MessageType, SendResult
|
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome, SendResult
|
||||||
from gateway.session import SessionSource, build_session_key
|
from gateway.session import SessionSource, build_session_key
|
||||||
|
|
||||||
|
|
||||||
@@ -212,7 +212,7 @@ async def test_reactions_disabled_via_env_zero(adapter, monkeypatch):
|
|||||||
|
|
||||||
event = _make_event("5", raw_message)
|
event = _make_event("5", raw_message)
|
||||||
await adapter.on_processing_start(event)
|
await adapter.on_processing_start(event)
|
||||||
await adapter.on_processing_complete(event, success=True)
|
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||||
|
|
||||||
raw_message.add_reaction.assert_not_awaited()
|
raw_message.add_reaction.assert_not_awaited()
|
||||||
raw_message.remove_reaction.assert_not_awaited()
|
raw_message.remove_reaction.assert_not_awaited()
|
||||||
@@ -232,3 +232,17 @@ async def test_reactions_enabled_by_default(adapter, monkeypatch):
|
|||||||
await adapter.on_processing_start(event)
|
await adapter.on_processing_start(event)
|
||||||
|
|
||||||
raw_message.add_reaction.assert_awaited_once_with("👀")
|
raw_message.add_reaction.assert_awaited_once_with("👀")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_processing_complete_cancelled_removes_eyes_without_terminal_reaction(adapter):
|
||||||
|
raw_message = SimpleNamespace(
|
||||||
|
add_reaction=AsyncMock(),
|
||||||
|
remove_reaction=AsyncMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
event = _make_event("7", raw_message)
|
||||||
|
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||||
|
|
||||||
|
raw_message.remove_reaction.assert_awaited_once_with("👀", adapter._client.user)
|
||||||
|
raw_message.add_reaction.assert_not_awaited()
|
||||||
|
|||||||
@@ -128,12 +128,16 @@ async def test_internal_event_bypasses_authorization(monkeypatch, tmp_path):
|
|||||||
|
|
||||||
monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth)
|
monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth)
|
||||||
|
|
||||||
# _handle_message will proceed past auth check and eventually fail on
|
# Stop execution before the agent runner so the test doesn't block in
|
||||||
# downstream logic. We just need to verify auth is skipped.
|
# run_in_executor. Auth check happens before _handle_message_with_agent.
|
||||||
|
async def _raise(*_a, **_kw):
|
||||||
|
raise RuntimeError("sentinel — stop here")
|
||||||
|
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await runner._handle_message(event)
|
await runner._handle_message(event)
|
||||||
except Exception:
|
except RuntimeError:
|
||||||
pass # Expected — downstream code needs more setup
|
pass # Expected sentinel
|
||||||
|
|
||||||
assert not auth_called, (
|
assert not auth_called, (
|
||||||
"_is_user_authorized should NOT be called for internal events"
|
"_is_user_authorized should NOT be called for internal events"
|
||||||
@@ -175,10 +179,16 @@ async def test_internal_event_does_not_trigger_pairing(monkeypatch, tmp_path):
|
|||||||
|
|
||||||
runner.pairing_store.generate_code = tracking_generate
|
runner.pairing_store.generate_code = tracking_generate
|
||||||
|
|
||||||
|
# Stop execution before the agent runner so the test doesn't block in
|
||||||
|
# run_in_executor. Pairing check happens before _handle_message_with_agent.
|
||||||
|
async def _raise(*_a, **_kw):
|
||||||
|
raise RuntimeError("sentinel — stop here")
|
||||||
|
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await runner._handle_message(event)
|
await runner._handle_message(event)
|
||||||
except Exception:
|
except RuntimeError:
|
||||||
pass # Expected — downstream code needs more setup
|
pass # Expected sentinel
|
||||||
|
|
||||||
assert not generate_called, (
|
assert not generate_called, (
|
||||||
"Pairing code should NOT be generated for internal events"
|
"Pairing code should NOT be generated for internal events"
|
||||||
|
|||||||
@@ -1980,7 +1980,7 @@ class TestMatrixReactions:
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_on_processing_complete_sends_check(self):
|
async def test_on_processing_complete_sends_check(self):
|
||||||
from gateway.platforms.base import MessageEvent, MessageType
|
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||||
|
|
||||||
self.adapter._reactions_enabled = True
|
self.adapter._reactions_enabled = True
|
||||||
self.adapter._send_reaction = AsyncMock(return_value=True)
|
self.adapter._send_reaction = AsyncMock(return_value=True)
|
||||||
@@ -1994,9 +1994,28 @@ class TestMatrixReactions:
|
|||||||
raw_message={},
|
raw_message={},
|
||||||
message_id="$msg1",
|
message_id="$msg1",
|
||||||
)
|
)
|
||||||
await self.adapter.on_processing_complete(event, success=True)
|
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||||
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "✅")
|
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "✅")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
|
||||||
|
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||||
|
|
||||||
|
self.adapter._reactions_enabled = True
|
||||||
|
self.adapter._send_reaction = AsyncMock(return_value=True)
|
||||||
|
|
||||||
|
source = MagicMock()
|
||||||
|
source.chat_id = "!room:ex"
|
||||||
|
event = MessageEvent(
|
||||||
|
text="hello",
|
||||||
|
message_type=MessageType.TEXT,
|
||||||
|
source=source,
|
||||||
|
raw_message={},
|
||||||
|
message_id="$msg1",
|
||||||
|
)
|
||||||
|
await self.adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||||
|
self.adapter._send_reaction.assert_not_called()
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_reactions_disabled(self):
|
async def test_reactions_disabled(self):
|
||||||
from gateway.platforms.base import MessageEvent, MessageType
|
from gateway.platforms.base import MessageEvent, MessageType
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa
|
|||||||
assert adapter.sent == [
|
assert adapter.sent == [
|
||||||
{
|
{
|
||||||
"chat_id": "-1001",
|
"chat_id": "-1001",
|
||||||
"content": '💻 terminal: "pwd"',
|
"content": '⚙️ terminal: "pwd"',
|
||||||
"reply_to": None,
|
"reply_to": None,
|
||||||
"metadata": {"thread_id": "17585"},
|
"metadata": {"thread_id": "17585"},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,3 +87,42 @@ async def test_runner_allows_cron_only_mode_when_no_platforms_are_enabled(monkey
|
|||||||
assert runner.adapters == {}
|
assert runner.adapters == {}
|
||||||
state = read_runtime_status()
|
state = read_runtime_status()
|
||||||
assert state["gateway_state"] == "running"
|
assert state["gateway_state"] == "running"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_path):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||||
|
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
class _CleanExitRunner:
|
||||||
|
def __init__(self, config):
|
||||||
|
self.config = config
|
||||||
|
self.should_exit_cleanly = True
|
||||||
|
self.exit_reason = None
|
||||||
|
self.adapters = {}
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42)
|
||||||
|
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None)
|
||||||
|
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
|
||||||
|
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||||
|
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
|
||||||
|
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
|
||||||
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||||
|
monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None)
|
||||||
|
monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path)
|
||||||
|
monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None)
|
||||||
|
monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner)
|
||||||
|
|
||||||
|
from gateway.run import start_gateway
|
||||||
|
|
||||||
|
ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None)
|
||||||
|
|
||||||
|
assert ok is True
|
||||||
|
assert calls == [(42, False), (42, True)]
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
from gateway import status
|
from gateway import status
|
||||||
|
|
||||||
@@ -104,6 +105,41 @@ class TestGatewayRuntimeStatus:
|
|||||||
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
||||||
|
|
||||||
|
|
||||||
|
class TestTerminatePid:
|
||||||
|
def test_force_uses_taskkill_on_windows(self, monkeypatch):
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||||
|
|
||||||
|
def fake_run(cmd, capture_output=False, text=False, timeout=None):
|
||||||
|
calls.append((cmd, capture_output, text, timeout))
|
||||||
|
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
||||||
|
|
||||||
|
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||||
|
|
||||||
|
status.terminate_pid(123, force=True)
|
||||||
|
|
||||||
|
assert calls == [
|
||||||
|
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||||
|
|
||||||
|
def fake_run(*args, **kwargs):
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
calls.append((pid, sig))
|
||||||
|
|
||||||
|
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||||
|
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||||
|
|
||||||
|
status.terminate_pid(456, force=True)
|
||||||
|
|
||||||
|
assert calls == [(456, status.signal.SIGTERM)]
|
||||||
|
|
||||||
|
|
||||||
class TestScopedLocks:
|
class TestScopedLocks:
|
||||||
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||||
|
|||||||
@@ -437,6 +437,45 @@ class TestSegmentBreakOnToolBoundary:
|
|||||||
# Only one send call (the initial message)
|
# Only one send call (the initial message)
|
||||||
assert adapter.send.call_count == 1
|
assert adapter.send.call_count == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_message_id_segment_breaks_do_not_resend(self):
|
||||||
|
"""On a platform that never returns a message_id (e.g. webhook with
|
||||||
|
github_comment delivery), tool-call segment breaks must NOT trigger
|
||||||
|
a new adapter.send() per boundary. The fix: _message_id == '__no_edit__'
|
||||||
|
suppresses the reset so all text accumulates and is sent once."""
|
||||||
|
adapter = MagicMock()
|
||||||
|
# No message_id on first send, then one more for the fallback final
|
||||||
|
adapter.send = AsyncMock(side_effect=[
|
||||||
|
SimpleNamespace(success=True, message_id=None),
|
||||||
|
SimpleNamespace(success=True, message_id=None),
|
||||||
|
])
|
||||||
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
||||||
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||||
|
|
||||||
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||||
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||||
|
|
||||||
|
# Simulate: text → tool boundary → text → tool boundary → text (3 segments)
|
||||||
|
consumer.on_delta("Phase 1 text")
|
||||||
|
consumer.on_delta(None) # tool call boundary
|
||||||
|
consumer.on_delta("Phase 2 text")
|
||||||
|
consumer.on_delta(None) # another tool call boundary
|
||||||
|
consumer.on_delta("Phase 3 text")
|
||||||
|
consumer.finish()
|
||||||
|
|
||||||
|
await consumer.run()
|
||||||
|
|
||||||
|
# Before the fix this would post 3 comments (one per segment).
|
||||||
|
# After the fix: only the initial partial + one fallback-final continuation.
|
||||||
|
assert adapter.send.call_count == 2, (
|
||||||
|
f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
|
||||||
|
)
|
||||||
|
assert consumer.already_sent
|
||||||
|
# The continuation must contain the text from segments 2 and 3
|
||||||
|
final_text = adapter.send.call_args_list[1][1]["content"]
|
||||||
|
assert "Phase 2" in final_text
|
||||||
|
assert "Phase 3" in final_text
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
|
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
|
||||||
"""Long continuation tails should be chunked when fallback final-send runs."""
|
"""Long continuation tails should be chunked when fallback final-send runs."""
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from unittest.mock import AsyncMock
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from gateway.config import Platform, PlatformConfig
|
from gateway.config import Platform, PlatformConfig
|
||||||
from gateway.platforms.base import MessageEvent, MessageType
|
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||||
from gateway.session import SessionSource
|
from gateway.session import SessionSource
|
||||||
|
|
||||||
|
|
||||||
@@ -180,7 +180,7 @@ async def test_on_processing_complete_success(monkeypatch):
|
|||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
event = _make_event()
|
event = _make_event()
|
||||||
|
|
||||||
await adapter.on_processing_complete(event, success=True)
|
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||||
|
|
||||||
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
||||||
chat_id=123,
|
chat_id=123,
|
||||||
@@ -196,7 +196,7 @@ async def test_on_processing_complete_failure(monkeypatch):
|
|||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
event = _make_event()
|
event = _make_event()
|
||||||
|
|
||||||
await adapter.on_processing_complete(event, success=False)
|
await adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
|
||||||
|
|
||||||
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
||||||
chat_id=123,
|
chat_id=123,
|
||||||
@@ -212,7 +212,19 @@ async def test_on_processing_complete_skipped_when_disabled(monkeypatch):
|
|||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
event = _make_event()
|
event = _make_event()
|
||||||
|
|
||||||
await adapter.on_processing_complete(event, success=True)
|
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||||
|
|
||||||
|
adapter._bot.set_message_reaction.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_on_processing_complete_cancelled_keeps_existing_reaction(monkeypatch):
|
||||||
|
"""Expected cancellation should not replace the in-progress reaction."""
|
||||||
|
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
|
||||||
|
adapter = _make_adapter()
|
||||||
|
event = _make_event()
|
||||||
|
|
||||||
|
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||||
|
|
||||||
adapter._bot.set_message_reaction.assert_not_awaited()
|
adapter._bot.set_message_reaction.assert_not_awaited()
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
"""Tests for hermes_cli.gateway."""
|
"""Tests for hermes_cli.gateway."""
|
||||||
|
|
||||||
import signal
|
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from unittest.mock import patch, call
|
from unittest.mock import patch, call
|
||||||
|
|
||||||
@@ -211,8 +210,7 @@ class TestWaitForGatewayExit:
|
|||||||
assert poll_count == 3
|
assert poll_count == 3
|
||||||
|
|
||||||
def test_force_kills_after_grace_period(self, monkeypatch):
|
def test_force_kills_after_grace_period(self, monkeypatch):
|
||||||
"""When the process doesn't exit, SIGKILL the saved PID."""
|
"""When the process doesn't exit, force-kill the saved PID."""
|
||||||
import time as _time
|
|
||||||
|
|
||||||
# Simulate monotonic time advancing past force_after
|
# Simulate monotonic time advancing past force_after
|
||||||
call_num = 0
|
call_num = 0
|
||||||
@@ -224,8 +222,8 @@ class TestWaitForGatewayExit:
|
|||||||
return call_num * 2.0 # 2, 4, 6, 8, ...
|
return call_num * 2.0 # 2, 4, 6, 8, ...
|
||||||
|
|
||||||
kills = []
|
kills = []
|
||||||
def mock_kill(pid, sig):
|
def mock_terminate(pid, force=False):
|
||||||
kills.append((pid, sig))
|
kills.append((pid, force))
|
||||||
|
|
||||||
# get_running_pid returns the PID until kill is sent, then None
|
# get_running_pid returns the PID until kill is sent, then None
|
||||||
def mock_get_running_pid():
|
def mock_get_running_pid():
|
||||||
@@ -234,14 +232,13 @@ class TestWaitForGatewayExit:
|
|||||||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||||
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
||||||
monkeypatch.setattr("os.kill", mock_kill)
|
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||||
|
|
||||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
|
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
|
||||||
assert (42, signal.SIGKILL) in kills
|
assert (42, True) in kills
|
||||||
|
|
||||||
def test_handles_process_already_gone_on_kill(self, monkeypatch):
|
def test_handles_process_already_gone_on_kill(self, monkeypatch):
|
||||||
"""ProcessLookupError during SIGKILL is not fatal."""
|
"""ProcessLookupError during force-kill is not fatal."""
|
||||||
import time as _time
|
|
||||||
|
|
||||||
call_num = 0
|
call_num = 0
|
||||||
def fake_monotonic():
|
def fake_monotonic():
|
||||||
@@ -249,13 +246,24 @@ class TestWaitForGatewayExit:
|
|||||||
call_num += 1
|
call_num += 1
|
||||||
return call_num * 3.0 # Jump past force_after quickly
|
return call_num * 3.0 # Jump past force_after quickly
|
||||||
|
|
||||||
def mock_kill(pid, sig):
|
def mock_terminate(pid, force=False):
|
||||||
raise ProcessLookupError
|
raise ProcessLookupError
|
||||||
|
|
||||||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
|
||||||
monkeypatch.setattr("os.kill", mock_kill)
|
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||||
|
|
||||||
# Should not raise — ProcessLookupError means it's already gone.
|
# Should not raise — ProcessLookupError means it's already gone.
|
||||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
|
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
|
||||||
|
|
||||||
|
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None: [11, 22])
|
||||||
|
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||||
|
|
||||||
|
killed = gateway.kill_gateway_processes(force=True)
|
||||||
|
|
||||||
|
assert killed == 2
|
||||||
|
assert calls == [(11, True), (22, True)]
|
||||||
|
|||||||
Reference in New Issue
Block a user