mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
Merge pull request #963 from NousResearch/hermes/hermes-cf9f7d54
fix: guard all print() against OSError with _SafeWriter
This commit is contained in:
50
run_agent.py
50
run_agent.py
@@ -99,6 +99,51 @@ from agent.trajectory import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _SafeWriter:
|
||||||
|
"""Transparent stdout wrapper that catches OSError from broken pipes.
|
||||||
|
|
||||||
|
When hermes-agent runs as a systemd service, Docker container, or headless
|
||||||
|
daemon, the stdout pipe can become unavailable (idle timeout, buffer
|
||||||
|
exhaustion, socket reset). Any print() call then raises
|
||||||
|
``OSError: [Errno 5] Input/output error``, which can crash
|
||||||
|
run_conversation() — especially via double-fault when the except handler
|
||||||
|
also tries to print.
|
||||||
|
|
||||||
|
This wrapper delegates all writes to the underlying stream and silently
|
||||||
|
catches OSError. It is installed once at the start of run_conversation()
|
||||||
|
and is transparent when stdout is healthy (zero overhead on the happy path).
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ("_inner",)
|
||||||
|
|
||||||
|
def __init__(self, inner):
|
||||||
|
object.__setattr__(self, "_inner", inner)
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
try:
|
||||||
|
return self._inner.write(data)
|
||||||
|
except OSError:
|
||||||
|
return len(data) if isinstance(data, str) else 0
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
try:
|
||||||
|
self._inner.flush()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def fileno(self):
|
||||||
|
return self._inner.fileno()
|
||||||
|
|
||||||
|
def isatty(self):
|
||||||
|
try:
|
||||||
|
return self._inner.isatty()
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self._inner, name)
|
||||||
|
|
||||||
|
|
||||||
class IterationBudget:
|
class IterationBudget:
|
||||||
"""Thread-safe shared iteration counter for parent and child agents.
|
"""Thread-safe shared iteration counter for parent and child agents.
|
||||||
|
|
||||||
@@ -3164,6 +3209,11 @@ class AIAgent:
|
|||||||
Returns:
|
Returns:
|
||||||
Dict: Complete conversation result with final response and message history
|
Dict: Complete conversation result with final response and message history
|
||||||
"""
|
"""
|
||||||
|
# Guard stdout against OSError from broken pipes (systemd/headless/daemon).
|
||||||
|
# Installed once, transparent when stdout is healthy, prevents crash on write.
|
||||||
|
if not isinstance(sys.stdout, _SafeWriter):
|
||||||
|
sys.stdout = _SafeWriter(sys.stdout)
|
||||||
|
|
||||||
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
|
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
|
||||||
effective_task_id = task_id or str(uuid.uuid4())
|
effective_task_id = task_id or str(uuid.uuid4())
|
||||||
|
|
||||||
|
|||||||
@@ -1283,3 +1283,83 @@ class TestBudgetPressure:
|
|||||||
messages[-1]["content"] = last_content + f"\n\n{warning}"
|
messages[-1]["content"] = last_content + f"\n\n{warning}"
|
||||||
assert "plain text result" in messages[-1]["content"]
|
assert "plain text result" in messages[-1]["content"]
|
||||||
assert "BUDGET WARNING" in messages[-1]["content"]
|
assert "BUDGET WARNING" in messages[-1]["content"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSafeWriter:
|
||||||
|
"""Verify _SafeWriter guards stdout against OSError (broken pipes)."""
|
||||||
|
|
||||||
|
def test_write_delegates_normally(self):
|
||||||
|
"""When stdout is healthy, _SafeWriter is transparent."""
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
from io import StringIO
|
||||||
|
inner = StringIO()
|
||||||
|
writer = _SafeWriter(inner)
|
||||||
|
writer.write("hello")
|
||||||
|
assert inner.getvalue() == "hello"
|
||||||
|
|
||||||
|
def test_write_catches_oserror(self):
|
||||||
|
"""OSError on write is silently caught, returns len(data)."""
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
inner = MagicMock()
|
||||||
|
inner.write.side_effect = OSError(5, "Input/output error")
|
||||||
|
writer = _SafeWriter(inner)
|
||||||
|
result = writer.write("hello")
|
||||||
|
assert result == 5 # len("hello")
|
||||||
|
|
||||||
|
def test_flush_catches_oserror(self):
|
||||||
|
"""OSError on flush is silently caught."""
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
inner = MagicMock()
|
||||||
|
inner.flush.side_effect = OSError(5, "Input/output error")
|
||||||
|
writer = _SafeWriter(inner)
|
||||||
|
writer.flush() # should not raise
|
||||||
|
|
||||||
|
def test_print_survives_broken_stdout(self, monkeypatch):
|
||||||
|
"""print() through _SafeWriter doesn't crash on broken pipe."""
|
||||||
|
import sys
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
broken = MagicMock()
|
||||||
|
broken.write.side_effect = OSError(5, "Input/output error")
|
||||||
|
original = sys.stdout
|
||||||
|
sys.stdout = _SafeWriter(broken)
|
||||||
|
try:
|
||||||
|
print("this should not crash") # would raise without _SafeWriter
|
||||||
|
finally:
|
||||||
|
sys.stdout = original
|
||||||
|
|
||||||
|
def test_installed_in_run_conversation(self, agent):
|
||||||
|
"""run_conversation installs _SafeWriter on sys.stdout."""
|
||||||
|
import sys
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
resp = _mock_response(content="Done", finish_reason="stop")
|
||||||
|
agent.client.chat.completions.create.return_value = resp
|
||||||
|
original = sys.stdout
|
||||||
|
try:
|
||||||
|
with (
|
||||||
|
patch.object(agent, "_persist_session"),
|
||||||
|
patch.object(agent, "_save_trajectory"),
|
||||||
|
patch.object(agent, "_cleanup_task_resources"),
|
||||||
|
):
|
||||||
|
agent.run_conversation("test")
|
||||||
|
assert isinstance(sys.stdout, _SafeWriter)
|
||||||
|
finally:
|
||||||
|
sys.stdout = original
|
||||||
|
|
||||||
|
def test_double_wrap_prevented(self):
|
||||||
|
"""Wrapping an already-wrapped stream doesn't add layers."""
|
||||||
|
import sys
|
||||||
|
from run_agent import _SafeWriter
|
||||||
|
from io import StringIO
|
||||||
|
inner = StringIO()
|
||||||
|
wrapped = _SafeWriter(inner)
|
||||||
|
# isinstance check should prevent double-wrapping
|
||||||
|
assert isinstance(wrapped, _SafeWriter)
|
||||||
|
# The guard in run_conversation checks isinstance before wrapping
|
||||||
|
if not isinstance(wrapped, _SafeWriter):
|
||||||
|
wrapped = _SafeWriter(wrapped)
|
||||||
|
# Still just one layer
|
||||||
|
wrapped.write("test")
|
||||||
|
assert inner.getvalue() == "test"
|
||||||
|
|||||||
Reference in New Issue
Block a user