Compare commits

...

1 Commits

Author SHA1 Message Date
Black-Kylin
9f6224033c fix(gateway): sync compression split on failed turns
Sync gateway session pointers immediately after context compression rotates the agent session, even when the follow-up model call fails before a final response.

Co-authored-from: https://github.com/NousResearch/hermes-agent/pull/25747
2026-06-13 04:53:41 -07:00
2 changed files with 225 additions and 27 deletions

View File

@@ -14565,6 +14565,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_context_length = getattr(_agent.context_compressor, "context_length", 0) or 0
_resolved_model = getattr(_agent, "model", None) if _agent else None
# Compression can rotate the agent to a new session before the
# follow-up model call succeeds. Sync that rotation before any
# early return so the next gateway turn reloads the compressed
# child transcript instead of the stale pre-compression parent.
agent = agent_holder[0]
_session_was_split = False
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
if agent and session_key and effective_session_id != session_id:
_session_was_split = True
logger.info(
"Session split detected: %s%s (compression)",
session_id, effective_session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = effective_session_id
self.session_store._save()
self._sync_telegram_topic_binding(
source, entry, reason="agent-result-compression",
)
# When compression created a new session, the messages list was
# shortened. Using the original history offset would make gateway
# persistence slice away the compressed handoff/tail on failure.
_effective_history_offset = 0 if _session_was_split else len(agent_history)
if not final_response:
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
return {
@@ -14579,7 +14605,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"error": result.get("error"),
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"history_offset": _effective_history_offset,
"session_id": effective_session_id,
"last_prompt_tokens": _last_prompt_toks,
"input_tokens": _input_toks,
"output_tokens": _output_toks,
@@ -14625,23 +14652,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
# Sync session_id: the agent may have created a new session during
# mid-run context compression (_compress_context splits sessions).
# If so, update the session store entry so the NEXT message loads
# the compressed transcript, not the stale pre-compression one.
agent = agent_holder[0]
_session_was_split = False
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
_session_was_split = True
logger.info(
"Session split detected: %s%s (compression)",
session_id, agent.session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = agent.session_id
self.session_store._save()
if _session_was_split:
# If this is a Telegram DM and source.thread_id was lost during
# the session split (synthetic / recovered event), restore it
# from the binding so _thread_metadata_for_source produces the
@@ -14673,15 +14684,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
exc_info=True,
)
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
# When compression created a new session, the messages list was
# shortened. Using the original history offset would produce an
# empty new_messages slice, causing the gateway to write only a
# user/assistant pair — losing the compressed summary and tail.
# Reset to 0 so the gateway writes ALL compressed messages.
_effective_history_offset = 0 if _session_was_split else len(agent_history)
# Auto-generate session title after first exchange (non-blocking)
if final_response and self._session_db:
try:

View File

@@ -0,0 +1,196 @@
import asyncio
import sys
import threading
import types
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import gateway.run as gateway_run
from gateway.config import Platform
from gateway.session import SessionSource
SESSION_KEY = "agent:main:telegram:dm:12345"
class _SaveTrackingSessionStore:
def __init__(self):
self.entry = SimpleNamespace(session_id="session-before-compression")
self._entries = {SESSION_KEY: self.entry}
self.save_calls = 0
self.topic_sync_calls = []
def _save(self):
self.save_calls += 1
class _CompressionThenFailureAgent:
def __init__(self, **kwargs):
self.session_id = kwargs["session_id"]
self.model = kwargs["model"]
self.tools = []
self.context_compressor = SimpleNamespace(
last_prompt_tokens=4321,
context_length=200000,
)
self.session_prompt_tokens = 4321
self.session_completion_tokens = 0
def run_conversation(self, user_message, conversation_history=None, task_id=None):
self.session_id = "session-after-compression"
return {
"failed": True,
"error": (
"APIConnectionError: Codex auxiliary Responses stream exceeded "
"120.0s total timeout"
),
"messages": [
{
"role": "user",
"content": (
"[Context compressed: previous long transcript was "
"summarized before retry]"
),
},
{"role": "user", "content": user_message},
],
"api_calls": 1,
}
def interrupt(self, *_args, **_kwargs):
pass
class _ImmediateStreamConsumer:
final_response_sent = False
def __init__(self, *_args, **_kwargs):
pass
async def run(self):
return None
def finish(self):
pass
class _QuietAdapter:
SUPPORTS_MESSAGE_EDITING = True
_pending_messages = {}
def get_pending_message(self, _session_key):
return None
def _install_fake_agent(monkeypatch):
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _CompressionThenFailureAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
def _make_runner(session_store):
runner = object.__new__(gateway_run.GatewayRunner)
runner.adapters = {
Platform.TELEGRAM: _QuietAdapter(),
}
runner._ephemeral_system_prompt = ""
runner._prefill_messages = []
runner._reasoning_config = None
runner._provider_routing = {}
runner._fallback_model = None
runner._running_agents = {}
runner._pending_model_notes = {}
runner._pending_skills_reload_notes = {}
runner._session_db = None
runner._agent_cache = {}
runner._agent_cache_lock = threading.Lock()
runner._session_model_overrides = {}
runner._draining = False
runner.config = SimpleNamespace(streaming=None)
runner.hooks = SimpleNamespace(loaded_hooks=False, emit=AsyncMock())
runner.session_store = session_store
runner._get_proxy_url = lambda: None
runner._resolve_session_agent_runtime = lambda **_kwargs: (
"gpt-5.4",
{
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "token",
},
)
runner._resolve_session_reasoning_config = lambda **_kwargs: None
runner._resolve_turn_agent_config = lambda message, model, runtime: {
"model": model,
"runtime": runtime,
}
runner._load_service_tier = lambda: None
runner._agent_config_signature = lambda *_args, **_kwargs: ("sig",)
runner._extract_cache_busting_config = lambda _config: ()
runner._thread_metadata_for_source = lambda *_args, **_kwargs: None
runner._is_telegram_topic_lane = lambda _source: False
runner._sync_telegram_topic_binding = lambda source, entry, *, reason: session_store.topic_sync_calls.append(
(source, entry.session_id, reason)
)
runner._release_running_agent_state = MagicMock()
return runner
def _source():
return SessionSource(
platform=Platform.TELEGRAM,
chat_id="12345",
chat_type="dm",
user_id="user-1",
)
def test_failed_turn_still_syncs_compression_session_split(monkeypatch):
"""A post-compression API failure must not leave the session store on the
stale pre-compression transcript.
"""
_install_fake_agent(monkeypatch)
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "off")
monkeypatch.setenv("HERMES_AGENT_TIMEOUT", "0")
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
monkeypatch.setattr(
"gateway.stream_consumer.GatewayStreamConsumer",
_ImmediateStreamConsumer,
)
import hermes_cli.tools_config as tools_config
monkeypatch.setattr(
tools_config,
"_get_platform_tools",
lambda *_args, **_kwargs: {"core"},
)
session_store = _SaveTrackingSessionStore()
runner = _make_runner(session_store)
result = asyncio.run(
asyncio.wait_for(
runner._run_agent(
message="continue",
context_prompt="",
history=[
{"role": "user", "content": "old question"},
{"role": "assistant", "content": "old answer"},
],
source=_source(),
session_id="session-before-compression",
session_key=SESSION_KEY,
),
timeout=2,
),
)
assert result["failed"] is True
assert result["session_id"] == "session-after-compression"
assert result["history_offset"] == 0
assert session_store.entry.session_id == "session-after-compression"
assert session_store.save_calls == 1
assert session_store.topic_sync_calls == [
(_source(), "session-after-compression", "agent-result-compression")
]