mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
fix: follow-up for salvaged PR #10854
- Extract duplicated activity-callback polling into shared touch_activity_if_due() helper in tools/environments/base.py - Use helper from both base.py _wait_for_process and code_execution_tool.py local polling loop (DRY) - Add test assertion that timeout output field contains the timeout message and emoji (#10807) - Add stream_consumer test for tool-boundary fallback scenario where continuation is empty but final_text differs from visible prefix (#10807)
This commit is contained in:
@@ -606,6 +606,56 @@ class TestSegmentBreakOnToolBoundary:
|
|||||||
assert sent_texts[0].startswith(prefix)
|
assert sent_texts[0].startswith(prefix)
|
||||||
assert sum(len(t) for t in sent_texts[1:]) == len(tail)
|
assert sum(len(t) for t in sent_texts[1:]) == len(tail)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fallback_final_sends_full_text_at_tool_boundary(self):
|
||||||
|
"""After a tool call, the streamed prefix is stale (from the pre-tool
|
||||||
|
segment). _send_fallback_final must still send the post-tool response
|
||||||
|
even when continuation_text calculates as empty (#10807)."""
|
||||||
|
adapter = MagicMock()
|
||||||
|
adapter.send = AsyncMock(
|
||||||
|
return_value=SimpleNamespace(success=True, message_id="msg_1"),
|
||||||
|
)
|
||||||
|
adapter.edit_message = AsyncMock(
|
||||||
|
return_value=SimpleNamespace(success=True),
|
||||||
|
)
|
||||||
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||||
|
|
||||||
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||||
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||||
|
|
||||||
|
# Simulate a pre-tool streamed segment that becomes the visible prefix
|
||||||
|
pre_tool_text = "I'll run that code now."
|
||||||
|
consumer.on_delta(pre_tool_text)
|
||||||
|
task = asyncio.create_task(consumer.run())
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
# After the tool call, the model returns a SHORT final response that
|
||||||
|
# does NOT start with the pre-tool prefix. The continuation calculator
|
||||||
|
# would return empty (no prefix match → full text returned, but if the
|
||||||
|
# streaming edit already showed pre_tool_text, the prefix-based logic
|
||||||
|
# wrongly matches). Simulate this by setting _last_sent_text to the
|
||||||
|
# pre-tool content, then finishing with different post-tool content.
|
||||||
|
consumer._last_sent_text = pre_tool_text
|
||||||
|
post_tool_response = "⏰ Script timed out after 30s and was killed."
|
||||||
|
consumer.finish()
|
||||||
|
await task
|
||||||
|
|
||||||
|
# The fallback should send the post-tool response via
|
||||||
|
# _send_fallback_final.
|
||||||
|
await consumer._send_fallback_final(post_tool_response)
|
||||||
|
|
||||||
|
# Verify the final text was sent (not silently dropped)
|
||||||
|
sent = False
|
||||||
|
for call in adapter.send.call_args_list:
|
||||||
|
content = call[1].get("content", call[0][0] if call[0] else "")
|
||||||
|
if "timed out" in str(content):
|
||||||
|
sent = True
|
||||||
|
break
|
||||||
|
assert sent, (
|
||||||
|
"Post-tool timeout response was silently dropped by "
|
||||||
|
"_send_fallback_final — the #10807 fix should prevent this"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestInterimCommentaryMessages:
|
class TestInterimCommentaryMessages:
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
@@ -279,6 +279,10 @@ raise RuntimeError("deliberate crash")
|
|||||||
))
|
))
|
||||||
self.assertEqual(result["status"], "timeout")
|
self.assertEqual(result["status"], "timeout")
|
||||||
self.assertIn("timed out", result.get("error", ""))
|
self.assertIn("timed out", result.get("error", ""))
|
||||||
|
# The timeout message must also appear in output so the LLM always
|
||||||
|
# surfaces it to the user (#10807).
|
||||||
|
self.assertIn("timed out", result.get("output", ""))
|
||||||
|
self.assertIn("\u23f0", result.get("output", ""))
|
||||||
|
|
||||||
def test_web_search_tool(self):
|
def test_web_search_tool(self):
|
||||||
"""Script calls web_search and processes results."""
|
"""Script calls web_search and processes results."""
|
||||||
|
|||||||
@@ -1128,8 +1128,10 @@ def execute_code(
|
|||||||
stderr_reader.start()
|
stderr_reader.start()
|
||||||
|
|
||||||
status = "success"
|
status = "success"
|
||||||
_last_activity_touch = time.monotonic()
|
_activity_state = {
|
||||||
_ACTIVITY_INTERVAL = 10.0
|
"last_touch": time.monotonic(),
|
||||||
|
"start": exec_start,
|
||||||
|
}
|
||||||
while proc.poll() is None:
|
while proc.poll() is None:
|
||||||
if _is_interrupted():
|
if _is_interrupted():
|
||||||
_kill_process_group(proc)
|
_kill_process_group(proc)
|
||||||
@@ -1141,17 +1143,11 @@ def execute_code(
|
|||||||
break
|
break
|
||||||
# Periodic activity touch so the gateway's inactivity timeout
|
# Periodic activity touch so the gateway's inactivity timeout
|
||||||
# doesn't kill the agent during long code execution (#10807).
|
# doesn't kill the agent during long code execution (#10807).
|
||||||
_now = time.monotonic()
|
try:
|
||||||
if _now - _last_activity_touch >= _ACTIVITY_INTERVAL:
|
from tools.environments.base import touch_activity_if_due
|
||||||
_last_activity_touch = _now
|
touch_activity_if_due(_activity_state, "execute_code running")
|
||||||
try:
|
except Exception:
|
||||||
from tools.environments.base import _get_activity_callback
|
pass
|
||||||
_cb = _get_activity_callback()
|
|
||||||
if _cb:
|
|
||||||
_elapsed = int(_now - exec_start)
|
|
||||||
_cb(f"execute_code running ({_elapsed}s elapsed)")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
# Wait for readers to finish draining
|
# Wait for readers to finish draining
|
||||||
|
|||||||
@@ -37,6 +37,32 @@ def _get_activity_callback() -> Callable[[str], None] | None:
|
|||||||
return getattr(_activity_callback_local, "callback", None)
|
return getattr(_activity_callback_local, "callback", None)
|
||||||
|
|
||||||
|
|
||||||
|
def touch_activity_if_due(
|
||||||
|
state: dict,
|
||||||
|
label: str,
|
||||||
|
) -> None:
|
||||||
|
"""Fire the activity callback at most once every ``state['interval']`` seconds.
|
||||||
|
|
||||||
|
*state* must contain ``last_touch`` (monotonic timestamp) and ``start``
|
||||||
|
(monotonic timestamp of the operation start). An optional ``interval``
|
||||||
|
key overrides the default 10 s cadence.
|
||||||
|
|
||||||
|
Swallows all exceptions so callers don't need their own try/except.
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
interval = state.get("interval", 10.0)
|
||||||
|
if now - state["last_touch"] < interval:
|
||||||
|
return
|
||||||
|
state["last_touch"] = now
|
||||||
|
try:
|
||||||
|
cb = _get_activity_callback()
|
||||||
|
if cb:
|
||||||
|
elapsed = int(now - state["start"])
|
||||||
|
cb(f"{label} ({elapsed}s elapsed)")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_sandbox_dir() -> Path:
|
def get_sandbox_dir() -> Path:
|
||||||
"""Return the host-side root for all sandbox storage (Docker workspaces,
|
"""Return the host-side root for all sandbox storage (Docker workspaces,
|
||||||
Singularity overlays/SIF cache, etc.).
|
Singularity overlays/SIF cache, etc.).
|
||||||
@@ -405,8 +431,11 @@ class BaseEnvironment(ABC):
|
|||||||
drain_thread = threading.Thread(target=_drain, daemon=True)
|
drain_thread = threading.Thread(target=_drain, daemon=True)
|
||||||
drain_thread.start()
|
drain_thread.start()
|
||||||
deadline = time.monotonic() + timeout
|
deadline = time.monotonic() + timeout
|
||||||
_last_activity_touch = time.monotonic()
|
_now = time.monotonic()
|
||||||
_ACTIVITY_INTERVAL = 10.0 # seconds between activity touches
|
_activity_state = {
|
||||||
|
"last_touch": _now,
|
||||||
|
"start": _now,
|
||||||
|
}
|
||||||
|
|
||||||
while proc.poll() is None:
|
while proc.poll() is None:
|
||||||
if is_interrupted():
|
if is_interrupted():
|
||||||
@@ -428,16 +457,7 @@ class BaseEnvironment(ABC):
|
|||||||
"returncode": 124,
|
"returncode": 124,
|
||||||
}
|
}
|
||||||
# Periodic activity touch so the gateway knows we're alive
|
# Periodic activity touch so the gateway knows we're alive
|
||||||
_now = time.monotonic()
|
touch_activity_if_due(_activity_state, "terminal command running")
|
||||||
if _now - _last_activity_touch >= _ACTIVITY_INTERVAL:
|
|
||||||
_last_activity_touch = _now
|
|
||||||
_cb = _get_activity_callback()
|
|
||||||
if _cb:
|
|
||||||
try:
|
|
||||||
_elapsed = int(_now - (deadline - timeout))
|
|
||||||
_cb(f"terminal command running ({_elapsed}s elapsed)")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
drain_thread.join(timeout=5)
|
drain_thread.join(timeout=5)
|
||||||
|
|||||||
@@ -105,9 +105,11 @@ class BaseModalExecutionEnvironment(BaseEnvironment):
|
|||||||
if self._client_timeout_grace_seconds is not None:
|
if self._client_timeout_grace_seconds is not None:
|
||||||
deadline = time.monotonic() + prepared.timeout + self._client_timeout_grace_seconds
|
deadline = time.monotonic() + prepared.timeout + self._client_timeout_grace_seconds
|
||||||
|
|
||||||
_last_activity_touch = time.monotonic()
|
_now = time.monotonic()
|
||||||
_modal_exec_start = time.monotonic()
|
_activity_state = {
|
||||||
_ACTIVITY_INTERVAL = 10.0 # match _wait_for_process cadence
|
"last_touch": _now,
|
||||||
|
"start": _now,
|
||||||
|
}
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if is_interrupted():
|
if is_interrupted():
|
||||||
@@ -133,20 +135,11 @@ class BaseModalExecutionEnvironment(BaseEnvironment):
|
|||||||
return self._timeout_result_for_modal(prepared.timeout)
|
return self._timeout_result_for_modal(prepared.timeout)
|
||||||
|
|
||||||
# Periodic activity touch so the gateway knows we're alive
|
# Periodic activity touch so the gateway knows we're alive
|
||||||
_now = time.monotonic()
|
try:
|
||||||
if _now - _last_activity_touch >= _ACTIVITY_INTERVAL:
|
from tools.environments.base import touch_activity_if_due
|
||||||
_last_activity_touch = _now
|
touch_activity_if_due(_activity_state, "modal command running")
|
||||||
try:
|
except Exception:
|
||||||
from tools.environments.base import _get_activity_callback
|
pass
|
||||||
_cb = _get_activity_callback()
|
|
||||||
except Exception:
|
|
||||||
_cb = None
|
|
||||||
if _cb:
|
|
||||||
try:
|
|
||||||
_elapsed = int(_now - _modal_exec_start)
|
|
||||||
_cb(f"modal command running ({_elapsed}s elapsed)")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
time.sleep(self._poll_interval_seconds)
|
time.sleep(self._poll_interval_seconds)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user