Compare commits

...

8 Commits

Author SHA1 Message Date
H-5-Isminiz
df1671d36b fix(gateway): implement platform-aware PID termination 2026-04-10 03:46:12 -07:00
KUSH42
43906fb89d fix(tests): repair three pre-existing gateway test failures
- test_background_autocompletes: pytest.importorskip("prompt_toolkit")
  so the test skips gracefully where the CLI dep is absent

- test_run_agent_progress_stays_in_originating_topic: update stale emoji
  💻⚙️ to match get_tool_emoji("terminal", default="⚙️") in run.py

- test_internal_event_bypass{_authorization,_pairing}: mock
  _handle_message_with_agent to raise immediately; avoids the 300s
  run_in_executor hang that caused the tests to time out
2026-04-10 03:46:08 -07:00
KUSH42
ef7b00d132 fix(gateway): prevent duplicate messages on no-message-id platforms
Platforms that don't return a message_id after the first send (Signal,
GitHub webhooks) were causing GatewayStreamConsumer to re-enter the
"first send" path on every tool boundary, posting one platform message
per tool call (observed as 155 PR comments on a single response).

Fix: treat _message_id == "__no_edit__" as a sentinel meaning "platform
accepted the send but cannot be edited". When a tool boundary arrives
in that state, skip the message_id/accumulated/last_sent_text reset so
all continuation text is delivered once via _send_fallback_final rather
than re-posted per segment.

Also make prompt_toolkit imports in hermes_cli/commands.py optional so
gateway and test environments that lack the package can still import
resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
2026-04-10 03:46:08 -07:00
zhouboli
d48cd893f2 fix(telegram): harden HTTPX request pools during reconnect
- configure Telegram HTTPXRequest pool/timeouts with env-overridable defaults\n- use separate request/get_updates request objects to reduce pool contention\n- skip fallback-IP transport when proxy is configured (or explicitly disabled)\n\nThis mitigates recurrent pool-timeout failures during polling reconnect/bootstrap (delete_webhook).
2026-04-10 03:46:05 -07:00
coffee
595802e14b fix(gateway): replace assertions with proper error handling in Telegram and Feishu
Python assertions are stripped when running with `python -O` (optimized
mode), making them unsuitable for runtime error handling.

1. `telegram_network.py:113` — After exhausting all fallback IPs, the code
   uses `assert last_error is not None` before `raise last_error`. In
   optimized mode, the assert is skipped; if `last_error` is unexpectedly
   None, `raise None` produces a confusing `TypeError` instead of a
   meaningful error. Replace with an explicit `if` check that raises
   `RuntimeError` with a descriptive message.

2. `feishu.py:975` — The `_configure_with_overrides` closure uses
   `assert original_configure is not None` as a guard. While the outer
   scope only installs this closure when `original_configure` is not None,
   the assert would silently disappear in optimized mode. Replace with an
   explicit `if` check for defensive safety.
2026-04-10 03:46:01 -07:00
Tranquil-Flow
628d9bb729 test(gateway): add /background to active-session bypass tests
Adds a regression test verifying that /background bypasses the
active-session guard in the platform adapter, matching the existing
test pattern for /stop, /new, /approve, /deny, and /status.
2026-04-10 03:45:57 -07:00
Tranquil-Flow
10b070b4bd fix(gateway): route /background through active-session bypass
When /background was sent during an active run, it was not in the
platform adapter's bypass list and fell through to the interrupt path
instead of spawning a parallel background task.

Add "background" to the active-session command bypass in the platform
adapter, and add an early return in the gateway runner's running-agent
guard to route /background to _handle_background_command() before it
reaches the default interrupt logic.

Fixes #6827
2026-04-10 03:45:57 -07:00
Kenny Xie
c20d06878d fix(gateway): avoid false failure reactions on restart cancellation 2026-04-10 03:45:54 -07:00
23 changed files with 429 additions and 73 deletions

View File

@@ -502,6 +502,14 @@ class MessageType(Enum):
COMMAND = "command" # /command style
class ProcessingOutcome(Enum):
"""Result classification for message-processing lifecycle hooks."""
SUCCESS = "success"
FAILURE = "failure"
CANCELLED = "cancelled"
@dataclass
class MessageEvent:
"""
@@ -625,6 +633,7 @@ class BasePlatformAdapter(ABC):
# Gateway shutdown cancels these so an old gateway instance doesn't keep
# working on a task after --replace or manual restarts.
self._background_tasks: set[asyncio.Task] = set()
self._expected_cancelled_tasks: set[asyncio.Task] = set()
# Chats where auto-TTS on voice input is disabled (set by /voice off)
self._auto_tts_disabled_chats: set = set()
# Chats where typing indicator is paused (e.g. during approval waits).
@@ -1133,7 +1142,7 @@ class BasePlatformAdapter(ABC):
async def on_processing_start(self, event: MessageEvent) -> None:
"""Hook called when background processing begins."""
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Hook called when background processing completes."""
async def _run_processing_hook(self, hook_name: str, *args: Any, **kwargs: Any) -> None:
@@ -1294,7 +1303,7 @@ class BasePlatformAdapter(ABC):
# session lifecycle and its cleanup races with the running task
# (see PR #4926).
cmd = event.get_command()
if cmd in ("approve", "deny", "status", "stop", "new", "reset"):
if cmd in ("approve", "deny", "status", "stop", "new", "reset", "background"):
logger.debug(
"[%s] Command '/%s' bypassing active-session guard for %s",
self.name, cmd, session_key,
@@ -1352,6 +1361,7 @@ class BasePlatformAdapter(ABC):
return
if hasattr(task, "add_done_callback"):
task.add_done_callback(self._background_tasks.discard)
task.add_done_callback(self._expected_cancelled_tasks.discard)
@staticmethod
def _get_human_delay() -> float:
@@ -1580,7 +1590,11 @@ class BasePlatformAdapter(ABC):
# Determine overall success for the processing hook
processing_ok = delivery_succeeded if delivery_attempted else not bool(response)
await self._run_processing_hook("on_processing_complete", event, processing_ok)
await self._run_processing_hook(
"on_processing_complete",
event,
ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE,
)
# Check if there's a pending message that was queued during our processing
if session_key in self._pending_messages:
@@ -1599,10 +1613,14 @@ class BasePlatformAdapter(ABC):
return # Already cleaned up
except asyncio.CancelledError:
await self._run_processing_hook("on_processing_complete", event, False)
current_task = asyncio.current_task()
outcome = ProcessingOutcome.CANCELLED
if current_task is None or current_task not in self._expected_cancelled_tasks:
outcome = ProcessingOutcome.FAILURE
await self._run_processing_hook("on_processing_complete", event, outcome)
raise
except Exception as e:
await self._run_processing_hook("on_processing_complete", event, False)
await self._run_processing_hook("on_processing_complete", event, ProcessingOutcome.FAILURE)
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
# Send the error to the user so they aren't left with radio silence
try:
@@ -1646,10 +1664,12 @@ class BasePlatformAdapter(ABC):
"""
tasks = [task for task in self._background_tasks if not task.done()]
for task in tasks:
self._expected_cancelled_tasks.add(task)
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._background_tasks.clear()
self._expected_cancelled_tasks.clear()
self._pending_messages.clear()
self._active_sessions.clear()

View File

@@ -49,6 +49,7 @@ from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
ProcessingOutcome,
SendResult,
cache_image_from_url,
cache_audio_from_url,
@@ -754,14 +755,17 @@ class DiscordAdapter(BasePlatformAdapter):
if hasattr(message, "add_reaction"):
await self._add_reaction(message, "👀")
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Swap the in-progress reaction for a final success/failure reaction."""
if not self._reactions_enabled():
return
message = event.raw_message
if hasattr(message, "add_reaction"):
await self._remove_reaction(message, "👀")
await self._add_reaction(message, "" if success else "")
if outcome == ProcessingOutcome.SUCCESS:
await self._add_reaction(message, "")
elif outcome == ProcessingOutcome.FAILURE:
await self._add_reaction(message, "")
async def send(
self,

View File

@@ -973,7 +973,8 @@ def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None:
return await original_connect(*args, **kwargs)
def _configure_with_overrides(conf: Any) -> Any:
assert original_configure is not None
if original_configure is None:
raise RuntimeError("Feishu _configure_with_overrides called but original_configure is None")
result = original_configure(conf)
_apply_runtime_ws_overrides()
return result

View File

@@ -40,6 +40,7 @@ from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
ProcessingOutcome,
SendResult,
)
@@ -1479,7 +1480,7 @@ class MatrixAdapter(BasePlatformAdapter):
await self._send_reaction(room_id, msg_id, "\U0001f440")
async def on_processing_complete(
self, event: MessageEvent, success: bool,
self, event: MessageEvent, outcome: ProcessingOutcome,
) -> None:
"""Replace eyes with checkmark (success) or cross (failure)."""
if not self._reactions_enabled:
@@ -1488,11 +1489,15 @@ class MatrixAdapter(BasePlatformAdapter):
room_id = event.source.chat_id
if not msg_id or not room_id:
return
if outcome == ProcessingOutcome.CANCELLED:
return
# Note: Matrix doesn't support removing a specific reaction easily
# without tracking the reaction event_id. We send the new reaction;
# the eyes stays (acceptable UX — both are visible).
await self._send_reaction(
room_id, msg_id, "\u2705" if success else "\u274c",
room_id,
msg_id,
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
)
async def _on_reaction(self, room: Any, event: Any) -> None:

View File

@@ -60,6 +60,7 @@ from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
ProcessingOutcome,
SendResult,
cache_image_from_bytes,
cache_audio_from_bytes,
@@ -517,6 +518,36 @@ class TelegramAdapter(BasePlatformAdapter):
# Build the application
builder = Application.builder().token(self.config.token)
# PTB defaults (pool_timeout=1s) are too aggressive on flaky networks and
# can trigger "Pool timeout: All connections in the connection pool are occupied"
# during reconnect/bootstrap. Use safer defaults and allow env overrides.
def _env_int(name: str, default: int) -> int:
try:
return int(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
def _env_float(name: str, default: float) -> float:
try:
return float(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
request_kwargs = {
"connection_pool_size": _env_int("HERMES_TELEGRAM_HTTP_POOL_SIZE", 512),
"pool_timeout": _env_float("HERMES_TELEGRAM_HTTP_POOL_TIMEOUT", 8.0),
"connect_timeout": _env_float("HERMES_TELEGRAM_HTTP_CONNECT_TIMEOUT", 10.0),
"read_timeout": _env_float("HERMES_TELEGRAM_HTTP_READ_TIMEOUT", 20.0),
"write_timeout": _env_float("HERMES_TELEGRAM_HTTP_WRITE_TIMEOUT", 20.0),
}
proxy_configured = any(
(os.getenv(k) or "").strip()
for k in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", "https_proxy", "http_proxy", "all_proxy")
)
disable_fallback = (os.getenv("HERMES_TELEGRAM_DISABLE_FALLBACK_IPS", "").strip().lower() in ("1", "true", "yes", "on"))
fallback_ips = self._fallback_ips()
if not fallback_ips:
fallback_ips = await discover_fallback_ips()
@@ -525,16 +556,32 @@ class TelegramAdapter(BasePlatformAdapter):
self.name,
", ".join(fallback_ips),
)
if fallback_ips:
if fallback_ips and not proxy_configured and not disable_fallback:
logger.info(
"[%s] Telegram fallback IPs active: %s",
self.name,
", ".join(fallback_ips),
)
transport = TelegramFallbackTransport(fallback_ips)
request = HTTPXRequest(httpx_kwargs={"transport": transport})
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport})
builder = builder.request(request).get_updates_request(get_updates_request)
# Keep request/update pools separate to reduce contention during
# polling reconnect + bot API bootstrap/delete_webhook calls.
request = HTTPXRequest(
**request_kwargs,
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
)
get_updates_request = HTTPXRequest(
**request_kwargs,
httpx_kwargs={"transport": TelegramFallbackTransport(fallback_ips)},
)
else:
if proxy_configured:
logger.info("[%s] Proxy configured; skipping Telegram fallback-IP transport", self.name)
elif disable_fallback:
logger.info("[%s] Telegram fallback-IP transport disabled via env", self.name)
request = HTTPXRequest(**request_kwargs)
get_updates_request = HTTPXRequest(**request_kwargs)
builder = builder.request(request).get_updates_request(get_updates_request)
self._app = builder.build()
self._bot = self._app.bot
@@ -2732,7 +2779,7 @@ class TelegramAdapter(BasePlatformAdapter):
if chat_id and message_id:
await self._set_reaction(chat_id, message_id, "\U0001f440")
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
"""Swap the in-progress reaction for a final success/failure reaction.
Unlike Discord (additive reactions), Telegram's set_message_reaction
@@ -2742,5 +2789,9 @@ class TelegramAdapter(BasePlatformAdapter):
return
chat_id = getattr(event.source, "chat_id", None)
message_id = getattr(event, "message_id", None)
if chat_id and message_id:
await self._set_reaction(chat_id, message_id, "\u2705" if success else "\u274c")
if chat_id and message_id and outcome != ProcessingOutcome.CANCELLED:
await self._set_reaction(
chat_id,
message_id,
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
)

View File

@@ -110,7 +110,8 @@ class TelegramFallbackTransport(httpx.AsyncBaseTransport):
logger.warning("[Telegram] Fallback IP %s failed: %s", ip, exc)
continue
assert last_error is not None
if last_error is None:
raise RuntimeError("All Telegram fallback IPs exhausted but no error was recorded")
raise last_error
async def aclose(self) -> None:

View File

@@ -1991,6 +1991,11 @@ class GatewayRunner:
return await self._handle_approve_command(event)
return await self._handle_deny_command(event)
# /background must bypass the running-agent guard — it starts a
# parallel task and must never interrupt the active conversation.
if _cmd_def_inner and _cmd_def_inner.name == "background":
return await self._handle_background_command(event)
if event.message_type == MessageType.PHOTO:
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
adapter = self.adapters.get(source.platform)
@@ -7577,7 +7582,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# setups (each profile using a distinct HERMES_HOME) will naturally
# allow concurrent instances without tripping this guard.
import time as _time
from gateway.status import get_running_pid, remove_pid_file
from gateway.status import get_running_pid, remove_pid_file, terminate_pid
existing_pid = get_running_pid()
if existing_pid is not None and existing_pid != os.getpid():
if replace:
@@ -7586,10 +7591,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
existing_pid,
)
try:
os.kill(existing_pid, signal.SIGTERM)
terminate_pid(existing_pid, force=False)
except ProcessLookupError:
pass # Already gone
except PermissionError:
except (PermissionError, OSError):
logger.error(
"Permission denied killing PID %d. Cannot replace.",
existing_pid,
@@ -7609,9 +7614,9 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
existing_pid,
)
try:
os.kill(existing_pid, signal.SIGKILL)
terminate_pid(existing_pid, force=True)
_time.sleep(0.5)
except (ProcessLookupError, PermissionError):
except (ProcessLookupError, PermissionError, OSError):
pass
remove_pid_file()
# Also release all scoped locks left by the old process.

View File

@@ -14,6 +14,8 @@ concurrently under distinct configurations).
import hashlib
import json
import os
import signal
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
@@ -23,6 +25,7 @@ from typing import Any, Optional
_GATEWAY_KIND = "hermes-gateway"
_RUNTIME_STATUS_FILE = "gateway_state.json"
_LOCKS_DIRNAME = "gateway-locks"
_IS_WINDOWS = sys.platform == "win32"
def _get_pid_path() -> Path:
@@ -49,6 +52,33 @@ def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def terminate_pid(pid: int, *, force: bool = False) -> None:
"""Terminate a PID with platform-appropriate force semantics.
POSIX uses SIGTERM/SIGKILL. Windows uses taskkill /T /F for true force-kill
because os.kill(..., SIGTERM) is not equivalent to a tree-killing hard stop.
"""
if force and _IS_WINDOWS:
try:
result = subprocess.run(
["taskkill", "/PID", str(pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
)
except FileNotFoundError:
os.kill(pid, signal.SIGTERM)
return
if result.returncode != 0:
details = (result.stderr or result.stdout or "").strip()
raise OSError(details or f"taskkill failed for PID {pid}")
return
sig = signal.SIGTERM if not force else getattr(signal, "SIGKILL", signal.SIGTERM)
os.kill(pid, sig)
def _scope_hash(identity: str) -> str:
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]

View File

@@ -205,11 +205,20 @@ class GatewayStreamConsumer:
await self._send_or_edit(self._accumulated)
return
# Tool boundary: the should_edit block above already flushed
# accumulated text without a cursor. Reset state so the next
# text chunk creates a fresh message below any tool-progress
# messages the gateway sent in between.
if got_segment_break:
# Tool boundary: reset message state so the next text chunk
# creates a fresh message below any tool-progress messages.
#
# Exception: when _message_id is "__no_edit__" the platform
# never returned a real message ID (e.g. Signal, webhook with
# github_comment delivery). Resetting to None would re-enter
# the "first send" path on every tool boundary and post one
# platform message per tool call — that is what caused 155
# comments under a single PR. Instead, keep all state so the
# full continuation is delivered once via _send_fallback_final.
# (When editing fails mid-stream due to flood control the id is
# a real string like "msg_1", not "__no_edit__", so that case
# still resets and creates a fresh segment as intended.)
if got_segment_break and self._message_id != "__no_edit__":
self._message_id = None
self._accumulated = ""
self._last_sent_text = ""

View File

@@ -16,8 +16,18 @@ from collections.abc import Callable, Mapping
from dataclasses import dataclass
from typing import Any
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
from prompt_toolkit.completion import Completer, Completion
# prompt_toolkit is an optional CLI dependency — only needed for
# SlashCommandCompleter and SlashCommandAutoSuggest. Gateway and test
# environments that lack it must still be able to import this module
# for resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
try:
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
from prompt_toolkit.completion import Completer, Completion
except ImportError: # pragma: no cover
AutoSuggest = object # type: ignore[assignment,misc]
Completer = object # type: ignore[assignment,misc]
Suggestion = None # type: ignore[assignment]
Completion = None # type: ignore[assignment]
# ---------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@ from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
from gateway.status import terminate_pid
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
# display_hermes_home is imported lazily at call sites to avoid ImportError
# when hermes_constants is cached from a pre-update version during `hermes update`.
@@ -162,7 +163,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
"""Kill any running gateway processes. Returns count killed.
Args:
force: Use SIGKILL instead of SIGTERM.
force: Use the platform's force-kill mechanism instead of graceful terminate.
exclude_pids: PIDs to skip (e.g. service-managed PIDs that were just
restarted and should not be killed).
"""
@@ -171,10 +172,7 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
for pid in pids:
try:
if force and not is_windows():
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGTERM)
terminate_pid(pid, force=force)
killed += 1
except ProcessLookupError:
# Process already gone
@@ -182,6 +180,8 @@ def kill_gateway_processes(force: bool = False, exclude_pids: set | None = None)
except PermissionError:
print(f"⚠ Permission denied to kill PID {pid}")
except OSError as exc:
print(f"Failed to kill PID {pid}: {exc}")
return killed
@@ -1208,7 +1208,7 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
Args:
timeout: Total seconds to wait before giving up.
force_after: Seconds of graceful waiting before sending SIGKILL.
force_after: Seconds of graceful waiting before escalating to force-kill.
"""
import time
from gateway.status import get_running_pid
@@ -1225,15 +1225,15 @@ def _wait_for_gateway_exit(timeout: float = 10.0, force_after: float = 5.0):
if not force_sent and time.monotonic() >= force_deadline:
# Grace period expired — force-kill the specific PID.
try:
os.kill(pid, signal.SIGKILL)
terminate_pid(pid, force=True)
print(f"⚠ Gateway PID {pid} did not exit gracefully; sent SIGKILL")
except (ProcessLookupError, PermissionError):
except (ProcessLookupError, PermissionError, OSError):
return # Already gone or we can't touch it.
force_sent = True
time.sleep(0.3)
# Timed out even after SIGKILL.
# Timed out even after force-kill.
remaining_pid = get_running_pid()
if remaining_pid is not None:
print(f"⚠ Gateway PID {remaining_pid} still running after {timeout}s — restart may fail")

View File

@@ -308,6 +308,7 @@ class TestBackgroundInCLICommands:
def test_background_autocompletes(self):
"""The /background command appears in autocomplete results."""
pytest.importorskip("prompt_toolkit")
from hermes_cli.commands import SlashCommandCompleter
from prompt_toolkit.document import Document

View File

@@ -6,7 +6,7 @@ from types import SimpleNamespace
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult
from gateway.session import SessionSource, build_session_key
@@ -44,8 +44,8 @@ class DummyTelegramAdapter(BasePlatformAdapter):
async def on_processing_start(self, event: MessageEvent) -> None:
self.processing_hooks.append(("start", event.message_id))
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
self.processing_hooks.append(("complete", event.message_id, success))
async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
self.processing_hooks.append(("complete", event.message_id, outcome))
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
@@ -142,7 +142,7 @@ class TestBasePlatformTopicSessions:
]
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", True),
("complete", "1", ProcessingOutcome.SUCCESS),
]
@pytest.mark.asyncio
@@ -168,7 +168,7 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
("complete", "1", ProcessingOutcome.FAILURE),
]
@pytest.mark.asyncio
@@ -190,7 +190,7 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
("complete", "1", ProcessingOutcome.FAILURE),
]
@pytest.mark.asyncio
@@ -218,5 +218,31 @@ class TestBasePlatformTopicSessions:
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
("complete", "1", ProcessingOutcome.FAILURE),
]
@pytest.mark.asyncio
async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self):
adapter = DummyTelegramAdapter()
release = asyncio.Event()
async def handler(_event):
await release.wait()
return "ack"
async def hold_typing(_chat_id, interval=2.0, metadata=None):
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
await adapter.handle_message(event)
await asyncio.sleep(0)
await adapter.cancel_background_tasks()
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", ProcessingOutcome.CANCELLED),
]

View File

@@ -160,6 +160,22 @@ class TestCommandBypassActiveSession:
assert sk not in adapter._pending_messages
assert any("handled:status" in r for r in adapter.sent_responses)
@pytest.mark.asyncio
async def test_background_bypasses_guard(self):
"""/background must bypass so it spawns a parallel task, not an interrupt."""
adapter = _make_adapter()
sk = _session_key()
adapter._active_sessions[sk] = asyncio.Event()
await adapter.handle_message(_make_event("/background summarize HN"))
assert sk not in adapter._pending_messages, (
"/background was queued as a pending message instead of being dispatched"
)
assert any("handled:background" in r for r in adapter.sent_responses), (
"/background response was not sent back to the user"
)
# ---------------------------------------------------------------------------
# Tests: non-bypass messages still get queued

View File

@@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome, SendResult
from gateway.session import SessionSource, build_session_key
@@ -212,7 +212,7 @@ async def test_reactions_disabled_via_env_zero(adapter, monkeypatch):
event = _make_event("5", raw_message)
await adapter.on_processing_start(event)
await adapter.on_processing_complete(event, success=True)
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
raw_message.add_reaction.assert_not_awaited()
raw_message.remove_reaction.assert_not_awaited()
@@ -232,3 +232,17 @@ async def test_reactions_enabled_by_default(adapter, monkeypatch):
await adapter.on_processing_start(event)
raw_message.add_reaction.assert_awaited_once_with("👀")
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_removes_eyes_without_terminal_reaction(adapter):
raw_message = SimpleNamespace(
add_reaction=AsyncMock(),
remove_reaction=AsyncMock(),
)
event = _make_event("7", raw_message)
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
raw_message.remove_reaction.assert_awaited_once_with("👀", adapter._client.user)
raw_message.add_reaction.assert_not_awaited()

View File

@@ -128,12 +128,16 @@ async def test_internal_event_bypasses_authorization(monkeypatch, tmp_path):
monkeypatch.setattr(GatewayRunner, "_is_user_authorized", tracking_auth)
# _handle_message will proceed past auth check and eventually fail on
# downstream logic. We just need to verify auth is skipped.
# Stop execution before the agent runner so the test doesn't block in
# run_in_executor. Auth check happens before _handle_message_with_agent.
async def _raise(*_a, **_kw):
raise RuntimeError("sentinel — stop here")
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
try:
await runner._handle_message(event)
except Exception:
pass # Expected — downstream code needs more setup
except RuntimeError:
pass # Expected sentinel
assert not auth_called, (
"_is_user_authorized should NOT be called for internal events"
@@ -175,10 +179,16 @@ async def test_internal_event_does_not_trigger_pairing(monkeypatch, tmp_path):
runner.pairing_store.generate_code = tracking_generate
# Stop execution before the agent runner so the test doesn't block in
# run_in_executor. Pairing check happens before _handle_message_with_agent.
async def _raise(*_a, **_kw):
raise RuntimeError("sentinel — stop here")
monkeypatch.setattr(GatewayRunner, "_handle_message_with_agent", _raise)
try:
await runner._handle_message(event)
except Exception:
pass # Expected — downstream code needs more setup
except RuntimeError:
pass # Expected sentinel
assert not generate_called, (
"Pairing code should NOT be generated for internal events"

View File

@@ -1980,7 +1980,7 @@ class TestMatrixReactions:
@pytest.mark.asyncio
async def test_on_processing_complete_sends_check(self):
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
@@ -1994,9 +1994,28 @@ class TestMatrixReactions:
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, success=True)
await self.adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
self.adapter._send_reaction.assert_called_once_with("!room:ex", "$msg1", "")
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_sends_no_terminal_reaction(self):
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
self.adapter._reactions_enabled = True
self.adapter._send_reaction = AsyncMock(return_value=True)
source = MagicMock()
source.chat_id = "!room:ex"
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
raw_message={},
message_id="$msg1",
)
await self.adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
self.adapter._send_reaction.assert_not_called()
@pytest.mark.asyncio
async def test_reactions_disabled(self):
from gateway.platforms.base import MessageEvent, MessageType

View File

@@ -144,7 +144,7 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa
assert adapter.sent == [
{
"chat_id": "-1001",
"content": '💻 terminal: "pwd"',
"content": '⚙️ terminal: "pwd"',
"reply_to": None,
"metadata": {"thread_id": "17585"},
}

View File

@@ -87,3 +87,42 @@ async def test_runner_allows_cron_only_mode_when_no_platforms_are_enabled(monkey
assert runner.adapters == {}
state = read_runtime_status()
assert state["gateway_state"] == "running"
@pytest.mark.asyncio
async def test_start_gateway_replace_force_uses_terminate_pid(monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
calls = []
class _CleanExitRunner:
def __init__(self, config):
self.config = config
self.should_exit_cleanly = True
self.exit_reason = None
self.adapters = {}
async def start(self):
return True
async def stop(self):
return None
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 42)
monkeypatch.setattr("gateway.status.remove_pid_file", lambda: None)
monkeypatch.setattr("gateway.status.release_all_scoped_locks", lambda: 0)
monkeypatch.setattr("gateway.status.terminate_pid", lambda pid, force=False: calls.append((pid, force)))
monkeypatch.setattr("gateway.run.os.getpid", lambda: 100)
monkeypatch.setattr("gateway.run.os.kill", lambda pid, sig: None)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("tools.skills_sync.sync_skills", lambda quiet=True: None)
monkeypatch.setattr("hermes_logging.setup_logging", lambda hermes_home, mode: tmp_path)
monkeypatch.setattr("hermes_logging._add_rotating_handler", lambda *args, **kwargs: None)
monkeypatch.setattr("gateway.run.GatewayRunner", _CleanExitRunner)
from gateway.run import start_gateway
ok = await start_gateway(config=GatewayConfig(), replace=True, verbosity=None)
assert ok is True
assert calls == [(42, False), (42, True)]

View File

@@ -2,6 +2,7 @@
import json
import os
from types import SimpleNamespace
from gateway import status
@@ -104,6 +105,41 @@ class TestGatewayRuntimeStatus:
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
class TestTerminatePid:
def test_force_uses_taskkill_on_windows(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(cmd, capture_output=False, text=False, timeout=None):
calls.append((cmd, capture_output, text, timeout))
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(status.subprocess, "run", fake_run)
status.terminate_pid(123, force=True)
assert calls == [
(["taskkill", "/PID", "123", "/T", "/F"], True, True, 10)
]
def test_force_falls_back_to_sigterm_when_taskkill_missing(self, monkeypatch):
calls = []
monkeypatch.setattr(status, "_IS_WINDOWS", True)
def fake_run(*args, **kwargs):
raise FileNotFoundError
def fake_kill(pid, sig):
calls.append((pid, sig))
monkeypatch.setattr(status.subprocess, "run", fake_run)
monkeypatch.setattr(status.os, "kill", fake_kill)
status.terminate_pid(456, force=True)
assert calls == [(456, status.signal.SIGTERM)]
class TestScopedLocks:
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))

View File

@@ -437,6 +437,45 @@ class TestSegmentBreakOnToolBoundary:
# Only one send call (the initial message)
assert adapter.send.call_count == 1
@pytest.mark.asyncio
async def test_no_message_id_segment_breaks_do_not_resend(self):
"""On a platform that never returns a message_id (e.g. webhook with
github_comment delivery), tool-call segment breaks must NOT trigger
a new adapter.send() per boundary. The fix: _message_id == '__no_edit__'
suppresses the reset so all text accumulates and is sent once."""
adapter = MagicMock()
# No message_id on first send, then one more for the fallback final
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id=None),
SimpleNamespace(success=True, message_id=None),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Simulate: text → tool boundary → text → tool boundary → text (3 segments)
consumer.on_delta("Phase 1 text")
consumer.on_delta(None) # tool call boundary
consumer.on_delta("Phase 2 text")
consumer.on_delta(None) # another tool call boundary
consumer.on_delta("Phase 3 text")
consumer.finish()
await consumer.run()
# Before the fix this would post 3 comments (one per segment).
# After the fix: only the initial partial + one fallback-final continuation.
assert adapter.send.call_count == 2, (
f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
)
assert consumer.already_sent
# The continuation must contain the text from segments 2 and 3
final_text = adapter.send.call_args_list[1][1]["content"]
assert "Phase 2" in final_text
assert "Phase 3" in final_text
@pytest.mark.asyncio
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
"""Long continuation tails should be chunked when fallback final-send runs."""

View File

@@ -6,7 +6,7 @@ from unittest.mock import AsyncMock
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.base import MessageEvent, MessageType, ProcessingOutcome
from gateway.session import SessionSource
@@ -180,7 +180,7 @@ async def test_on_processing_complete_success(monkeypatch):
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, success=True)
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123,
@@ -196,7 +196,7 @@ async def test_on_processing_complete_failure(monkeypatch):
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, success=False)
await adapter.on_processing_complete(event, ProcessingOutcome.FAILURE)
adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123,
@@ -212,7 +212,19 @@ async def test_on_processing_complete_skipped_when_disabled(monkeypatch):
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, success=True)
await adapter.on_processing_complete(event, ProcessingOutcome.SUCCESS)
adapter._bot.set_message_reaction.assert_not_awaited()
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_keeps_existing_reaction(monkeypatch):
"""Expected cancellation should not replace the in-progress reaction."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
adapter._bot.set_message_reaction.assert_not_awaited()

View File

@@ -1,6 +1,5 @@
"""Tests for hermes_cli.gateway."""
import signal
from types import SimpleNamespace
from unittest.mock import patch, call
@@ -211,8 +210,7 @@ class TestWaitForGatewayExit:
assert poll_count == 3
def test_force_kills_after_grace_period(self, monkeypatch):
"""When the process doesn't exit, SIGKILL the saved PID."""
import time as _time
"""When the process doesn't exit, force-kill the saved PID."""
# Simulate monotonic time advancing past force_after
call_num = 0
@@ -224,8 +222,8 @@ class TestWaitForGatewayExit:
return call_num * 2.0 # 2, 4, 6, 8, ...
kills = []
def mock_kill(pid, sig):
kills.append((pid, sig))
def mock_terminate(pid, force=False):
kills.append((pid, force))
# get_running_pid returns the PID until kill is sent, then None
def mock_get_running_pid():
@@ -234,14 +232,13 @@ class TestWaitForGatewayExit:
monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
monkeypatch.setattr("os.kill", mock_kill)
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
assert (42, signal.SIGKILL) in kills
assert (42, True) in kills
def test_handles_process_already_gone_on_kill(self, monkeypatch):
"""ProcessLookupError during SIGKILL is not fatal."""
import time as _time
"""ProcessLookupError during force-kill is not fatal."""
call_num = 0
def fake_monotonic():
@@ -249,13 +246,24 @@ class TestWaitForGatewayExit:
call_num += 1
return call_num * 3.0 # Jump past force_after quickly
def mock_kill(pid, sig):
def mock_terminate(pid, force=False):
raise ProcessLookupError
monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
monkeypatch.setattr("os.kill", mock_kill)
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
# Should not raise — ProcessLookupError means it's already gone.
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
calls = []
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None: [11, 22])
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
killed = gateway.kill_gateway_processes(force=True)
assert killed == 2
assert calls == [(11, True), (22, True)]