mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-14 14:19:29 +08:00
Compare commits
1 Commits
claude-cod
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f6224033c |
@@ -14565,6 +14565,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
_context_length = getattr(_agent.context_compressor, "context_length", 0) or 0
|
||||
_resolved_model = getattr(_agent, "model", None) if _agent else None
|
||||
|
||||
# Compression can rotate the agent to a new session before the
|
||||
# follow-up model call succeeds. Sync that rotation before any
|
||||
# early return so the next gateway turn reloads the compressed
|
||||
# child transcript instead of the stale pre-compression parent.
|
||||
agent = agent_holder[0]
|
||||
_session_was_split = False
|
||||
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
|
||||
if agent and session_key and effective_session_id != session_id:
|
||||
_session_was_split = True
|
||||
logger.info(
|
||||
"Session split detected: %s → %s (compression)",
|
||||
session_id, effective_session_id,
|
||||
)
|
||||
entry = self.session_store._entries.get(session_key)
|
||||
if entry:
|
||||
entry.session_id = effective_session_id
|
||||
self.session_store._save()
|
||||
self._sync_telegram_topic_binding(
|
||||
source, entry, reason="agent-result-compression",
|
||||
)
|
||||
|
||||
# When compression created a new session, the messages list was
|
||||
# shortened. Using the original history offset would make gateway
|
||||
# persistence slice away the compressed handoff/tail on failure.
|
||||
_effective_history_offset = 0 if _session_was_split else len(agent_history)
|
||||
|
||||
if not final_response:
|
||||
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
|
||||
return {
|
||||
@@ -14579,7 +14605,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
"error": result.get("error"),
|
||||
"compression_exhausted": result.get("compression_exhausted", False),
|
||||
"tools": tools_holder[0] or [],
|
||||
"history_offset": len(agent_history),
|
||||
"history_offset": _effective_history_offset,
|
||||
"session_id": effective_session_id,
|
||||
"last_prompt_tokens": _last_prompt_toks,
|
||||
"input_tokens": _input_toks,
|
||||
"output_tokens": _output_toks,
|
||||
@@ -14625,23 +14652,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
unique_tags.insert(0, "[[audio_as_voice]]")
|
||||
final_response = final_response + "\n" + "\n".join(unique_tags)
|
||||
|
||||
# Sync session_id: the agent may have created a new session during
|
||||
# mid-run context compression (_compress_context splits sessions).
|
||||
# If so, update the session store entry so the NEXT message loads
|
||||
# the compressed transcript, not the stale pre-compression one.
|
||||
agent = agent_holder[0]
|
||||
_session_was_split = False
|
||||
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
|
||||
_session_was_split = True
|
||||
logger.info(
|
||||
"Session split detected: %s → %s (compression)",
|
||||
session_id, agent.session_id,
|
||||
)
|
||||
entry = self.session_store._entries.get(session_key)
|
||||
if entry:
|
||||
entry.session_id = agent.session_id
|
||||
self.session_store._save()
|
||||
|
||||
if _session_was_split:
|
||||
# If this is a Telegram DM and source.thread_id was lost during
|
||||
# the session split (synthetic / recovered event), restore it
|
||||
# from the binding so _thread_metadata_for_source produces the
|
||||
@@ -14673,15 +14684,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
|
||||
|
||||
# When compression created a new session, the messages list was
|
||||
# shortened. Using the original history offset would produce an
|
||||
# empty new_messages slice, causing the gateway to write only a
|
||||
# user/assistant pair — losing the compressed summary and tail.
|
||||
# Reset to 0 so the gateway writes ALL compressed messages.
|
||||
_effective_history_offset = 0 if _session_was_split else len(agent_history)
|
||||
|
||||
# Auto-generate session title after first exchange (non-blocking)
|
||||
if final_response and self._session_db:
|
||||
try:
|
||||
|
||||
196
tests/gateway/test_compression_failure_session_sync.py
Normal file
196
tests/gateway/test_compression_failure_session_sync.py
Normal file
@@ -0,0 +1,196 @@
|
||||
import asyncio
|
||||
import sys
|
||||
import threading
|
||||
import types
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import gateway.run as gateway_run
|
||||
from gateway.config import Platform
|
||||
from gateway.session import SessionSource
|
||||
|
||||
|
||||
SESSION_KEY = "agent:main:telegram:dm:12345"
|
||||
|
||||
|
||||
class _SaveTrackingSessionStore:
|
||||
def __init__(self):
|
||||
self.entry = SimpleNamespace(session_id="session-before-compression")
|
||||
self._entries = {SESSION_KEY: self.entry}
|
||||
self.save_calls = 0
|
||||
self.topic_sync_calls = []
|
||||
|
||||
def _save(self):
|
||||
self.save_calls += 1
|
||||
|
||||
|
||||
class _CompressionThenFailureAgent:
|
||||
def __init__(self, **kwargs):
|
||||
self.session_id = kwargs["session_id"]
|
||||
self.model = kwargs["model"]
|
||||
self.tools = []
|
||||
self.context_compressor = SimpleNamespace(
|
||||
last_prompt_tokens=4321,
|
||||
context_length=200000,
|
||||
)
|
||||
self.session_prompt_tokens = 4321
|
||||
self.session_completion_tokens = 0
|
||||
|
||||
def run_conversation(self, user_message, conversation_history=None, task_id=None):
|
||||
self.session_id = "session-after-compression"
|
||||
return {
|
||||
"failed": True,
|
||||
"error": (
|
||||
"APIConnectionError: Codex auxiliary Responses stream exceeded "
|
||||
"120.0s total timeout"
|
||||
),
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"[Context compressed: previous long transcript was "
|
||||
"summarized before retry]"
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": user_message},
|
||||
],
|
||||
"api_calls": 1,
|
||||
}
|
||||
|
||||
def interrupt(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class _ImmediateStreamConsumer:
|
||||
final_response_sent = False
|
||||
|
||||
def __init__(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
async def run(self):
|
||||
return None
|
||||
|
||||
def finish(self):
|
||||
pass
|
||||
|
||||
|
||||
class _QuietAdapter:
|
||||
SUPPORTS_MESSAGE_EDITING = True
|
||||
_pending_messages = {}
|
||||
|
||||
def get_pending_message(self, _session_key):
|
||||
return None
|
||||
|
||||
|
||||
def _install_fake_agent(monkeypatch):
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = _CompressionThenFailureAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
|
||||
|
||||
def _make_runner(session_store):
|
||||
runner = object.__new__(gateway_run.GatewayRunner)
|
||||
runner.adapters = {
|
||||
Platform.TELEGRAM: _QuietAdapter(),
|
||||
}
|
||||
runner._ephemeral_system_prompt = ""
|
||||
runner._prefill_messages = []
|
||||
runner._reasoning_config = None
|
||||
runner._provider_routing = {}
|
||||
runner._fallback_model = None
|
||||
runner._running_agents = {}
|
||||
runner._pending_model_notes = {}
|
||||
runner._pending_skills_reload_notes = {}
|
||||
runner._session_db = None
|
||||
runner._agent_cache = {}
|
||||
runner._agent_cache_lock = threading.Lock()
|
||||
runner._session_model_overrides = {}
|
||||
runner._draining = False
|
||||
runner.config = SimpleNamespace(streaming=None)
|
||||
runner.hooks = SimpleNamespace(loaded_hooks=False, emit=AsyncMock())
|
||||
runner.session_store = session_store
|
||||
runner._get_proxy_url = lambda: None
|
||||
runner._resolve_session_agent_runtime = lambda **_kwargs: (
|
||||
"gpt-5.4",
|
||||
{
|
||||
"provider": "openai-codex",
|
||||
"api_mode": "codex_responses",
|
||||
"base_url": "https://chatgpt.com/backend-api/codex",
|
||||
"api_key": "token",
|
||||
},
|
||||
)
|
||||
runner._resolve_session_reasoning_config = lambda **_kwargs: None
|
||||
runner._resolve_turn_agent_config = lambda message, model, runtime: {
|
||||
"model": model,
|
||||
"runtime": runtime,
|
||||
}
|
||||
runner._load_service_tier = lambda: None
|
||||
runner._agent_config_signature = lambda *_args, **_kwargs: ("sig",)
|
||||
runner._extract_cache_busting_config = lambda _config: ()
|
||||
runner._thread_metadata_for_source = lambda *_args, **_kwargs: None
|
||||
runner._is_telegram_topic_lane = lambda _source: False
|
||||
runner._sync_telegram_topic_binding = lambda source, entry, *, reason: session_store.topic_sync_calls.append(
|
||||
(source, entry.session_id, reason)
|
||||
)
|
||||
runner._release_running_agent_state = MagicMock()
|
||||
return runner
|
||||
|
||||
|
||||
def _source():
|
||||
return SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="12345",
|
||||
chat_type="dm",
|
||||
user_id="user-1",
|
||||
)
|
||||
|
||||
|
||||
def test_failed_turn_still_syncs_compression_session_split(monkeypatch):
|
||||
"""A post-compression API failure must not leave the session store on the
|
||||
stale pre-compression transcript.
|
||||
"""
|
||||
_install_fake_agent(monkeypatch)
|
||||
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "off")
|
||||
monkeypatch.setenv("HERMES_AGENT_TIMEOUT", "0")
|
||||
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
|
||||
monkeypatch.setattr(
|
||||
"gateway.stream_consumer.GatewayStreamConsumer",
|
||||
_ImmediateStreamConsumer,
|
||||
)
|
||||
|
||||
import hermes_cli.tools_config as tools_config
|
||||
|
||||
monkeypatch.setattr(
|
||||
tools_config,
|
||||
"_get_platform_tools",
|
||||
lambda *_args, **_kwargs: {"core"},
|
||||
)
|
||||
|
||||
session_store = _SaveTrackingSessionStore()
|
||||
runner = _make_runner(session_store)
|
||||
|
||||
result = asyncio.run(
|
||||
asyncio.wait_for(
|
||||
runner._run_agent(
|
||||
message="continue",
|
||||
context_prompt="",
|
||||
history=[
|
||||
{"role": "user", "content": "old question"},
|
||||
{"role": "assistant", "content": "old answer"},
|
||||
],
|
||||
source=_source(),
|
||||
session_id="session-before-compression",
|
||||
session_key=SESSION_KEY,
|
||||
),
|
||||
timeout=2,
|
||||
),
|
||||
)
|
||||
|
||||
assert result["failed"] is True
|
||||
assert result["session_id"] == "session-after-compression"
|
||||
assert result["history_offset"] == 0
|
||||
assert session_store.entry.session_id == "session-after-compression"
|
||||
assert session_store.save_calls == 1
|
||||
assert session_store.topic_sync_calls == [
|
||||
(_source(), "session-after-compression", "agent-result-compression")
|
||||
]
|
||||
Reference in New Issue
Block a user