mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
Compare commits
1 Commits
opencode-p
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38eef67549 |
107
run_agent.py
107
run_agent.py
@@ -782,6 +782,25 @@ class AIAgent:
|
|||||||
}
|
}
|
||||||
|
|
||||||
self._client_kwargs = client_kwargs # stored for rebuilding after interrupt
|
self._client_kwargs = client_kwargs # stored for rebuilding after interrupt
|
||||||
|
|
||||||
|
# Enable fine-grained tool streaming for Claude on OpenRouter.
|
||||||
|
# Without this, Anthropic buffers the entire tool call and goes
|
||||||
|
# silent for minutes while thinking — OpenRouter's upstream proxy
|
||||||
|
# times out during the silence. The beta header makes Anthropic
|
||||||
|
# stream tool call arguments token-by-token, keeping the
|
||||||
|
# connection alive.
|
||||||
|
_effective_base = str(client_kwargs.get("base_url", "")).lower()
|
||||||
|
if "openrouter" in _effective_base and "claude" in (self.model or "").lower():
|
||||||
|
headers = client_kwargs.get("default_headers") or {}
|
||||||
|
existing_beta = headers.get("x-anthropic-beta", "")
|
||||||
|
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
|
||||||
|
if _FINE_GRAINED not in existing_beta:
|
||||||
|
if existing_beta:
|
||||||
|
headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}"
|
||||||
|
else:
|
||||||
|
headers["x-anthropic-beta"] = _FINE_GRAINED
|
||||||
|
client_kwargs["default_headers"] = headers
|
||||||
|
|
||||||
self.api_key = client_kwargs.get("api_key", "")
|
self.api_key = client_kwargs.get("api_key", "")
|
||||||
try:
|
try:
|
||||||
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
|
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
|
||||||
@@ -3994,7 +4013,37 @@ class AIAgent:
|
|||||||
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
|
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
|
||||||
)
|
)
|
||||||
|
|
||||||
if _is_timeout or _is_conn_err:
|
# SSE error events from proxies (e.g. OpenRouter sends
|
||||||
|
# {"error":{"message":"Network connection lost."}}) are
|
||||||
|
# raised as APIError by the OpenAI SDK. These are
|
||||||
|
# semantically identical to httpx connection drops —
|
||||||
|
# the upstream stream died — and should be retried with
|
||||||
|
# a fresh connection. Distinguish from HTTP errors:
|
||||||
|
# APIError from SSE has no status_code, while
|
||||||
|
# APIStatusError (4xx/5xx) always has one.
|
||||||
|
_is_sse_conn_err = False
|
||||||
|
if not _is_timeout and not _is_conn_err:
|
||||||
|
from openai import APIError as _APIError
|
||||||
|
if isinstance(e, _APIError) and not getattr(e, "status_code", None):
|
||||||
|
_err_lower_sse = str(e).lower()
|
||||||
|
_SSE_CONN_PHRASES = (
|
||||||
|
"connection lost",
|
||||||
|
"connection reset",
|
||||||
|
"connection closed",
|
||||||
|
"connection terminated",
|
||||||
|
"network error",
|
||||||
|
"network connection",
|
||||||
|
"terminated",
|
||||||
|
"peer closed",
|
||||||
|
"broken pipe",
|
||||||
|
"upstream connect error",
|
||||||
|
)
|
||||||
|
_is_sse_conn_err = any(
|
||||||
|
phrase in _err_lower_sse
|
||||||
|
for phrase in _SSE_CONN_PHRASES
|
||||||
|
)
|
||||||
|
|
||||||
|
if _is_timeout or _is_conn_err or _is_sse_conn_err:
|
||||||
# Transient network / timeout error. Retry the
|
# Transient network / timeout error. Retry the
|
||||||
# streaming request with a fresh connection first.
|
# streaming request with a fresh connection first.
|
||||||
if _stream_attempt < _max_stream_retries:
|
if _stream_attempt < _max_stream_retries:
|
||||||
@@ -4507,6 +4556,20 @@ class AIAgent:
|
|||||||
|
|
||||||
if self.max_tokens is not None:
|
if self.max_tokens is not None:
|
||||||
api_kwargs.update(self._max_tokens_param(self.max_tokens))
|
api_kwargs.update(self._max_tokens_param(self.max_tokens))
|
||||||
|
elif self._is_openrouter_url() and "claude" in (self.model or "").lower():
|
||||||
|
# OpenRouter translates requests to Anthropic's Messages API,
|
||||||
|
# which requires max_tokens as a mandatory field. When we omit
|
||||||
|
# it, OpenRouter picks a default that can be too low — the model
|
||||||
|
# spends its output budget on thinking and has almost nothing
|
||||||
|
# left for the actual response (especially large tool calls like
|
||||||
|
# write_file). Sending the model's real output limit ensures
|
||||||
|
# full capacity. Other providers handle the default fine.
|
||||||
|
try:
|
||||||
|
from agent.anthropic_adapter import _get_anthropic_max_output
|
||||||
|
_model_output_limit = _get_anthropic_max_output(self.model)
|
||||||
|
api_kwargs["max_tokens"] = _model_output_limit
|
||||||
|
except Exception:
|
||||||
|
pass # fail open — let OpenRouter pick its default
|
||||||
|
|
||||||
extra_body = {}
|
extra_body = {}
|
||||||
|
|
||||||
@@ -6923,6 +6986,36 @@ class AIAgent:
|
|||||||
_final_summary = self._summarize_api_error(api_error)
|
_final_summary = self._summarize_api_error(api_error)
|
||||||
self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True)
|
self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True)
|
||||||
self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True)
|
self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True)
|
||||||
|
|
||||||
|
# Detect SSE stream-drop pattern (e.g. "Network
|
||||||
|
# connection lost") and surface actionable guidance.
|
||||||
|
# This typically happens when the model generates a
|
||||||
|
# very large tool call (write_file with huge content)
|
||||||
|
# and the proxy/CDN drops the stream mid-response.
|
||||||
|
_is_stream_drop = (
|
||||||
|
not getattr(api_error, "status_code", None)
|
||||||
|
and any(p in error_msg for p in (
|
||||||
|
"connection lost", "connection reset",
|
||||||
|
"connection closed", "network connection",
|
||||||
|
"network error", "terminated",
|
||||||
|
))
|
||||||
|
)
|
||||||
|
if _is_stream_drop:
|
||||||
|
self._vprint(
|
||||||
|
f"{self.log_prefix} 💡 The provider's stream "
|
||||||
|
f"connection keeps dropping. This often happens "
|
||||||
|
f"when the model tries to write a very large "
|
||||||
|
f"file in a single tool call.",
|
||||||
|
force=True,
|
||||||
|
)
|
||||||
|
self._vprint(
|
||||||
|
f"{self.log_prefix} Try asking the model "
|
||||||
|
f"to use execute_code with Python's open() for "
|
||||||
|
f"large files, or to write the file in smaller "
|
||||||
|
f"sections.",
|
||||||
|
force=True,
|
||||||
|
)
|
||||||
|
|
||||||
logging.error(
|
logging.error(
|
||||||
"%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s",
|
"%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s",
|
||||||
self.log_prefix, max_retries, _final_summary,
|
self.log_prefix, max_retries, _final_summary,
|
||||||
@@ -6932,8 +7025,18 @@ class AIAgent:
|
|||||||
api_kwargs, reason="max_retries_exhausted", error=api_error,
|
api_kwargs, reason="max_retries_exhausted", error=api_error,
|
||||||
)
|
)
|
||||||
self._persist_session(messages, conversation_history)
|
self._persist_session(messages, conversation_history)
|
||||||
|
_final_response = f"API call failed after {max_retries} retries: {_final_summary}"
|
||||||
|
if _is_stream_drop:
|
||||||
|
_final_response += (
|
||||||
|
"\n\nThe provider's stream connection keeps "
|
||||||
|
"dropping — this often happens when generating "
|
||||||
|
"very large tool call responses (e.g. write_file "
|
||||||
|
"with long content). Try asking me to use "
|
||||||
|
"execute_code with Python's open() for large "
|
||||||
|
"files, or to write in smaller sections."
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"final_response": f"API call failed after {max_retries} retries: {_final_summary}",
|
"final_response": _final_response,
|
||||||
"messages": messages,
|
"messages": messages,
|
||||||
"api_calls": api_call_count,
|
"api_calls": api_call_count,
|
||||||
"completed": False,
|
"completed": False,
|
||||||
|
|||||||
@@ -532,6 +532,121 @@ class TestStreamingFallback:
|
|||||||
mock_non_stream.assert_called_once()
|
mock_non_stream.assert_called_once()
|
||||||
assert mock_close.call_count >= 1
|
assert mock_close.call_count >= 1
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream):
|
||||||
|
"""SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors.
|
||||||
|
|
||||||
|
OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE
|
||||||
|
event when the upstream stream drops. The OpenAI SDK raises APIError from
|
||||||
|
this. It should be retried at the streaming level, same as httpx connection
|
||||||
|
errors, before falling back to non-streaming.
|
||||||
|
"""
|
||||||
|
from run_agent import AIAgent
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
# Create an APIError that mimics what the OpenAI SDK raises from SSE error events.
|
||||||
|
# Key: no status_code attribute (unlike APIStatusError which has one).
|
||||||
|
from openai import APIError as OAIAPIError
|
||||||
|
sse_error = OAIAPIError(
|
||||||
|
message="Network connection lost.",
|
||||||
|
request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"),
|
||||||
|
body={"message": "Network connection lost."},
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.side_effect = sse_error
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
fallback_response = SimpleNamespace(
|
||||||
|
id="fallback",
|
||||||
|
model="test",
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0,
|
||||||
|
message=SimpleNamespace(
|
||||||
|
role="assistant",
|
||||||
|
content="fallback after SSE retries",
|
||||||
|
tool_calls=None,
|
||||||
|
reasoning_content=None,
|
||||||
|
),
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=None,
|
||||||
|
)
|
||||||
|
mock_non_stream.return_value = fallback_response
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
model="test/model",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
assert response.choices[0].message.content == "fallback after SSE retries"
|
||||||
|
# Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts)
|
||||||
|
# before falling back to non-streaming
|
||||||
|
assert mock_client.chat.completions.create.call_count == 3
|
||||||
|
mock_non_stream.assert_called_once()
|
||||||
|
# Connection cleanup should happen for each failed retry
|
||||||
|
assert mock_close.call_count >= 2
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream):
|
||||||
|
"""SSE errors that aren't connection-related still fall back immediately (no stream retry)."""
|
||||||
|
from run_agent import AIAgent
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from openai import APIError as OAIAPIError
|
||||||
|
sse_error = OAIAPIError(
|
||||||
|
message="Invalid model configuration.",
|
||||||
|
request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"),
|
||||||
|
body={"message": "Invalid model configuration."},
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.side_effect = sse_error
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
fallback_response = SimpleNamespace(
|
||||||
|
id="fallback",
|
||||||
|
model="test",
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0,
|
||||||
|
message=SimpleNamespace(
|
||||||
|
role="assistant",
|
||||||
|
content="fallback no retry",
|
||||||
|
tool_calls=None,
|
||||||
|
reasoning_content=None,
|
||||||
|
),
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=None,
|
||||||
|
)
|
||||||
|
mock_non_stream.return_value = fallback_response
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
model="test/model",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
assert response.choices[0].message.content == "fallback no retry"
|
||||||
|
# Should NOT retry — goes straight to non-streaming fallback
|
||||||
|
assert mock_client.chat.completions.create.call_count == 1
|
||||||
|
mock_non_stream.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
# ── Test: Reasoning Streaming ────────────────────────────────────────────
|
# ── Test: Reasoning Streaming ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user