mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
1 Commits
opencode-p
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38eef67549 |
107
run_agent.py
107
run_agent.py
@@ -782,6 +782,25 @@ class AIAgent:
|
||||
}
|
||||
|
||||
self._client_kwargs = client_kwargs # stored for rebuilding after interrupt
|
||||
|
||||
# Enable fine-grained tool streaming for Claude on OpenRouter.
|
||||
# Without this, Anthropic buffers the entire tool call and goes
|
||||
# silent for minutes while thinking — OpenRouter's upstream proxy
|
||||
# times out during the silence. The beta header makes Anthropic
|
||||
# stream tool call arguments token-by-token, keeping the
|
||||
# connection alive.
|
||||
_effective_base = str(client_kwargs.get("base_url", "")).lower()
|
||||
if "openrouter" in _effective_base and "claude" in (self.model or "").lower():
|
||||
headers = client_kwargs.get("default_headers") or {}
|
||||
existing_beta = headers.get("x-anthropic-beta", "")
|
||||
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
|
||||
if _FINE_GRAINED not in existing_beta:
|
||||
if existing_beta:
|
||||
headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}"
|
||||
else:
|
||||
headers["x-anthropic-beta"] = _FINE_GRAINED
|
||||
client_kwargs["default_headers"] = headers
|
||||
|
||||
self.api_key = client_kwargs.get("api_key", "")
|
||||
try:
|
||||
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
|
||||
@@ -3994,7 +4013,37 @@ class AIAgent:
|
||||
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
|
||||
)
|
||||
|
||||
if _is_timeout or _is_conn_err:
|
||||
# SSE error events from proxies (e.g. OpenRouter sends
|
||||
# {"error":{"message":"Network connection lost."}}) are
|
||||
# raised as APIError by the OpenAI SDK. These are
|
||||
# semantically identical to httpx connection drops —
|
||||
# the upstream stream died — and should be retried with
|
||||
# a fresh connection. Distinguish from HTTP errors:
|
||||
# APIError from SSE has no status_code, while
|
||||
# APIStatusError (4xx/5xx) always has one.
|
||||
_is_sse_conn_err = False
|
||||
if not _is_timeout and not _is_conn_err:
|
||||
from openai import APIError as _APIError
|
||||
if isinstance(e, _APIError) and not getattr(e, "status_code", None):
|
||||
_err_lower_sse = str(e).lower()
|
||||
_SSE_CONN_PHRASES = (
|
||||
"connection lost",
|
||||
"connection reset",
|
||||
"connection closed",
|
||||
"connection terminated",
|
||||
"network error",
|
||||
"network connection",
|
||||
"terminated",
|
||||
"peer closed",
|
||||
"broken pipe",
|
||||
"upstream connect error",
|
||||
)
|
||||
_is_sse_conn_err = any(
|
||||
phrase in _err_lower_sse
|
||||
for phrase in _SSE_CONN_PHRASES
|
||||
)
|
||||
|
||||
if _is_timeout or _is_conn_err or _is_sse_conn_err:
|
||||
# Transient network / timeout error. Retry the
|
||||
# streaming request with a fresh connection first.
|
||||
if _stream_attempt < _max_stream_retries:
|
||||
@@ -4507,6 +4556,20 @@ class AIAgent:
|
||||
|
||||
if self.max_tokens is not None:
|
||||
api_kwargs.update(self._max_tokens_param(self.max_tokens))
|
||||
elif self._is_openrouter_url() and "claude" in (self.model or "").lower():
|
||||
# OpenRouter translates requests to Anthropic's Messages API,
|
||||
# which requires max_tokens as a mandatory field. When we omit
|
||||
# it, OpenRouter picks a default that can be too low — the model
|
||||
# spends its output budget on thinking and has almost nothing
|
||||
# left for the actual response (especially large tool calls like
|
||||
# write_file). Sending the model's real output limit ensures
|
||||
# full capacity. Other providers handle the default fine.
|
||||
try:
|
||||
from agent.anthropic_adapter import _get_anthropic_max_output
|
||||
_model_output_limit = _get_anthropic_max_output(self.model)
|
||||
api_kwargs["max_tokens"] = _model_output_limit
|
||||
except Exception:
|
||||
pass # fail open — let OpenRouter pick its default
|
||||
|
||||
extra_body = {}
|
||||
|
||||
@@ -6923,6 +6986,36 @@ class AIAgent:
|
||||
_final_summary = self._summarize_api_error(api_error)
|
||||
self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True)
|
||||
self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True)
|
||||
|
||||
# Detect SSE stream-drop pattern (e.g. "Network
|
||||
# connection lost") and surface actionable guidance.
|
||||
# This typically happens when the model generates a
|
||||
# very large tool call (write_file with huge content)
|
||||
# and the proxy/CDN drops the stream mid-response.
|
||||
_is_stream_drop = (
|
||||
not getattr(api_error, "status_code", None)
|
||||
and any(p in error_msg for p in (
|
||||
"connection lost", "connection reset",
|
||||
"connection closed", "network connection",
|
||||
"network error", "terminated",
|
||||
))
|
||||
)
|
||||
if _is_stream_drop:
|
||||
self._vprint(
|
||||
f"{self.log_prefix} 💡 The provider's stream "
|
||||
f"connection keeps dropping. This often happens "
|
||||
f"when the model tries to write a very large "
|
||||
f"file in a single tool call.",
|
||||
force=True,
|
||||
)
|
||||
self._vprint(
|
||||
f"{self.log_prefix} Try asking the model "
|
||||
f"to use execute_code with Python's open() for "
|
||||
f"large files, or to write the file in smaller "
|
||||
f"sections.",
|
||||
force=True,
|
||||
)
|
||||
|
||||
logging.error(
|
||||
"%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s",
|
||||
self.log_prefix, max_retries, _final_summary,
|
||||
@@ -6932,8 +7025,18 @@ class AIAgent:
|
||||
api_kwargs, reason="max_retries_exhausted", error=api_error,
|
||||
)
|
||||
self._persist_session(messages, conversation_history)
|
||||
_final_response = f"API call failed after {max_retries} retries: {_final_summary}"
|
||||
if _is_stream_drop:
|
||||
_final_response += (
|
||||
"\n\nThe provider's stream connection keeps "
|
||||
"dropping — this often happens when generating "
|
||||
"very large tool call responses (e.g. write_file "
|
||||
"with long content). Try asking me to use "
|
||||
"execute_code with Python's open() for large "
|
||||
"files, or to write in smaller sections."
|
||||
)
|
||||
return {
|
||||
"final_response": f"API call failed after {max_retries} retries: {_final_summary}",
|
||||
"final_response": _final_response,
|
||||
"messages": messages,
|
||||
"api_calls": api_call_count,
|
||||
"completed": False,
|
||||
|
||||
@@ -532,6 +532,121 @@ class TestStreamingFallback:
|
||||
mock_non_stream.assert_called_once()
|
||||
assert mock_close.call_count >= 1
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream):
|
||||
"""SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors.
|
||||
|
||||
OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE
|
||||
event when the upstream stream drops. The OpenAI SDK raises APIError from
|
||||
this. It should be retried at the streaming level, same as httpx connection
|
||||
errors, before falling back to non-streaming.
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
import httpx
|
||||
|
||||
# Create an APIError that mimics what the OpenAI SDK raises from SSE error events.
|
||||
# Key: no status_code attribute (unlike APIStatusError which has one).
|
||||
from openai import APIError as OAIAPIError
|
||||
sse_error = OAIAPIError(
|
||||
message="Network connection lost.",
|
||||
request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"),
|
||||
body={"message": "Network connection lost."},
|
||||
)
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.side_effect = sse_error
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback after SSE retries",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback after SSE retries"
|
||||
# Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts)
|
||||
# before falling back to non-streaming
|
||||
assert mock_client.chat.completions.create.call_count == 3
|
||||
mock_non_stream.assert_called_once()
|
||||
# Connection cleanup should happen for each failed retry
|
||||
assert mock_close.call_count >= 2
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream):
|
||||
"""SSE errors that aren't connection-related still fall back immediately (no stream retry)."""
|
||||
from run_agent import AIAgent
|
||||
import httpx
|
||||
|
||||
from openai import APIError as OAIAPIError
|
||||
sse_error = OAIAPIError(
|
||||
message="Invalid model configuration.",
|
||||
request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"),
|
||||
body={"message": "Invalid model configuration."},
|
||||
)
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.side_effect = sse_error
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback no retry",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback no retry"
|
||||
# Should NOT retry — goes straight to non-streaming fallback
|
||||
assert mock_client.chat.completions.create.call_count == 1
|
||||
mock_non_stream.assert_called_once()
|
||||
|
||||
|
||||
# ── Test: Reasoning Streaming ────────────────────────────────────────────
|
||||
|
||||
|
||||
Reference in New Issue
Block a user