mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-17 07:31:21 +08:00
Compare commits
1 Commits
claude-cod
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f6224033c |
@@ -14565,6 +14565,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||||||
_context_length = getattr(_agent.context_compressor, "context_length", 0) or 0
|
_context_length = getattr(_agent.context_compressor, "context_length", 0) or 0
|
||||||
_resolved_model = getattr(_agent, "model", None) if _agent else None
|
_resolved_model = getattr(_agent, "model", None) if _agent else None
|
||||||
|
|
||||||
|
# Compression can rotate the agent to a new session before the
|
||||||
|
# follow-up model call succeeds. Sync that rotation before any
|
||||||
|
# early return so the next gateway turn reloads the compressed
|
||||||
|
# child transcript instead of the stale pre-compression parent.
|
||||||
|
agent = agent_holder[0]
|
||||||
|
_session_was_split = False
|
||||||
|
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
|
||||||
|
if agent and session_key and effective_session_id != session_id:
|
||||||
|
_session_was_split = True
|
||||||
|
logger.info(
|
||||||
|
"Session split detected: %s → %s (compression)",
|
||||||
|
session_id, effective_session_id,
|
||||||
|
)
|
||||||
|
entry = self.session_store._entries.get(session_key)
|
||||||
|
if entry:
|
||||||
|
entry.session_id = effective_session_id
|
||||||
|
self.session_store._save()
|
||||||
|
self._sync_telegram_topic_binding(
|
||||||
|
source, entry, reason="agent-result-compression",
|
||||||
|
)
|
||||||
|
|
||||||
|
# When compression created a new session, the messages list was
|
||||||
|
# shortened. Using the original history offset would make gateway
|
||||||
|
# persistence slice away the compressed handoff/tail on failure.
|
||||||
|
_effective_history_offset = 0 if _session_was_split else len(agent_history)
|
||||||
|
|
||||||
if not final_response:
|
if not final_response:
|
||||||
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
|
error_msg = f"⚠️ {result['error']}" if result.get("error") else ""
|
||||||
return {
|
return {
|
||||||
@@ -14579,7 +14605,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||||||
"error": result.get("error"),
|
"error": result.get("error"),
|
||||||
"compression_exhausted": result.get("compression_exhausted", False),
|
"compression_exhausted": result.get("compression_exhausted", False),
|
||||||
"tools": tools_holder[0] or [],
|
"tools": tools_holder[0] or [],
|
||||||
"history_offset": len(agent_history),
|
"history_offset": _effective_history_offset,
|
||||||
|
"session_id": effective_session_id,
|
||||||
"last_prompt_tokens": _last_prompt_toks,
|
"last_prompt_tokens": _last_prompt_toks,
|
||||||
"input_tokens": _input_toks,
|
"input_tokens": _input_toks,
|
||||||
"output_tokens": _output_toks,
|
"output_tokens": _output_toks,
|
||||||
@@ -14625,23 +14652,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||||||
unique_tags.insert(0, "[[audio_as_voice]]")
|
unique_tags.insert(0, "[[audio_as_voice]]")
|
||||||
final_response = final_response + "\n" + "\n".join(unique_tags)
|
final_response = final_response + "\n" + "\n".join(unique_tags)
|
||||||
|
|
||||||
# Sync session_id: the agent may have created a new session during
|
if _session_was_split:
|
||||||
# mid-run context compression (_compress_context splits sessions).
|
|
||||||
# If so, update the session store entry so the NEXT message loads
|
|
||||||
# the compressed transcript, not the stale pre-compression one.
|
|
||||||
agent = agent_holder[0]
|
|
||||||
_session_was_split = False
|
|
||||||
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
|
|
||||||
_session_was_split = True
|
|
||||||
logger.info(
|
|
||||||
"Session split detected: %s → %s (compression)",
|
|
||||||
session_id, agent.session_id,
|
|
||||||
)
|
|
||||||
entry = self.session_store._entries.get(session_key)
|
|
||||||
if entry:
|
|
||||||
entry.session_id = agent.session_id
|
|
||||||
self.session_store._save()
|
|
||||||
|
|
||||||
# If this is a Telegram DM and source.thread_id was lost during
|
# If this is a Telegram DM and source.thread_id was lost during
|
||||||
# the session split (synthetic / recovered event), restore it
|
# the session split (synthetic / recovered event), restore it
|
||||||
# from the binding so _thread_metadata_for_source produces the
|
# from the binding so _thread_metadata_for_source produces the
|
||||||
@@ -14673,15 +14684,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
|
|
||||||
|
|
||||||
# When compression created a new session, the messages list was
|
|
||||||
# shortened. Using the original history offset would produce an
|
|
||||||
# empty new_messages slice, causing the gateway to write only a
|
|
||||||
# user/assistant pair — losing the compressed summary and tail.
|
|
||||||
# Reset to 0 so the gateway writes ALL compressed messages.
|
|
||||||
_effective_history_offset = 0 if _session_was_split else len(agent_history)
|
|
||||||
|
|
||||||
# Auto-generate session title after first exchange (non-blocking)
|
# Auto-generate session title after first exchange (non-blocking)
|
||||||
if final_response and self._session_db:
|
if final_response and self._session_db:
|
||||||
try:
|
try:
|
||||||
|
|||||||
196
tests/gateway/test_compression_failure_session_sync.py
Normal file
196
tests/gateway/test_compression_failure_session_sync.py
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import types
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import gateway.run as gateway_run
|
||||||
|
from gateway.config import Platform
|
||||||
|
from gateway.session import SessionSource
|
||||||
|
|
||||||
|
|
||||||
|
SESSION_KEY = "agent:main:telegram:dm:12345"
|
||||||
|
|
||||||
|
|
||||||
|
class _SaveTrackingSessionStore:
|
||||||
|
def __init__(self):
|
||||||
|
self.entry = SimpleNamespace(session_id="session-before-compression")
|
||||||
|
self._entries = {SESSION_KEY: self.entry}
|
||||||
|
self.save_calls = 0
|
||||||
|
self.topic_sync_calls = []
|
||||||
|
|
||||||
|
def _save(self):
|
||||||
|
self.save_calls += 1
|
||||||
|
|
||||||
|
|
||||||
|
class _CompressionThenFailureAgent:
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.session_id = kwargs["session_id"]
|
||||||
|
self.model = kwargs["model"]
|
||||||
|
self.tools = []
|
||||||
|
self.context_compressor = SimpleNamespace(
|
||||||
|
last_prompt_tokens=4321,
|
||||||
|
context_length=200000,
|
||||||
|
)
|
||||||
|
self.session_prompt_tokens = 4321
|
||||||
|
self.session_completion_tokens = 0
|
||||||
|
|
||||||
|
def run_conversation(self, user_message, conversation_history=None, task_id=None):
|
||||||
|
self.session_id = "session-after-compression"
|
||||||
|
return {
|
||||||
|
"failed": True,
|
||||||
|
"error": (
|
||||||
|
"APIConnectionError: Codex auxiliary Responses stream exceeded "
|
||||||
|
"120.0s total timeout"
|
||||||
|
),
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": (
|
||||||
|
"[Context compressed: previous long transcript was "
|
||||||
|
"summarized before retry]"
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{"role": "user", "content": user_message},
|
||||||
|
],
|
||||||
|
"api_calls": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
def interrupt(self, *_args, **_kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class _ImmediateStreamConsumer:
|
||||||
|
final_response_sent = False
|
||||||
|
|
||||||
|
def __init__(self, *_args, **_kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def finish(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class _QuietAdapter:
|
||||||
|
SUPPORTS_MESSAGE_EDITING = True
|
||||||
|
_pending_messages = {}
|
||||||
|
|
||||||
|
def get_pending_message(self, _session_key):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _install_fake_agent(monkeypatch):
|
||||||
|
fake_run_agent = types.ModuleType("run_agent")
|
||||||
|
fake_run_agent.AIAgent = _CompressionThenFailureAgent
|
||||||
|
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_runner(session_store):
|
||||||
|
runner = object.__new__(gateway_run.GatewayRunner)
|
||||||
|
runner.adapters = {
|
||||||
|
Platform.TELEGRAM: _QuietAdapter(),
|
||||||
|
}
|
||||||
|
runner._ephemeral_system_prompt = ""
|
||||||
|
runner._prefill_messages = []
|
||||||
|
runner._reasoning_config = None
|
||||||
|
runner._provider_routing = {}
|
||||||
|
runner._fallback_model = None
|
||||||
|
runner._running_agents = {}
|
||||||
|
runner._pending_model_notes = {}
|
||||||
|
runner._pending_skills_reload_notes = {}
|
||||||
|
runner._session_db = None
|
||||||
|
runner._agent_cache = {}
|
||||||
|
runner._agent_cache_lock = threading.Lock()
|
||||||
|
runner._session_model_overrides = {}
|
||||||
|
runner._draining = False
|
||||||
|
runner.config = SimpleNamespace(streaming=None)
|
||||||
|
runner.hooks = SimpleNamespace(loaded_hooks=False, emit=AsyncMock())
|
||||||
|
runner.session_store = session_store
|
||||||
|
runner._get_proxy_url = lambda: None
|
||||||
|
runner._resolve_session_agent_runtime = lambda **_kwargs: (
|
||||||
|
"gpt-5.4",
|
||||||
|
{
|
||||||
|
"provider": "openai-codex",
|
||||||
|
"api_mode": "codex_responses",
|
||||||
|
"base_url": "https://chatgpt.com/backend-api/codex",
|
||||||
|
"api_key": "token",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
runner._resolve_session_reasoning_config = lambda **_kwargs: None
|
||||||
|
runner._resolve_turn_agent_config = lambda message, model, runtime: {
|
||||||
|
"model": model,
|
||||||
|
"runtime": runtime,
|
||||||
|
}
|
||||||
|
runner._load_service_tier = lambda: None
|
||||||
|
runner._agent_config_signature = lambda *_args, **_kwargs: ("sig",)
|
||||||
|
runner._extract_cache_busting_config = lambda _config: ()
|
||||||
|
runner._thread_metadata_for_source = lambda *_args, **_kwargs: None
|
||||||
|
runner._is_telegram_topic_lane = lambda _source: False
|
||||||
|
runner._sync_telegram_topic_binding = lambda source, entry, *, reason: session_store.topic_sync_calls.append(
|
||||||
|
(source, entry.session_id, reason)
|
||||||
|
)
|
||||||
|
runner._release_running_agent_state = MagicMock()
|
||||||
|
return runner
|
||||||
|
|
||||||
|
|
||||||
|
def _source():
|
||||||
|
return SessionSource(
|
||||||
|
platform=Platform.TELEGRAM,
|
||||||
|
chat_id="12345",
|
||||||
|
chat_type="dm",
|
||||||
|
user_id="user-1",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_failed_turn_still_syncs_compression_session_split(monkeypatch):
|
||||||
|
"""A post-compression API failure must not leave the session store on the
|
||||||
|
stale pre-compression transcript.
|
||||||
|
"""
|
||||||
|
_install_fake_agent(monkeypatch)
|
||||||
|
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "off")
|
||||||
|
monkeypatch.setenv("HERMES_AGENT_TIMEOUT", "0")
|
||||||
|
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"gateway.stream_consumer.GatewayStreamConsumer",
|
||||||
|
_ImmediateStreamConsumer,
|
||||||
|
)
|
||||||
|
|
||||||
|
import hermes_cli.tools_config as tools_config
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
tools_config,
|
||||||
|
"_get_platform_tools",
|
||||||
|
lambda *_args, **_kwargs: {"core"},
|
||||||
|
)
|
||||||
|
|
||||||
|
session_store = _SaveTrackingSessionStore()
|
||||||
|
runner = _make_runner(session_store)
|
||||||
|
|
||||||
|
result = asyncio.run(
|
||||||
|
asyncio.wait_for(
|
||||||
|
runner._run_agent(
|
||||||
|
message="continue",
|
||||||
|
context_prompt="",
|
||||||
|
history=[
|
||||||
|
{"role": "user", "content": "old question"},
|
||||||
|
{"role": "assistant", "content": "old answer"},
|
||||||
|
],
|
||||||
|
source=_source(),
|
||||||
|
session_id="session-before-compression",
|
||||||
|
session_key=SESSION_KEY,
|
||||||
|
),
|
||||||
|
timeout=2,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result["failed"] is True
|
||||||
|
assert result["session_id"] == "session-after-compression"
|
||||||
|
assert result["history_offset"] == 0
|
||||||
|
assert session_store.entry.session_id == "session-after-compression"
|
||||||
|
assert session_store.save_calls == 1
|
||||||
|
assert session_store.topic_sync_calls == [
|
||||||
|
(_source(), "session-after-compression", "agent-result-compression")
|
||||||
|
]
|
||||||
Reference in New Issue
Block a user