mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 15:31:38 +08:00
Compare commits
2 Commits
skill/gith
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e45e4107f6 | ||
|
|
d90178e21b |
@@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
self._bot_task: Optional[asyncio.Task] = None
|
||||
# Cap to prevent unbounded growth (Discord threads get archived).
|
||||
self._MAX_TRACKED_THREADS = 500
|
||||
# Dedup cache: message_id → timestamp. Prevents duplicate bot
|
||||
# responses when Discord RESUME replays events after reconnects.
|
||||
self._seen_messages: Dict[str, float] = {}
|
||||
self._SEEN_TTL = 300 # 5 minutes
|
||||
self._SEEN_MAX = 2000 # prune threshold
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Discord and start receiving events."""
|
||||
@@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter):
|
||||
|
||||
@self._client.event
|
||||
async def on_message(message: DiscordMessage):
|
||||
# Dedup: Discord RESUME replays events after reconnects (#4777)
|
||||
msg_id = str(message.id)
|
||||
now = time.time()
|
||||
if msg_id in adapter_self._seen_messages:
|
||||
return
|
||||
adapter_self._seen_messages[msg_id] = now
|
||||
if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX:
|
||||
cutoff = now - adapter_self._SEEN_TTL
|
||||
adapter_self._seen_messages = {
|
||||
k: v for k, v in adapter_self._seen_messages.items()
|
||||
if v > cutoff
|
||||
}
|
||||
|
||||
# Always ignore our own messages
|
||||
if message.author == self._client.user:
|
||||
return
|
||||
|
||||
@@ -13,6 +13,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Optional, Any
|
||||
|
||||
try:
|
||||
@@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter):
|
||||
self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient
|
||||
self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id
|
||||
self._channel_team: Dict[str, str] = {} # channel_id → team_id
|
||||
# Dedup cache: event_ts → timestamp. Prevents duplicate bot
|
||||
# responses when Socket Mode reconnects redeliver events.
|
||||
self._seen_messages: Dict[str, float] = {}
|
||||
self._SEEN_TTL = 300 # 5 minutes
|
||||
self._SEEN_MAX = 2000 # prune threshold
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Slack via Socket Mode."""
|
||||
@@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter):
|
||||
|
||||
async def _handle_slack_message(self, event: dict) -> None:
|
||||
"""Handle an incoming Slack message event."""
|
||||
# Dedup: Slack Socket Mode can redeliver events after reconnects (#4777)
|
||||
event_ts = event.get("ts", "")
|
||||
if event_ts:
|
||||
now = time.time()
|
||||
if event_ts in self._seen_messages:
|
||||
return
|
||||
self._seen_messages[event_ts] = now
|
||||
if len(self._seen_messages) > self._SEEN_MAX:
|
||||
cutoff = now - self._SEEN_TTL
|
||||
self._seen_messages = {
|
||||
k: v for k, v in self._seen_messages.items()
|
||||
if v > cutoff
|
||||
}
|
||||
|
||||
# Ignore bot messages (including our own)
|
||||
if event.get("bot_id") or event.get("subtype") == "bot_message":
|
||||
return
|
||||
|
||||
137
run_agent.py
137
run_agent.py
@@ -4027,6 +4027,7 @@ class AIAgent:
|
||||
request_client_holder = {"client": None}
|
||||
first_delta_fired = {"done": False}
|
||||
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
|
||||
partial_streamed_text = {"content": ""} # Accumulates text delivered to platform
|
||||
# Wall-clock timestamp of the last real streaming chunk. The outer
|
||||
# poll loop uses this to detect stale connections that keep receiving
|
||||
# SSE keep-alive pings but no actual data.
|
||||
@@ -4114,6 +4115,7 @@ class AIAgent:
|
||||
_fire_first_delta()
|
||||
self._fire_stream_delta(delta.content)
|
||||
deltas_were_sent["yes"] = True
|
||||
partial_streamed_text["content"] += delta.content
|
||||
else:
|
||||
# Tool calls suppress regular content streaming (avoids
|
||||
# displaying chatty "I'll use the tool..." text alongside
|
||||
@@ -4264,6 +4266,8 @@ class AIAgent:
|
||||
if text and not has_tool_use:
|
||||
_fire_first_delta()
|
||||
self._fire_stream_delta(text)
|
||||
deltas_were_sent["yes"] = True
|
||||
partial_streamed_text["content"] += text
|
||||
elif delta_type == "thinking_delta":
|
||||
thinking_text = getattr(delta, "thinking", "")
|
||||
if thinking_text:
|
||||
@@ -4473,6 +4477,14 @@ class AIAgent:
|
||||
pass
|
||||
raise InterruptedError("Agent interrupted during streaming API call")
|
||||
if result["error"] is not None:
|
||||
if deltas_were_sent["yes"] and partial_streamed_text["content"]:
|
||||
# Streaming failed AFTER tokens were delivered to the platform.
|
||||
# Attach the partial text so the outer retry loop can attempt
|
||||
# continuation (Option A: trailing assistant, Option B: user
|
||||
# "continue" prompt) instead of duplicating or losing content.
|
||||
err = result["error"]
|
||||
err._partial_content = partial_streamed_text["content"]
|
||||
raise err
|
||||
raise result["error"]
|
||||
return result["response"]
|
||||
|
||||
@@ -7277,6 +7289,131 @@ class AIAgent:
|
||||
break
|
||||
|
||||
except Exception as api_error:
|
||||
# -----------------------------------------------------------
|
||||
# Partial stream recovery. When streaming fails AFTER some
|
||||
# tokens were delivered to the user, we attempt to continue
|
||||
# the response rather than duplicate or lose content.
|
||||
# Option A: append partial as assistant message, retry
|
||||
# Option B: also inject user "continue" prompt (fallback)
|
||||
# -----------------------------------------------------------
|
||||
_partial = getattr(api_error, "_partial_content", None)
|
||||
if _partial and not getattr(self, "_partial_recovery_done", False):
|
||||
logger.warning(
|
||||
"%sStream interrupted after %d chars delivered. "
|
||||
"Attempting continuation (Option A)...",
|
||||
self.log_prefix, len(_partial),
|
||||
)
|
||||
self._emit_status(
|
||||
"⚡ Stream interrupted — attempting to continue..."
|
||||
)
|
||||
# Option A: trailing assistant message
|
||||
api_messages.append({"role": "assistant", "content": _partial})
|
||||
try:
|
||||
_cont_resp = self._interruptible_api_call(
|
||||
{**api_kwargs, "messages": api_messages}
|
||||
)
|
||||
# Merge: prepend partial to continuation
|
||||
_cont_text = ""
|
||||
if self.api_mode == "anthropic_messages":
|
||||
for _blk in getattr(_cont_resp, "content", []):
|
||||
if getattr(_blk, "type", None) == "text":
|
||||
_cont_text = getattr(_blk, "text", "")
|
||||
break
|
||||
else:
|
||||
_cont_text = getattr(
|
||||
_cont_resp.choices[0].message, "content", ""
|
||||
) or ""
|
||||
if _cont_text:
|
||||
# Fire the continuation to the platform stream
|
||||
self._fire_stream_delta(_cont_text)
|
||||
self._partial_recovery_done = True
|
||||
# Build a merged response for the agent loop
|
||||
_merged = _partial + _cont_text
|
||||
_merged_msg = SimpleNamespace(
|
||||
role="assistant", content=_merged,
|
||||
tool_calls=None, reasoning_content=None,
|
||||
)
|
||||
response = SimpleNamespace(
|
||||
id="partial-recovery",
|
||||
model=getattr(_cont_resp, "model", self.model),
|
||||
choices=[SimpleNamespace(
|
||||
index=0, message=_merged_msg,
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=getattr(_cont_resp, "usage", None),
|
||||
)
|
||||
break # Success — exit retry loop
|
||||
except Exception as opt_a_err:
|
||||
logger.warning(
|
||||
"%sOption A failed (%s), trying Option B...",
|
||||
self.log_prefix, opt_a_err,
|
||||
)
|
||||
# Option B: add user "continue" instruction
|
||||
api_messages.append({
|
||||
"role": "user",
|
||||
"content": (
|
||||
"Your response was cut off mid-sentence "
|
||||
"due to a connection error. Continue from "
|
||||
"exactly where you stopped — do not repeat "
|
||||
"what you already said."
|
||||
),
|
||||
})
|
||||
try:
|
||||
_cont_resp_b = self._interruptible_api_call(
|
||||
{**api_kwargs, "messages": api_messages}
|
||||
)
|
||||
_cont_text_b = ""
|
||||
if self.api_mode == "anthropic_messages":
|
||||
for _blk in getattr(_cont_resp_b, "content", []):
|
||||
if getattr(_blk, "type", None) == "text":
|
||||
_cont_text_b = getattr(_blk, "text", "")
|
||||
break
|
||||
else:
|
||||
_cont_text_b = getattr(
|
||||
_cont_resp_b.choices[0].message,
|
||||
"content", "",
|
||||
) or ""
|
||||
if _cont_text_b:
|
||||
self._fire_stream_delta(_cont_text_b)
|
||||
self._partial_recovery_done = True
|
||||
_merged_b = _partial + _cont_text_b
|
||||
_merged_msg_b = SimpleNamespace(
|
||||
role="assistant", content=_merged_b,
|
||||
tool_calls=None, reasoning_content=None,
|
||||
)
|
||||
response = SimpleNamespace(
|
||||
id="partial-recovery-b",
|
||||
model=getattr(_cont_resp_b, "model", self.model),
|
||||
choices=[SimpleNamespace(
|
||||
index=0, message=_merged_msg_b,
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=getattr(_cont_resp_b, "usage", None),
|
||||
)
|
||||
break # Success via Option B
|
||||
except Exception as opt_b_err:
|
||||
logger.warning(
|
||||
"%sBoth recovery options failed. "
|
||||
"Returning partial content as final response.",
|
||||
self.log_prefix,
|
||||
)
|
||||
# Last resort: return what we have
|
||||
_partial_msg = SimpleNamespace(
|
||||
role="assistant", content=_partial,
|
||||
tool_calls=None, reasoning_content=None,
|
||||
)
|
||||
response = SimpleNamespace(
|
||||
id="partial-only",
|
||||
model=getattr(self, "model", "unknown"),
|
||||
choices=[SimpleNamespace(
|
||||
index=0, message=_partial_msg,
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
self._partial_recovery_done = True
|
||||
break
|
||||
|
||||
# Stop spinner before printing error messages
|
||||
if thinking_spinner:
|
||||
thinking_spinner.stop("(╥_╥) error, retrying...")
|
||||
|
||||
Reference in New Issue
Block a user