mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-04 09:47:54 +08:00
Compare commits
2 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e45e4107f6 | ||
|
|
d90178e21b |
@@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
self._bot_task: Optional[asyncio.Task] = None
|
self._bot_task: Optional[asyncio.Task] = None
|
||||||
# Cap to prevent unbounded growth (Discord threads get archived).
|
# Cap to prevent unbounded growth (Discord threads get archived).
|
||||||
self._MAX_TRACKED_THREADS = 500
|
self._MAX_TRACKED_THREADS = 500
|
||||||
|
# Dedup cache: message_id → timestamp. Prevents duplicate bot
|
||||||
|
# responses when Discord RESUME replays events after reconnects.
|
||||||
|
self._seen_messages: Dict[str, float] = {}
|
||||||
|
self._SEEN_TTL = 300 # 5 minutes
|
||||||
|
self._SEEN_MAX = 2000 # prune threshold
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
async def connect(self) -> bool:
|
||||||
"""Connect to Discord and start receiving events."""
|
"""Connect to Discord and start receiving events."""
|
||||||
@@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
|
|
||||||
@self._client.event
|
@self._client.event
|
||||||
async def on_message(message: DiscordMessage):
|
async def on_message(message: DiscordMessage):
|
||||||
|
# Dedup: Discord RESUME replays events after reconnects (#4777)
|
||||||
|
msg_id = str(message.id)
|
||||||
|
now = time.time()
|
||||||
|
if msg_id in adapter_self._seen_messages:
|
||||||
|
return
|
||||||
|
adapter_self._seen_messages[msg_id] = now
|
||||||
|
if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX:
|
||||||
|
cutoff = now - adapter_self._SEEN_TTL
|
||||||
|
adapter_self._seen_messages = {
|
||||||
|
k: v for k, v in adapter_self._seen_messages.items()
|
||||||
|
if v > cutoff
|
||||||
|
}
|
||||||
|
|
||||||
# Always ignore our own messages
|
# Always ignore our own messages
|
||||||
if message.author == self._client.user:
|
if message.author == self._client.user:
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
from typing import Dict, Optional, Any
|
from typing import Dict, Optional, Any
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter):
|
|||||||
self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient
|
self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient
|
||||||
self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id
|
self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id
|
||||||
self._channel_team: Dict[str, str] = {} # channel_id → team_id
|
self._channel_team: Dict[str, str] = {} # channel_id → team_id
|
||||||
|
# Dedup cache: event_ts → timestamp. Prevents duplicate bot
|
||||||
|
# responses when Socket Mode reconnects redeliver events.
|
||||||
|
self._seen_messages: Dict[str, float] = {}
|
||||||
|
self._SEEN_TTL = 300 # 5 minutes
|
||||||
|
self._SEEN_MAX = 2000 # prune threshold
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
async def connect(self) -> bool:
|
||||||
"""Connect to Slack via Socket Mode."""
|
"""Connect to Slack via Socket Mode."""
|
||||||
@@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter):
|
|||||||
|
|
||||||
async def _handle_slack_message(self, event: dict) -> None:
|
async def _handle_slack_message(self, event: dict) -> None:
|
||||||
"""Handle an incoming Slack message event."""
|
"""Handle an incoming Slack message event."""
|
||||||
|
# Dedup: Slack Socket Mode can redeliver events after reconnects (#4777)
|
||||||
|
event_ts = event.get("ts", "")
|
||||||
|
if event_ts:
|
||||||
|
now = time.time()
|
||||||
|
if event_ts in self._seen_messages:
|
||||||
|
return
|
||||||
|
self._seen_messages[event_ts] = now
|
||||||
|
if len(self._seen_messages) > self._SEEN_MAX:
|
||||||
|
cutoff = now - self._SEEN_TTL
|
||||||
|
self._seen_messages = {
|
||||||
|
k: v for k, v in self._seen_messages.items()
|
||||||
|
if v > cutoff
|
||||||
|
}
|
||||||
|
|
||||||
# Ignore bot messages (including our own)
|
# Ignore bot messages (including our own)
|
||||||
if event.get("bot_id") or event.get("subtype") == "bot_message":
|
if event.get("bot_id") or event.get("subtype") == "bot_message":
|
||||||
return
|
return
|
||||||
|
|||||||
137
run_agent.py
137
run_agent.py
@@ -4027,6 +4027,7 @@ class AIAgent:
|
|||||||
request_client_holder = {"client": None}
|
request_client_holder = {"client": None}
|
||||||
first_delta_fired = {"done": False}
|
first_delta_fired = {"done": False}
|
||||||
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
|
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
|
||||||
|
partial_streamed_text = {"content": ""} # Accumulates text delivered to platform
|
||||||
# Wall-clock timestamp of the last real streaming chunk. The outer
|
# Wall-clock timestamp of the last real streaming chunk. The outer
|
||||||
# poll loop uses this to detect stale connections that keep receiving
|
# poll loop uses this to detect stale connections that keep receiving
|
||||||
# SSE keep-alive pings but no actual data.
|
# SSE keep-alive pings but no actual data.
|
||||||
@@ -4114,6 +4115,7 @@ class AIAgent:
|
|||||||
_fire_first_delta()
|
_fire_first_delta()
|
||||||
self._fire_stream_delta(delta.content)
|
self._fire_stream_delta(delta.content)
|
||||||
deltas_were_sent["yes"] = True
|
deltas_were_sent["yes"] = True
|
||||||
|
partial_streamed_text["content"] += delta.content
|
||||||
else:
|
else:
|
||||||
# Tool calls suppress regular content streaming (avoids
|
# Tool calls suppress regular content streaming (avoids
|
||||||
# displaying chatty "I'll use the tool..." text alongside
|
# displaying chatty "I'll use the tool..." text alongside
|
||||||
@@ -4264,6 +4266,8 @@ class AIAgent:
|
|||||||
if text and not has_tool_use:
|
if text and not has_tool_use:
|
||||||
_fire_first_delta()
|
_fire_first_delta()
|
||||||
self._fire_stream_delta(text)
|
self._fire_stream_delta(text)
|
||||||
|
deltas_were_sent["yes"] = True
|
||||||
|
partial_streamed_text["content"] += text
|
||||||
elif delta_type == "thinking_delta":
|
elif delta_type == "thinking_delta":
|
||||||
thinking_text = getattr(delta, "thinking", "")
|
thinking_text = getattr(delta, "thinking", "")
|
||||||
if thinking_text:
|
if thinking_text:
|
||||||
@@ -4473,6 +4477,14 @@ class AIAgent:
|
|||||||
pass
|
pass
|
||||||
raise InterruptedError("Agent interrupted during streaming API call")
|
raise InterruptedError("Agent interrupted during streaming API call")
|
||||||
if result["error"] is not None:
|
if result["error"] is not None:
|
||||||
|
if deltas_were_sent["yes"] and partial_streamed_text["content"]:
|
||||||
|
# Streaming failed AFTER tokens were delivered to the platform.
|
||||||
|
# Attach the partial text so the outer retry loop can attempt
|
||||||
|
# continuation (Option A: trailing assistant, Option B: user
|
||||||
|
# "continue" prompt) instead of duplicating or losing content.
|
||||||
|
err = result["error"]
|
||||||
|
err._partial_content = partial_streamed_text["content"]
|
||||||
|
raise err
|
||||||
raise result["error"]
|
raise result["error"]
|
||||||
return result["response"]
|
return result["response"]
|
||||||
|
|
||||||
@@ -7277,6 +7289,131 @@ class AIAgent:
|
|||||||
break
|
break
|
||||||
|
|
||||||
except Exception as api_error:
|
except Exception as api_error:
|
||||||
|
# -----------------------------------------------------------
|
||||||
|
# Partial stream recovery. When streaming fails AFTER some
|
||||||
|
# tokens were delivered to the user, we attempt to continue
|
||||||
|
# the response rather than duplicate or lose content.
|
||||||
|
# Option A: append partial as assistant message, retry
|
||||||
|
# Option B: also inject user "continue" prompt (fallback)
|
||||||
|
# -----------------------------------------------------------
|
||||||
|
_partial = getattr(api_error, "_partial_content", None)
|
||||||
|
if _partial and not getattr(self, "_partial_recovery_done", False):
|
||||||
|
logger.warning(
|
||||||
|
"%sStream interrupted after %d chars delivered. "
|
||||||
|
"Attempting continuation (Option A)...",
|
||||||
|
self.log_prefix, len(_partial),
|
||||||
|
)
|
||||||
|
self._emit_status(
|
||||||
|
"⚡ Stream interrupted — attempting to continue..."
|
||||||
|
)
|
||||||
|
# Option A: trailing assistant message
|
||||||
|
api_messages.append({"role": "assistant", "content": _partial})
|
||||||
|
try:
|
||||||
|
_cont_resp = self._interruptible_api_call(
|
||||||
|
{**api_kwargs, "messages": api_messages}
|
||||||
|
)
|
||||||
|
# Merge: prepend partial to continuation
|
||||||
|
_cont_text = ""
|
||||||
|
if self.api_mode == "anthropic_messages":
|
||||||
|
for _blk in getattr(_cont_resp, "content", []):
|
||||||
|
if getattr(_blk, "type", None) == "text":
|
||||||
|
_cont_text = getattr(_blk, "text", "")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
_cont_text = getattr(
|
||||||
|
_cont_resp.choices[0].message, "content", ""
|
||||||
|
) or ""
|
||||||
|
if _cont_text:
|
||||||
|
# Fire the continuation to the platform stream
|
||||||
|
self._fire_stream_delta(_cont_text)
|
||||||
|
self._partial_recovery_done = True
|
||||||
|
# Build a merged response for the agent loop
|
||||||
|
_merged = _partial + _cont_text
|
||||||
|
_merged_msg = SimpleNamespace(
|
||||||
|
role="assistant", content=_merged,
|
||||||
|
tool_calls=None, reasoning_content=None,
|
||||||
|
)
|
||||||
|
response = SimpleNamespace(
|
||||||
|
id="partial-recovery",
|
||||||
|
model=getattr(_cont_resp, "model", self.model),
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0, message=_merged_msg,
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=getattr(_cont_resp, "usage", None),
|
||||||
|
)
|
||||||
|
break # Success — exit retry loop
|
||||||
|
except Exception as opt_a_err:
|
||||||
|
logger.warning(
|
||||||
|
"%sOption A failed (%s), trying Option B...",
|
||||||
|
self.log_prefix, opt_a_err,
|
||||||
|
)
|
||||||
|
# Option B: add user "continue" instruction
|
||||||
|
api_messages.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": (
|
||||||
|
"Your response was cut off mid-sentence "
|
||||||
|
"due to a connection error. Continue from "
|
||||||
|
"exactly where you stopped — do not repeat "
|
||||||
|
"what you already said."
|
||||||
|
),
|
||||||
|
})
|
||||||
|
try:
|
||||||
|
_cont_resp_b = self._interruptible_api_call(
|
||||||
|
{**api_kwargs, "messages": api_messages}
|
||||||
|
)
|
||||||
|
_cont_text_b = ""
|
||||||
|
if self.api_mode == "anthropic_messages":
|
||||||
|
for _blk in getattr(_cont_resp_b, "content", []):
|
||||||
|
if getattr(_blk, "type", None) == "text":
|
||||||
|
_cont_text_b = getattr(_blk, "text", "")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
_cont_text_b = getattr(
|
||||||
|
_cont_resp_b.choices[0].message,
|
||||||
|
"content", "",
|
||||||
|
) or ""
|
||||||
|
if _cont_text_b:
|
||||||
|
self._fire_stream_delta(_cont_text_b)
|
||||||
|
self._partial_recovery_done = True
|
||||||
|
_merged_b = _partial + _cont_text_b
|
||||||
|
_merged_msg_b = SimpleNamespace(
|
||||||
|
role="assistant", content=_merged_b,
|
||||||
|
tool_calls=None, reasoning_content=None,
|
||||||
|
)
|
||||||
|
response = SimpleNamespace(
|
||||||
|
id="partial-recovery-b",
|
||||||
|
model=getattr(_cont_resp_b, "model", self.model),
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0, message=_merged_msg_b,
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=getattr(_cont_resp_b, "usage", None),
|
||||||
|
)
|
||||||
|
break # Success via Option B
|
||||||
|
except Exception as opt_b_err:
|
||||||
|
logger.warning(
|
||||||
|
"%sBoth recovery options failed. "
|
||||||
|
"Returning partial content as final response.",
|
||||||
|
self.log_prefix,
|
||||||
|
)
|
||||||
|
# Last resort: return what we have
|
||||||
|
_partial_msg = SimpleNamespace(
|
||||||
|
role="assistant", content=_partial,
|
||||||
|
tool_calls=None, reasoning_content=None,
|
||||||
|
)
|
||||||
|
response = SimpleNamespace(
|
||||||
|
id="partial-only",
|
||||||
|
model=getattr(self, "model", "unknown"),
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0, message=_partial_msg,
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=None,
|
||||||
|
)
|
||||||
|
self._partial_recovery_done = True
|
||||||
|
break
|
||||||
|
|
||||||
# Stop spinner before printing error messages
|
# Stop spinner before printing error messages
|
||||||
if thinking_spinner:
|
if thinking_spinner:
|
||||||
thinking_spinner.stop("(╥_╥) error, retrying...")
|
thinking_spinner.stop("(╥_╥) error, retrying...")
|
||||||
|
|||||||
Reference in New Issue
Block a user