Compare commits

...

8 Commits

Author SHA1 Message Date
H-5-Isminiz
df1671d36b fix(gateway): implement platform-aware PID termination 2026-04-10 03:46:12 -07:00
KUSH42
43906fb89d fix(tests): repair three pre-existing gateway test failures
- test_background_autocompletes: pytest.importorskip("prompt_toolkit")
  so the test skips gracefully where the CLI dep is absent

- test_run_agent_progress_stays_in_originating_topic: update stale emoji
  💻⚙️ to match get_tool_emoji("terminal", default="⚙️") in run.py

- test_internal_event_bypass{_authorization,_pairing}: mock
  _handle_message_with_agent to raise immediately; avoids the 300s
  run_in_executor hang that caused the tests to time out
2026-04-10 03:46:08 -07:00
KUSH42
ef7b00d132 fix(gateway): prevent duplicate messages on no-message-id platforms
Platforms that don't return a message_id after the first send (Signal,
GitHub webhooks) were causing GatewayStreamConsumer to re-enter the
"first send" path on every tool boundary, posting one platform message
per tool call (observed as 155 PR comments on a single response).

Fix: treat _message_id == "__no_edit__" as a sentinel meaning "platform
accepted the send but cannot be edited". When a tool boundary arrives
in that state, skip the message_id/accumulated/last_sent_text reset so
all continuation text is delivered once via _send_fallback_final rather
than re-posted per segment.

Also make prompt_toolkit imports in hermes_cli/commands.py optional so
gateway and test environments that lack the package can still import
resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
2026-04-10 03:46:08 -07:00
zhouboli
d48cd893f2 fix(telegram): harden HTTPX request pools during reconnect
- configure Telegram HTTPXRequest pool/timeouts with env-overridable defaults\n- use separate request/get_updates request objects to reduce pool contention\n- skip fallback-IP transport when proxy is configured (or explicitly disabled)\n\nThis mitigates recurrent pool-timeout failures during polling reconnect/bootstrap (delete_webhook).
2026-04-10 03:46:05 -07:00
coffee
595802e14b fix(gateway): replace assertions with proper error handling in Telegram and Feishu
Python assertions are stripped when running with `python -O` (optimized
mode), making them unsuitable for runtime error handling.

1. `telegram_network.py:113` — After exhausting all fallback IPs, the code
   uses `assert last_error is not None` before `raise last_error`. In
   optimized mode, the assert is skipped; if `last_error` is unexpectedly
   None, `raise None` produces a confusing `TypeError` instead of a
   meaningful error. Replace with an explicit `if` check that raises
   `RuntimeError` with a descriptive message.

2. `feishu.py:975` — The `_configure_with_overrides` closure uses
   `assert original_configure is not None` as a guard. While the outer
   scope only installs this closure when `original_configure` is not None,
   the assert would silently disappear in optimized mode. Replace with an
   explicit `if` check for defensive safety.
2026-04-10 03:46:01 -07:00
Tranquil-Flow
628d9bb729 test(gateway): add /background to active-session bypass tests
Adds a regression test verifying that /background bypasses the
active-session guard in the platform adapter, matching the existing
test pattern for /stop, /new, /approve, /deny, and /status.
2026-04-10 03:45:57 -07:00
Tranquil-Flow
10b070b4bd fix(gateway): route /background through active-session bypass
When /background was sent during an active run, it was not in the
platform adapter's bypass list and fell through to the interrupt path
instead of spawning a parallel background task.

Add "background" to the active-session command bypass in the platform
adapter, and add an early return in the gateway runner's running-agent
guard to route /background to _handle_background_command() before it
reaches the default interrupt logic.

Fixes #6827
2026-04-10 03:45:57 -07:00
Kenny Xie
c20d06878d fix(gateway): avoid false failure reactions on restart cancellation 2026-04-10 03:45:54 -07:00
23 changed files with 429 additions and 73 deletions

View File

@@ -502,6 +502,14 @@ class MessageType(Enum):
COMMAND = "command" # /command style COMMAND = "command" # /command style
class ProcessingOutcome(Enum):
"""Result classification for message-processing lifecycle hooks."""
SUCCESS = "success"
FAILURE = "failure"
CANCELLED = "cancelled"
@dataclass @dataclass
class MessageEvent: class MessageEvent:
""" """
@@ -625,6 +633,7 @@ class BasePlatformAdapter(ABC):
# Gateway shutdown cancels these so an old gateway instance doesn't keep # Gateway shutdown cancels these so an old gateway instance doesn't keep
# working on a task after --replace or manual restarts. # working on a task after --replace or manual restarts.
self._background_tasks: set[asyncio.Task] = set() self._background_tasks: set[asyncio.Task] = set()
self._expected_cancelled_tasks: set[asyncio.Task] = set()
# Chats where auto-TTS on voice input is disabled (set by /voice off) # Chats where auto-TTS on voice input is disabled (set by /voice off)
self._auto_tts_disabled_chats: set = set() self._auto_tts_disabled_chats: set = set()
# Chats where typing indicator is paused (e.g. during approval waits). # Chats where typing indicator is paused (e.g. during approval waits).
@@ -1133,7 +1142,7 @@ class BasePlatformAdapter(ABC):
async def on_processing_start(self, event: MessageEvent) -> None: async def on_processing_start(self, event: MessageEvent) -> None:
"""Hook called when background processing begins.""" """Hook called when background processing begins."""
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None: async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Hook called when background processing completes.""" """Hook called when background processing completes."""
async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None: async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None:
@@ -1294,7 +1303,7 @@ class BasePlatformAdapter(ABC):
# session lifecycle and its cleanup races with the running task # session lifecycle and its cleanup races with the running task
# (see PR #4926). # (see PR #4926).
cmd = event.get_command() cmd = event.get_command()
if cmd in ("approve", "deny", "status", "stop", "new", "reset"): if cmd in ("approve", "deny", "status", "stop", "new", "reset", "background"):
logger.debug( logger.debug(
"[%s] Command '/%s' bypassing active-session guard for %s", "[%s] Command '/%s' bypassing active-session guard for %s",
self.name, cmd, session_key, self.name, cmd, session_key,
@@ -1352,6 +1361,7 @@ class BasePlatformAdapter(ABC):
return return
if hasattr(task, "add_done_callback"): if hasattr(task, "add_done_callback"):
task.add_done_callback(self._background_tasks.discard) task.add_done_callback(self._background_tasks.discard)
task.add_done_callback(self._expected_cancelled_tasks.discard)
@staticmethod @staticmethod
def _get_human_delay() -> float: def _get_human_delay() -> float:
@@ -1580,7 +1590,11 @@ class BasePlatformAdapter(ABC):
# Determine overall success for the processing hook # Determine overall success for the processing hook
processing_ok = delivery_succeeded if delivery_attempted else not bool(response) processing_ok = delivery_succeeded if delivery_attempted else not bool(response)
await self._run_processing_hook("on_processing_complete", event, processing_ok) await self._run_processing_hook(
"on_processing_complete",
event,
ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE,
)
# Check if there's a pending message that was queued during our processing # Check if there's a pending message that was queued during our processing
if session_key in self._pending_messages: if session_key in self._pending_messages:
@@ -1599,10 +1613,14 @@ class BasePlatformAdapter(ABC):
return # Already cleaned up return # Already cleaned up
except asyncio.CancelledError: except asyncio.CancelledError:
await self._run_processing_hook("on_processing_complete", event, False) current_task = asyncio.current_task()
outcome = ProcessingOutcome.CANCELLED
if current_task is None or current_task not in self._expected_cancelled_tasks:
outcome = ProcessingOutcome.FAILURE
await self._run_processing_hook("on_processing_complete", event, outcome)
raise raise
except Exception as e: except Exception as e:
await self._run_processing_hook("on_processing_complete", event, False) await self._run_processing_hook("on_processing_complete", event, ProcessingOutcome.FAILURE)
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True) logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
# Send the error to the user so they aren't left with radio silence # Send the error to the user so they aren't left with radio silence
try: try:
@@ -1646,10 +1664,12 @@ class BasePlatformAdapter(ABC):
""" """
tasks = [task for task in self._background_tasks if not task.done()] tasks = [task for task in self._background_tasks if not task.done()]
for task in tasks: for task in tasks:
self._expected_cancelled_tasks.add(task)
task.cancel() task.cancel()
if tasks: if tasks:
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
self._background_tasks.clear() self._background_tasks.clear()
self._expected_cancelled_tasks.clear()
self._pending_messages.clear() self._pending_messages.clear()
self._active_sessions.clear() self._active_sessions.clear()

View File

@@ -49,6 +49,7 @@ from gateway.platforms.base import (
BasePlatformAdapter, BasePlatformAdapter,
MessageEvent, MessageEvent,
MessageType, MessageType,
ProcessingOutcome,
SendResult, SendResult,
cache_image_from_url, cache_image_from_url,
cache_audio_from_url, cache_audio_from_url,
@@ -754,14 +755,17 @@ class DiscordAdapter(BasePlatformAdapter):
if hasattr(message, "add_reaction"): if hasattr(message, "add_reaction"):
await self._add_reaction(message, "👀") await self._add_reaction(message, "👀")
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None: async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Swap the in-progress reaction for a final success/failure reaction.""" """Swap the in-progress reaction for a final success/failure reaction."""
if not self._reactions_enabled(): if not self._reactions_enabled():
return return
message = event.raw_message message = event.raw_message
if hasattr(message, "add_reaction"): if hasattr(message, "add_reaction"):
await self._remove_reaction(message, "👀") await self._remove_reaction(message, "👀")
await self._add_reaction(message, "" if success else "") if outcome == ProcessingOutcome.SUCCESS:
await self._add_reaction(message, "")
elif outcome == ProcessingOutcome.FAILURE:
await self._add_reaction(message, "")
async def send( async def send(
self, self,

View File

@@ -973,7 +973,8 @@ def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None:
return await original_connect(*args, **kwargs) return await original_connect(*args, **kwargs)
def _configure_with_overrides(conf: Any) -> Any: def _configure_with_overrides(conf: Any) -> Any:
assert original_configure is not None if original_configure is None:
raise RuntimeError("Feishu _configure_with_overrides called but original_configure is None")
result = original_configure(conf) result = original_configure(conf)
_apply_runtime_ws_overrides() _apply_runtime_ws_overrides()
return result return result

View File

@@ -40,6 +40,7 @@ from gateway.platforms.base import (
BasePlatformAdapter, BasePlatformAdapter,
MessageEvent, MessageEvent,
MessageType, MessageType,
ProcessingOutcome,
SendResult, SendResult,
) )
@@ -1479,7 +1480,7 @@ class MatrixAdapter(BasePlatformAdapter):
await self._send_reaction(room_id, msg_id, "\U0001f440") await self._send_reaction(room_id, msg_id, "\U0001f440")
async def on_processing_complete( async def on_processing_complete(
self, event: MessageEvent, success: bool, self, event: MessageEvent, outcome: ProcessingOutcome,
) -> None: ) -> None:
"""Replace eyes with checkmark (success) or cross (failure).""" """Replace eyes with checkmark (success) or cross (failure)."""
if not self._reactions_enabled: if not self._reactions_enabled:
@@ -1488,11 +1489,15 @@ class MatrixAdapter(BasePlatformAdapter):
room_id = event.source.chat_id room_id = event.source.chat_id
if not msg_id or not room_id: if not msg_id or not room_id:
return return
if outcome == ProcessingOutcome.CANCELLED:
return
# Note: Matrix doesn't support removing a specific reaction easily # Note: Matrix doesn't support removing a specific reaction easily
# without tracking the reaction event_id. We send the new reaction; # without tracking the reaction event_id. We send the new reaction;
# the eyes stays (acceptable UX — both are visible). # the eyes stays (acceptable UX — both are visible).
await self._send_reaction( await self._send_reaction(
room_id, msg_id, "\u2705" if success else "\u274c", room_id,
msg_id,
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
) )
async def _on_reaction(self, room: Any, event: Any) -> None: async def _on_reaction(self, room: Any, event: Any) -> None:

View File

@@ -60,6 +60,7 @@ from gateway.platforms.base import (
BasePlatformAdapter, BasePlatformAdapter,
MessageEvent, MessageEvent,
MessageType, MessageType,
ProcessingOutcome,
SendResult, SendResult,
cache_image_from_bytes, cache_image_from_bytes,
cache_audio_from_bytes, cache_audio_from_bytes,
@@ -517,6 +518,36 @@ class TelegramAdapter(BasePlatformAdapter):
# Build the application # Build the application
builder = Application.builder().token(self.config.token) builder = Application.builder().token(self.config.token)
# PTB defaults (pool_timeout=1s) are too aggressive on flaky networks and
# can trigger "Pool timeout: All connections in the connection pool are occupied"
# during reconnect/bootstrap. Use safer defaults and allow env overrides.
def _env_int(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
def _env_float(name: str, default: float) -> float:
try:
return float(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
request_kwargs = {
"connection_pool_size": _env_int("HERMES_TELEGRAM_HTTP_POOL_SIZE", 512),
"pool_timeout": _env_float("HERMES_TELEGRAM_HTTP_POOL_TIMEOUT", 8.0),
"connect_timeout": _env_float("HERMES_TELEGRAM_HTTP_CONNECT_TIMEOUT", 10.0),
"read_timeout": _env_float("HERMES_TELEGRAM_HTTP_READ_TIMEOUT", 20.0),
"write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0),
}
proxy_configured = any(
(os.getenv(k) or "").strip()
for k in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", "https_proxy", "http_proxy", "all_proxy")
)
disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in ("1", "true", "yes", "on"))
fallback_ips = self._fallback_ips() fallback_ips = self._fallback_ips()
if not fallback_ips: if not fallback_ips:
fallback_ips = await discover_fallback_ips() fallback_ips = await discover_fallback_ips()
@@ -525,15 +556,31 @@ class TelegramAdapter(BasePlatformAdapter):
self.name, self.name,
", ".join(fallback_ips), ", ".join(fallback_ips),
) )
if fallback_ips:
if fallback_ips and not proxy_configured and not disable_fallback:
logger.info( logger.info(
"[%s] Telegram fallback IPs active: %s", "[%s] Telegram fallback IPs active: %s",
self.name, self.name,
", ".join(fallback_ips), ", ".join(fallback_ips),
) )
transport = TelegramFallbackTransport(fallback_ips) # Keep request/update pools separate to reduce contention during
request = HTTPXRequest(httpx_kwargs={"transport": transport}) # polling reconnect + bot API bootstrap/delete_webhook calls.
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport}) request = HTTPXRequest(
**request_kwargs,
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
)
get_updates_request = HTTPXRequest(
**request_kwargs,
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
)
else:
if proxy_configured:
logger.info("[%s] Proxy configured; skipping Telegram fallback-IP transport", self.name)
elif disable_fallback:
logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name)
request = HTTPXRequest(**request_kwargs)
get_updates_request = HTTPXRequest(**request_kwargs)
builder = builder.request(request).get_updates_request(get_updates_request) builder = builder.request(request).get_updates_request(get_updates_request)
self._app = builder.build() self._app = builder.build()
self._bot = self._app.bot self._bot = self._app.bot
@@ -2732,7 +2779,7 @@ class TelegramAdapter(BasePlatformAdapter):
if chat_id and message_id: if chat_id and message_id:
await self._set_reaction(chat_id, message_id, "\U0001f440") await self._set_reaction(chat_id, message_id, "\U0001f440")
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None: async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Swap the in-progress reaction for a final success/failure reaction. """Swap the in-progress reaction for a final success/failure reaction.
Unlike Discord (additive reactions), Telegram's set_message_reaction Unlike Discord (additive reactions), Telegram's set_message_reaction
@@ -2742,5 +2789,9 @@ class TelegramAdapter(BasePlatformAdapter):
return return
chat_id = getattr(event.source, "chat_id", None) chat_id = getattr(event.source, "chat_id", None)
message_id = getattr(event, "message_id", None) message_id = getattr(event, "message_id", None)
if chat_id and message_id: if chat_id and message_id and outcome != ProcessingOutcome.CANCELLED:
await self._set_reaction(chat_id, message_id, "\u2705" if success else "\u274c") await self._set_reaction(
chat_id,
message_id,
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
)

View File

@@ -110,7 +110,8 @@ class TelegramFallbackTransport(httpx.AsyncBaseTransport):
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc) logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
continue continue
assert last_error is not None if last_error is None:
raise RuntimeError("All Telegram fallback IPs exhausted but no error was recorded")
raise last_error raise last_error
async def aclose(self) -> None: async def aclose(self) -> None:

View File

@@ -1991,6 +1991,11 @@ class GatewayRunner:
return await self._handle_approve_command(event) return await self._handle_approve_command(event)
return await self._handle_deny_command(event) return await self._handle_deny_command(event)
# /background must bypass the running-agent guard — it starts a
# parallel task and must never interrupt the active conversation.
if _cmd_def_inner and _cmd_def_inner.name == "background":
return await self._handle_background_command(event)
if event.message_type == MessageType.PHOTO: if event.message_type == MessageType.PHOTO:
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20]) logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
adapter = self.adapters.get(source.platform) adapter = self.adapters.get(source.platform)
@@ -7577,7 +7582,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# setups (each profile using a distinct HERMES_HOME) will naturally # setups (each profile using a distinct HERMES_HOME) will naturally
# allow concurrent instances without tripping this guard. # allow concurrent instances without tripping this guard.
import time as _time import time as _time
from gateway.status import get_running_pid, remove_pid_file from gateway.status import get_running_pid, remove_pid_file, terminate_pid
existing_pid = get_running_pid() existing_pid = get_running_pid()
if existing_pid is not None and existing_pid != os.getpid(): if existing_pid is not None and existing_pid != os.getpid():
if replace: if replace:
@@ -7586,10 +7591,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
existing_pid, existing_pid,
) )
try: try:
os.kill(existing_pid, signal.SIGTERM) terminate_pid(existing_pid, force=False)
except ProcessLookupError: except ProcessLookupError:
pass # Already gone pass # Already gone
except PermissionError: except (PermissionError, OSError):
logger.error( logger.error(
"Permission denied killing PID %d. Cannot replace.", "Permission denied killing PID %d. Cannot replace.",
existing_pid, existing_pid,
@@ -7609,9 +7614,9 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
existing_pid, existing_pid,
) )
try: try:
os.kill(existing_pid, signal.SIGKILL) terminate_pid(existing_pid, force=True)
_time.sleep(0.5) _time.sleep(0.5)
except (ProcessLookupError, PermissionError): except (ProcessLookupError, PermissionError, OSError):
pass pass
remove_pid_file() remove_pid_file()
# Also release all scoped locks left by the old process. # Also release all scoped locks left by the old process.

View File

@@ -14,6 +14,8 @@ concurrently under distinct configurations).
import hashlib import hashlib
import json import json
import os import os
import signal
import subprocess
import sys import sys
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -23,6 +25,7 @@ from typing import Any, Optional
_GATEWAY_KIND = "hermes-gateway" _GATEWAY_KIND = "hermes-gateway"
_RUNTIME_STATUS_FILE = "gateway_state.json" _RUNTIME_STATUS_FILE = "gateway_state.json"
_LOCKS_DIRNAME = "gateway-locks" _LOCKS_DIRNAME = "gateway-locks"
_IS_WINDOWS = sys.platform == "win32"
def _get_pid_path() -> Path: def _get_pid_path() -> Path:
@@ -49,6 +52,33 @@ def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
def terminate_pid(pid: int, *, force: bool = False) -> None:
"""Terminate a PID with platform-appropriate force semantics.
POSIX uses SIGTERM/SIGKILL. Windows uses taskkill /T /F for true force-kill
because os.kill(..., SIGTERM) is not equivalent to a tree-killing hard stop.
"""
if force and _IS_WINDOWS:
try:
result = subprocess.run(
["taskkill", "/PID", str(pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
)
except FileNotFoundError:
os.kill(pid, signal.SIGTERM)
return
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise OSError(details or f"taskkill failed for PID {pid}")
return
sig = signal.SIGTERM if not force else getattr(signal, "SIGKILL", signal.SIGTERM)
os.kill(pid, sig)
def _scope_hash(identity: str) -> str: def _scope_hash(identity: str) -> str:
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16] return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]

View File

@@ -205,11 +205,20 @@ class GatewayStreamConsumer:
await self._send_or_edit(self._accumulated) await self._send_or_edit(self._accumulated)
return return
# Tool boundary: the should_edit block above already flushed # Tool boundary: reset message state so the next text chunk
# accumulated text without a cursor. Reset state so the next # creates a fresh message below any tool-progress messages.
# text chunk creates a fresh message below any tool-progress #
# messages the gateway sent in between. # Exception: when _message_id is "__no_edit__" the platform
if got_segment_break: # never returned a real message ID (e.g. Signal, webhook with
# github_comment delivery). Resetting to None would re-enter
# the "first send" path on every tool boundary and post one
# platform message per tool call — that is what caused 155
# comments under a single PR. Instead, keep all state so the
# full continuation is delivered once via _send_fallback_final.
# (When editing fails mid-stream due to flood control the id is
# a real string like "msg_1", not "__no_edit__", so that case
# still resets and creates a fresh segment as intended.)
if got_segment_break and self._message_id != "__no_edit__":
self._message_id = None self._message_id = None
self._accumulated = "" self._accumulated = ""
self._last_sent_text = "" self._last_sent_text = ""

View File

@@ -16,8 +16,18 @@ from collections.abc import Callable, Mapping
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion # prompt_toolkit is an optional CLI dependency — only needed for
from prompt_toolkit.completion import Completer, Completion # SlashCommandCompleter and SlashCommandAutoSuggest. Gateway and test
# environments that lack it must still be able to import this module
# for resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
try:
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
from prompt_toolkit.completion import Completer, Completion
except ImportError: # pragma: no cover
AutoSuggest = object # type: ignore[assignment,misc]
Completer = object # type: ignore[assignment,misc]
Suggestion = None # type: ignore[assignment]
Completion = None # type: ignore[assignment]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@ from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.resolve() PROJECT_ROOT = Path(__file__).parent.parent.resolve()
from gateway.status import terminate_pid
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
# display_hermes_home is imported lazily at call sites to avoid ImportError # display_hermes_home is imported lazily at call sites to avoid ImportError
# when hermes_constants is cached from a pre-update version during `hermes update`. # when hermes_constants is cached from a pre-update version during `hermes update`.
@@ -162,7 +163,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
"""Kill any running gateway processes. Returns count killed. """Kill any running gateway processes. Returns count killed.
Args: Args:
force: Use SIGKILL instead of SIGTERM. force: Use the platform's force-kill mechanism instead of graceful terminate.
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
restarted and should not be killed). restarted and should not be killed).
""" """
@@ -171,10 +172,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
for pid in pids: for pid in pids:
try: try:
if force and not is_windows(): terminate_pid(pid, force=force)
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGTERM)
killed += 1 killed += 1
except ProcessLookupError: except ProcessLookupError:
# Process already gone # Process already gone
@@ -182,6 +180,8 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
except PermissionError: except PermissionError:
print(f"⚠ Permission denied to kill PID {pid}") print(f"⚠ Permission denied to kill PID {pid}")
except OSError as exc:
print(f"Failed to kill PID {pid}: {exc}")
return killed return killed
@@ -1208,7 +1208,7 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
Args: Args:
timeout: Total seconds to wait before giving up. timeout: Total seconds to wait before giving up.
force_after: Seconds of graceful waiting before sending SIGKILL. force_after: Seconds of graceful waiting before escalating to force-kill.
""" """
import time import time
from gateway.status import get_running_pid from gateway.status import get_running_pid
@@ -1225,15 +1225,15 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
if not force_sent and time.monotonic() >= force_deadline: if not force_sent and time.monotonic() >= force_deadline:
# Grace period expired — force-kill the specific PID. # Grace period expired — force-kill the specific PID.
try: try:
os.kill(pid, signal.SIGKILL) terminate_pid(pid, force=True)
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL") print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
except (ProcessLookupError, PermissionError): except (ProcessLookupError, PermissionError, OSError):
return # Already gone or we can't touch it. return # Already gone or we can't touch it.
force_sent = True force_sent = True
time.sleep(0.3) time.sleep(0.3)
# Timed out even after SIGKILL. # Timed out even after force-kill.
remaining_pid = get_running_pid() remaining_pid = get_running_pid()
if remaining_pid is not None: if remaining_pid is not None:
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail") print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")

View File

@@ -308,6 +308,7 @@ class TestBackgroundInCLICommands:
def test_background_autocompletes(self): def test_background_autocompletes(self):
"""The /background command appears in autocomplete results.""" """The /background command appears in autocomplete results."""
pytest.importorskip("prompt_toolkit")
from hermes_cli.commands import SlashCommandCompleter from hermes_cli.commands import SlashCommandCompleter
from prompt_toolkit.document import Document from prompt_toolkit.document import Document

View File

@@ -6,7 +6,7 @@ from types import SimpleNamespace
import pytest import pytest
from gateway.config import Platform, PlatformConfig from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult
from gateway.session import SessionSource, build_session_key from gateway.session import SessionSource, build_session_key
@@ -44,8 +44,8 @@ class DummyTelegramAdapter(BasePlatformAdapter):
async def on_processing_start(self, event: MessageEvent) -> None: async def on_processing_start(self, event: MessageEvent) -> None:
self.processing_hooks.append(("start", event.message_id)) self.processing_hooks.append(("start", event.message_id))
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None: async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
self.processing_hooks.append(("complete", event.message_id, success)) self.processing_hooks.append(("complete", event.message_id, outcome))
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent: def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
@@ -142,7 +142,7 @@ class TestBasePlatformTopicSessions:
] ]
assert adapter.processing_hooks == [ assert adapter.processing_hooks == [
("start", "1"), ("start", "1"),
("complete", "1", True), ("complete", "1", ProcessingOutcome.SUCCESS),
] ]
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -168,7 +168,7 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [ assert adapter.processing_hooks == [
("start", "1"), ("start", "1"),
("complete", "1", False), ("complete", "1", ProcessingOutcome.FAILURE),
] ]
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -190,7 +190,7 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [ assert adapter.processing_hooks == [
("start", "1"), ("start", "1"),
("complete", "1", False), ("complete", "1", ProcessingOutcome.FAILURE),
] ]
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -218,5 +218,31 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [ assert adapter.processing_hooks == [
("start", "1"), ("start", "1"),
("complete", "1", False), ("complete", "1", ProcessingOutcome.FAILURE),
]
@pytest.mark.asyncio
async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self):
adapter = DummyTelegramAdapter()
release = asyncio.Event()
async def handler(_event):
await release.wait()
return "ack"
async def hold_typing(_chat_id, interval=2.0, metadata=None):
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
await adapter.handle_message(event)
await asyncio.sleep(0)
await adapter.cancel_background_tasks()
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", ProcessingOutcome.CANCELLED),
] ]

View File

@@ -160,6 +160,22 @@ class TestCommandBypassActiveSession:
assert sk not in adapter._pending_messages assert sk not in adapter._pending_messages
assert any("handled:status" in r for r in adapter.sent_responses) assert any("handled:status" in r for r in adapter.sent_responses)
@pytest.mark.asyncio
async def test_background_bypasses_guard(self):
"""/background must bypass so it spawns a parallel task, not an interrupt."""
adapter = _make_adapter()
sk = _session_key()
adapter._active_sessions[sk] = asyncio.Event()
await adapter.handle_message(_make_event("/background summarize HN"))
assert sk not in adapter._pending_messages, (
"/background was queued as a pending message instead of being dispatched"
)
assert any("handled:background" in r for r in adapter.sent_responses), (
"/background response was not sent back to the user"
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Tests: non-bypass messages still get queued # Tests: non-bypass messages still get queued

View File

@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from gateway.config import Platform, PlatformConfig from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SendResult from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome, SendResult
from gateway.session import SessionSource, build_session_key from gateway.session import SessionSource, build_session_key
@@ -212,7 +212,7 @@ async def test_reactions_disabled_via_env_zero(adapter, monkeypatch):
event = _make_event("5", raw_message) event = _make_event("5", raw_message)
await adapter.on_processing_start(event) await adapter.on_processing_start(event)
await adapter.on_processing_complete(event, success=True) await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
raw_message.add_reaction.assert_not_awaited() raw_message.add_reaction.assert_not_awaited()
raw_message.remove_reaction.assert_not_awaited() raw_message.remove_reaction.assert_not_awaited()
@@ -232,3 +232,17 @@ async def test_reactions_enabled_by_default(adapter, monkeypatch):
await adapter.on_processing_start(event) await adapter.on_processing_start(event)
raw_message.add_reaction.assert_awaited_once_with("👀") raw_message.add_reaction.assert_awaited_once_with("👀")
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_removes_eyes_without_terminal_reaction(adapter):
raw_message = SimpleNamespace(
add_reaction=AsyncMock(),
remove_reaction=AsyncMock(),
)
event = _make_event("7", raw_message)
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
raw_message.remove_reaction.assert_awaited_once_with("👀", adapter._client.user)
raw_message.add_reaction.assert_not_awaited()

View File

@@ -128,12 +128,16 @@ async def test_internal_event_bypasses_authorization(monkeypatch, tmp_path):
monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth) monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth)
# _handle_message will proceed past auth check and eventually fail on # Stop execution before the agent runner so the test doesn't block in
# downstream logic. We just need to verify auth is skipped. # run_in_executor. Auth check happens before _handle_message_with_agent.
async def _raise(*_a, **_kw):
raise RuntimeError("sentinel — stop here")
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
try: try:
await runner._handle_message(event) await runner._handle_message(event)
except Exception: except RuntimeError:
pass # Expected — downstream code needs more setup pass # Expected sentinel
assert not auth_called, ( assert not auth_called, (
"_is_user_authorized should NOT be called for internal events" "_is_user_authorized should NOT be called for internal events"
@@ -175,10 +179,16 @@ async def test_internal_event_does_not_trigger_pairing(monkeypatch, tmp_path):
runner.pairing_store.generate_code = tracking_generate runner.pairing_store.generate_code = tracking_generate
# Stop execution before the agent runner so the test doesn't block in
# run_in_executor. Pairing check happens before _handle_message_with_agent.
async def _raise(*_a, **_kw):
raise RuntimeError("sentinel — stop here")
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
try: try:
await runner._handle_message(event) await runner._handle_message(event)
except Exception: except RuntimeError:
pass # Expected — downstream code needs more setup pass # Expected sentinel
assert not generate_called, ( assert not generate_called, (
"Pairing code should NOT be generated for internal events" "Pairing code should NOT be generated for internal events"

View File

@@ -1980,7 +1980,7 @@ class TestMatrixReactions:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_on_processing_complete_sends_check(self): async def test_on_processing_complete_sends_check(self):
from gateway.platforms.base import MessageEvent, MessageType from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True) self.adapter._send_reaction = AsyncMock(return_value=True)
@@ -1994,9 +1994,28 @@ class TestMatrixReactions:
raw_message={}, raw_message={},
message_id="$msg1", message_id="$msg1",
) )
await self.adapter.on_processing_complete(event, success=True) await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "") self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "")
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
self.adapter._send_reaction.assert_not_called()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reactions_disabled(self): async def test_reactions_disabled(self):
from gateway.platforms.base import MessageEvent, MessageType from gateway.platforms.base import MessageEvent, MessageType

View File

@@ -144,7 +144,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa
assert adapter.sent == [ assert adapter.sent == [
{ {
"chat_id": "-1001", "chat_id": "-1001",
"content": '💻 terminal: "pwd"', "content": '⚙️ terminal: "pwd"',
"reply_to": None, "reply_to": None,
"metadata": {"thread_id": "17585"}, "metadata": {"thread_id": "17585"},
} }

View File

@@ -87,3 +87,42 @@ async def test_runner_allows_cron_only_mode_when_no_platforms_are_enabled(monkey
assert runner.adapters == {} assert runner.adapters == {}
state = read_runtime_status() state = read_runtime_status()
assert state["gateway_state"] == "running" assert state["gateway_state"] == "running"
@pytest.mark.asyncio
async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
calls = []
class _CleanExitRunner:
def __init__(self, config):
self.config = config
self.should_exit_cleanly = True
self.exit_reason = None
self.adapters = {}
async def start(self):
return True
async def stop(self):
return None
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42)
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None)
monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path)
monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None)
monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner)
from gateway.run import start_gateway
ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None)
assert ok is True
assert calls == [(42, False), (42, True)]

View File

@@ -2,6 +2,7 @@
import json import json
import os import os
from types import SimpleNamespace
from gateway import status from gateway import status
@@ -104,6 +105,41 @@ class TestGatewayRuntimeStatus:
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active" assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
class TestTerminatePid:
def test_force_uses_taskkill_on_windows(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(cmd, capture_output=False, text=False, timeout=None):
calls.append((cmd, capture_output, text, timeout))
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(status.subprocess, "run", fake_run)
status.terminate_pid(123, force=True)
assert calls == [
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
]
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(*args, **kwargs):
raise FileNotFoundError
def fake_kill(pid, sig):
calls.append((pid, sig))
monkeypatch.setattr(status.subprocess, "run", fake_run)
monkeypatch.setattr(status.os, "kill", fake_kill)
status.terminate_pid(456, force=True)
assert calls == [(456, status.signal.SIGTERM)]
class TestScopedLocks: class TestScopedLocks:
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch): def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks")) monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))

View File

@@ -437,6 +437,45 @@ class TestSegmentBreakOnToolBoundary:
# Only one send call (the initial message) # Only one send call (the initial message)
assert adapter.send.call_count == 1 assert adapter.send.call_count == 1
@pytest.mark.asyncio
async def test_no_message_id_segment_breaks_do_not_resend(self):
"""On a platform that never returns a message_id (e.g. webhook with
github_comment delivery), tool-call segment breaks must NOT trigger
a new adapter.send() per boundary. The fix: _message_id == '__no_edit__'
suppresses the reset so all text accumulates and is sent once."""
adapter = MagicMock()
# No message_id on first send, then one more for the fallback final
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id=None),
SimpleNamespace(success=True, message_id=None),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Simulate: text → tool boundary → text → tool boundary → text (3 segments)
consumer.on_delta("Phase 1 text")
consumer.on_delta(None) # tool call boundary
consumer.on_delta("Phase 2 text")
consumer.on_delta(None) # another tool call boundary
consumer.on_delta("Phase 3 text")
consumer.finish()
await consumer.run()
# Before the fix this would post 3 comments (one per segment).
# After the fix: only the initial partial + one fallback-final continuation.
assert adapter.send.call_count == 2, (
f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
)
assert consumer.already_sent
# The continuation must contain the text from segments 2 and 3
final_text = adapter.send.call_args_list[1][1]["content"]
assert "Phase 2" in final_text
assert "Phase 3" in final_text
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fallback_final_splits_long_continuation_without_dropping_text(self): async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
"""Long continuation tails should be chunked when fallback final-send runs.""" """Long continuation tails should be chunked when fallback final-send runs."""

View File

@@ -6,7 +6,7 @@ from unittest.mock import AsyncMock
import pytest import pytest
from gateway.config import Platform, PlatformConfig from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
from gateway.session import SessionSource from gateway.session import SessionSource
@@ -180,7 +180,7 @@ async def test_on_processing_complete_success(monkeypatch):
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event() event = _make_event()
await adapter.on_processing_complete(event, success=True) await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
adapter._bot.set_message_reaction.assert_awaited_once_with( adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123, chat_id=123,
@@ -196,7 +196,7 @@ async def test_on_processing_complete_failure(monkeypatch):
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event() event = _make_event()
await adapter.on_processing_complete(event, success=False) await adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
adapter._bot.set_message_reaction.assert_awaited_once_with( adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123, chat_id=123,
@@ -212,7 +212,19 @@ async def test_on_processing_complete_skipped_when_disabled(monkeypatch):
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event() event = _make_event()
await adapter.on_processing_complete(event, success=True) await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
adapter._bot.set_message_reaction.assert_not_awaited()
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_keeps_existing_reaction(monkeypatch):
"""Expected cancellation should not replace the in-progress reaction."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
adapter._bot.set_message_reaction.assert_not_awaited() adapter._bot.set_message_reaction.assert_not_awaited()

View File

@@ -1,6 +1,5 @@
"""Tests for hermes_cli.gateway.""" """Tests for hermes_cli.gateway."""
import signal
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import patch, call from unittest.mock import patch, call
@@ -211,8 +210,7 @@ class TestWaitForGatewayExit:
assert poll_count == 3 assert poll_count == 3
def test_force_kills_after_grace_period(self, monkeypatch): def test_force_kills_after_grace_period(self, monkeypatch):
"""When the process doesn't exit, SIGKILL the saved PID.""" """When the process doesn't exit, force-kill the saved PID."""
import time as _time
# Simulate monotonic time advancing past force_after # Simulate monotonic time advancing past force_after
call_num = 0 call_num = 0
@@ -224,8 +222,8 @@ class TestWaitForGatewayExit:
return call_num * 2.0 # 2, 4, 6, 8, ... return call_num * 2.0 # 2, 4, 6, 8, ...
kills = [] kills = []
def mock_kill(pid, sig): def mock_terminate(pid, force=False):
kills.append((pid, sig)) kills.append((pid, force))
# get_running_pid returns the PID until kill is sent, then None # get_running_pid returns the PID until kill is sent, then None
def mock_get_running_pid(): def mock_get_running_pid():
@@ -234,14 +232,13 @@ class TestWaitForGatewayExit:
monkeypatch.setattr("time.monotonic", fake_monotonic) monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None) monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid) monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
monkeypatch.setattr("os.kill", mock_kill) monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0) gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
assert (42, signal.SIGKILL) in kills assert (42, True) in kills
def test_handles_process_already_gone_on_kill(self, monkeypatch): def test_handles_process_already_gone_on_kill(self, monkeypatch):
"""ProcessLookupError during SIGKILL is not fatal.""" """ProcessLookupError during force-kill is not fatal."""
import time as _time
call_num = 0 call_num = 0
def fake_monotonic(): def fake_monotonic():
@@ -249,13 +246,24 @@ class TestWaitForGatewayExit:
call_num += 1 call_num += 1
return call_num * 3.0 # Jump past force_after quickly return call_num * 3.0 # Jump past force_after quickly
def mock_kill(pid, sig): def mock_terminate(pid, force=False):
raise ProcessLookupError raise ProcessLookupError
monkeypatch.setattr("time.monotonic", fake_monotonic) monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None) monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99) monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
monkeypatch.setattr("os.kill", mock_kill) monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
# Should not raise — ProcessLookupError means it's already gone. # Should not raise — ProcessLookupError means it's already gone.
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0) gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
calls = []
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None: [11, 22])
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
killed = gateway.kill_gateway_processes(force=True)
assert killed == 2
assert calls == [(11, True), (22, True)]