mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 01:07:31 +08:00
Compare commits
8 Commits
skill/gith
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df1671d36b | ||
|
|
43906fb89d | ||
|
|
ef7b00d132 | ||
|
|
d48cd893f2 | ||
|
|
595802e14b | ||
|
|
628d9bb729 | ||
|
|
10b070b4bd | ||
|
|
c20d06878d |
@@ -502,6 +502,14 @@ class MessageType(Enum):
|
||||
COMMAND = "command" # /command style
|
||||
|
||||
|
||||
class ProcessingOutcome(Enum):
|
||||
"""Result classification for message-processing lifecycle hooks."""
|
||||
|
||||
SUCCESS = "success"
|
||||
FAILURE = "failure"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageEvent:
|
||||
"""
|
||||
@@ -625,6 +633,7 @@ class BasePlatformAdapter(ABC):
|
||||
# Gateway shutdown cancels these so an old gateway instance doesn't keep
|
||||
# working on a task after --replace or manual restarts.
|
||||
self._background_tasks: set[asyncio.Task] = set()
|
||||
self._expected_cancelled_tasks: set[asyncio.Task] = set()
|
||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||
self._auto_tts_disabled_chats: set = set()
|
||||
# Chats where typing indicator is paused (e.g. during approval waits).
|
||||
@@ -1133,7 +1142,7 @@ class BasePlatformAdapter(ABC):
|
||||
async def on_processing_start(self, event: MessageEvent) -> None:
|
||||
"""Hook called when background processing begins."""
|
||||
|
||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
||||
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||
"""Hook called when background processing completes."""
|
||||
|
||||
async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None:
|
||||
@@ -1294,7 +1303,7 @@ class BasePlatformAdapter(ABC):
|
||||
# session lifecycle and its cleanup races with the running task
|
||||
# (see PR #4926).
|
||||
cmd = event.get_command()
|
||||
if cmd in ("approve", "deny", "status", "stop", "new", "reset"):
|
||||
if cmd in ("approve", "deny", "status", "stop", "new", "reset", "background"):
|
||||
logger.debug(
|
||||
"[%s] Command '/%s' bypassing active-session guard for %s",
|
||||
self.name, cmd, session_key,
|
||||
@@ -1352,6 +1361,7 @@ class BasePlatformAdapter(ABC):
|
||||
return
|
||||
if hasattr(task, "add_done_callback"):
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
task.add_done_callback(self._expected_cancelled_tasks.discard)
|
||||
|
||||
@staticmethod
|
||||
def _get_human_delay() -> float:
|
||||
@@ -1580,7 +1590,11 @@ class BasePlatformAdapter(ABC):
|
||||
|
||||
# Determine overall success for the processing hook
|
||||
processing_ok = delivery_succeeded if delivery_attempted else not bool(response)
|
||||
await self._run_processing_hook("on_processing_complete", event, processing_ok)
|
||||
await self._run_processing_hook(
|
||||
"on_processing_complete",
|
||||
event,
|
||||
ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE,
|
||||
)
|
||||
|
||||
# Check if there's a pending message that was queued during our processing
|
||||
if session_key in self._pending_messages:
|
||||
@@ -1599,10 +1613,14 @@ class BasePlatformAdapter(ABC):
|
||||
return # Already cleaned up
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await self._run_processing_hook("on_processing_complete", event, False)
|
||||
current_task = asyncio.current_task()
|
||||
outcome = ProcessingOutcome.CANCELLED
|
||||
if current_task is None or current_task not in self._expected_cancelled_tasks:
|
||||
outcome = ProcessingOutcome.FAILURE
|
||||
await self._run_processing_hook("on_processing_complete", event, outcome)
|
||||
raise
|
||||
except Exception as e:
|
||||
await self._run_processing_hook("on_processing_complete", event, False)
|
||||
await self._run_processing_hook("on_processing_complete", event, ProcessingOutcome.FAILURE)
|
||||
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
|
||||
# Send the error to the user so they aren't left with radio silence
|
||||
try:
|
||||
@@ -1646,10 +1664,12 @@ class BasePlatformAdapter(ABC):
|
||||
"""
|
||||
tasks = [task for task in self._background_tasks if not task.done()]
|
||||
for task in tasks:
|
||||
self._expected_cancelled_tasks.add(task)
|
||||
task.cancel()
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
self._background_tasks.clear()
|
||||
self._expected_cancelled_tasks.clear()
|
||||
self._pending_messages.clear()
|
||||
self._active_sessions.clear()
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
ProcessingOutcome,
|
||||
SendResult,
|
||||
cache_image_from_url,
|
||||
cache_audio_from_url,
|
||||
@@ -754,14 +755,17 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
if hasattr(message, "add_reaction"):
|
||||
await self._add_reaction(message, "👀")
|
||||
|
||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
||||
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||
"""Swap the in-progress reaction for a final success/failure reaction."""
|
||||
if not self._reactions_enabled():
|
||||
return
|
||||
message = event.raw_message
|
||||
if hasattr(message, "add_reaction"):
|
||||
await self._remove_reaction(message, "👀")
|
||||
await self._add_reaction(message, "✅" if success else "❌")
|
||||
if outcome == ProcessingOutcome.SUCCESS:
|
||||
await self._add_reaction(message, "✅")
|
||||
elif outcome == ProcessingOutcome.FAILURE:
|
||||
await self._add_reaction(message, "❌")
|
||||
|
||||
async def send(
|
||||
self,
|
||||
|
||||
@@ -973,7 +973,8 @@ def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None:
|
||||
return await original_connect(*args, **kwargs)
|
||||
|
||||
def _configure_with_overrides(conf: Any) -> Any:
|
||||
assert original_configure is not None
|
||||
if original_configure is None:
|
||||
raise RuntimeError("Feishu _configure_with_overrides called but original_configure is None")
|
||||
result = original_configure(conf)
|
||||
_apply_runtime_ws_overrides()
|
||||
return result
|
||||
|
||||
@@ -40,6 +40,7 @@ from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
ProcessingOutcome,
|
||||
SendResult,
|
||||
)
|
||||
|
||||
@@ -1479,7 +1480,7 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
await self._send_reaction(room_id, msg_id, "\U0001f440")
|
||||
|
||||
async def on_processing_complete(
|
||||
self, event: MessageEvent, success: bool,
|
||||
self, event: MessageEvent, outcome: ProcessingOutcome,
|
||||
) -> None:
|
||||
"""Replace eyes with checkmark (success) or cross (failure)."""
|
||||
if not self._reactions_enabled:
|
||||
@@ -1488,11 +1489,15 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
room_id = event.source.chat_id
|
||||
if not msg_id or not room_id:
|
||||
return
|
||||
if outcome == ProcessingOutcome.CANCELLED:
|
||||
return
|
||||
# Note: Matrix doesn't support removing a specific reaction easily
|
||||
# without tracking the reaction event_id. We send the new reaction;
|
||||
# the eyes stays (acceptable UX — both are visible).
|
||||
await self._send_reaction(
|
||||
room_id, msg_id, "\u2705" if success else "\u274c",
|
||||
room_id,
|
||||
msg_id,
|
||||
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
|
||||
)
|
||||
|
||||
async def _on_reaction(self, room: Any, event: Any) -> None:
|
||||
|
||||
@@ -60,6 +60,7 @@ from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
ProcessingOutcome,
|
||||
SendResult,
|
||||
cache_image_from_bytes,
|
||||
cache_audio_from_bytes,
|
||||
@@ -517,6 +518,36 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
|
||||
# Build the application
|
||||
builder = Application.builder().token(self.config.token)
|
||||
|
||||
# PTB defaults (pool_timeout=1s) are too aggressive on flaky networks and
|
||||
# can trigger "Pool timeout: All connections in the connection pool are occupied"
|
||||
# during reconnect/bootstrap. Use safer defaults and allow env overrides.
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
try:
|
||||
return int(os.getenv(name, str(default)))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
try:
|
||||
return float(os.getenv(name, str(default)))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
request_kwargs = {
|
||||
"connection_pool_size": _env_int("HERMES_TELEGRAM_HTTP_POOL_SIZE", 512),
|
||||
"pool_timeout": _env_float("HERMES_TELEGRAM_HTTP_POOL_TIMEOUT", 8.0),
|
||||
"connect_timeout": _env_float("HERMES_TELEGRAM_HTTP_CONNECT_TIMEOUT", 10.0),
|
||||
"read_timeout": _env_float("HERMES_TELEGRAM_HTTP_READ_TIMEOUT", 20.0),
|
||||
"write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0),
|
||||
}
|
||||
|
||||
proxy_configured = any(
|
||||
(os.getenv(k) or "").strip()
|
||||
for k in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", "https_proxy", "http_proxy", "all_proxy")
|
||||
)
|
||||
disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in ("1", "true", "yes", "on"))
|
||||
|
||||
fallback_ips = self._fallback_ips()
|
||||
if not fallback_ips:
|
||||
fallback_ips = await discover_fallback_ips()
|
||||
@@ -525,16 +556,32 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
self.name,
|
||||
", ".join(fallback_ips),
|
||||
)
|
||||
if fallback_ips:
|
||||
|
||||
if fallback_ips and not proxy_configured and not disable_fallback:
|
||||
logger.info(
|
||||
"[%s] Telegram fallback IPs active: %s",
|
||||
self.name,
|
||||
", ".join(fallback_ips),
|
||||
)
|
||||
transport = TelegramFallbackTransport(fallback_ips)
|
||||
request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
||||
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
||||
builder = builder.request(request).get_updates_request(get_updates_request)
|
||||
# Keep request/update pools separate to reduce contention during
|
||||
# polling reconnect + bot API bootstrap/delete_webhook calls.
|
||||
request = HTTPXRequest(
|
||||
**request_kwargs,
|
||||
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||
)
|
||||
get_updates_request = HTTPXRequest(
|
||||
**request_kwargs,
|
||||
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
|
||||
)
|
||||
else:
|
||||
if proxy_configured:
|
||||
logger.info("[%s] Proxy configured; skipping Telegram fallback-IP transport", self.name)
|
||||
elif disable_fallback:
|
||||
logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name)
|
||||
request = HTTPXRequest(**request_kwargs)
|
||||
get_updates_request = HTTPXRequest(**request_kwargs)
|
||||
|
||||
builder = builder.request(request).get_updates_request(get_updates_request)
|
||||
self._app = builder.build()
|
||||
self._bot = self._app.bot
|
||||
|
||||
@@ -2732,7 +2779,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
if chat_id and message_id:
|
||||
await self._set_reaction(chat_id, message_id, "\U0001f440")
|
||||
|
||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
||||
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||
"""Swap the in-progress reaction for a final success/failure reaction.
|
||||
|
||||
Unlike Discord (additive reactions), Telegram's set_message_reaction
|
||||
@@ -2742,5 +2789,9 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return
|
||||
chat_id = getattr(event.source, "chat_id", None)
|
||||
message_id = getattr(event, "message_id", None)
|
||||
if chat_id and message_id:
|
||||
await self._set_reaction(chat_id, message_id, "\u2705" if success else "\u274c")
|
||||
if chat_id and message_id and outcome != ProcessingOutcome.CANCELLED:
|
||||
await self._set_reaction(
|
||||
chat_id,
|
||||
message_id,
|
||||
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
|
||||
)
|
||||
|
||||
@@ -110,7 +110,8 @@ class TelegramFallbackTransport(httpx.AsyncBaseTransport):
|
||||
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
|
||||
continue
|
||||
|
||||
assert last_error is not None
|
||||
if last_error is None:
|
||||
raise RuntimeError("All Telegram fallback IPs exhausted but no error was recorded")
|
||||
raise last_error
|
||||
|
||||
async def aclose(self) -> None:
|
||||
|
||||
@@ -1991,6 +1991,11 @@ class GatewayRunner:
|
||||
return await self._handle_approve_command(event)
|
||||
return await self._handle_deny_command(event)
|
||||
|
||||
# /background must bypass the running-agent guard — it starts a
|
||||
# parallel task and must never interrupt the active conversation.
|
||||
if _cmd_def_inner and _cmd_def_inner.name == "background":
|
||||
return await self._handle_background_command(event)
|
||||
|
||||
if event.message_type == MessageType.PHOTO:
|
||||
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
|
||||
adapter = self.adapters.get(source.platform)
|
||||
@@ -7577,7 +7582,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
# setups (each profile using a distinct HERMES_HOME) will naturally
|
||||
# allow concurrent instances without tripping this guard.
|
||||
import time as _time
|
||||
from gateway.status import get_running_pid, remove_pid_file
|
||||
from gateway.status import get_running_pid, remove_pid_file, terminate_pid
|
||||
existing_pid = get_running_pid()
|
||||
if existing_pid is not None and existing_pid != os.getpid():
|
||||
if replace:
|
||||
@@ -7586,10 +7591,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
existing_pid,
|
||||
)
|
||||
try:
|
||||
os.kill(existing_pid, signal.SIGTERM)
|
||||
terminate_pid(existing_pid, force=False)
|
||||
except ProcessLookupError:
|
||||
pass # Already gone
|
||||
except PermissionError:
|
||||
except (PermissionError, OSError):
|
||||
logger.error(
|
||||
"Permission denied killing PID %d. Cannot replace.",
|
||||
existing_pid,
|
||||
@@ -7609,9 +7614,9 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
existing_pid,
|
||||
)
|
||||
try:
|
||||
os.kill(existing_pid, signal.SIGKILL)
|
||||
terminate_pid(existing_pid, force=True)
|
||||
_time.sleep(0.5)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
pass
|
||||
remove_pid_file()
|
||||
# Also release all scoped locks left by the old process.
|
||||
|
||||
@@ -14,6 +14,8 @@ concurrently under distinct configurations).
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -23,6 +25,7 @@ from typing import Any, Optional
|
||||
_GATEWAY_KIND = "hermes-gateway"
|
||||
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
||||
_LOCKS_DIRNAME = "gateway-locks"
|
||||
_IS_WINDOWS = sys.platform == "win32"
|
||||
|
||||
|
||||
def _get_pid_path() -> Path:
|
||||
@@ -49,6 +52,33 @@ def _utc_now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def terminate_pid(pid: int, *, force: bool = False) -> None:
|
||||
"""Terminate a PID with platform-appropriate force semantics.
|
||||
|
||||
POSIX uses SIGTERM/SIGKILL. Windows uses taskkill /T /F for true force-kill
|
||||
because os.kill(..., SIGTERM) is not equivalent to a tree-killing hard stop.
|
||||
"""
|
||||
if force and _IS_WINDOWS:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["taskkill", "/PID", str(pid), "/T", "/F"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
return
|
||||
|
||||
if result.returncode != 0:
|
||||
details = (result.stderr or result.stdout or "").strip()
|
||||
raise OSError(details or f"taskkill failed for PID {pid}")
|
||||
return
|
||||
|
||||
sig = signal.SIGTERM if not force else getattr(signal, "SIGKILL", signal.SIGTERM)
|
||||
os.kill(pid, sig)
|
||||
|
||||
|
||||
def _scope_hash(identity: str) -> str:
|
||||
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
@@ -205,11 +205,20 @@ class GatewayStreamConsumer:
|
||||
await self._send_or_edit(self._accumulated)
|
||||
return
|
||||
|
||||
# Tool boundary: the should_edit block above already flushed
|
||||
# accumulated text without a cursor. Reset state so the next
|
||||
# text chunk creates a fresh message below any tool-progress
|
||||
# messages the gateway sent in between.
|
||||
if got_segment_break:
|
||||
# Tool boundary: reset message state so the next text chunk
|
||||
# creates a fresh message below any tool-progress messages.
|
||||
#
|
||||
# Exception: when _message_id is "__no_edit__" the platform
|
||||
# never returned a real message ID (e.g. Signal, webhook with
|
||||
# github_comment delivery). Resetting to None would re-enter
|
||||
# the "first send" path on every tool boundary and post one
|
||||
# platform message per tool call — that is what caused 155
|
||||
# comments under a single PR. Instead, keep all state so the
|
||||
# full continuation is delivered once via _send_fallback_final.
|
||||
# (When editing fails mid-stream due to flood control the id is
|
||||
# a real string like "msg_1", not "__no_edit__", so that case
|
||||
# still resets and creates a fresh segment as intended.)
|
||||
if got_segment_break and self._message_id != "__no_edit__":
|
||||
self._message_id = None
|
||||
self._accumulated = ""
|
||||
self._last_sent_text = ""
|
||||
|
||||
@@ -16,8 +16,18 @@ from collections.abc import Callable, Mapping
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
|
||||
from prompt_toolkit.completion import Completer, Completion
|
||||
# prompt_toolkit is an optional CLI dependency — only needed for
|
||||
# SlashCommandCompleter and SlashCommandAutoSuggest. Gateway and test
|
||||
# environments that lack it must still be able to import this module
|
||||
# for resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
|
||||
try:
|
||||
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
|
||||
from prompt_toolkit.completion import Completer, Completion
|
||||
except ImportError: # pragma: no cover
|
||||
AutoSuggest = object # type: ignore[assignment,misc]
|
||||
Completer = object # type: ignore[assignment,misc]
|
||||
Suggestion = None # type: ignore[assignment]
|
||||
Completion = None # type: ignore[assignment]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -14,6 +14,7 @@ from pathlib import Path
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||
|
||||
from gateway.status import terminate_pid
|
||||
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
|
||||
# display_hermes_home is imported lazily at call sites to avoid ImportError
|
||||
# when hermes_constants is cached from a pre-update version during `hermes update`.
|
||||
@@ -162,7 +163,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
||||
"""Kill any running gateway processes. Returns count killed.
|
||||
|
||||
Args:
|
||||
force: Use SIGKILL instead of SIGTERM.
|
||||
force: Use the platform's force-kill mechanism instead of graceful terminate.
|
||||
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
|
||||
restarted and should not be killed).
|
||||
"""
|
||||
@@ -171,10 +172,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
||||
|
||||
for pid in pids:
|
||||
try:
|
||||
if force and not is_windows():
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
else:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
terminate_pid(pid, force=force)
|
||||
killed += 1
|
||||
except ProcessLookupError:
|
||||
# Process already gone
|
||||
@@ -182,6 +180,8 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
|
||||
except PermissionError:
|
||||
print(f"⚠ Permission denied to kill PID {pid}")
|
||||
|
||||
except OSError as exc:
|
||||
print(f"Failed to kill PID {pid}: {exc}")
|
||||
return killed
|
||||
|
||||
|
||||
@@ -1208,7 +1208,7 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
||||
|
||||
Args:
|
||||
timeout: Total seconds to wait before giving up.
|
||||
force_after: Seconds of graceful waiting before sending SIGKILL.
|
||||
force_after: Seconds of graceful waiting before escalating to force-kill.
|
||||
"""
|
||||
import time
|
||||
from gateway.status import get_running_pid
|
||||
@@ -1225,15 +1225,15 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
|
||||
if not force_sent and time.monotonic() >= force_deadline:
|
||||
# Grace period expired — force-kill the specific PID.
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
terminate_pid(pid, force=True)
|
||||
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
|
||||
except (ProcessLookupError, PermissionError):
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
return # Already gone or we can't touch it.
|
||||
force_sent = True
|
||||
|
||||
time.sleep(0.3)
|
||||
|
||||
# Timed out even after SIGKILL.
|
||||
# Timed out even after force-kill.
|
||||
remaining_pid = get_running_pid()
|
||||
if remaining_pid is not None:
|
||||
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")
|
||||
|
||||
@@ -308,6 +308,7 @@ class TestBackgroundInCLICommands:
|
||||
|
||||
def test_background_autocompletes(self):
|
||||
"""The /background command appears in autocomplete results."""
|
||||
pytest.importorskip("prompt_toolkit")
|
||||
from hermes_cli.commands import SlashCommandCompleter
|
||||
from prompt_toolkit.document import Document
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from types import SimpleNamespace
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
|
||||
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
@@ -44,8 +44,8 @@ class DummyTelegramAdapter(BasePlatformAdapter):
|
||||
async def on_processing_start(self, event: MessageEvent) -> None:
|
||||
self.processing_hooks.append(("start", event.message_id))
|
||||
|
||||
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
||||
self.processing_hooks.append(("complete", event.message_id, success))
|
||||
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
|
||||
self.processing_hooks.append(("complete", event.message_id, outcome))
|
||||
|
||||
|
||||
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
|
||||
@@ -142,7 +142,7 @@ class TestBasePlatformTopicSessions:
|
||||
]
|
||||
assert adapter.processing_hooks == [
|
||||
("start", "1"),
|
||||
("complete", "1", True),
|
||||
("complete", "1", ProcessingOutcome.SUCCESS),
|
||||
]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -168,7 +168,7 @@ class TestBasePlatformTopicSessions:
|
||||
|
||||
assert adapter.processing_hooks == [
|
||||
("start", "1"),
|
||||
("complete", "1", False),
|
||||
("complete", "1", ProcessingOutcome.FAILURE),
|
||||
]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -190,7 +190,7 @@ class TestBasePlatformTopicSessions:
|
||||
|
||||
assert adapter.processing_hooks == [
|
||||
("start", "1"),
|
||||
("complete", "1", False),
|
||||
("complete", "1", ProcessingOutcome.FAILURE),
|
||||
]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -218,5 +218,31 @@ class TestBasePlatformTopicSessions:
|
||||
|
||||
assert adapter.processing_hooks == [
|
||||
("start", "1"),
|
||||
("complete", "1", False),
|
||||
("complete", "1", ProcessingOutcome.FAILURE),
|
||||
]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self):
|
||||
adapter = DummyTelegramAdapter()
|
||||
release = asyncio.Event()
|
||||
|
||||
async def handler(_event):
|
||||
await release.wait()
|
||||
return "ack"
|
||||
|
||||
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
||||
await asyncio.Event().wait()
|
||||
|
||||
adapter.set_message_handler(handler)
|
||||
adapter._keep_typing = hold_typing
|
||||
|
||||
event = _make_event("-1001", "17585")
|
||||
await adapter.handle_message(event)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
await adapter.cancel_background_tasks()
|
||||
|
||||
assert adapter.processing_hooks == [
|
||||
("start", "1"),
|
||||
("complete", "1", ProcessingOutcome.CANCELLED),
|
||||
]
|
||||
|
||||
@@ -160,6 +160,22 @@ class TestCommandBypassActiveSession:
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:status" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_background_bypasses_guard(self):
|
||||
"""/background must bypass so it spawns a parallel task, not an interrupt."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/background summarize HN"))
|
||||
|
||||
assert sk not in adapter._pending_messages, (
|
||||
"/background was queued as a pending message instead of being dispatched"
|
||||
)
|
||||
assert any("handled:background" in r for r in adapter.sent_responses), (
|
||||
"/background response was not sent back to the user"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: non-bypass messages still get queued
|
||||
|
||||
@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType, SendResult
|
||||
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome, SendResult
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
@@ -212,7 +212,7 @@ async def test_reactions_disabled_via_env_zero(adapter, monkeypatch):
|
||||
|
||||
event = _make_event("5", raw_message)
|
||||
await adapter.on_processing_start(event)
|
||||
await adapter.on_processing_complete(event, success=True)
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||
|
||||
raw_message.add_reaction.assert_not_awaited()
|
||||
raw_message.remove_reaction.assert_not_awaited()
|
||||
@@ -232,3 +232,17 @@ async def test_reactions_enabled_by_default(adapter, monkeypatch):
|
||||
await adapter.on_processing_start(event)
|
||||
|
||||
raw_message.add_reaction.assert_awaited_once_with("👀")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_processing_complete_cancelled_removes_eyes_without_terminal_reaction(adapter):
|
||||
raw_message = SimpleNamespace(
|
||||
add_reaction=AsyncMock(),
|
||||
remove_reaction=AsyncMock(),
|
||||
)
|
||||
|
||||
event = _make_event("7", raw_message)
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||
|
||||
raw_message.remove_reaction.assert_awaited_once_with("👀", adapter._client.user)
|
||||
raw_message.add_reaction.assert_not_awaited()
|
||||
|
||||
@@ -128,12 +128,16 @@ async def test_internal_event_bypasses_authorization(monkeypatch, tmp_path):
|
||||
|
||||
monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth)
|
||||
|
||||
# _handle_message will proceed past auth check and eventually fail on
|
||||
# downstream logic. We just need to verify auth is skipped.
|
||||
# Stop execution before the agent runner so the test doesn't block in
|
||||
# run_in_executor. Auth check happens before _handle_message_with_agent.
|
||||
async def _raise(*_a, **_kw):
|
||||
raise RuntimeError("sentinel — stop here")
|
||||
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
|
||||
|
||||
try:
|
||||
await runner._handle_message(event)
|
||||
except Exception:
|
||||
pass # Expected — downstream code needs more setup
|
||||
except RuntimeError:
|
||||
pass # Expected sentinel
|
||||
|
||||
assert not auth_called, (
|
||||
"_is_user_authorized should NOT be called for internal events"
|
||||
@@ -175,10 +179,16 @@ async def test_internal_event_does_not_trigger_pairing(monkeypatch, tmp_path):
|
||||
|
||||
runner.pairing_store.generate_code = tracking_generate
|
||||
|
||||
# Stop execution before the agent runner so the test doesn't block in
|
||||
# run_in_executor. Pairing check happens before _handle_message_with_agent.
|
||||
async def _raise(*_a, **_kw):
|
||||
raise RuntimeError("sentinel — stop here")
|
||||
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
|
||||
|
||||
try:
|
||||
await runner._handle_message(event)
|
||||
except Exception:
|
||||
pass # Expected — downstream code needs more setup
|
||||
except RuntimeError:
|
||||
pass # Expected sentinel
|
||||
|
||||
assert not generate_called, (
|
||||
"Pairing code should NOT be generated for internal events"
|
||||
|
||||
@@ -1980,7 +1980,7 @@ class TestMatrixReactions:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_processing_complete_sends_check(self):
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||
|
||||
self.adapter._reactions_enabled = True
|
||||
self.adapter._send_reaction = AsyncMock(return_value=True)
|
||||
@@ -1994,9 +1994,28 @@ class TestMatrixReactions:
|
||||
raw_message={},
|
||||
message_id="$msg1",
|
||||
)
|
||||
await self.adapter.on_processing_complete(event, success=True)
|
||||
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "✅")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
|
||||
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||
|
||||
self.adapter._reactions_enabled = True
|
||||
self.adapter._send_reaction = AsyncMock(return_value=True)
|
||||
|
||||
source = MagicMock()
|
||||
source.chat_id = "!room:ex"
|
||||
event = MessageEvent(
|
||||
text="hello",
|
||||
message_type=MessageType.TEXT,
|
||||
source=source,
|
||||
raw_message={},
|
||||
message_id="$msg1",
|
||||
)
|
||||
await self.adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||
self.adapter._send_reaction.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reactions_disabled(self):
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
|
||||
@@ -144,7 +144,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa
|
||||
assert adapter.sent == [
|
||||
{
|
||||
"chat_id": "-1001",
|
||||
"content": '💻 terminal: "pwd"',
|
||||
"content": '⚙️ terminal: "pwd"',
|
||||
"reply_to": None,
|
||||
"metadata": {"thread_id": "17585"},
|
||||
}
|
||||
|
||||
@@ -87,3 +87,42 @@ async def test_runner_allows_cron_only_mode_when_no_platforms_are_enabled(monkey
|
||||
assert runner.adapters == {}
|
||||
state = read_runtime_status()
|
||||
assert state["gateway_state"] == "running"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_path):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
calls = []
|
||||
|
||||
class _CleanExitRunner:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.should_exit_cleanly = True
|
||||
self.exit_reason = None
|
||||
self.adapters = {}
|
||||
|
||||
async def start(self):
|
||||
return True
|
||||
|
||||
async def stop(self):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42)
|
||||
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None)
|
||||
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
|
||||
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
|
||||
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None)
|
||||
monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path)
|
||||
monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None)
|
||||
monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner)
|
||||
|
||||
from gateway.run import start_gateway
|
||||
|
||||
ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None)
|
||||
|
||||
assert ok is True
|
||||
assert calls == [(42, False), (42, True)]
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import json
|
||||
import os
|
||||
from types import SimpleNamespace
|
||||
|
||||
from gateway import status
|
||||
|
||||
@@ -104,6 +105,41 @@ class TestGatewayRuntimeStatus:
|
||||
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
||||
|
||||
|
||||
class TestTerminatePid:
|
||||
def test_force_uses_taskkill_on_windows(self, monkeypatch):
|
||||
calls = []
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||
|
||||
def fake_run(cmd, capture_output=False, text=False, timeout=None):
|
||||
calls.append((cmd, capture_output, text, timeout))
|
||||
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
||||
|
||||
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||
|
||||
status.terminate_pid(123, force=True)
|
||||
|
||||
assert calls == [
|
||||
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
|
||||
]
|
||||
|
||||
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
|
||||
calls = []
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||
|
||||
def fake_run(*args, **kwargs):
|
||||
raise FileNotFoundError
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
calls.append((pid, sig))
|
||||
|
||||
monkeypatch.setattr(status.subprocess, "run", fake_run)
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
|
||||
status.terminate_pid(456, force=True)
|
||||
|
||||
assert calls == [(456, status.signal.SIGTERM)]
|
||||
|
||||
|
||||
class TestScopedLocks:
|
||||
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
|
||||
@@ -437,6 +437,45 @@ class TestSegmentBreakOnToolBoundary:
|
||||
# Only one send call (the initial message)
|
||||
assert adapter.send.call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_message_id_segment_breaks_do_not_resend(self):
|
||||
"""On a platform that never returns a message_id (e.g. webhook with
|
||||
github_comment delivery), tool-call segment breaks must NOT trigger
|
||||
a new adapter.send() per boundary. The fix: _message_id == '__no_edit__'
|
||||
suppresses the reset so all text accumulates and is sent once."""
|
||||
adapter = MagicMock()
|
||||
# No message_id on first send, then one more for the fallback final
|
||||
adapter.send = AsyncMock(side_effect=[
|
||||
SimpleNamespace(success=True, message_id=None),
|
||||
SimpleNamespace(success=True, message_id=None),
|
||||
])
|
||||
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||
|
||||
# Simulate: text → tool boundary → text → tool boundary → text (3 segments)
|
||||
consumer.on_delta("Phase 1 text")
|
||||
consumer.on_delta(None) # tool call boundary
|
||||
consumer.on_delta("Phase 2 text")
|
||||
consumer.on_delta(None) # another tool call boundary
|
||||
consumer.on_delta("Phase 3 text")
|
||||
consumer.finish()
|
||||
|
||||
await consumer.run()
|
||||
|
||||
# Before the fix this would post 3 comments (one per segment).
|
||||
# After the fix: only the initial partial + one fallback-final continuation.
|
||||
assert adapter.send.call_count == 2, (
|
||||
f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
|
||||
)
|
||||
assert consumer.already_sent
|
||||
# The continuation must contain the text from segments 2 and 3
|
||||
final_text = adapter.send.call_args_list[1][1]["content"]
|
||||
assert "Phase 2" in final_text
|
||||
assert "Phase 3" in final_text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
|
||||
"""Long continuation tails should be chunked when fallback final-send runs."""
|
||||
|
||||
@@ -6,7 +6,7 @@ from unittest.mock import AsyncMock
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
|
||||
from gateway.session import SessionSource
|
||||
|
||||
|
||||
@@ -180,7 +180,7 @@ async def test_on_processing_complete_success(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
event = _make_event()
|
||||
|
||||
await adapter.on_processing_complete(event, success=True)
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||
|
||||
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
||||
chat_id=123,
|
||||
@@ -196,7 +196,7 @@ async def test_on_processing_complete_failure(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
event = _make_event()
|
||||
|
||||
await adapter.on_processing_complete(event, success=False)
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
|
||||
|
||||
adapter._bot.set_message_reaction.assert_awaited_once_with(
|
||||
chat_id=123,
|
||||
@@ -212,7 +212,19 @@ async def test_on_processing_complete_skipped_when_disabled(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
event = _make_event()
|
||||
|
||||
await adapter.on_processing_complete(event, success=True)
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
|
||||
|
||||
adapter._bot.set_message_reaction.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_processing_complete_cancelled_keeps_existing_reaction(monkeypatch):
|
||||
"""Expected cancellation should not replace the in-progress reaction."""
|
||||
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
|
||||
adapter = _make_adapter()
|
||||
event = _make_event()
|
||||
|
||||
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
|
||||
|
||||
adapter._bot.set_message_reaction.assert_not_awaited()
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Tests for hermes_cli.gateway."""
|
||||
|
||||
import signal
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch, call
|
||||
|
||||
@@ -211,8 +210,7 @@ class TestWaitForGatewayExit:
|
||||
assert poll_count == 3
|
||||
|
||||
def test_force_kills_after_grace_period(self, monkeypatch):
|
||||
"""When the process doesn't exit, SIGKILL the saved PID."""
|
||||
import time as _time
|
||||
"""When the process doesn't exit, force-kill the saved PID."""
|
||||
|
||||
# Simulate monotonic time advancing past force_after
|
||||
call_num = 0
|
||||
@@ -224,8 +222,8 @@ class TestWaitForGatewayExit:
|
||||
return call_num * 2.0 # 2, 4, 6, 8, ...
|
||||
|
||||
kills = []
|
||||
def mock_kill(pid, sig):
|
||||
kills.append((pid, sig))
|
||||
def mock_terminate(pid, force=False):
|
||||
kills.append((pid, force))
|
||||
|
||||
# get_running_pid returns the PID until kill is sent, then None
|
||||
def mock_get_running_pid():
|
||||
@@ -234,14 +232,13 @@ class TestWaitForGatewayExit:
|
||||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
||||
monkeypatch.setattr("os.kill", mock_kill)
|
||||
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||
|
||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
|
||||
assert (42, signal.SIGKILL) in kills
|
||||
assert (42, True) in kills
|
||||
|
||||
def test_handles_process_already_gone_on_kill(self, monkeypatch):
|
||||
"""ProcessLookupError during SIGKILL is not fatal."""
|
||||
import time as _time
|
||||
"""ProcessLookupError during force-kill is not fatal."""
|
||||
|
||||
call_num = 0
|
||||
def fake_monotonic():
|
||||
@@ -249,13 +246,24 @@ class TestWaitForGatewayExit:
|
||||
call_num += 1
|
||||
return call_num * 3.0 # Jump past force_after quickly
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
def mock_terminate(pid, force=False):
|
||||
raise ProcessLookupError
|
||||
|
||||
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
|
||||
monkeypatch.setattr("os.kill", mock_kill)
|
||||
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
||||
|
||||
# Should not raise — ProcessLookupError means it's already gone.
|
||||
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
|
||||
|
||||
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
|
||||
calls = []
|
||||
|
||||
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None: [11, 22])
|
||||
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
||||
|
||||
killed = gateway.kill_gateway_processes(force=True)
|
||||
|
||||
assert killed == 2
|
||||
assert calls == [(11, True), (22, True)]
|
||||
|
||||
Reference in New Issue
Block a user