fix(gateway): defer background review notifications until after main reply

Background review notifications ("💾 Skill created", "💾 Memory updated")
could race ahead of the main assistant reply in chat, making it look like
the agent stopped after creating a skill.

Gate bg-review notifications behind a threading.Event + pending queue.
Register a release callback on the adapter's _post_delivery_callbacks dict
so base.py's finally block fires it after the main response is delivered.

The queued-message path in _run_agent pops and calls the callback directly
to prevent double-fire.

Co-authored-by: Hermes Agent <hermes@nousresearch.com>
Closes #10541
This commit is contained in:
Greer Guthrie
2026-04-15 16:40:38 -07:00
committed by Teknium
parent 44941f0ed1
commit 33ff29dfae
3 changed files with 130 additions and 2 deletions

View File

@@ -839,6 +839,11 @@ class BasePlatformAdapter(ABC):
# Gateway shutdown cancels these so an old gateway instance doesn't keep
# working on a task after --replace or manual restarts.
self._background_tasks: set[asyncio.Task] = set()
# One-shot callbacks to fire after the main response is delivered.
# Keyed by session_key. GatewayRunner uses this to defer
# background-review notifications ("💾 Skill created") until the
# primary reply has been sent.
self._post_delivery_callbacks: Dict[str, Callable] = {}
self._expected_cancelled_tasks: set[asyncio.Task] = set()
self._busy_session_handler: Optional[Callable[[MessageEvent, str], Awaitable[bool]]] = None
# Chats where auto-TTS on voice input is disabled (set by /voice off)
@@ -1894,6 +1899,14 @@ class BasePlatformAdapter(ABC):
except Exception:
pass # Last resort — don't let error reporting crash the handler
finally:
# Fire any one-shot post-delivery callback registered for this
# session (e.g. deferred background-review notifications).
_post_cb = getattr(self, "_post_delivery_callbacks", {}).pop(session_key, None)
if callable(_post_cb):
try:
_post_cb()
except Exception:
pass
# Stop typing indicator
typing_task.cancel()
try:

View File

@@ -8616,8 +8616,11 @@ class GatewayRunner:
agent.service_tier = self._service_tier
agent.request_overrides = turn_route.get("request_overrides")
# Background review delivery — send "💾 Memory updated" etc. to user
def _bg_review_send(message: str) -> None:
_bg_review_release = threading.Event()
_bg_review_pending: list[str] = []
_bg_review_pending_lock = threading.Lock()
def _deliver_bg_review_message(message: str) -> None:
if not _status_adapter:
return
try:
@@ -8632,7 +8635,32 @@ class GatewayRunner:
except Exception as _e:
logger.debug("background_review_callback error: %s", _e)
def _release_bg_review_messages() -> None:
_bg_review_release.set()
with _bg_review_pending_lock:
pending = list(_bg_review_pending)
_bg_review_pending.clear()
for queued in pending:
_deliver_bg_review_message(queued)
# Background review delivery — send "💾 Memory updated" etc. to user
def _bg_review_send(message: str) -> None:
if not _status_adapter:
return
if not _bg_review_release.is_set():
with _bg_review_pending_lock:
if not _bg_review_release.is_set():
_bg_review_pending.append(message)
return
_deliver_bg_review_message(message)
agent.background_review_callback = _bg_review_send
# Register the release hook on the adapter so base.py's finally
# block can fire it after delivering the main response.
if _status_adapter and session_key:
_pdc = getattr(_status_adapter, "_post_delivery_callbacks", None)
if _pdc is not None:
_pdc[session_key] = _release_bg_review_messages
# Store agent reference for interrupt support
agent_holder[0] = agent
@@ -9356,6 +9384,17 @@ class GatewayRunner:
)
except Exception as e:
logger.warning("Failed to send first response before queued message: %s", e)
# Release deferred bg-review notifications now that the
# first response has been delivered. Pop from the
# adapter's callback dict (prevents double-fire in
# base.py's finally block) and call it.
if adapter and hasattr(adapter, "_post_delivery_callbacks"):
_bg_cb = adapter._post_delivery_callbacks.pop(session_key, None)
if callable(_bg_cb):
try:
_bg_cb()
except Exception:
pass
# else: interrupted — discard the interrupted response ("Operation
# interrupted." is just noise; the user already knows they sent a
# new message).

View File

@@ -1,5 +1,6 @@
"""Tests for topic-aware gateway progress updates."""
import asyncio
import importlib
import sys
import time
@@ -415,6 +416,21 @@ class QueuedCommentaryAgent:
}
class BackgroundReviewAgent:
def __init__(self, **kwargs):
self.background_review_callback = kwargs.get("background_review_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
if self.background_review_callback:
self.background_review_callback("💾 Skill 'prospect-scanner' created.")
return {
"final_response": "done",
"messages": [],
"api_calls": 1,
}
class VerboseAgent:
"""Agent that emits a tool call with args whose JSON exceeds 200 chars."""
LONG_CODE = "x" * 300
@@ -668,6 +684,66 @@ async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monke
assert "final response 1" in sent_texts
@pytest.mark.asyncio
async def test_run_agent_defers_background_review_notification_until_release(monkeypatch, tmp_path):
adapter, result = await _run_with_agent(
monkeypatch,
tmp_path,
BackgroundReviewAgent,
session_id="sess-bg-review-order",
config_data={"display": {"interim_assistant_messages": True}},
)
assert result["final_response"] == "done"
assert adapter.sent == []
@pytest.mark.asyncio
async def test_base_processing_releases_post_delivery_callback_after_main_send():
"""Post-delivery callbacks on the adapter fire after the main response."""
adapter = ProgressCaptureAdapter()
async def _handler(event):
return "done"
adapter.set_message_handler(_handler)
released = []
def _post_delivery_cb():
released.append(True)
adapter.sent.append(
{
"chat_id": "bg-review",
"content": "💾 Skill 'prospect-scanner' created.",
"reply_to": None,
"metadata": None,
}
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1001",
chat_type="group",
thread_id="17585",
)
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=source,
message_id="msg-1",
)
session_key = "agent:main:telegram:group:-1001:17585"
adapter._active_sessions[session_key] = asyncio.Event()
adapter._post_delivery_callbacks[session_key] = _post_delivery_cb
await adapter._process_message_background(event, session_key)
sent_texts = [call["content"] for call in adapter.sent]
assert sent_texts == ["done", "💾 Skill 'prospect-scanner' created."]
assert released == [True]
@pytest.mark.asyncio
async def test_verbose_mode_does_not_truncate_args_by_default(monkeypatch, tmp_path):
"""Verbose mode with default tool_preview_length (0) should NOT truncate args.