mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-06 10:47:12 +08:00
Compare commits
5 Commits
dependabot
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58cea98774 | ||
|
|
46db738eb4 | ||
|
|
c9045bcbfb | ||
|
|
9abd8b27a5 | ||
|
|
5be2162922 |
@@ -53,6 +53,7 @@ DEFAULT_HOST = "127.0.0.1"
|
|||||||
DEFAULT_PORT = 8642
|
DEFAULT_PORT = 8642
|
||||||
MAX_STORED_RESPONSES = 100
|
MAX_STORED_RESPONSES = 100
|
||||||
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
|
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
|
||||||
|
CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS = 30.0
|
||||||
|
|
||||||
|
|
||||||
def check_api_server_requirements() -> bool:
|
def check_api_server_requirements() -> bool:
|
||||||
@@ -762,7 +763,11 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||||||
"""
|
"""
|
||||||
import queue as _q
|
import queue as _q
|
||||||
|
|
||||||
sse_headers = {"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}
|
sse_headers = {
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
}
|
||||||
# CORS middleware can't inject headers into StreamResponse after
|
# CORS middleware can't inject headers into StreamResponse after
|
||||||
# prepare() flushes them, so resolve CORS headers up front.
|
# prepare() flushes them, so resolve CORS headers up front.
|
||||||
origin = request.headers.get("Origin", "")
|
origin = request.headers.get("Origin", "")
|
||||||
@@ -775,6 +780,8 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||||||
await response.prepare(request)
|
await response.prepare(request)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
last_activity = time.monotonic()
|
||||||
|
|
||||||
# Role chunk
|
# Role chunk
|
||||||
role_chunk = {
|
role_chunk = {
|
||||||
"id": completion_id, "object": "chat.completion.chunk",
|
"id": completion_id, "object": "chat.completion.chunk",
|
||||||
@@ -782,6 +789,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||||||
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
||||||
}
|
}
|
||||||
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
||||||
|
last_activity = time.monotonic()
|
||||||
|
|
||||||
# Helper — route a queue item to the correct SSE event.
|
# Helper — route a queue item to the correct SSE event.
|
||||||
async def _emit(item):
|
async def _emit(item):
|
||||||
@@ -805,6 +813,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||||||
"choices": [{"index": 0, "delta": {"content": item}, "finish_reason": None}],
|
"choices": [{"index": 0, "delta": {"content": item}, "finish_reason": None}],
|
||||||
}
|
}
|
||||||
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
||||||
|
return time.monotonic()
|
||||||
|
|
||||||
# Stream content chunks as they arrive from the agent
|
# Stream content chunks as they arrive from the agent
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
@@ -819,16 +828,19 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||||||
delta = stream_q.get_nowait()
|
delta = stream_q.get_nowait()
|
||||||
if delta is None:
|
if delta is None:
|
||||||
break
|
break
|
||||||
await _emit(delta)
|
last_activity = await _emit(delta)
|
||||||
except _q.Empty:
|
except _q.Empty:
|
||||||
break
|
break
|
||||||
break
|
break
|
||||||
|
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
|
||||||
|
await response.write(b": keepalive\n\n")
|
||||||
|
last_activity = time.monotonic()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if delta is None: # End of stream sentinel
|
if delta is None: # End of stream sentinel
|
||||||
break
|
break
|
||||||
|
|
||||||
await _emit(delta)
|
last_activity = await _emit(delta)
|
||||||
|
|
||||||
# Get usage from completed agent
|
# Get usage from completed agent
|
||||||
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
||||||
|
|||||||
@@ -1465,7 +1465,18 @@ class GatewayRunner:
|
|||||||
logger.info("Recovered %s background process(es) from previous run", recovered)
|
logger.info("Recovered %s background process(es) from previous run", recovered)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Process checkpoint recovery: %s", e)
|
logger.warning("Process checkpoint recovery: %s", e)
|
||||||
|
|
||||||
|
# Suspend sessions that were active when the gateway last exited.
|
||||||
|
# This prevents stuck sessions from being blindly resumed on restart,
|
||||||
|
# which can create an unrecoverable loop (#7536). Suspended sessions
|
||||||
|
# auto-reset on the next incoming message, giving the user a clean start.
|
||||||
|
try:
|
||||||
|
suspended = self.session_store.suspend_recently_active()
|
||||||
|
if suspended:
|
||||||
|
logger.info("Suspended %d in-flight session(s) from previous run", suspended)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Session suspension on startup failed: %s", e)
|
||||||
|
|
||||||
connected_count = 0
|
connected_count = 0
|
||||||
enabled_platform_count = 0
|
enabled_platform_count = 0
|
||||||
startup_nonretryable_errors: list[str] = []
|
startup_nonretryable_errors: list[str] = []
|
||||||
@@ -2370,8 +2381,11 @@ class GatewayRunner:
|
|||||||
self._pending_messages.pop(_quick_key, None)
|
self._pending_messages.pop(_quick_key, None)
|
||||||
if _quick_key in self._running_agents:
|
if _quick_key in self._running_agents:
|
||||||
del self._running_agents[_quick_key]
|
del self._running_agents[_quick_key]
|
||||||
logger.info("HARD STOP for session %s — session lock released", _quick_key[:20])
|
# Mark session suspended so the next message starts fresh
|
||||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
# instead of resuming the stuck context (#7536).
|
||||||
|
self.session_store.suspend_session(_quick_key)
|
||||||
|
logger.info("HARD STOP for session %s — suspended, session lock released", _quick_key[:20])
|
||||||
|
return "⚡ Force-stopped. The session is suspended — your next message will start fresh."
|
||||||
|
|
||||||
# /reset and /new must bypass the running-agent guard so they
|
# /reset and /new must bypass the running-agent guard so they
|
||||||
# actually dispatch as commands instead of being queued as user
|
# actually dispatch as commands instead of being queued as user
|
||||||
@@ -2805,7 +2819,9 @@ class GatewayRunner:
|
|||||||
# so the agent knows this is a fresh conversation (not an intentional /reset).
|
# so the agent knows this is a fresh conversation (not an intentional /reset).
|
||||||
if getattr(session_entry, 'was_auto_reset', False):
|
if getattr(session_entry, 'was_auto_reset', False):
|
||||||
reset_reason = getattr(session_entry, 'auto_reset_reason', None) or 'idle'
|
reset_reason = getattr(session_entry, 'auto_reset_reason', None) or 'idle'
|
||||||
if reset_reason == "daily":
|
if reset_reason == "suspended":
|
||||||
|
context_note = "[System note: The user's previous session was stopped and suspended. This is a fresh conversation with no prior context.]"
|
||||||
|
elif reset_reason == "daily":
|
||||||
context_note = "[System note: The user's session was automatically reset by the daily schedule. This is a fresh conversation with no prior context.]"
|
context_note = "[System note: The user's session was automatically reset by the daily schedule. This is a fresh conversation with no prior context.]"
|
||||||
else:
|
else:
|
||||||
context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]"
|
context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]"
|
||||||
@@ -2822,7 +2838,9 @@ class GatewayRunner:
|
|||||||
)
|
)
|
||||||
platform_name = source.platform.value if source.platform else ""
|
platform_name = source.platform.value if source.platform else ""
|
||||||
had_activity = getattr(session_entry, 'reset_had_activity', False)
|
had_activity = getattr(session_entry, 'reset_had_activity', False)
|
||||||
should_notify = (
|
# Suspended sessions always notify (they were explicitly stopped
|
||||||
|
# or crashed mid-operation) — skip the policy check.
|
||||||
|
should_notify = reset_reason == "suspended" or (
|
||||||
policy.notify
|
policy.notify
|
||||||
and had_activity
|
and had_activity
|
||||||
and platform_name not in policy.notify_exclude_platforms
|
and platform_name not in policy.notify_exclude_platforms
|
||||||
@@ -2830,7 +2848,9 @@ class GatewayRunner:
|
|||||||
if should_notify:
|
if should_notify:
|
||||||
adapter = self.adapters.get(source.platform)
|
adapter = self.adapters.get(source.platform)
|
||||||
if adapter:
|
if adapter:
|
||||||
if reset_reason == "daily":
|
if reset_reason == "suspended":
|
||||||
|
reason_text = "previous session was stopped or interrupted"
|
||||||
|
elif reset_reason == "daily":
|
||||||
reason_text = f"daily schedule at {policy.at_hour}:00"
|
reason_text = f"daily schedule at {policy.at_hour}:00"
|
||||||
else:
|
else:
|
||||||
hours = policy.idle_minutes // 60
|
hours = policy.idle_minutes // 60
|
||||||
@@ -3913,25 +3933,31 @@ class GatewayRunner:
|
|||||||
handles /stop before this method is reached. This handler fires
|
handles /stop before this method is reached. This handler fires
|
||||||
only through normal command dispatch (no running agent) or as a
|
only through normal command dispatch (no running agent) or as a
|
||||||
fallback. Force-clean the session lock in all cases for safety.
|
fallback. Force-clean the session lock in all cases for safety.
|
||||||
|
|
||||||
|
When there IS a running/pending agent, the session is also marked
|
||||||
|
as *suspended* so the next message starts a fresh session instead
|
||||||
|
of resuming the stuck context (#7536).
|
||||||
"""
|
"""
|
||||||
source = event.source
|
source = event.source
|
||||||
session_entry = self.session_store.get_or_create_session(source)
|
session_entry = self.session_store.get_or_create_session(source)
|
||||||
session_key = session_entry.session_key
|
session_key = session_entry.session_key
|
||||||
|
|
||||||
agent = self._running_agents.get(session_key)
|
agent = self._running_agents.get(session_key)
|
||||||
if agent is _AGENT_PENDING_SENTINEL:
|
if agent is _AGENT_PENDING_SENTINEL:
|
||||||
# Force-clean the sentinel so the session is unlocked.
|
# Force-clean the sentinel so the session is unlocked.
|
||||||
if session_key in self._running_agents:
|
if session_key in self._running_agents:
|
||||||
del self._running_agents[session_key]
|
del self._running_agents[session_key]
|
||||||
logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20])
|
self.session_store.suspend_session(session_key)
|
||||||
return "⚡ Force-stopped. The agent was still starting — session unlocked."
|
logger.info("HARD STOP (pending) for session %s — suspended, sentinel cleared", session_key[:20])
|
||||||
|
return "⚡ Force-stopped. The agent was still starting — your next message will start fresh."
|
||||||
if agent:
|
if agent:
|
||||||
agent.interrupt("Stop requested")
|
agent.interrupt("Stop requested")
|
||||||
# Force-clean the session lock so a truly hung agent doesn't
|
# Force-clean the session lock so a truly hung agent doesn't
|
||||||
# keep it locked forever.
|
# keep it locked forever.
|
||||||
if session_key in self._running_agents:
|
if session_key in self._running_agents:
|
||||||
del self._running_agents[session_key]
|
del self._running_agents[session_key]
|
||||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
self.session_store.suspend_session(session_key)
|
||||||
|
return "⚡ Force-stopped. Your next message will start a fresh session."
|
||||||
else:
|
else:
|
||||||
return "No active task to stop."
|
return "No active task to stop."
|
||||||
|
|
||||||
|
|||||||
@@ -368,6 +368,11 @@ class SessionEntry:
|
|||||||
# survives gateway restarts (the old in-memory _pre_flushed_sessions
|
# survives gateway restarts (the old in-memory _pre_flushed_sessions
|
||||||
# set was lost on restart, causing redundant re-flushes).
|
# set was lost on restart, causing redundant re-flushes).
|
||||||
memory_flushed: bool = False
|
memory_flushed: bool = False
|
||||||
|
|
||||||
|
# When True the next call to get_or_create_session() will auto-reset
|
||||||
|
# this session (create a new session_id) so the user starts fresh.
|
||||||
|
# Set by /stop to break stuck-resume loops (#7536).
|
||||||
|
suspended: bool = False
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
result = {
|
result = {
|
||||||
@@ -387,6 +392,7 @@ class SessionEntry:
|
|||||||
"estimated_cost_usd": self.estimated_cost_usd,
|
"estimated_cost_usd": self.estimated_cost_usd,
|
||||||
"cost_status": self.cost_status,
|
"cost_status": self.cost_status,
|
||||||
"memory_flushed": self.memory_flushed,
|
"memory_flushed": self.memory_flushed,
|
||||||
|
"suspended": self.suspended,
|
||||||
}
|
}
|
||||||
if self.origin:
|
if self.origin:
|
||||||
result["origin"] = self.origin.to_dict()
|
result["origin"] = self.origin.to_dict()
|
||||||
@@ -423,6 +429,7 @@ class SessionEntry:
|
|||||||
estimated_cost_usd=data.get("estimated_cost_usd", 0.0),
|
estimated_cost_usd=data.get("estimated_cost_usd", 0.0),
|
||||||
cost_status=data.get("cost_status", "unknown"),
|
cost_status=data.get("cost_status", "unknown"),
|
||||||
memory_flushed=data.get("memory_flushed", False),
|
memory_flushed=data.get("memory_flushed", False),
|
||||||
|
suspended=data.get("suspended", False),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -698,7 +705,12 @@ class SessionStore:
|
|||||||
if session_key in self._entries and not force_new:
|
if session_key in self._entries and not force_new:
|
||||||
entry = self._entries[session_key]
|
entry = self._entries[session_key]
|
||||||
|
|
||||||
reset_reason = self._should_reset(entry, source)
|
# Auto-reset sessions marked as suspended (e.g. after /stop
|
||||||
|
# broke a stuck loop — #7536).
|
||||||
|
if entry.suspended:
|
||||||
|
reset_reason = "suspended"
|
||||||
|
else:
|
||||||
|
reset_reason = self._should_reset(entry, source)
|
||||||
if not reset_reason:
|
if not reset_reason:
|
||||||
entry.updated_at = now
|
entry.updated_at = now
|
||||||
self._save()
|
self._save()
|
||||||
@@ -771,6 +783,44 @@ class SessionStore:
|
|||||||
entry.last_prompt_tokens = last_prompt_tokens
|
entry.last_prompt_tokens = last_prompt_tokens
|
||||||
self._save()
|
self._save()
|
||||||
|
|
||||||
|
def suspend_session(self, session_key: str) -> bool:
|
||||||
|
"""Mark a session as suspended so it auto-resets on next access.
|
||||||
|
|
||||||
|
Used by ``/stop`` to prevent stuck sessions from being resumed
|
||||||
|
after a gateway restart (#7536). Returns True if the session
|
||||||
|
existed and was marked.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._ensure_loaded_locked()
|
||||||
|
if session_key in self._entries:
|
||||||
|
self._entries[session_key].suspended = True
|
||||||
|
self._save()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def suspend_recently_active(self, max_age_seconds: int = 120) -> int:
|
||||||
|
"""Mark recently-active sessions as suspended.
|
||||||
|
|
||||||
|
Called on gateway startup to prevent sessions that were likely
|
||||||
|
in-flight when the gateway last exited from being blindly resumed
|
||||||
|
(#7536). Only suspends sessions updated within *max_age_seconds*
|
||||||
|
to avoid resetting long-idle sessions that are harmless to resume.
|
||||||
|
Returns the number of sessions that were suspended.
|
||||||
|
"""
|
||||||
|
import time as _time
|
||||||
|
|
||||||
|
cutoff = _time.time() - max_age_seconds
|
||||||
|
count = 0
|
||||||
|
with self._lock:
|
||||||
|
self._ensure_loaded_locked()
|
||||||
|
for entry in self._entries.values():
|
||||||
|
if not entry.suspended and entry.updated_at >= cutoff:
|
||||||
|
entry.suspended = True
|
||||||
|
count += 1
|
||||||
|
if count:
|
||||||
|
self._save()
|
||||||
|
return count
|
||||||
|
|
||||||
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
|
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
|
||||||
"""Force reset a session, creating a new session ID."""
|
"""Force reset a session, creating a new session ID."""
|
||||||
db_end_session_id = None
|
db_end_session_id = None
|
||||||
|
|||||||
51
run_agent.py
51
run_agent.py
@@ -8167,8 +8167,24 @@ class AIAgent:
|
|||||||
_text_parts.append(getattr(_blk, "text", ""))
|
_text_parts.append(getattr(_blk, "text", ""))
|
||||||
_trunc_content = "\n".join(_text_parts) if _text_parts else None
|
_trunc_content = "\n".join(_text_parts) if _text_parts else None
|
||||||
|
|
||||||
|
# A response is "thinking exhausted" only when the model
|
||||||
|
# actually produced reasoning blocks but no visible text after
|
||||||
|
# them. Models that do not use <think> tags (e.g. GLM-4.7 on
|
||||||
|
# NVIDIA Build, minimax) may return content=None or an empty
|
||||||
|
# string for unrelated reasons — treat those as normal
|
||||||
|
# truncations that deserve continuation retries, not as
|
||||||
|
# thinking-budget exhaustion.
|
||||||
|
_has_think_tags = bool(
|
||||||
|
_trunc_content and re.search(
|
||||||
|
r'<(?:think|thinking|reasoning|REASONING_SCRATCHPAD)[^>]*>',
|
||||||
|
_trunc_content,
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
)
|
||||||
_thinking_exhausted = (
|
_thinking_exhausted = (
|
||||||
not _trunc_has_tool_calls and (
|
not _trunc_has_tool_calls
|
||||||
|
and _has_think_tags
|
||||||
|
and (
|
||||||
(_trunc_content is not None and not self._has_content_after_think_block(_trunc_content))
|
(_trunc_content is not None and not self._has_content_after_think_block(_trunc_content))
|
||||||
or _trunc_content is None
|
or _trunc_content is None
|
||||||
)
|
)
|
||||||
@@ -9396,12 +9412,41 @@ class AIAgent:
|
|||||||
invalid_json_args.append((tc.function.name, str(e)))
|
invalid_json_args.append((tc.function.name, str(e)))
|
||||||
|
|
||||||
if invalid_json_args:
|
if invalid_json_args:
|
||||||
|
# Check if the invalid JSON is due to truncation rather
|
||||||
|
# than a model formatting mistake. Routers sometimes
|
||||||
|
# rewrite finish_reason from "length" to "tool_calls",
|
||||||
|
# hiding the truncation from the length handler above.
|
||||||
|
# Detect truncation: args that don't end with } or ]
|
||||||
|
# (after stripping whitespace) are cut off mid-stream.
|
||||||
|
_truncated = any(
|
||||||
|
not (tc.function.arguments or "").rstrip().endswith(("}", "]"))
|
||||||
|
for tc in assistant_message.tool_calls
|
||||||
|
if tc.function.name in {n for n, _ in invalid_json_args}
|
||||||
|
)
|
||||||
|
if _truncated:
|
||||||
|
self._vprint(
|
||||||
|
f"{self.log_prefix}⚠️ Truncated tool call arguments detected "
|
||||||
|
f"(finish_reason={finish_reason!r}) — refusing to execute.",
|
||||||
|
force=True,
|
||||||
|
)
|
||||||
|
self._invalid_json_retries = 0
|
||||||
|
self._cleanup_task_resources(effective_task_id)
|
||||||
|
self._persist_session(messages, conversation_history)
|
||||||
|
return {
|
||||||
|
"final_response": None,
|
||||||
|
"messages": messages,
|
||||||
|
"api_calls": api_call_count,
|
||||||
|
"completed": False,
|
||||||
|
"partial": True,
|
||||||
|
"error": "Response truncated due to output length limit",
|
||||||
|
}
|
||||||
|
|
||||||
# Track retries for invalid JSON arguments
|
# Track retries for invalid JSON arguments
|
||||||
self._invalid_json_retries += 1
|
self._invalid_json_retries += 1
|
||||||
|
|
||||||
tool_name, error_msg = invalid_json_args[0]
|
tool_name, error_msg = invalid_json_args[0]
|
||||||
self._vprint(f"{self.log_prefix}⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}")
|
self._vprint(f"{self.log_prefix}⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}")
|
||||||
|
|
||||||
if self._invalid_json_retries < 3:
|
if self._invalid_json_retries < 3:
|
||||||
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._invalid_json_retries}/3)...")
|
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._invalid_json_retries}/3)...")
|
||||||
# Don't add anything to messages, just retry the API call
|
# Don't add anything to messages, just retry the API call
|
||||||
|
|||||||
@@ -409,11 +409,50 @@ class TestChatCompletionsEndpoint:
|
|||||||
)
|
)
|
||||||
assert resp.status == 200
|
assert resp.status == 200
|
||||||
assert "text/event-stream" in resp.headers.get("Content-Type", "")
|
assert "text/event-stream" in resp.headers.get("Content-Type", "")
|
||||||
|
assert resp.headers.get("X-Accel-Buffering") == "no"
|
||||||
body = await resp.text()
|
body = await resp.text()
|
||||||
assert "data: " in body
|
assert "data: " in body
|
||||||
assert "[DONE]" in body
|
assert "[DONE]" in body
|
||||||
assert "Hello!" in body
|
assert "Hello!" in body
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stream_sends_keepalive_during_quiet_tool_gap(self, adapter):
|
||||||
|
"""Idle SSE streams should send keepalive comments while tools run silently."""
|
||||||
|
import asyncio
|
||||||
|
import gateway.platforms.api_server as api_server_mod
|
||||||
|
|
||||||
|
app = _create_app(adapter)
|
||||||
|
async with TestClient(TestServer(app)) as cli:
|
||||||
|
async def _mock_run_agent(**kwargs):
|
||||||
|
cb = kwargs.get("stream_delta_callback")
|
||||||
|
if cb:
|
||||||
|
cb("Working")
|
||||||
|
await asyncio.sleep(0.65)
|
||||||
|
cb("...done")
|
||||||
|
return (
|
||||||
|
{"final_response": "Working...done", "messages": [], "api_calls": 1},
|
||||||
|
{"input_tokens": 10, "output_tokens": 5, "total_tokens": 15},
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(api_server_mod, "CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS", 0.01),
|
||||||
|
patch.object(adapter, "_run_agent", side_effect=_mock_run_agent),
|
||||||
|
):
|
||||||
|
resp = await cli.post(
|
||||||
|
"/v1/chat/completions",
|
||||||
|
json={
|
||||||
|
"model": "test",
|
||||||
|
"messages": [{"role": "user", "content": "do the thing"}],
|
||||||
|
"stream": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert resp.status == 200
|
||||||
|
body = await resp.text()
|
||||||
|
assert ": keepalive" in body
|
||||||
|
assert "Working" in body
|
||||||
|
assert "...done" in body
|
||||||
|
assert "[DONE]" in body
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_stream_survives_tool_call_none_sentinel(self, adapter):
|
async def test_stream_survives_tool_call_none_sentinel(self, adapter):
|
||||||
"""stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream.
|
"""stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream.
|
||||||
|
|||||||
@@ -2087,8 +2087,9 @@ class TestRunConversation:
|
|||||||
assert "Thinking Budget Exhausted" in result["final_response"]
|
assert "Thinking Budget Exhausted" in result["final_response"]
|
||||||
assert "/thinkon" in result["final_response"]
|
assert "/thinkon" in result["final_response"]
|
||||||
|
|
||||||
def test_length_empty_content_detected_as_thinking_exhausted(self, agent):
|
def test_length_empty_content_without_think_tags_retries_normally(self, agent):
|
||||||
"""When finish_reason='length' and content is None/empty, detect exhaustion."""
|
"""When finish_reason='length' and content is None but no think tags,
|
||||||
|
fall through to normal continuation retry (not thinking-exhaustion)."""
|
||||||
self._setup_agent(agent)
|
self._setup_agent(agent)
|
||||||
resp = _mock_response(content=None, finish_reason="length")
|
resp = _mock_response(content=None, finish_reason="length")
|
||||||
agent.client.chat.completions.create.return_value = resp
|
agent.client.chat.completions.create.return_value = resp
|
||||||
@@ -2100,12 +2101,10 @@ class TestRunConversation:
|
|||||||
):
|
):
|
||||||
result = agent.run_conversation("hello")
|
result = agent.run_conversation("hello")
|
||||||
|
|
||||||
|
# Without think tags, the agent should attempt continuation retries
|
||||||
|
# (up to 3), not immediately fire thinking-exhaustion.
|
||||||
|
assert result["api_calls"] == 3
|
||||||
assert result["completed"] is False
|
assert result["completed"] is False
|
||||||
assert result["api_calls"] == 1
|
|
||||||
assert "reasoning" in result["error"].lower()
|
|
||||||
# User-friendly message is returned
|
|
||||||
assert result["final_response"] is not None
|
|
||||||
assert "Thinking Budget Exhausted" in result["final_response"]
|
|
||||||
|
|
||||||
def test_length_with_tool_calls_returns_partial_without_executing_tools(self, agent):
|
def test_length_with_tool_calls_returns_partial_without_executing_tools(self, agent):
|
||||||
self._setup_agent(agent)
|
self._setup_agent(agent)
|
||||||
@@ -2169,6 +2168,35 @@ class TestRunConversation:
|
|||||||
mock_hfc.assert_called_once()
|
mock_hfc.assert_called_once()
|
||||||
assert result["final_response"] == "Done!"
|
assert result["final_response"] == "Done!"
|
||||||
|
|
||||||
|
def test_truncated_tool_args_detected_when_finish_reason_not_length(self, agent):
|
||||||
|
"""When a router rewrites finish_reason from 'length' to 'tool_calls',
|
||||||
|
truncated JSON arguments should still be detected and refused rather
|
||||||
|
than wasting 3 retry attempts."""
|
||||||
|
self._setup_agent(agent)
|
||||||
|
agent.valid_tool_names.add("write_file")
|
||||||
|
bad_tc = _mock_tool_call(
|
||||||
|
name="write_file",
|
||||||
|
arguments='{"path":"report.md","content":"partial',
|
||||||
|
call_id="c1",
|
||||||
|
)
|
||||||
|
resp = _mock_response(
|
||||||
|
content="", finish_reason="tool_calls", tool_calls=[bad_tc],
|
||||||
|
)
|
||||||
|
agent.client.chat.completions.create.return_value = resp
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("run_agent.handle_function_call") as mock_handle_function_call,
|
||||||
|
patch.object(agent, "_persist_session"),
|
||||||
|
patch.object(agent, "_save_trajectory"),
|
||||||
|
patch.object(agent, "_cleanup_task_resources"),
|
||||||
|
):
|
||||||
|
result = agent.run_conversation("write the report")
|
||||||
|
|
||||||
|
assert result["completed"] is False
|
||||||
|
assert result["partial"] is True
|
||||||
|
assert "truncated due to output length limit" in result["error"]
|
||||||
|
mock_handle_function_call.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
class TestRetryExhaustion:
|
class TestRetryExhaustion:
|
||||||
"""Regression: retry_count > max_retries was dead code (off-by-one).
|
"""Regression: retry_count > max_retries was dead code (off-by-one).
|
||||||
|
|||||||
Reference in New Issue
Block a user