Files
hermes-agent/tests/gateway/test_run_progress_interrupt.py
Teknium 35c57cc46b fix(gateway): suppress tool-progress bubbles after interrupt (#16034)
When the LLM response carries N parallel tool calls, the agent fires
N tool.started events back-to-back before its interrupt check runs.
A user sending /stop mid-batch would see the ' Interrupting current
task' ack followed by a trail of 🔍 web_search bubbles for the remaining
events in the batch — making the interrupt feel ignored.

progress_callback and the drain loop in send_progress_messages now
check agent.is_interrupted (via agent_holder[0], the existing
cross-scope handle). Events that arrive after interrupt are dropped
at both the queueing and rendering stages. The ' Interrupting'
message is sent through a separate adapter path and is unaffected.
2026-04-26 05:47:37 -07:00

216 lines
7.9 KiB
Python

"""Tests for interrupt-aware tool-progress suppression in gateway.
When a user sends `stop` while the agent is executing a batch of parallel
tool calls, the gateway's progress_callback should stop queuing 🔍 bubbles
and the drain loop should drop any already-queued events. Without this
guard, the stop acknowledgement appears first but is followed by a trail
of tool-progress bubbles for calls that were already parsed from the LLM
response — making the interrupt feel ignored.
"""
import asyncio
import importlib
import sys
import time
import types
from types import SimpleNamespace
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.session import SessionSource
class ProgressCaptureAdapter(BasePlatformAdapter):
def __init__(self, platform=Platform.TELEGRAM):
super().__init__(PlatformConfig(enabled=True, token="***"), platform)
self.sent = []
self.edits = []
self.typing = []
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
return None
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
self.sent.append({"chat_id": chat_id, "content": content})
return SendResult(success=True, message_id="progress-1")
async def edit_message(self, chat_id, message_id, content) -> SendResult:
self.edits.append({"message_id": message_id, "content": content})
return SendResult(success=True, message_id=message_id)
async def send_typing(self, chat_id, metadata=None) -> None:
self.typing.append(chat_id)
async def stop_typing(self, chat_id) -> None:
return None
async def get_chat_info(self, chat_id: str):
return {"id": chat_id}
class PreInterruptAgent:
"""Fires tool-progress events BEFORE the interrupt lands.
These should render normally. Baseline for comparison with the
interrupted case — proves the harness renders events when no
interrupt is active.
"""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
self._interrupt_requested = False
@property
def is_interrupted(self) -> bool:
return self._interrupt_requested
def run_conversation(self, message, conversation_history=None, task_id=None):
self.tool_progress_callback("tool.started", "web_search", "first search", {})
time.sleep(0.35) # let the drain loop process
return {"final_response": "done", "messages": [], "api_calls": 1}
class InterruptedAgent:
"""Fires tool.started events AFTER interrupt — all should be suppressed.
Mirrors the failure mode in the bug report: LLM returned N parallel
web_search calls, interrupt flag flipped, remaining events still
rendered as bubbles. With the fix, none of these should appear.
"""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
# Start already interrupted — simulates stop having already landed
# by the time the agent batch starts firing tool.started events.
self._interrupt_requested = True
@property
def is_interrupted(self) -> bool:
return self._interrupt_requested
def run_conversation(self, message, conversation_history=None, task_id=None):
# Parallel tool batch — in production these come from one LLM
# response with 5 tool_calls. All are post-interrupt.
self.tool_progress_callback("tool.started", "web_search", "cognee hermes", {})
self.tool_progress_callback("tool.started", "web_search", "McBee deer hunting", {})
self.tool_progress_callback("tool.started", "web_search", "kuzu graph db", {})
self.tool_progress_callback("tool.started", "web_search", "moonshot kimi api", {})
self.tool_progress_callback("tool.started", "web_search", "platform.moonshot.cn", {})
time.sleep(0.35) # let the drain loop attempt to process the queue
return {"final_response": "interrupted", "messages": [], "api_calls": 1}
def _make_runner(adapter):
gateway_run = importlib.import_module("gateway.run")
GatewayRunner = gateway_run.GatewayRunner
runner = object.__new__(GatewayRunner)
runner.adapters = {adapter.platform: adapter}
runner._voice_mode = {}
runner._prefill_messages = []
runner._ephemeral_system_prompt = ""
runner._reasoning_config = None
runner._provider_routing = {}
runner._fallback_model = None
runner._session_db = None
runner._running_agents = {}
runner._session_run_generation = {}
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(
thread_sessions_per_user=False,
group_sessions_per_user=False,
stt_enabled=False,
)
return runner
async def _run_once(monkeypatch, tmp_path, agent_cls, session_id):
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = agent_cls
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
adapter = ProgressCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {"api_key": "fake"},
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1001",
chat_type="group",
thread_id="17585",
)
result = await runner._run_agent(
message="hi",
context_prompt="",
history=[],
source=source,
session_id=session_id,
session_key="agent:main:telegram:group:-1001:17585",
)
return adapter, result
@pytest.mark.asyncio
async def test_baseline_non_interrupted_agent_renders_progress(monkeypatch, tmp_path):
"""Sanity check: when is_interrupted is False, tool-progress renders normally."""
adapter, result = await _run_once(monkeypatch, tmp_path, PreInterruptAgent, "sess-baseline")
assert result["final_response"] == "done"
rendered = " ".join(c["content"] for c in adapter.sent) + " " + " ".join(
c["content"] for c in adapter.edits
)
assert "first search" in rendered, (
"baseline agent should render its tool-progress event — "
"if this fails the test harness is broken, not the fix"
)
@pytest.mark.asyncio
async def test_progress_suppressed_when_agent_is_interrupted(monkeypatch, tmp_path):
"""Post-interrupt tool.started events must not render as bubbles.
This is Bug B from the screenshot: user sends `stop`, agent acks with
⚡ Interrupting, but 5 more 🔍 web_search bubbles still render because
their tool.started events were already parsed from the LLM response.
With the fix, progress_callback and the drain loop both check
is_interrupted and skip these events.
"""
adapter, result = await _run_once(
monkeypatch, tmp_path, InterruptedAgent, "sess-interrupted"
)
assert result["final_response"] == "interrupted"
rendered = " ".join(c["content"] for c in adapter.sent) + " " + " ".join(
c["content"] for c in adapter.edits
)
# None of the post-interrupt queries should appear.
for leaked_query in (
"cognee hermes",
"McBee deer hunting",
"kuzu graph db",
"moonshot kimi api",
"platform.moonshot.cn",
):
assert leaked_query not in rendered, (
f"event '{leaked_query}' leaked into the UI after interrupt — "
f"progress_callback / drain loop is not checking is_interrupted"
)