mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 18:27:04 +08:00
Compare commits
5 Commits
dependabot
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58cea98774 | ||
|
|
46db738eb4 | ||
|
|
c9045bcbfb | ||
|
|
9abd8b27a5 | ||
|
|
5be2162922 |
@@ -53,6 +53,7 @@ DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 8642
|
||||
MAX_STORED_RESPONSES = 100
|
||||
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
|
||||
CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS = 30.0
|
||||
|
||||
|
||||
def check_api_server_requirements() -> bool:
|
||||
@@ -762,7 +763,11 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
"""
|
||||
import queue as _q
|
||||
|
||||
sse_headers = {"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}
|
||||
sse_headers = {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
# CORS middleware can't inject headers into StreamResponse after
|
||||
# prepare() flushes them, so resolve CORS headers up front.
|
||||
origin = request.headers.get("Origin", "")
|
||||
@@ -775,6 +780,8 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
await response.prepare(request)
|
||||
|
||||
try:
|
||||
last_activity = time.monotonic()
|
||||
|
||||
# Role chunk
|
||||
role_chunk = {
|
||||
"id": completion_id, "object": "chat.completion.chunk",
|
||||
@@ -782,6 +789,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
||||
}
|
||||
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
||||
last_activity = time.monotonic()
|
||||
|
||||
# Helper — route a queue item to the correct SSE event.
|
||||
async def _emit(item):
|
||||
@@ -805,6 +813,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
"choices": [{"index": 0, "delta": {"content": item}, "finish_reason": None}],
|
||||
}
|
||||
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
||||
return time.monotonic()
|
||||
|
||||
# Stream content chunks as they arrive from the agent
|
||||
loop = asyncio.get_event_loop()
|
||||
@@ -819,16 +828,19 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
delta = stream_q.get_nowait()
|
||||
if delta is None:
|
||||
break
|
||||
await _emit(delta)
|
||||
last_activity = await _emit(delta)
|
||||
except _q.Empty:
|
||||
break
|
||||
break
|
||||
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
|
||||
await response.write(b": keepalive\n\n")
|
||||
last_activity = time.monotonic()
|
||||
continue
|
||||
|
||||
if delta is None: # End of stream sentinel
|
||||
break
|
||||
|
||||
await _emit(delta)
|
||||
last_activity = await _emit(delta)
|
||||
|
||||
# Get usage from completed agent
|
||||
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
||||
|
||||
@@ -1465,7 +1465,18 @@ class GatewayRunner:
|
||||
logger.info("Recovered %s background process(es) from previous run", recovered)
|
||||
except Exception as e:
|
||||
logger.warning("Process checkpoint recovery: %s", e)
|
||||
|
||||
|
||||
# Suspend sessions that were active when the gateway last exited.
|
||||
# This prevents stuck sessions from being blindly resumed on restart,
|
||||
# which can create an unrecoverable loop (#7536). Suspended sessions
|
||||
# auto-reset on the next incoming message, giving the user a clean start.
|
||||
try:
|
||||
suspended = self.session_store.suspend_recently_active()
|
||||
if suspended:
|
||||
logger.info("Suspended %d in-flight session(s) from previous run", suspended)
|
||||
except Exception as e:
|
||||
logger.warning("Session suspension on startup failed: %s", e)
|
||||
|
||||
connected_count = 0
|
||||
enabled_platform_count = 0
|
||||
startup_nonretryable_errors: list[str] = []
|
||||
@@ -2370,8 +2381,11 @@ class GatewayRunner:
|
||||
self._pending_messages.pop(_quick_key, None)
|
||||
if _quick_key in self._running_agents:
|
||||
del self._running_agents[_quick_key]
|
||||
logger.info("HARD STOP for session %s — session lock released", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
# Mark session suspended so the next message starts fresh
|
||||
# instead of resuming the stuck context (#7536).
|
||||
self.session_store.suspend_session(_quick_key)
|
||||
logger.info("HARD STOP for session %s — suspended, session lock released", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The session is suspended — your next message will start fresh."
|
||||
|
||||
# /reset and /new must bypass the running-agent guard so they
|
||||
# actually dispatch as commands instead of being queued as user
|
||||
@@ -2805,7 +2819,9 @@ class GatewayRunner:
|
||||
# so the agent knows this is a fresh conversation (not an intentional /reset).
|
||||
if getattr(session_entry, 'was_auto_reset', False):
|
||||
reset_reason = getattr(session_entry, 'auto_reset_reason', None) or 'idle'
|
||||
if reset_reason == "daily":
|
||||
if reset_reason == "suspended":
|
||||
context_note = "[System note: The user's previous session was stopped and suspended. This is a fresh conversation with no prior context.]"
|
||||
elif reset_reason == "daily":
|
||||
context_note = "[System note: The user's session was automatically reset by the daily schedule. This is a fresh conversation with no prior context.]"
|
||||
else:
|
||||
context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]"
|
||||
@@ -2822,7 +2838,9 @@ class GatewayRunner:
|
||||
)
|
||||
platform_name = source.platform.value if source.platform else ""
|
||||
had_activity = getattr(session_entry, 'reset_had_activity', False)
|
||||
should_notify = (
|
||||
# Suspended sessions always notify (they were explicitly stopped
|
||||
# or crashed mid-operation) — skip the policy check.
|
||||
should_notify = reset_reason == "suspended" or (
|
||||
policy.notify
|
||||
and had_activity
|
||||
and platform_name not in policy.notify_exclude_platforms
|
||||
@@ -2830,7 +2848,9 @@ class GatewayRunner:
|
||||
if should_notify:
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter:
|
||||
if reset_reason == "daily":
|
||||
if reset_reason == "suspended":
|
||||
reason_text = "previous session was stopped or interrupted"
|
||||
elif reset_reason == "daily":
|
||||
reason_text = f"daily schedule at {policy.at_hour}:00"
|
||||
else:
|
||||
hours = policy.idle_minutes // 60
|
||||
@@ -3913,25 +3933,31 @@ class GatewayRunner:
|
||||
handles /stop before this method is reached. This handler fires
|
||||
only through normal command dispatch (no running agent) or as a
|
||||
fallback. Force-clean the session lock in all cases for safety.
|
||||
|
||||
When there IS a running/pending agent, the session is also marked
|
||||
as *suspended* so the next message starts a fresh session instead
|
||||
of resuming the stuck context (#7536).
|
||||
"""
|
||||
source = event.source
|
||||
session_entry = self.session_store.get_or_create_session(source)
|
||||
session_key = session_entry.session_key
|
||||
|
||||
|
||||
agent = self._running_agents.get(session_key)
|
||||
if agent is _AGENT_PENDING_SENTINEL:
|
||||
# Force-clean the sentinel so the session is unlocked.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — session unlocked."
|
||||
self.session_store.suspend_session(session_key)
|
||||
logger.info("HARD STOP (pending) for session %s — suspended, sentinel cleared", session_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — your next message will start fresh."
|
||||
if agent:
|
||||
agent.interrupt("Stop requested")
|
||||
# Force-clean the session lock so a truly hung agent doesn't
|
||||
# keep it locked forever.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
self.session_store.suspend_session(session_key)
|
||||
return "⚡ Force-stopped. Your next message will start a fresh session."
|
||||
else:
|
||||
return "No active task to stop."
|
||||
|
||||
|
||||
@@ -368,6 +368,11 @@ class SessionEntry:
|
||||
# survives gateway restarts (the old in-memory _pre_flushed_sessions
|
||||
# set was lost on restart, causing redundant re-flushes).
|
||||
memory_flushed: bool = False
|
||||
|
||||
# When True the next call to get_or_create_session() will auto-reset
|
||||
# this session (create a new session_id) so the user starts fresh.
|
||||
# Set by /stop to break stuck-resume loops (#7536).
|
||||
suspended: bool = False
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
result = {
|
||||
@@ -387,6 +392,7 @@ class SessionEntry:
|
||||
"estimated_cost_usd": self.estimated_cost_usd,
|
||||
"cost_status": self.cost_status,
|
||||
"memory_flushed": self.memory_flushed,
|
||||
"suspended": self.suspended,
|
||||
}
|
||||
if self.origin:
|
||||
result["origin"] = self.origin.to_dict()
|
||||
@@ -423,6 +429,7 @@ class SessionEntry:
|
||||
estimated_cost_usd=data.get("estimated_cost_usd", 0.0),
|
||||
cost_status=data.get("cost_status", "unknown"),
|
||||
memory_flushed=data.get("memory_flushed", False),
|
||||
suspended=data.get("suspended", False),
|
||||
)
|
||||
|
||||
|
||||
@@ -698,7 +705,12 @@ class SessionStore:
|
||||
if session_key in self._entries and not force_new:
|
||||
entry = self._entries[session_key]
|
||||
|
||||
reset_reason = self._should_reset(entry, source)
|
||||
# Auto-reset sessions marked as suspended (e.g. after /stop
|
||||
# broke a stuck loop — #7536).
|
||||
if entry.suspended:
|
||||
reset_reason = "suspended"
|
||||
else:
|
||||
reset_reason = self._should_reset(entry, source)
|
||||
if not reset_reason:
|
||||
entry.updated_at = now
|
||||
self._save()
|
||||
@@ -771,6 +783,44 @@ class SessionStore:
|
||||
entry.last_prompt_tokens = last_prompt_tokens
|
||||
self._save()
|
||||
|
||||
def suspend_session(self, session_key: str) -> bool:
|
||||
"""Mark a session as suspended so it auto-resets on next access.
|
||||
|
||||
Used by ``/stop`` to prevent stuck sessions from being resumed
|
||||
after a gateway restart (#7536). Returns True if the session
|
||||
existed and was marked.
|
||||
"""
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
if session_key in self._entries:
|
||||
self._entries[session_key].suspended = True
|
||||
self._save()
|
||||
return True
|
||||
return False
|
||||
|
||||
def suspend_recently_active(self, max_age_seconds: int = 120) -> int:
|
||||
"""Mark recently-active sessions as suspended.
|
||||
|
||||
Called on gateway startup to prevent sessions that were likely
|
||||
in-flight when the gateway last exited from being blindly resumed
|
||||
(#7536). Only suspends sessions updated within *max_age_seconds*
|
||||
to avoid resetting long-idle sessions that are harmless to resume.
|
||||
Returns the number of sessions that were suspended.
|
||||
"""
|
||||
import time as _time
|
||||
|
||||
cutoff = _time.time() - max_age_seconds
|
||||
count = 0
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
for entry in self._entries.values():
|
||||
if not entry.suspended and entry.updated_at >= cutoff:
|
||||
entry.suspended = True
|
||||
count += 1
|
||||
if count:
|
||||
self._save()
|
||||
return count
|
||||
|
||||
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
|
||||
"""Force reset a session, creating a new session ID."""
|
||||
db_end_session_id = None
|
||||
|
||||
51
run_agent.py
51
run_agent.py
@@ -8167,8 +8167,24 @@ class AIAgent:
|
||||
_text_parts.append(getattr(_blk, "text", ""))
|
||||
_trunc_content = "\n".join(_text_parts) if _text_parts else None
|
||||
|
||||
# A response is "thinking exhausted" only when the model
|
||||
# actually produced reasoning blocks but no visible text after
|
||||
# them. Models that do not use <think> tags (e.g. GLM-4.7 on
|
||||
# NVIDIA Build, minimax) may return content=None or an empty
|
||||
# string for unrelated reasons — treat those as normal
|
||||
# truncations that deserve continuation retries, not as
|
||||
# thinking-budget exhaustion.
|
||||
_has_think_tags = bool(
|
||||
_trunc_content and re.search(
|
||||
r'<(?:think|thinking|reasoning|REASONING_SCRATCHPAD)[^>]*>',
|
||||
_trunc_content,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
)
|
||||
_thinking_exhausted = (
|
||||
not _trunc_has_tool_calls and (
|
||||
not _trunc_has_tool_calls
|
||||
and _has_think_tags
|
||||
and (
|
||||
(_trunc_content is not None and not self._has_content_after_think_block(_trunc_content))
|
||||
or _trunc_content is None
|
||||
)
|
||||
@@ -9396,12 +9412,41 @@ class AIAgent:
|
||||
invalid_json_args.append((tc.function.name, str(e)))
|
||||
|
||||
if invalid_json_args:
|
||||
# Check if the invalid JSON is due to truncation rather
|
||||
# than a model formatting mistake. Routers sometimes
|
||||
# rewrite finish_reason from "length" to "tool_calls",
|
||||
# hiding the truncation from the length handler above.
|
||||
# Detect truncation: args that don't end with } or ]
|
||||
# (after stripping whitespace) are cut off mid-stream.
|
||||
_truncated = any(
|
||||
not (tc.function.arguments or "").rstrip().endswith(("}", "]"))
|
||||
for tc in assistant_message.tool_calls
|
||||
if tc.function.name in {n for n, _ in invalid_json_args}
|
||||
)
|
||||
if _truncated:
|
||||
self._vprint(
|
||||
f"{self.log_prefix}⚠️ Truncated tool call arguments detected "
|
||||
f"(finish_reason={finish_reason!r}) — refusing to execute.",
|
||||
force=True,
|
||||
)
|
||||
self._invalid_json_retries = 0
|
||||
self._cleanup_task_resources(effective_task_id)
|
||||
self._persist_session(messages, conversation_history)
|
||||
return {
|
||||
"final_response": None,
|
||||
"messages": messages,
|
||||
"api_calls": api_call_count,
|
||||
"completed": False,
|
||||
"partial": True,
|
||||
"error": "Response truncated due to output length limit",
|
||||
}
|
||||
|
||||
# Track retries for invalid JSON arguments
|
||||
self._invalid_json_retries += 1
|
||||
|
||||
|
||||
tool_name, error_msg = invalid_json_args[0]
|
||||
self._vprint(f"{self.log_prefix}⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}")
|
||||
|
||||
|
||||
if self._invalid_json_retries < 3:
|
||||
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._invalid_json_retries}/3)...")
|
||||
# Don't add anything to messages, just retry the API call
|
||||
|
||||
@@ -409,11 +409,50 @@ class TestChatCompletionsEndpoint:
|
||||
)
|
||||
assert resp.status == 200
|
||||
assert "text/event-stream" in resp.headers.get("Content-Type", "")
|
||||
assert resp.headers.get("X-Accel-Buffering") == "no"
|
||||
body = await resp.text()
|
||||
assert "data: " in body
|
||||
assert "[DONE]" in body
|
||||
assert "Hello!" in body
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_sends_keepalive_during_quiet_tool_gap(self, adapter):
|
||||
"""Idle SSE streams should send keepalive comments while tools run silently."""
|
||||
import asyncio
|
||||
import gateway.platforms.api_server as api_server_mod
|
||||
|
||||
app = _create_app(adapter)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
async def _mock_run_agent(**kwargs):
|
||||
cb = kwargs.get("stream_delta_callback")
|
||||
if cb:
|
||||
cb("Working")
|
||||
await asyncio.sleep(0.65)
|
||||
cb("...done")
|
||||
return (
|
||||
{"final_response": "Working...done", "messages": [], "api_calls": 1},
|
||||
{"input_tokens": 10, "output_tokens": 5, "total_tokens": 15},
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(api_server_mod, "CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS", 0.01),
|
||||
patch.object(adapter, "_run_agent", side_effect=_mock_run_agent),
|
||||
):
|
||||
resp = await cli.post(
|
||||
"/v1/chat/completions",
|
||||
json={
|
||||
"model": "test",
|
||||
"messages": [{"role": "user", "content": "do the thing"}],
|
||||
"stream": True,
|
||||
},
|
||||
)
|
||||
assert resp.status == 200
|
||||
body = await resp.text()
|
||||
assert ": keepalive" in body
|
||||
assert "Working" in body
|
||||
assert "...done" in body
|
||||
assert "[DONE]" in body
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_survives_tool_call_none_sentinel(self, adapter):
|
||||
"""stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream.
|
||||
|
||||
@@ -2087,8 +2087,9 @@ class TestRunConversation:
|
||||
assert "Thinking Budget Exhausted" in result["final_response"]
|
||||
assert "/thinkon" in result["final_response"]
|
||||
|
||||
def test_length_empty_content_detected_as_thinking_exhausted(self, agent):
|
||||
"""When finish_reason='length' and content is None/empty, detect exhaustion."""
|
||||
def test_length_empty_content_without_think_tags_retries_normally(self, agent):
|
||||
"""When finish_reason='length' and content is None but no think tags,
|
||||
fall through to normal continuation retry (not thinking-exhaustion)."""
|
||||
self._setup_agent(agent)
|
||||
resp = _mock_response(content=None, finish_reason="length")
|
||||
agent.client.chat.completions.create.return_value = resp
|
||||
@@ -2100,12 +2101,10 @@ class TestRunConversation:
|
||||
):
|
||||
result = agent.run_conversation("hello")
|
||||
|
||||
# Without think tags, the agent should attempt continuation retries
|
||||
# (up to 3), not immediately fire thinking-exhaustion.
|
||||
assert result["api_calls"] == 3
|
||||
assert result["completed"] is False
|
||||
assert result["api_calls"] == 1
|
||||
assert "reasoning" in result["error"].lower()
|
||||
# User-friendly message is returned
|
||||
assert result["final_response"] is not None
|
||||
assert "Thinking Budget Exhausted" in result["final_response"]
|
||||
|
||||
def test_length_with_tool_calls_returns_partial_without_executing_tools(self, agent):
|
||||
self._setup_agent(agent)
|
||||
@@ -2169,6 +2168,35 @@ class TestRunConversation:
|
||||
mock_hfc.assert_called_once()
|
||||
assert result["final_response"] == "Done!"
|
||||
|
||||
def test_truncated_tool_args_detected_when_finish_reason_not_length(self, agent):
|
||||
"""When a router rewrites finish_reason from 'length' to 'tool_calls',
|
||||
truncated JSON arguments should still be detected and refused rather
|
||||
than wasting 3 retry attempts."""
|
||||
self._setup_agent(agent)
|
||||
agent.valid_tool_names.add("write_file")
|
||||
bad_tc = _mock_tool_call(
|
||||
name="write_file",
|
||||
arguments='{"path":"report.md","content":"partial',
|
||||
call_id="c1",
|
||||
)
|
||||
resp = _mock_response(
|
||||
content="", finish_reason="tool_calls", tool_calls=[bad_tc],
|
||||
)
|
||||
agent.client.chat.completions.create.return_value = resp
|
||||
|
||||
with (
|
||||
patch("run_agent.handle_function_call") as mock_handle_function_call,
|
||||
patch.object(agent, "_persist_session"),
|
||||
patch.object(agent, "_save_trajectory"),
|
||||
patch.object(agent, "_cleanup_task_resources"),
|
||||
):
|
||||
result = agent.run_conversation("write the report")
|
||||
|
||||
assert result["completed"] is False
|
||||
assert result["partial"] is True
|
||||
assert "truncated due to output length limit" in result["error"]
|
||||
mock_handle_function_call.assert_not_called()
|
||||
|
||||
|
||||
class TestRetryExhaustion:
|
||||
"""Regression: retry_count > max_retries was dead code (off-by-one).
|
||||
|
||||
Reference in New Issue
Block a user