Compare commits

...

21 Commits

Author SHA1 Message Date
alt-glitch
add160fd1b chore: remove dead _save_oversized_tool_result after merge
Superseded by maybe_persist_tool_result from tools/tool_result_storage.
Function had zero call sites — only its own test suite referenced it.
2026-04-07 01:35:31 +00:00
alt-glitch
51cf4e0bbc fix: address PR review — alias expansion + L3 budget enforcement
- Add `shopt -s expand_aliases` to snapshot so aliases captured by
  `alias -p` actually work under `bash -c` (review comment #2)
- Pass threshold=0 in enforce_turn_budget() so L3 can force-persist
  results below the 50K default when aggregate budget is exceeded
  (review comment #3)
- Add regression test: 6x42K results (each under 50K) exceeding 200K
  budget are now correctly persisted
2026-04-07 01:33:05 +00:00
alt-glitch
72bd14e09d perf(environments): reduce per-command overhead in _before_execute hooks
- Daytona: skip refresh_data() API call unless sandbox was interrupted/errored
- Docker: cache _build_forward_env_args() to avoid re-reading .env every command
- All remote backends: TTL-based sync skip (5s) to avoid redundant dir walks
2026-04-07 01:33:05 +00:00
alt-glitch
2fe8fd8720 docs: replace L2/L3 jargon labels with descriptive comments
Expanded tool_result_storage.py module docstring to document the
three-level architecture. Replaced opaque L2/L3 labels at call
sites with self-describing comments.
2026-04-07 01:33:05 +00:00
alt-glitch
ab35753c52 refactor(managed_modal): replace sentinel-key dicts with _ExecStartResult dataclass 2026-04-07 01:33:05 +00:00
alt-glitch
51bd4aecff fix: update stale _ModalProcessHandle/_DaytonaProcessHandle references in docstrings
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:33:05 +00:00
alt-glitch
bead55bbcb refactor(environments): extract _ThreadedProcessHandle base class
Eliminates ~50 lines of duplicated pipe+thread+poll boilerplate between
_ModalProcessHandle and _DaytonaProcessHandle. Both now use closures
passed to the shared _ThreadedProcessHandle in base.py.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:33:05 +00:00
alt-glitch
5c491734f8 chore: reset uv.lock to main to fix nix build
The lockfile had drifted from main (debugpy, exa-py, version bump)
causing atomicwrites to fail building in the nix sandbox due to
missing setuptools in pyproject-build-systems.
2026-04-07 01:32:47 +00:00
alt-glitch
454edc7771 perf(ssh): add mtime-based caching to file sync
SSH _before_execute() ran rsync unconditionally before every command,
adding ~2.3s overhead even when zero bytes were transferred. This was
80% of per-command latency (actual execution: ~0.6s).

Add (mtime, size) caching — matching the pattern Modal and Daytona
already use — to skip rsync when local files haven't changed:

- Per-file mtime+size check for credential files
- Directory fingerprint (set of relpath/mtime/size tuples) for skills
- --delete flag on skills rsync to prune uninstalled skills
- Track created remote dirs to avoid redundant mkdir -p calls
- Cache invalidation on rsync failure (remote may have been wiped)
- force=True parameter as escape hatch for debugging

Before: ~3s per SSH command (2.3s rsync + 0.6s execution)
After:  ~0.6s per SSH command (mtime check + execution)
SSH test suite: 134s → 50s
2026-04-07 01:32:47 +00:00
alt-glitch
49d1390b40 fix(environments): move CWD tracking from remote file to in-band stdout
Previously, _wrap_command() wrote pwd to a file on the remote (container,
sandbox, SSH host), then _update_cwd_from_file() read it back via another
_run_bash() call. On Modal/Daytona this was a full API round-trip just to
read 20 bytes.

Now the wrapping template echoes the cwd to stdout with markers:
  printf '\n__HERMES_CWD__%s__HERMES_CWD__\n' "$(pwd -P)"

_extract_cwd_from_output() parses it from the output already in memory.
Zero extra round-trips on any backend. The cwdfile, _read_file_in_env(),
and per-backend overrides are all deleted.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:32:47 +00:00
alt-glitch
7046d9b834 feat(agent_loop): add L2+L3 tool result persistence to eval path
- Add _tool_result_storage_dir to HermesAgentLoop.__init__
- Apply maybe_persist_tool_result() before tool message append
- Add enforce_turn_budget() after all tool calls in a turn
- Both wrapped in try/except (best-effort in eval path)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:32:47 +00:00
alt-glitch
acbbdc05d4 feat(run_agent): replace 100K truncation with persist-to-disk (L2+L3)
Layer 2: Replace destructive head-truncation with maybe_persist_tool_result()
in both concurrent and sequential tool execution paths. Large results are
now written to ~/.hermes/sessions/{id}/tool-results/ with a 2KB preview
in context. Model can read_file the persisted path for full content.

Layer 3: Add enforce_turn_budget() after all tool results in a turn.
If aggregate exceeds 200K chars, persist largest results first until
under budget. Runs after concurrent futures.wait() (single-threaded).

Callbacks still receive full untruncated results before persistence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:32:47 +00:00
alt-glitch
9cbfa1309b feat(tools): add per-tool result size thresholds and search_files output cap
Declares max_result_size_chars on each tool registration so the persistence
layer can apply per-tool limits instead of the global 50K default. Adds a
Layer 1 output cap inside search_tool() to prevent context overflow, and
adds a schema maximum of 10000 to the search_files limit parameter.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 01:31:48 +00:00
alt-glitch
3dfce74099 feat(tools): add tool result persistence module + registry support
Add tools/tool_result_storage.py implementing Layer 2 (per-result) and
Layer 3 (per-turn budget) persistence for large tool outputs. Results
exceeding thresholds are written to disk with a <persisted-output>
preview block replacing the inline content. Extend ToolEntry and
ToolRegistry with max_result_size_chars for per-tool threshold control.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:48 +00:00
alt-glitch
431262f9a5 test(daytona): update unit tests for unified execution model
- Update execute tests to account for init_session during __init__
- Fix CWD resolution tests for cwdfile reads
- Patch is_interrupted at base module level (where _wait_for_process uses it)
- Update stdin heredoc test for new call pattern
- 27/27 Daytona unit tests passing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:48 +00:00
alt-glitch
ec43496d5a refactor(environments): delete persistent_shell.py + dead code — Phase 8
- DELETE persistent_shell.py entirely (277 lines removed)
- Remove _SHELL_NOISE_SUBSTRINGS, _clean_shell_noise, _extract_fenced_output
  from local.py (unused after fence marker removal)
- Adapt ManagedModalEnvironment to use BaseEnvironment + _wrap_command()
  while keeping its own HTTP-based execute()
- Remove _OUTPUT_FENCE constant

42/42 tests passing across all testable backends.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:48 +00:00
alt-glitch
ef8a985ee3 refactor(environments): migrate Modal + Daytona to unified model — Phase 5+6
ModalEnvironment:
- Create _ModalProcessHandle adapter (async SDK → ProcessHandle protocol)
- Routes async sandbox.exec through thread + OS pipe for stdout
- Remove BaseModalExecutionEnvironment inheritance, use BaseEnvironment
- Remove _start_modal_exec/_poll_modal_exec/_cancel_modal_exec
- Move file sync to _before_execute hook
- Preserve _AsyncWorker, sandbox lifecycle, snapshot management

DaytonaEnvironment:
- Create _DaytonaProcessHandle adapter (blocking SDK → ProcessHandle)
- Preserves shell timeout wrapper (SDK timeout unreliable)
- Add _run_bash with heredoc stdin embedding
- Move file sync to _before_execute hook
- Preserve sandbox lifecycle, persistent resume logic

ManagedModal left unchanged (HTTP-based, keeps modal_common.py dep).
42/42 local tests passing. SDK backends untested (require credentials).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:41 +00:00
alt-glitch
0482133559 refactor(terminal_tool): remove persistent shell params from factory
- Remove persistent= from LocalEnvironment creation
- Remove persistent= from SSHEnvironment creation
- Container persistent_filesystem params unchanged (different concept)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:34 +00:00
alt-glitch
637bbbee7e refactor(environments): migrate Docker + SSH to unified model — Phase 2+4
DockerEnvironment:
- Add _run_bash/_run_bash_login, extract _build_forward_env_args()
- Remove execute() override with duplicate timeout/interrupt loop
- Remove -w flag (CWD handled by wrapping template)
- Call init_session() after container creation
- 42/42 tests passing on debian:bookworm-slim (21.7s)

SSHEnvironment:
- Remove PersistentShellMixin inheritance entirely
- Remove all IPC methods: _read_temp_files, _kill_shell_children,
  _cleanup_temp_files, _spawn_shell_process, _execute_oneshot
- Add _run_bash/_run_bash_login with shlex.quote for SSH transport
- Override _read_file_in_env with capture_output=True to suppress
  SSH connection warnings (post-quantum key exchange etc.)
- Move _sync_skills_and_credentials to _before_execute hook
- Remove persistent parameter
- 42/42 tests passing on SSH (173s, no more polling overhead)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:34 +00:00
alt-glitch
cd814392ae refactor(environments): migrate SingularityEnvironment to unified model — Phase 3
- Add _run_bash/_run_bash_login, remove execute() override
- Remove --pwd flag (CWD handled by wrapping template)
- Remove is_interrupted import (handled by BaseEnvironment)
- Call init_session() after instance start

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:29 +00:00
alt-glitch
fe2e1c16c6 refactor(environments): unify execution layer — Phase 0+1 (base + local)
Add spawn-per-call execution model to BaseEnvironment:
- ProcessHandle protocol for backend abstraction
- Shell snapshot creation (bash -l once, capture env to file)
- Command wrapping template (source snapshot + cd + eval + pwd tracking)
- Unified _wait_for_process with interrupt/timeout handling
- CWD tracking via cwdfile read after process exit

Migrate LocalEnvironment to unified model:
- Remove PersistentShellMixin inheritance
- Implement _run_bash (Popen + process group kill)
- Remove fence markers, oneshot method, shell noise cleanup
- Session snapshot captures env vars, functions, aliases

New test capabilities validated:
- CWD persists across execute() calls (was 37% manual cd prefix)
- stdin_data piping works
- Exit codes preserved through wrapper
- Single quotes survive eval escaping
- Snapshot fallback works when creation fails

42/42 tests passing on local backend.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 01:31:08 +00:00
25 changed files with 2393 additions and 1618 deletions

View File

@@ -164,6 +164,11 @@ class HermesAgentLoop:
self.max_tokens = max_tokens
self.extra_body = extra_body
# Per-result and per-turn output persistence (see tools/tool_result_storage.py)
from pathlib import Path
self._tool_result_storage_dir = Path(f"/tmp/hermes_tool_results/{self.task_id}")
self._tool_result_storage_dir.mkdir(parents=True, exist_ok=True)
async def run(self, messages: List[Dict[str, Any]]) -> AgentResult:
"""
Execute the full agent loop using standard OpenAI tool calling.
@@ -446,8 +451,18 @@ class HermesAgentLoop:
except (json.JSONDecodeError, TypeError):
pass
# Add tool response to conversation
# Persist oversized results to disk
tc_id = tc.get("id", "") if isinstance(tc, dict) else tc.id
try:
from tools.tool_result_storage import maybe_persist_tool_result
tool_result = maybe_persist_tool_result(
content=tool_result,
tool_name=tool_name,
tool_use_id=tc_id,
storage_dir=self._tool_result_storage_dir,
)
except Exception:
pass # Persistence is best-effort in eval path
messages.append(
{
"role": "tool",
@@ -456,6 +471,17 @@ class HermesAgentLoop:
}
)
# Per-turn aggregate budget enforcement
try:
from tools.tool_result_storage import enforce_turn_budget
num_tcs = len(assistant_msg.tool_calls)
if num_tcs > 0:
turn_msgs = [m for m in messages[-num_tcs * 2:]
if m.get("role") == "tool"]
enforce_turn_budget(turn_msgs, self._tool_result_storage_dir)
except Exception:
pass # Best-effort in eval path
turn_elapsed = _time.monotonic() - turn_start
logger.info(
"[%s] turn %d: api=%.1fs, %d tools, turn_total=%.1fs",

View File

@@ -407,68 +407,6 @@ def _strip_budget_warnings_from_history(messages: list) -> None:
msg["content"] = cleaned
# =========================================================================
# Large tool result handler — save oversized output to temp file
# =========================================================================
# Threshold at which tool results are saved to a file instead of kept inline.
# 100K chars ≈ 25K tokens — generous for any reasonable output but prevents
# catastrophic context explosions.
_LARGE_RESULT_CHARS = 100_000
# How many characters of the original result to include as an inline preview
# so the model has immediate context about what the tool returned.
_LARGE_RESULT_PREVIEW_CHARS = 1_500
def _save_oversized_tool_result(function_name: str, function_result: str) -> str:
"""Replace oversized tool results with a file reference + preview.
When a tool returns more than ``_LARGE_RESULT_CHARS`` characters, the full
content is written to a temporary file under ``HERMES_HOME/cache/tool_responses/``
and the result sent to the model is replaced with:
• a brief head preview (first ``_LARGE_RESULT_PREVIEW_CHARS`` chars)
• the file path so the model can use ``read_file`` / ``search_files``
Falls back to destructive truncation if the file write fails.
"""
original_len = len(function_result)
if original_len <= _LARGE_RESULT_CHARS:
return function_result
# Build the target directory
try:
response_dir = os.path.join(get_hermes_home(), "cache", "tool_responses")
os.makedirs(response_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
# Sanitize tool name for use in filename
safe_name = re.sub(r"[^\w\-]", "_", function_name)[:40]
filename = f"{safe_name}_{timestamp}.txt"
filepath = os.path.join(response_dir, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(function_result)
preview = function_result[:_LARGE_RESULT_PREVIEW_CHARS]
return (
f"{preview}\n\n"
f"[Large tool response: {original_len:,} characters total — "
f"only the first {_LARGE_RESULT_PREVIEW_CHARS:,} shown above. "
f"Full output saved to: {filepath}\n"
f"Use read_file or search_files on that path to access the rest.]"
)
except Exception as exc:
# Fall back to destructive truncation if file write fails
logger.warning("Failed to save large tool result to file: %s", exc)
return (
function_result[:_LARGE_RESULT_CHARS]
+ f"\n\n[Truncated: tool response was {original_len:,} chars, "
f"exceeding the {_LARGE_RESULT_CHARS:,} char limit. "
f"File save failed: {exc}]"
)
class AIAgent:
"""
AI Agent with tool calling capabilities.
@@ -961,6 +899,10 @@ class AIAgent:
short_uuid = uuid.uuid4().hex[:6]
self.session_id = f"{timestamp_str}_{short_uuid}"
# Per-result and per-turn output persistence (see tools/tool_result_storage.py)
from tools.tool_result_storage import get_storage_dir as _get_tool_storage_dir
self._tool_result_storage_dir = _get_tool_storage_dir(self.session_id)
# Session logs go into ~/.hermes/sessions/ alongside gateway sessions
hermes_home = get_hermes_home()
self.logs_dir = hermes_home / "sessions"
@@ -6253,8 +6195,15 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Save oversized results to file instead of destructive truncation
function_result = _save_oversized_tool_result(name, function_result)
# Persist oversized results to disk (model can read_file to access full output)
from tools.tool_result_storage import maybe_persist_tool_result
function_result = maybe_persist_tool_result(
content=function_result,
tool_name=name,
tool_use_id=tc.id,
storage_dir=self._tool_result_storage_dir,
)
# Discover subdirectory context files from tool arguments
subdir_hints = self._subdirectory_hints.check_tool_call(name, args)
@@ -6269,6 +6218,13 @@ class AIAgent:
}
messages.append(tool_msg)
# ── Per-turn aggregate budget enforcement ─────────────────────────
from tools.tool_result_storage import enforce_turn_budget
num_tools = len(parsed_calls)
if num_tools > 0:
turn_tool_msgs = messages[-num_tools:]
enforce_turn_budget(turn_tool_msgs, self._tool_result_storage_dir)
# ── Budget pressure injection ────────────────────────────────────
budget_warning = self._get_budget_warning(api_call_count)
if budget_warning and messages and messages[-1].get("role") == "tool":
@@ -6553,8 +6509,15 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Save oversized results to file instead of destructive truncation
function_result = _save_oversized_tool_result(function_name, function_result)
# Persist oversized results to disk (model can read_file to access full output)
from tools.tool_result_storage import maybe_persist_tool_result
function_result = maybe_persist_tool_result(
content=function_result,
tool_name=function_name,
tool_use_id=tool_call.id,
storage_dir=self._tool_result_storage_dir,
)
# Discover subdirectory context files from tool arguments
subdir_hints = self._subdirectory_hints.check_tool_call(function_name, function_args)
@@ -6592,6 +6555,14 @@ class AIAgent:
if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
time.sleep(self.tool_delay)
# ── Per-turn aggregate budget enforcement ─────────────────────────
from tools.tool_result_storage import enforce_turn_budget as _enforce_budget
num_tools_seq = len(assistant_message.tool_calls)
if num_tools_seq > 0:
turn_tool_msgs_seq = [m for m in messages[-num_tools_seq * 2:]
if m.get("role") == "tool"]
_enforce_budget(turn_tool_msgs_seq, self._tool_result_storage_dir)
# ── Budget pressure injection ─────────────────────────────────
# After all tool calls in this turn are processed, check if we're
# approaching max_iterations. If so, inject a warning into the LAST

View File

@@ -0,0 +1,432 @@
"""
Cross-environment backend compatibility tests.
Derived from analysis of 218 real Hermes agent sessions (590 terminal calls).
Tests the command execution patterns the agent actually uses, against any
environment backend (local, docker, ssh, modal, daytona, singularity).
Usage:
# Local (default)
uv run pytest tests/test_env_backend_compat.py -v
# Docker
TERMINAL_ENV=docker TERMINAL_DOCKER_IMAGE=ubuntu:24.04 \
uv run pytest tests/test_env_backend_compat.py -v
# SSH
TERMINAL_ENV=ssh TERMINAL_SSH_HOST=... TERMINAL_SSH_USER=... \
uv run pytest tests/test_env_backend_compat.py -v
# Modal
TERMINAL_ENV=modal uv run pytest tests/test_env_backend_compat.py -v
"""
import json
import os
import time
import pytest
ENV_TYPE = os.getenv("TERMINAL_ENV", "local")
def _get_env():
"""Create and return an environment backend based on TERMINAL_ENV."""
from tools.terminal_tool import _create_environment
env_type = ENV_TYPE
image = os.getenv("TERMINAL_DOCKER_IMAGE", "ubuntu:24.04")
cwd = os.getenv("TERMINAL_CWD", "/tmp")
timeout = int(os.getenv("TERMINAL_TIMEOUT", "30"))
ssh_config = None
if env_type == "ssh":
ssh_config = {
"host": os.environ["TERMINAL_SSH_HOST"],
"user": os.environ["TERMINAL_SSH_USER"],
"port": int(os.getenv("TERMINAL_SSH_PORT", "22")),
"key": os.getenv("TERMINAL_SSH_KEY", ""),
}
container_config = {
"container_cpu": int(os.getenv("TERMINAL_CPU", "1")),
"container_memory": int(os.getenv("TERMINAL_MEMORY", "2048")),
"container_disk": int(os.getenv("TERMINAL_DISK", "10240")),
"container_persistent": True,
}
return _create_environment(
env_type=env_type,
image=image,
cwd=cwd,
timeout=timeout,
ssh_config=ssh_config,
container_config=container_config,
task_id="test_env_compat",
)
@pytest.fixture(scope="module")
def env():
"""Module-scoped environment — created once, reused across tests."""
e = _get_env()
yield e
if hasattr(e, "cleanup"):
try:
e.cleanup()
except Exception:
pass
def _exec(env, command: str, timeout: int = 30) -> dict:
"""Execute a command and parse the result dict."""
result = env.execute(command, timeout=timeout)
assert isinstance(result, dict), f"Expected dict, got {type(result)}"
return result
def _output(result: dict) -> str:
return result.get("output", "")
def _rc(result: dict) -> int:
return result.get("returncode", result.get("exit_code", -999))
# ---------------------------------------------------------------------------
# Category 1: Basic execution
# From session data: simple single commands are the foundation
# ---------------------------------------------------------------------------
class TestBasicExecution:
def test_echo(self, env):
"""Most basic: can we run a command and get output?"""
r = _exec(env, "echo hello")
assert "hello" in _output(r)
assert _rc(r) == 0
def test_exit_code_success(self, env):
r = _exec(env, "true")
assert _rc(r) == 0
def test_exit_code_failure(self, env):
r = _exec(env, "false")
assert _rc(r) != 0
def test_stderr_captured(self, env):
"""Agent relies on 2>&1 patterns; stderr must be captured."""
r = _exec(env, "echo err >&2")
# stderr may be merged into output or separate — just verify no crash
assert isinstance(_output(r), str)
def test_multiline_output(self, env):
r = _exec(env, "printf 'line1\\nline2\\nline3'")
lines = _output(r).strip().split("\n")
assert len(lines) >= 3
# ---------------------------------------------------------------------------
# Category 2: cd && command chains
# 37% of all terminal commands use this pattern. CRITICAL.
# ---------------------------------------------------------------------------
class TestCdAndChain:
def test_cd_and_command(self, env):
"""cd /tmp && ls — the most common pattern in session data."""
r = _exec(env, "cd /tmp && echo 'in_tmp'")
assert "in_tmp" in _output(r)
def test_chained_and(self, env):
"""Multiple && chains: agent does cd X && source Y && cmd Z."""
r = _exec(env, "echo a && echo b && echo c")
out = _output(r)
assert "a" in out and "b" in out and "c" in out
def test_chained_semicolon(self, env):
"""Semicolon chains: agent uses '; echo "---"' as separators."""
r = _exec(env, "echo first; echo '---'; echo second")
out = _output(r)
assert "first" in out and "---" in out and "second" in out
def test_cd_nonexistent_and_fails(self, env):
"""cd to bad dir && cmd should fail (not run cmd)."""
r = _exec(env, "cd /nonexistent_dir_xyz && echo should_not_see")
assert "should_not_see" not in _output(r)
def test_cwd_persists_across_calls(self, env):
"""CWD now persists via cwdfile tracking (unified execution model).
Previously: 37% of commands needed 'cd X &&' prefix. Now automatic."""
_exec(env, "cd /tmp")
r = _exec(env, "pwd")
assert "/tmp" in _output(r)
# ---------------------------------------------------------------------------
# Category 3: Pipes
# 46% of commands use pipes. Essential.
# ---------------------------------------------------------------------------
class TestPipes:
def test_simple_pipe(self, env):
r = _exec(env, "echo 'hello world' | wc -w")
assert "2" in _output(r)
def test_multi_pipe(self, env):
"""Agent chains: find X | grep Y | head -N"""
r = _exec(env, "echo -e 'a\\nb\\nc\\nd\\ne' | grep -v c | wc -l")
assert "4" in _output(r)
def test_pipe_with_grep(self, env):
"""Common pattern: cmd 2>&1 | grep pattern"""
r = _exec(env, "echo -e 'foo\\nbar\\nbaz' | grep ba")
out = _output(r)
assert "bar" in out and "baz" in out
assert "foo" not in out
# ---------------------------------------------------------------------------
# Category 4: Environment variables and source
# 19% of commands use source. Agent does: source ~/.bashrc && cmd
# ---------------------------------------------------------------------------
class TestEnvAndSource:
def test_inline_env_var(self, env):
r = _exec(env, "MY_VAR=hello && echo $MY_VAR")
assert "hello" in _output(r)
def test_export_and_use(self, env):
r = _exec(env, "export FOO=bar && echo $FOO")
assert "bar" in _output(r)
def test_env_does_not_persist(self, env):
"""Env vars don't persist across execute() calls."""
_exec(env, "export HERMES_TEST_VAR=1234")
r = _exec(env, "echo ${HERMES_TEST_VAR:-unset}")
assert "unset" in _output(r)
def test_source_inline_script(self, env):
"""Agent pattern: write a file, source it, use its vars."""
r = _exec(env, (
"echo 'export TEST_SOURCED=yes' > /tmp/hermes_test_source.sh && "
"source /tmp/hermes_test_source.sh && "
"echo $TEST_SOURCED"
))
assert "yes" in _output(r)
# ---------------------------------------------------------------------------
# Category 5: File I/O via shell
# Agent uses cat, heredoc, find, ls extensively
# ---------------------------------------------------------------------------
class TestFileIO:
def test_write_and_read(self, env):
r = _exec(env, (
"echo 'test content' > /tmp/hermes_test_file.txt && "
"cat /tmp/hermes_test_file.txt"
))
assert "test content" in _output(r)
def test_heredoc_write(self, env):
"""0.5% of commands use heredoc — rare but important for config files."""
r = _exec(env, """cat > /tmp/hermes_heredoc_test.txt << 'EOF'
line one
line two
line three
EOF
cat /tmp/hermes_heredoc_test.txt""")
out = _output(r)
assert "line one" in out and "line three" in out
def test_mkdir_p(self, env):
r = _exec(env, "mkdir -p /tmp/hermes_test_deep/a/b/c && ls /tmp/hermes_test_deep/a/b/")
assert "c" in _output(r)
def test_find(self, env):
r = _exec(env, (
"mkdir -p /tmp/hermes_find_test && "
"touch /tmp/hermes_find_test/a.py /tmp/hermes_find_test/b.txt && "
"find /tmp/hermes_find_test -name '*.py'"
))
assert "a.py" in _output(r)
def test_file_persistence_within_session(self, env):
"""Files written and read in a single execute() call."""
r = _exec(env, (
"echo 'persistent' > /tmp/hermes_persist_test.txt && "
"cat /tmp/hermes_persist_test.txt"
))
assert "persistent" in _output(r)
# ---------------------------------------------------------------------------
# Category 6: Multiline commands
# 6% of commands are multiline. Agent sends literal newlines.
# ---------------------------------------------------------------------------
class TestMultiline:
def test_multiline_script(self, env):
r = _exec(env, """echo "step 1"
echo "step 2"
echo "step 3" """)
out = _output(r)
assert "step 1" in out and "step 3" in out
def test_multiline_with_variable(self, env):
r = _exec(env, """X=42
echo "value is $X" """)
assert "value is 42" in _output(r)
# ---------------------------------------------------------------------------
# Category 7: Timeouts
# 50% of terminal calls specify a timeout. Some go up to 1800s.
# ---------------------------------------------------------------------------
class TestTimeouts:
def test_fast_command_with_timeout(self, env):
r = _exec(env, "echo fast", timeout=5)
assert "fast" in _output(r)
def test_slow_command_timeout(self, env):
"""Command that exceeds timeout should be killed."""
start = time.time()
r = _exec(env, "sleep 60", timeout=3)
elapsed = time.time() - start
# Should return in roughly timeout seconds, not 60
assert elapsed < 15, f"Command took {elapsed}s, should have timed out at ~3s"
# ---------------------------------------------------------------------------
# Category 8: Output handling
# Verifying the contract: {output: str, exit_code/returncode: int, error: ...}
# ---------------------------------------------------------------------------
class TestOutputContract:
def test_result_has_output_key(self, env):
r = _exec(env, "echo test")
assert "output" in r
def test_result_has_returncode(self, env):
r = _exec(env, "echo test")
assert "returncode" in r or "exit_code" in r
def test_large_output_not_truncated_at_execute_level(self, env):
"""The env.execute() should return raw output.
Truncation happens in terminal_tool.py, not in the backend."""
r = _exec(env, "seq 1 5000")
lines = _output(r).strip().split("\n")
# Should get all 5000 lines from the backend itself
assert len(lines) >= 4900, f"Expected ~5000 lines, got {len(lines)}"
def test_binary_output_doesnt_crash(self, env):
"""Agent sometimes runs commands that produce partial binary output."""
r = _exec(env, "echo -e '\\x00\\x01\\x02hello\\x03'")
# Just verify it doesn't crash
assert isinstance(_output(r), str)
# ---------------------------------------------------------------------------
# Category 9: Package/tool availability
# Agent frequently checks for tools before using them
# ---------------------------------------------------------------------------
class TestToolAvailability:
def test_which_pattern(self, env):
"""Agent pattern: which X 2>/dev/null || echo 'not found'"""
r = _exec(env, "which bash 2>/dev/null || echo 'not found'")
out = _output(r)
assert "bash" in out or "not found" in out
def test_python_available(self, env):
"""Agent uses python3 extensively."""
r = _exec(env, "which python3 2>/dev/null && python3 --version || echo 'no python3'")
assert "Python" in _output(r) or "no python3" in _output(r)
def test_git_available(self, env):
"""52 git operations in session data."""
r = _exec(env, "which git 2>/dev/null && git --version || echo 'no git'")
assert "git" in _output(r).lower() or "no git" in _output(r)
# ---------------------------------------------------------------------------
# Category 10: Error handling edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
def test_empty_command(self, env):
"""Empty or whitespace command shouldn't crash."""
try:
r = _exec(env, "")
# May succeed with empty output or fail gracefully
except Exception:
pass # Acceptable to raise
def test_command_not_found(self, env):
r = _exec(env, "nonexistent_command_xyz_123 2>&1")
assert _rc(r) != 0
def test_special_characters_in_output(self, env):
"""Agent processes JSON, YAML, code — special chars must survive."""
r = _exec(env, """echo '{"key": "value", "list": [1,2,3]}'""")
out = _output(r)
assert '"key"' in out
def test_long_command_string(self, env):
"""Agent sends commands up to ~500 chars. Verify no truncation on input."""
long_val = "A" * 500
r = _exec(env, f"echo {long_val} | wc -c")
count = int(_output(r).strip())
assert count >= 500
# ---------------------------------------------------------------------------
# Category 11: Unified execution model — new capabilities
# ---------------------------------------------------------------------------
class TestUnifiedExecution:
def test_cwd_tracking_updates_env(self, env):
"""env.cwd should update after cd command."""
_exec(env, "cd /tmp")
assert env.cwd == "/tmp"
def test_stdin_data(self, env):
"""stdin_data should be piped to the command."""
r = env.execute("cat", stdin_data="hello from stdin\n")
assert "hello from stdin" in _output(r)
def test_snapshot_fallback(self, env):
"""Commands work even when snapshot is missing/broken."""
old_snapshot = env._snapshot_ready
old_path = env._snapshot_path
env._snapshot_ready = False
env._snapshot_path = None
try:
r = _exec(env, "echo still_works")
assert "still_works" in _output(r)
finally:
env._snapshot_ready = old_snapshot
env._snapshot_path = old_path
def test_exit_code_preserved_through_wrapper(self, env):
"""Exit code from the user command should pass through the wrapper."""
r = _exec(env, "exit 42")
assert _rc(r) == 42
def test_single_quotes_in_command(self, env):
"""Commands with single quotes must survive the eval wrapper."""
r = _exec(env, "echo 'it'\\''s a test'")
assert "it's a test" in _output(r)
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True, scope="module")
def cleanup_test_files(env):
"""Clean up test artifacts after all tests."""
yield
try:
env.execute("rm -rf /tmp/hermes_test_* /tmp/hermes_find_test /tmp/hermes_heredoc_test.txt /tmp/hermes_persist_test.txt /tmp/hermes_test_source.sh", timeout=5)
except Exception:
pass

View File

@@ -1,162 +0,0 @@
"""Tests for _save_oversized_tool_result() — the large tool response handler.
When a tool returns more than _LARGE_RESULT_CHARS characters, the full content
is saved to a file and the model receives a preview + file path instead.
"""
import os
import re
import pytest
from run_agent import (
_save_oversized_tool_result,
_LARGE_RESULT_CHARS,
_LARGE_RESULT_PREVIEW_CHARS,
)
class TestSaveOversizedToolResult:
"""Unit tests for the large tool result handler."""
def test_small_result_returned_unchanged(self):
"""Results under the threshold pass through untouched."""
small = "x" * 1000
assert _save_oversized_tool_result("terminal", small) is small
def test_exactly_at_threshold_returned_unchanged(self):
"""Results exactly at the threshold pass through."""
exact = "y" * _LARGE_RESULT_CHARS
assert _save_oversized_tool_result("terminal", exact) is exact
def test_oversized_result_saved_to_file(self, tmp_path, monkeypatch):
"""Results over the threshold are written to a file."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
big = "A" * (_LARGE_RESULT_CHARS + 500)
result = _save_oversized_tool_result("terminal", big)
# Should contain the preview
assert result.startswith("A" * _LARGE_RESULT_PREVIEW_CHARS)
# Should mention the file path
assert "Full output saved to:" in result
# Should mention original size
assert f"{len(big):,}" in result
# Extract the file path and verify the file exists with full content
match = re.search(r"Full output saved to: (.+?)\n", result)
assert match, f"No file path found in result: {result[:300]}"
filepath = match.group(1)
assert os.path.isfile(filepath)
with open(filepath, "r", encoding="utf-8") as f:
saved = f.read()
assert saved == big
assert len(saved) == _LARGE_RESULT_CHARS + 500
def test_file_placed_in_cache_tool_responses(self, tmp_path, monkeypatch):
"""Saved file lives under HERMES_HOME/cache/tool_responses/."""
hermes_home = str(tmp_path / ".hermes")
monkeypatch.setenv("HERMES_HOME", hermes_home)
os.makedirs(hermes_home, exist_ok=True)
big = "B" * (_LARGE_RESULT_CHARS + 1)
result = _save_oversized_tool_result("web_search", big)
match = re.search(r"Full output saved to: (.+?)\n", result)
filepath = match.group(1)
expected_dir = os.path.join(hermes_home, "cache", "tool_responses")
assert filepath.startswith(expected_dir)
def test_filename_contains_tool_name(self, tmp_path, monkeypatch):
"""The saved filename includes a sanitized version of the tool name."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
big = "C" * (_LARGE_RESULT_CHARS + 1)
result = _save_oversized_tool_result("browser_navigate", big)
match = re.search(r"Full output saved to: (.+?)\n", result)
filename = os.path.basename(match.group(1))
assert filename.startswith("browser_navigate_")
assert filename.endswith(".txt")
def test_tool_name_sanitized(self, tmp_path, monkeypatch):
"""Special characters in tool names are replaced in the filename."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
big = "D" * (_LARGE_RESULT_CHARS + 1)
result = _save_oversized_tool_result("mcp:some/weird tool", big)
match = re.search(r"Full output saved to: (.+?)\n", result)
filename = os.path.basename(match.group(1))
# No slashes or colons in filename
assert "/" not in filename
assert ":" not in filename
def test_fallback_on_write_failure(self, tmp_path, monkeypatch):
"""When file write fails, falls back to destructive truncation."""
# Point HERMES_HOME to a path that will fail (file, not directory)
bad_path = str(tmp_path / "not_a_dir.txt")
with open(bad_path, "w") as f:
f.write("I'm a file, not a directory")
monkeypatch.setenv("HERMES_HOME", bad_path)
big = "E" * (_LARGE_RESULT_CHARS + 50_000)
result = _save_oversized_tool_result("terminal", big)
# Should still contain data (fallback truncation)
assert len(result) > 0
assert result.startswith("E" * 1000)
# Should mention the failure
assert "File save failed" in result
# Should be truncated to approximately _LARGE_RESULT_CHARS + error msg
assert len(result) < len(big)
def test_preview_length_capped(self, tmp_path, monkeypatch):
"""The inline preview is capped at _LARGE_RESULT_PREVIEW_CHARS."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
# Use distinct chars so we can measure the preview
big = "Z" * (_LARGE_RESULT_CHARS + 5000)
result = _save_oversized_tool_result("terminal", big)
# The preview section is the content before the "[Large tool response:" marker
marker_pos = result.index("[Large tool response:")
preview_section = result[:marker_pos].rstrip()
assert len(preview_section) == _LARGE_RESULT_PREVIEW_CHARS
def test_guidance_message_mentions_tools(self, tmp_path, monkeypatch):
"""The replacement message tells the model how to access the file."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
big = "F" * (_LARGE_RESULT_CHARS + 1)
result = _save_oversized_tool_result("terminal", big)
assert "read_file" in result
assert "search_files" in result
def test_empty_result_passes_through(self):
"""Empty strings are not oversized."""
assert _save_oversized_tool_result("terminal", "") == ""
def test_unicode_content_preserved(self, tmp_path, monkeypatch):
"""Unicode content is fully preserved in the saved file."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
os.makedirs(tmp_path / ".hermes", exist_ok=True)
# Mix of ASCII and multi-byte unicode to exceed threshold
unit = "Hello 世界! 🎉 " * 100 # ~1400 chars per repeat
big = unit * ((_LARGE_RESULT_CHARS // len(unit)) + 1)
assert len(big) > _LARGE_RESULT_CHARS
result = _save_oversized_tool_result("terminal", big)
match = re.search(r"Full output saved to: (.+?)\n", result)
filepath = match.group(1)
with open(filepath, "r", encoding="utf-8") as f:
saved = f.read()
assert saved == big

View File

@@ -123,14 +123,40 @@ class TestCwdResolution:
assert env.cwd == "/home/testuser"
def test_explicit_cwd_not_overridden(self, make_env):
env = make_env(cwd="/workspace", home_dir="/root")
"""Explicit cwd should be set before init_session.
After init_session(), the cwdfile may update cwd to whatever the
login shell reports. We make the mock return /workspace for the
cwdfile read so init_session doesn't override the explicit cwd.
"""
sb = _make_sandbox()
# Return /workspace for all exec calls including init_session's
# snapshot bootstrap and cwdfile reads
sb.process.exec.return_value = _make_exec_response(result="/workspace")
env = make_env(sandbox=sb, cwd="/workspace", home_dir="/workspace")
assert env.cwd == "/workspace"
def test_home_detection_failure_keeps_default_cwd(self, make_env):
"""When $HOME detection fails, cwd falls back to constructor default.
init_session() still runs but its cwdfile read returns empty,
so cwd is not overwritten.
"""
sb = _make_sandbox()
sb.process.exec.side_effect = RuntimeError("exec failed")
call_count = {"n": 0}
def _exec_side_effect(*args, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
# $HOME detection fails
raise RuntimeError("exec failed")
# All subsequent calls (init_session, cwdfile reads) succeed
# but return empty so they don't override cwd
return _make_exec_response(result="", exit_code=0)
sb.process.exec.side_effect = _exec_side_effect
env = make_env(sandbox=sb)
assert env.cwd == "/home/daytona" # keeps constructor default
assert env.cwd == "/home/daytona"
def test_empty_home_keeps_default_cwd(self, make_env):
env = make_env(home_dir="")
@@ -221,104 +247,107 @@ class TestCleanup:
class TestExecute:
def test_basic_command(self, make_env):
sb = _make_sandbox()
# First call: $HOME detection; subsequent calls: actual commands
sb.process.exec.side_effect = [
_make_exec_response(result="/root"), # $HOME
_make_exec_response(result="hello", exit_code=0), # actual cmd
]
# Calls: $HOME detection, init_session bootstrap, init_session cat,
# _before_execute sandbox refresh, _run_bash command, _update_cwd cat
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb)
# Reset mock to control just the execute() calls
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="hello", exit_code=0)
result = env.execute("echo hello")
assert result["output"] == "hello"
assert "hello" in result["output"]
assert result["returncode"] == 0
def test_command_wrapped_with_shell_timeout(self, make_env):
sb = _make_sandbox()
sb.process.exec.side_effect = [
_make_exec_response(result="/root"),
_make_exec_response(result="ok", exit_code=0),
]
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb, timeout=42)
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="ok", exit_code=0)
env.execute("echo hello")
# The command sent to exec should be wrapped with `timeout N sh -c '...'`
# The command sent to _ThreadedProcessHandle should be wrapped with
# `timeout N bash -c '...'`
call_args = sb.process.exec.call_args_list[-1]
cmd = call_args[0][0]
assert cmd.startswith("timeout 42 sh -c ")
# SDK timeout param should NOT be passed
assert "timeout" not in call_args[1]
assert "timeout 42 bash -c " in cmd
def test_timeout_returns_exit_code_124(self, make_env):
"""Shell timeout utility returns exit code 124."""
sb = _make_sandbox()
sb.process.exec.side_effect = [
_make_exec_response(result="/root"),
_make_exec_response(result="", exit_code=124),
]
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="", exit_code=124)
result = env.execute("sleep 300", timeout=5)
assert result["returncode"] == 124
def test_nonzero_exit_code(self, make_env):
sb = _make_sandbox()
sb.process.exec.side_effect = [
_make_exec_response(result="/root"),
_make_exec_response(result="not found", exit_code=127),
]
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="not found", exit_code=127)
result = env.execute("bad_cmd")
assert result["returncode"] == 127
def test_stdin_data_wraps_heredoc(self, make_env):
sb = _make_sandbox()
sb.process.exec.side_effect = [
_make_exec_response(result="/root"),
_make_exec_response(result="ok", exit_code=0),
]
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="ok", exit_code=0)
env.execute("python3", stdin_data="print('hi')")
# Check that the command passed to exec contains heredoc markers
# (single quotes get shell-escaped by shlex.quote, so check components)
call_args = sb.process.exec.call_args_list[-1]
cmd = call_args[0][0]
assert "HERMES_EOF_" in cmd
# Check that one of the exec calls contains heredoc markers.
# The last call may be the cwdfile read, so check all calls.
all_cmds = [
call_args[0][0]
for call_args in sb.process.exec.call_args_list
]
heredoc_cmd = [c for c in all_cmds if "HERMES_EOF_" in c]
assert heredoc_cmd, f"No heredoc found in exec calls: {all_cmds}"
cmd = heredoc_cmd[0]
assert "print" in cmd
assert "hi" in cmd
def test_custom_cwd_passed_through(self, make_env):
sb = _make_sandbox()
sb.process.exec.side_effect = [
_make_exec_response(result="/root"),
_make_exec_response(result="/tmp", exit_code=0),
]
sb.process.exec.return_value = _make_exec_response(result="/root")
sb.state = "started"
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.return_value = _make_exec_response(result="/tmp", exit_code=0)
env.execute("pwd", cwd="/tmp")
call_kwargs = sb.process.exec.call_args_list[-1][1]
assert call_kwargs["cwd"] == "/tmp"
# In the unified model, cwd is embedded in the _wrap_command output
# and the _ThreadedProcessHandle also passes cwd to the SDK
call_args = sb.process.exec.call_args_list[-1]
cmd = call_args[0][0]
# The wrapped command includes a cd to the cwd
assert "/tmp" in cmd
def test_daytona_error_triggers_retry(self, make_env, daytona_sdk):
def test_daytona_error_returns_error_result(self, make_env, daytona_sdk):
"""In the unified model, SDK errors are caught by _ThreadedProcessHandle
and returned as error results (no automatic retry)."""
sb = _make_sandbox()
sb.state = "started"
sb.process.exec.side_effect = [
_make_exec_response(result="/root"), # $HOME
daytona_sdk.DaytonaError("transient"), # first attempt fails
_make_exec_response(result="ok", exit_code=0), # retry succeeds
]
sb.process.exec.return_value = _make_exec_response(result="/root")
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.side_effect = daytona_sdk.DaytonaError("transient")
result = env.execute("echo retry")
assert result["output"] == "ok"
assert result["returncode"] == 0
assert result["returncode"] == 1
assert "transient" in result["output"]
# ---------------------------------------------------------------------------
@@ -349,51 +378,47 @@ class TestResourceConversion:
# ---------------------------------------------------------------------------
class TestInterrupt:
def test_interrupt_stops_sandbox_and_returns_130(self, make_env, monkeypatch):
def test_interrupt_returns_130(self, make_env, monkeypatch):
"""In the unified model, interrupt is handled by BaseEnvironment._wait_for_process."""
sb = _make_sandbox()
sb.state = "started"
event = threading.Event()
calls = {"n": 0}
sb.process.exec.return_value = _make_exec_response(result="/root")
env = make_env(sandbox=sb)
def exec_side_effect(*args, **kwargs):
calls["n"] += 1
if calls["n"] == 1:
return _make_exec_response(result="/root") # $HOME detection
event.wait(timeout=5) # simulate long-running command
# Make the SDK exec block long enough for the interrupt check to fire
import time as time_mod
def slow_exec(*args, **kwargs):
time_mod.sleep(5)
return _make_exec_response(result="done", exit_code=0)
sb.process.exec.side_effect = exec_side_effect
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.side_effect = slow_exec
# Patch is_interrupted in the base module where _wait_for_process uses it
monkeypatch.setattr(
"tools.environments.daytona.is_interrupted", lambda: True
"tools.environments.base.is_interrupted", lambda: True
)
try:
result = env.execute("sleep 10")
assert result["returncode"] == 130
sb.stop.assert_called()
finally:
event.set()
result = env.execute("sleep 10")
assert result["returncode"] == 130
# ---------------------------------------------------------------------------
# Retry exhaustion
# SDK error handling
# ---------------------------------------------------------------------------
class TestRetryExhausted:
def test_both_attempts_fail(self, make_env, daytona_sdk):
class TestSdkError:
def test_sdk_error_returns_error_result(self, make_env, daytona_sdk):
"""SDK errors in _ThreadedProcessHandle are caught and returned cleanly."""
sb = _make_sandbox()
sb.state = "started"
sb.process.exec.side_effect = [
_make_exec_response(result="/root"), # $HOME
daytona_sdk.DaytonaError("fail1"), # first attempt
daytona_sdk.DaytonaError("fail2"), # retry
]
sb.process.exec.return_value = _make_exec_response(result="/root")
env = make_env(sandbox=sb)
sb.process.exec.reset_mock()
sb.process.exec.side_effect = daytona_sdk.DaytonaError("fail")
result = env.execute("echo x")
assert result["returncode"] == 1
assert "Daytona execution error" in result["output"]
assert "fail" in result["output"]
# ---------------------------------------------------------------------------
@@ -403,12 +428,20 @@ class TestRetryExhausted:
class TestEnsureSandboxReady:
def test_restarts_stopped_sandbox(self, make_env):
env = make_env()
env._needs_refresh = True
env._sandbox.state = "stopped"
env._ensure_sandbox_ready()
env._sandbox.start.assert_called()
def test_no_restart_when_running(self, make_env):
env = make_env()
env._needs_refresh = True
env._sandbox.state = "started"
env._ensure_sandbox_ready()
env._sandbox.start.assert_not_called()
def test_skips_refresh_when_not_needed(self, make_env):
env = make_env()
env._needs_refresh = False
env._ensure_sandbox_ready()
env._sandbox.refresh_data.assert_not_called()

View File

@@ -239,10 +239,14 @@ def _make_execute_only_env(forward_env=None):
env = docker_env.DockerEnvironment.__new__(docker_env.DockerEnvironment)
env.cwd = "/root"
env.timeout = 60
env.env = {}
env._forward_env = forward_env or []
env._env = {}
env._prepare_command = lambda command: (command, None)
env._timeout_result = lambda timeout: {"output": f"timed out after {timeout}", "returncode": 124}
env._snapshot_path = None
env._snapshot_ready = False
env._session_id = ""
env._cached_forward_env_args = None
env._container_id = "test-container"
env._docker_exe = "/usr/bin/docker"
return env

View File

@@ -75,10 +75,32 @@ def _install_fake_tools_package(*, credential_mounts=None):
self.cwd = cwd
self.timeout = timeout
self.env = env or {}
self._snapshot_path = None
self._snapshot_ready = False
self._session_id = ""
def _prepare_command(self, command: str):
return command, None
def _wrap_command(self, command: str, cwd: str) -> str:
"""Simplified wrapper for tests — just returns the command."""
return command
def init_session(self):
pass
def _extract_cwd_from_output(self, result: dict) -> dict:
return result
def _before_execute(self):
pass
def stop(self):
self.cleanup()
def cleanup(self):
pass
sys.modules["tools.environments.base"] = types.SimpleNamespace(BaseEnvironment=_DummyBaseEnvironment)
sys.modules["tools.managed_tool_gateway"] = types.SimpleNamespace(
resolve_managed_tool_gateway=lambda vendor: types.SimpleNamespace(
@@ -107,10 +129,40 @@ class _FakeResponse:
return self._payload
def test_managed_modal_immediate_result_has_no_internal_keys(monkeypatch):
"""Immediate results must not leak _immediate or _handle keys."""
_install_fake_tools_package()
managed_modal = _load_tool_module("tools.environments.managed_modal", "environments/managed_modal.py")
def fake_request(method, url, headers=None, json=None, timeout=None):
if method == "POST" and url.endswith("/v1/sandboxes"):
return _FakeResponse(200, {"id": "sandbox-1"})
if method == "POST" and url.endswith("/execs"):
return _FakeResponse(200, {
"execId": json["execId"],
"status": "completed",
"output": "done",
"returncode": 0,
})
if method == "POST" and url.endswith("/terminate"):
return _FakeResponse(200, {"status": "terminated"})
raise AssertionError(f"Unexpected: {method} {url}")
monkeypatch.setattr(managed_modal.requests, "request", fake_request)
env = managed_modal.ManagedModalEnvironment(image="python:3.11")
result = env.execute("echo done")
assert "_immediate" not in result
assert "_handle" not in result
assert result["output"] == "done"
assert result["returncode"] == 0
env.cleanup()
def test_managed_modal_execute_polls_until_completed(monkeypatch):
_install_fake_tools_package()
managed_modal = _load_tool_module("tools.environments.managed_modal", "environments/managed_modal.py")
modal_common = sys.modules["tools.environments.modal_common"]
# time.sleep / time.monotonic now live in managed_modal (not modal_common)
modal_common = managed_modal
calls = []
poll_count = {"value": 0}
@@ -173,7 +225,8 @@ def test_managed_modal_create_sends_a_stable_idempotency_key(monkeypatch):
def test_managed_modal_execute_cancels_on_interrupt(monkeypatch):
interrupt_event = _install_fake_tools_package()
managed_modal = _load_tool_module("tools.environments.managed_modal", "environments/managed_modal.py")
modal_common = sys.modules["tools.environments.modal_common"]
# time.sleep / time.monotonic now live in managed_modal (not modal_common)
modal_common = managed_modal
calls = []
@@ -215,7 +268,8 @@ def test_managed_modal_execute_cancels_on_interrupt(monkeypatch):
def test_managed_modal_execute_returns_descriptive_error_on_missing_exec(monkeypatch):
_install_fake_tools_package()
managed_modal = _load_tool_module("tools.environments.managed_modal", "environments/managed_modal.py")
modal_common = sys.modules["tools.environments.modal_common"]
# time.sleep / time.monotonic now live in managed_modal (not modal_common)
modal_common = managed_modal
def fake_request(method, url, headers=None, json=None, timeout=None):
if method == "POST" and url.endswith("/v1/sandboxes"):
@@ -293,7 +347,8 @@ def test_managed_modal_rejects_host_credential_passthrough():
def test_managed_modal_execute_times_out_and_cancels(monkeypatch):
_install_fake_tools_package()
managed_modal = _load_tool_module("tools.environments.managed_modal", "environments/managed_modal.py")
modal_common = sys.modules["tools.environments.modal_common"]
# time.sleep / time.monotonic now live in managed_modal (not modal_common)
modal_common = managed_modal
calls = []
monotonic_values = iter([0.0, 12.5])

View File

@@ -0,0 +1,63 @@
"""Unit tests for the _ThreadedProcessHandle adapter in base.py."""
import threading
import pytest
def test_successful_execution():
"""exec_fn returns (output, 0) -> returncode==0 and stdout reads output."""
from tools.environments.base import _ThreadedProcessHandle
handle = _ThreadedProcessHandle(lambda: ("hello output", 0))
handle.wait()
assert handle.returncode == 0
text = handle.stdout.read()
assert text == "hello output"
handle.stdout.close()
def test_nonzero_exit_code():
"""exec_fn returns (output, 42) -> returncode==42."""
from tools.environments.base import _ThreadedProcessHandle
handle = _ThreadedProcessHandle(lambda: ("error output", 42))
handle.wait()
assert handle.returncode == 42
text = handle.stdout.read()
assert text == "error output"
handle.stdout.close()
def test_exception_returns_rc1():
"""exec_fn raises RuntimeError -> returncode==1 and error message in stdout."""
from tools.environments.base import _ThreadedProcessHandle
def failing_fn():
raise RuntimeError("boom")
handle = _ThreadedProcessHandle(failing_fn)
handle.wait()
assert handle.returncode == 1
text = handle.stdout.read()
assert "boom" in text
handle.stdout.close()
def test_poll_returns_none_while_running():
"""poll() returns None before exec_fn completes."""
from tools.environments.base import _ThreadedProcessHandle
barrier = threading.Event()
def blocking_fn():
barrier.wait(timeout=5)
return ("done", 0)
handle = _ThreadedProcessHandle(blocking_fn)
assert handle.poll() is None
barrier.set()
handle.wait()
assert handle.poll() == 0
handle.stdout.read()
handle.stdout.close()

View File

@@ -0,0 +1,422 @@
"""Tests for tools.tool_result_storage — Layer 2 + Layer 3 persistence logic."""
import pytest
from tools.tool_result_storage import (
DEFAULT_MAX_RESULT_SIZE_CHARS,
MAX_TURN_BUDGET_CHARS,
PERSISTED_OUTPUT_CLOSING_TAG,
PERSISTED_OUTPUT_TAG,
PREVIEW_SIZE_CHARS,
PersistedResult,
build_persisted_output_message,
enforce_turn_budget,
generate_preview,
maybe_persist_tool_result,
persist_large_result,
)
# ------------------------------------------------------------------ #
# generate_preview
# ------------------------------------------------------------------ #
class TestGeneratePreview:
def test_short_content_unchanged(self):
"""Content under limit returns as-is, has_more=False."""
text = "hello world"
preview, has_more = generate_preview(text)
assert preview == text
assert has_more is False
def test_truncates_at_newline_boundary(self):
"""Multi-line content truncated at last newline within budget."""
# Build content with lines that exceed the budget
lines = [f"line {i}: " + "x" * 80 for i in range(50)]
content = "\n".join(lines)
assert len(content) > PREVIEW_SIZE_CHARS
preview, has_more = generate_preview(content)
assert has_more is True
assert len(preview) <= PREVIEW_SIZE_CHARS
# Should end at a newline boundary
assert preview.endswith("\n")
def test_single_line_truncates_at_max(self):
"""Single long line truncated at max_bytes exactly."""
content = "x" * 5000 # No newlines
preview, has_more = generate_preview(content, max_chars=100)
assert has_more is True
assert len(preview) == 100
def test_empty_content(self):
"""Empty string returns ('', False)."""
preview, has_more = generate_preview("")
assert preview == ""
assert has_more is False
def test_exact_boundary(self):
"""Content exactly at max_bytes returns as-is."""
content = "x" * PREVIEW_SIZE_CHARS
preview, has_more = generate_preview(content)
assert preview == content
assert has_more is False
def test_newline_only_used_if_past_halfway(self):
"""Newline before halfway mark is ignored; truncation at max_bytes."""
# Newline at position 10 out of 100 — way before halfway (50)
content = "a" * 10 + "\n" + "b" * 200
preview, has_more = generate_preview(content, max_chars=100)
assert has_more is True
# Should NOT truncate at position 10 since it's before halfway
assert len(preview) == 100
# ------------------------------------------------------------------ #
# persist_large_result
# ------------------------------------------------------------------ #
class TestPersistLargeResult:
def test_small_returns_none(self, tmp_path):
"""Content under DEFAULT_MAX_RESULT_SIZE_CHARS returns None."""
content = "small output"
result = persist_large_result(content, "tool_1", tmp_path)
assert result is None
def test_large_writes_file(self, tmp_path):
"""Content over threshold writes file, returns PersistedResult."""
content = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 1)
result = persist_large_result(content, "tool_2", tmp_path)
assert result is not None
assert isinstance(result, PersistedResult)
assert result.tool_use_id == "tool_2"
assert result.original_size == len(content)
assert result.file_path == str(tmp_path / "tool_2.txt")
assert (tmp_path / "tool_2.txt").exists()
def test_dedup_via_exclusive_create(self, tmp_path):
"""Second call with same tool_use_id doesn't crash, returns result."""
content = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 1)
result1 = persist_large_result(content, "tool_dup", tmp_path)
result2 = persist_large_result(content, "tool_dup", tmp_path)
assert result1 is not None
assert result2 is not None
# Both return valid PersistedResult
assert result1.tool_use_id == result2.tool_use_id
def test_file_contains_full_content(self, tmp_path):
"""Verify the written file has the complete original content."""
content = "line1\nline2\nline3\n" * 10000 # Well over threshold
result = persist_large_result(content, "tool_full", tmp_path)
assert result is not None
on_disk = (tmp_path / "tool_full.txt").read_text(encoding="utf-8")
assert on_disk == content
def test_exactly_at_threshold_returns_none(self, tmp_path):
"""Content exactly at DEFAULT_MAX_RESULT_SIZE_CHARS is not persisted."""
content = "x" * DEFAULT_MAX_RESULT_SIZE_CHARS
result = persist_large_result(content, "tool_exact", tmp_path)
assert result is None
# ------------------------------------------------------------------ #
# build_persisted_output_message
# ------------------------------------------------------------------ #
class TestBuildPersistedOutputMessage:
@pytest.fixture
def sample_result(self):
return PersistedResult(
tool_use_id="test_id",
original_size=100_000,
file_path="/tmp/test_id.txt",
preview="first line\nsecond line\n",
has_more=True,
)
def test_contains_file_path(self, sample_result):
msg = build_persisted_output_message(sample_result)
assert sample_result.file_path in msg
def test_contains_preview(self, sample_result):
msg = build_persisted_output_message(sample_result)
assert "first line\nsecond line\n" in msg
def test_contains_size_info(self, sample_result):
msg = build_persisted_output_message(sample_result)
assert "100,000 characters" in msg
assert "97.7 KB" in msg
def test_contains_tags(self, sample_result):
msg = build_persisted_output_message(sample_result)
assert msg.startswith(PERSISTED_OUTPUT_TAG)
assert msg.endswith(PERSISTED_OUTPUT_CLOSING_TAG)
def test_has_more_false_no_ellipsis(self):
result = PersistedResult(
tool_use_id="t",
original_size=60_000,
file_path="/tmp/t.txt",
preview="all content",
has_more=False,
)
msg = build_persisted_output_message(result)
assert "\n..." not in msg
def test_has_more_true_shows_ellipsis(self, sample_result):
msg = build_persisted_output_message(sample_result)
assert "\n..." in msg
def test_large_mb_size(self):
result = PersistedResult(
tool_use_id="big",
original_size=2_000_000,
file_path="/tmp/big.txt",
preview="preview",
has_more=True,
)
msg = build_persisted_output_message(result)
assert "MB" in msg
# ------------------------------------------------------------------ #
# maybe_persist_tool_result
# ------------------------------------------------------------------ #
class TestMaybePersistToolResult:
def test_small_passes_through(self, tmp_path, monkeypatch):
"""Under threshold, returns original content."""
monkeypatch.setattr(
"tools.registry.registry.get_max_result_size",
lambda name: DEFAULT_MAX_RESULT_SIZE_CHARS,
)
content = "small output"
result = maybe_persist_tool_result(content, "test_tool", "id_1", tmp_path)
assert result == content
def test_large_returns_persisted_block(self, tmp_path, monkeypatch):
"""Over threshold, returns <persisted-output> block."""
monkeypatch.setattr(
"tools.registry.registry.get_max_result_size",
lambda name: DEFAULT_MAX_RESULT_SIZE_CHARS,
)
content = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 1)
result = maybe_persist_tool_result(content, "test_tool", "id_2", tmp_path)
assert PERSISTED_OUTPUT_TAG in result
assert PERSISTED_OUTPUT_CLOSING_TAG in result
# File written
assert (tmp_path / "id_2.txt").exists()
def test_read_file_never_persisted(self, tmp_path, monkeypatch):
"""read_file with inf threshold always passes through."""
monkeypatch.setattr(
"tools.registry.registry.get_max_result_size",
lambda name: float('inf'),
)
content = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS * 3)
result = maybe_persist_tool_result(content, "read_file", "id_3", tmp_path)
assert result == content # Unchanged
def test_unknown_tool_uses_default(self, tmp_path, monkeypatch):
"""Unregistered tool name uses 50K default."""
monkeypatch.setattr(
"tools.registry.registry.get_max_result_size",
lambda name: DEFAULT_MAX_RESULT_SIZE_CHARS,
)
# Under default: passes through
content_under = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS - 1)
result = maybe_persist_tool_result(content_under, "no_such_tool", "id_4", tmp_path)
assert result == content_under
# Over default: persisted
content_over = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 1)
result = maybe_persist_tool_result(content_over, "no_such_tool", "id_5", tmp_path)
assert PERSISTED_OUTPUT_TAG in result
def test_custom_threshold_via_registry(self, tmp_path, monkeypatch):
"""Tool with custom lower threshold persists sooner."""
custom_limit = 1000
monkeypatch.setattr(
"tools.registry.registry.get_max_result_size",
lambda name: custom_limit,
)
content = "x" * (custom_limit + 1)
result = maybe_persist_tool_result(content, "small_tool", "id_6", tmp_path)
# Content exceeds the per-tool threshold (1001 > 1000) so it should
# be persisted even though it's well below the 50K default.
assert PERSISTED_OUTPUT_TAG in result
assert (tmp_path / "id_6.txt").exists()
# ------------------------------------------------------------------ #
# enforce_turn_budget
# ------------------------------------------------------------------ #
class TestEnforceTurnBudget:
def test_under_budget_no_changes(self, tmp_path):
"""All messages fit, nothing changed."""
messages = [
{"content": "short result", "tool_call_id": "t1"},
{"content": "another short", "tool_call_id": "t2"},
]
result = enforce_turn_budget(messages, tmp_path)
assert result[0]["content"] == "short result"
assert result[1]["content"] == "another short"
def test_over_budget_persists_largest(self, tmp_path):
"""Total > 200K, largest result gets persisted first."""
small = "s" * 50_001
large = "L" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 100_001) # 150K+
messages = [
{"content": small, "tool_call_id": "small_1"},
{"content": large, "tool_call_id": "large_1"},
]
total_before = len(small) + len(large)
assert total_before > MAX_TURN_BUDGET_CHARS
result = enforce_turn_budget(messages, tmp_path)
# The large one should be persisted
assert PERSISTED_OUTPUT_TAG in result[1]["content"]
# The small one should be unchanged
assert result[0]["content"] == small
# File written
assert (tmp_path / "large_1.txt").exists()
def test_already_persisted_skipped(self, tmp_path):
"""Messages with <persisted-output> not re-persisted."""
already = f"{PERSISTED_OUTPUT_TAG}\nalready persisted\n{PERSISTED_OUTPUT_CLOSING_TAG}"
large = "x" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 1)
messages = [
{"content": already, "tool_call_id": "p1"},
{"content": large, "tool_call_id": "new_1"},
]
result = enforce_turn_budget(messages, tmp_path, budget=100)
# already-persisted one is untouched (same object)
assert result[0]["content"] == already
# new large one gets persisted
assert PERSISTED_OUTPUT_TAG in result[1]["content"]
assert result[1]["content"] != already # Different from the first
def test_parallel_80k_results(self, tmp_path):
"""5 messages each 80K = 400K total, should persist enough to get under 200K."""
messages = [
{"content": "x" * 80_000, "tool_call_id": f"par_{i}"}
for i in range(5)
]
total_before = sum(len(m["content"]) for m in messages)
assert total_before == 400_000
assert total_before > MAX_TURN_BUDGET_CHARS
result = enforce_turn_budget(messages, tmp_path)
# Count how many were persisted vs kept inline
persisted_count = sum(
1 for m in result if PERSISTED_OUTPUT_TAG in m["content"]
)
inline_count = 5 - persisted_count
# At least some must be persisted to get under budget.
# Each 80K result is > 50K threshold so persist_large_result will work.
assert persisted_count >= 1
# Total should now be under budget (or close — replacement text adds some)
total_after = sum(len(m["content"]) for m in result)
assert total_after < total_before # Definitely reduced
def test_empty_messages(self, tmp_path):
"""Empty list returns empty list."""
result = enforce_turn_budget([], tmp_path)
assert result == []
def test_medium_results_under_default_threshold_still_persisted(self, tmp_path):
"""Multiple 42K results (under 50K default) exceed 200K budget.
Regression test: L3 must force-persist even when individual results
are below DEFAULT_MAX_RESULT_SIZE_CHARS. Without threshold=0 in the
enforce_turn_budget call, these would all be skipped.
"""
# 6 x 42K = 252K > 200K budget, but each is under 50K default
messages = [
{"content": "x" * 42_000, "tool_call_id": f"med_{i}"}
for i in range(6)
]
total_before = sum(len(m["content"]) for m in messages)
assert total_before == 252_000
assert total_before > MAX_TURN_BUDGET_CHARS
# Each individual result is under the default 50K threshold
assert all(len(m["content"]) < DEFAULT_MAX_RESULT_SIZE_CHARS for m in messages)
result = enforce_turn_budget(messages, tmp_path)
persisted_count = sum(
1 for m in result if PERSISTED_OUTPUT_TAG in m["content"]
)
# At least one must be persisted to bring total under budget
assert persisted_count >= 1
total_after = sum(len(m["content"]) for m in result)
assert total_after < total_before
def test_budget_parameter_respected(self, tmp_path):
"""Custom budget parameter is used instead of default."""
# Two messages each 100 chars, budget=150 should trigger persistence
messages = [
{"content": "a" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 100), "tool_call_id": "b1"},
{"content": "b" * (DEFAULT_MAX_RESULT_SIZE_CHARS + 100), "tool_call_id": "b2"},
]
result = enforce_turn_budget(messages, tmp_path, budget=50_000)
# At least one should be persisted
persisted_count = sum(
1 for m in result if PERSISTED_OUTPUT_TAG in m["content"]
)
assert persisted_count >= 1
# ------------------------------------------------------------------ #
# Registry integration: get_max_result_size
# ------------------------------------------------------------------ #
class TestRegistryGetMaxResultSize:
def test_default_for_unknown_tool(self):
"""Unregistered tool returns DEFAULT_MAX_RESULT_SIZE_CHARS."""
from tools.registry import ToolRegistry
reg = ToolRegistry()
assert reg.get_max_result_size("nonexistent") == DEFAULT_MAX_RESULT_SIZE_CHARS
def test_custom_threshold(self):
"""Tool registered with max_result_size_chars returns that value."""
from tools.registry import ToolRegistry
reg = ToolRegistry()
reg.register(
name="custom_tool",
toolset="test",
schema={"description": "test"},
handler=lambda args: "ok",
max_result_size_chars=10_000,
)
assert reg.get_max_result_size("custom_tool") == 10_000
def test_inf_threshold(self):
"""Tool with inf threshold returns inf."""
from tools.registry import ToolRegistry
reg = ToolRegistry()
reg.register(
name="read_file",
toolset="test",
schema={"description": "test"},
handler=lambda args: "ok",
max_result_size_chars=float('inf'),
)
assert reg.get_max_result_size("read_file") == float('inf')
def test_none_falls_back_to_default(self):
"""Tool registered without max_result_size_chars uses default."""
from tools.registry import ToolRegistry
reg = ToolRegistry()
reg.register(
name="plain_tool",
toolset="test",
schema={"description": "test"},
handler=lambda args: "ok",
)
assert reg.get_max_result_size("plain_tool") == DEFAULT_MAX_RESULT_SIZE_CHARS

View File

@@ -1344,4 +1344,5 @@ registry.register(
enabled_tools=kw.get("enabled_tools")),
check_fn=check_sandbox_requirements,
emoji="🐍",
max_result_size_chars=30_000,
)

View File

@@ -1,11 +1,28 @@
"""Base class for all Hermes execution environment backends."""
from abc import ABC, abstractmethod
import logging
import os
import subprocess
import shlex
import threading
import time
import uuid
from pathlib import Path
from typing import Protocol, runtime_checkable
from hermes_constants import get_hermes_home
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
# Marker echoed to stdout by the wrapping template so the local Hermes
# process can extract the remote shell's cwd without a separate round-trip.
_CWD_MARKER = "__HERMES_CWD__"
# Min seconds between file-sync checks in _before_execute hooks.
# Remote backends (SSH, Modal, Daytona) skip re-walking the skills
# directory and re-statting credential files within this window.
_SYNC_INTERVAL_SECONDS: float = 5.0
def get_sandbox_dir() -> Path:
@@ -23,29 +40,388 @@ def get_sandbox_dir() -> Path:
return p
@runtime_checkable
class ProcessHandle(Protocol):
"""Duck type for anything _run_bash returns.
subprocess.Popen satisfies this natively. SDK backends (Modal, Daytona)
return small adapters that wrap async/blocking calls in a thread + OS pipe.
"""
def poll(self) -> int | None: ...
def kill(self) -> None: ...
def wait(self, timeout: float | None = None) -> int: ...
@property
def stdout(self): ... # readable, iterable-of-str (for drain thread)
@property
def returncode(self) -> int | None: ...
class _ThreadedProcessHandle:
"""ProcessHandle adapter for SDK backends that run in a background thread."""
def __init__(self, exec_fn):
self._done = threading.Event()
self._returncode = None
self._read_fd, self._write_fd = os.pipe()
self.stdout = os.fdopen(self._read_fd, "r")
self.stdin = None
def _run():
# Open the write end exactly once to avoid double-close races.
writer = os.fdopen(self._write_fd, "w")
try:
output, exit_code = exec_fn()
writer.write(output)
self._returncode = exit_code
except Exception as e:
try:
writer.write(str(e))
except Exception:
pass
self._returncode = 1
finally:
try:
writer.close()
except Exception:
pass
self._done.set()
self._thread = threading.Thread(target=_run, daemon=True)
self._thread.start()
def poll(self):
return self._returncode if self._done.is_set() else None
def kill(self):
pass
def wait(self, timeout=None):
self._done.wait(timeout=timeout)
return self._returncode
@property
def returncode(self):
return self._returncode
class BaseEnvironment(ABC):
"""Common interface for all Hermes execution backends.
Subclasses implement execute() and cleanup(). Shared helpers eliminate
duplicated subprocess boilerplate across backends.
**Unified execution model (spawn-per-call):**
Backends implement ``_run_bash()`` — the ONLY thing that differs per
backend. Everything else (command wrapping, CWD tracking, snapshot
management, timeout/interrupt handling, output collection) lives here.
Backends that cannot return a ProcessHandle (e.g. HTTP-based
ManagedModal) may override ``execute()`` directly and use
``_wrap_command()`` for command shaping only.
"""
def __init__(self, cwd: str, timeout: int, env: dict = None):
self.cwd = cwd
self.timeout = timeout
self.env = env or {}
self._snapshot_path: str | None = None
self._snapshot_ready: bool = False
self._session_id: str = ""
# ------------------------------------------------------------------
# Abstract — the ONLY thing backends implement
# ------------------------------------------------------------------
def _run_bash(self, cmd_string: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> ProcessHandle:
"""Spawn ``bash -c <cmd_string>`` in the backend.
Returns a ProcessHandle (subprocess.Popen or equivalent adapter).
The caller owns polling, timeout, output collection, and cleanup.
*timeout* is the effective per-command timeout. Backends that use
SDK-level or shell-level timeouts (Modal, Daytona) should forward
this value. Backends where timeout is enforced by
``_wait_for_process`` (local, docker, ssh, singularity) may ignore it.
If *stdin_data* is provided, write it to the process's stdin and
close. Backends that cannot pipe stdin (Modal, Daytona) must embed
it via heredoc in *cmd_string* before calling their SDK.
Subclasses MUST override this. The base implementation raises
NotImplementedError (not declared abstract so legacy backends that
still override execute() directly can be instantiated during migration).
"""
raise NotImplementedError(
f"{type(self).__name__} must implement _run_bash()"
)
def cleanup(self):
"""Release backend resources (container, instance, connection).
Subclasses should override. Base implementation cleans up snapshot
and cwdfile if they exist.
"""
pass
# ------------------------------------------------------------------
# Snapshot — login-shell env capture (called once at session init)
# ------------------------------------------------------------------
def _run_bash_login(self, cmd_string: str, *,
timeout: int | None = None) -> ProcessHandle:
"""Spawn ``bash -l -c <cmd_string>`` for snapshot creation.
Defaults to ``_run_bash`` — backends override this when the login
flag needs different handling (e.g. local adds ``-l`` to Popen args).
"""
return self._run_bash(cmd_string, timeout=timeout)
_snapshot_timeout: int = 15
def init_session(self):
"""Capture the login-shell environment into a snapshot file.
Called once after ``__init__`` completes. If it fails, commands
still work — they just don't get env restoration.
"""
self._session_id = uuid.uuid4().hex[:12]
self._snapshot_path = f"/tmp/hermes-snap-{self._session_id}.sh"
bootstrap = (
f"set +e\n"
f"export -p > {self._snapshot_path}\n"
f"if type declare >/dev/null 2>&1; then "
f"declare -f >> {self._snapshot_path} 2>/dev/null; fi\n"
f"alias -p >> {self._snapshot_path} 2>/dev/null || true\n"
f"echo 'shopt -s expand_aliases' >> {self._snapshot_path}\n"
f"echo 'set +e' >> {self._snapshot_path}\n"
f"echo 'set +u' >> {self._snapshot_path}\n"
f"printf '{_CWD_MARKER}%s{_CWD_MARKER}' \"$(pwd -P)\"\n"
)
result = {}
try:
proc = self._run_bash_login(bootstrap, timeout=self._snapshot_timeout)
result = self._wait_for_process(proc, timeout=self._snapshot_timeout)
if result["returncode"] == 0:
self._snapshot_ready = True
logger.info(
"Snapshot created (session=%s)", self._session_id,
)
else:
logger.warning(
"Snapshot creation failed (rc=%d), commands will "
"run without env restoration", result["returncode"],
)
except Exception as e:
logger.warning("Snapshot creation failed: %s", e)
self._extract_cwd_from_output(result)
# ------------------------------------------------------------------
# Command wrapping
# ------------------------------------------------------------------
@staticmethod
def _embed_stdin_heredoc(cmd_string: str, stdin_data: str) -> str:
"""Wrap *stdin_data* as a shell heredoc appended to *cmd_string*.
Used by backends that cannot pipe stdin (Modal, Daytona, ManagedModal).
"""
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
return f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}"
def _resolve_tilde(self, path: str) -> str:
"""Expand ``~`` to the actual home directory path.
Remote backends (SSH, Daytona) set ``_remote_home`` during init;
local uses ``os.path.expanduser``. Tilde must be resolved before
``shlex.quote`` since single-quoting prevents shell tilde expansion.
"""
if not path or not path.startswith("~"):
return path
home = getattr(self, "_remote_home", None) or os.path.expanduser("~")
if path == "~":
return home
if path.startswith("~/"):
return home + path[1:]
return path # ~otheruser — leave for shell to handle
def _wrap_command(self, command: str, cwd: str) -> str:
"""Wrap a user command with snapshot sourcing and CWD tracking.
Returns a bash script string.
"""
parts: list[str] = []
# 1. Source snapshot (if available)
if self._snapshot_ready and self._snapshot_path:
parts.append(
f"source {self._snapshot_path} 2>/dev/null || true"
)
# 2. cd to working directory (resolve ~ before quoting)
work_dir = cwd or self.cwd
if work_dir:
work_dir = self._resolve_tilde(work_dir)
parts.append(f"cd {shlex.quote(work_dir)} || exit 1")
# 3. The actual command (eval to handle complex shell syntax)
escaped = command.replace("'", "'\\''")
parts.append(f"eval '{escaped}'")
# 4. Capture exit code, record CWD
parts.append("__hermes_ec=$?")
parts.append(
f"printf '\\n{_CWD_MARKER}%s{_CWD_MARKER}\\n' \"$(pwd -P)\""
)
parts.append("exit $__hermes_ec")
return "\n".join(parts)
# ------------------------------------------------------------------
# Unified execute()
# ------------------------------------------------------------------
@abstractmethod
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
"""Execute a command, return {"output": str, "returncode": int}."""
...
"""Execute a command, return ``{"output": str, "returncode": int}``."""
self._before_execute()
@abstractmethod
def cleanup(self):
"""Release backend resources (container, instance, connection)."""
...
exec_command, sudo_stdin = self._prepare_command(command)
# Merge sudo stdin with caller stdin
effective_stdin: str | None = None
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
wrapped = self._wrap_command(exec_command, cwd)
effective_timeout = timeout or self.timeout
proc = self._run_bash(wrapped, timeout=effective_timeout,
stdin_data=effective_stdin)
result = self._wait_for_process(proc, timeout=effective_timeout)
self._extract_cwd_from_output(result)
return result
# ------------------------------------------------------------------
# Process lifecycle (shared — not overridden except _kill_process)
# ------------------------------------------------------------------
def _wait_for_process(self, proc: ProcessHandle,
timeout: int) -> dict:
"""Poll process with interrupt checking, drain stdout, enforce timeout."""
output_chunks: list[str] = []
def _drain():
try:
for line in proc.stdout:
output_chunks.append(line)
except (ValueError, OSError):
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = time.monotonic() + timeout
try:
while proc.poll() is None:
if is_interrupted():
self._kill_process(proc)
reader.join(timeout=2)
partial = "".join(output_chunks)
return {
"output": partial + "\n[Command interrupted]",
"returncode": 130,
}
if time.monotonic() > deadline:
self._kill_process(proc)
reader.join(timeout=2)
partial = "".join(output_chunks)
msg = f"\n[Command timed out after {timeout}s]"
return {
"output": (partial + msg) if partial else msg.lstrip(),
"returncode": 124,
}
time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(output_chunks), "returncode": proc.returncode}
finally:
# Close the stdout pipe to prevent FD leaks, especially for
# SDK-backed handles (Modal, Daytona) that use os.pipe().
try:
proc.stdout.close()
except Exception:
pass
def _kill_process(self, proc: ProcessHandle):
"""Kill a process. Backends may override for process-group kill."""
try:
if hasattr(proc, "terminate"):
proc.terminate()
try:
proc.wait(timeout=1.0)
return
except Exception:
pass
proc.kill()
except (ProcessLookupError, PermissionError, OSError):
pass
# ------------------------------------------------------------------
# CWD tracking
# ------------------------------------------------------------------
def _extract_cwd_from_output(self, result: dict) -> dict:
"""Parse CWD marker from command output, update self.cwd, strip marker.
The wrapping template echoes ``__HERMES_CWD__/path__HERMES_CWD__``
to stdout. This method extracts the path, updates ``self.cwd``,
and removes the marker from the output so the caller sees clean output.
"""
output = result.get("output", "")
ml = len(_CWD_MARKER)
# Find the last pair: look for the second-to-last marker (open),
# then the last marker (close).
close = output.rfind(_CWD_MARKER)
if close == -1:
return result
open_ = output.rfind(_CWD_MARKER, 0, close)
if open_ == -1:
return result
cwd = output[open_ + ml:close].strip()
if cwd:
self.cwd = cwd
# Strip the marker and surrounding whitespace from output
before = output[:open_].rstrip("\n")
after = output[close + ml:].lstrip("\n")
result["output"] = (before + after) if after else before
return result
# ------------------------------------------------------------------
# Hooks for subclasses
# ------------------------------------------------------------------
def _before_execute(self):
"""Hook for pre-execution sync (SSH rsync, Modal file push, etc.)."""
pass
# ------------------------------------------------------------------
# Compat
# ------------------------------------------------------------------
def stop(self):
"""Alias for cleanup (compat with older callers)."""
@@ -58,55 +434,10 @@ class BaseEnvironment(ABC):
pass
# ------------------------------------------------------------------
# Shared helpers (eliminate duplication across backends)
# Shared helpers
# ------------------------------------------------------------------
def _prepare_command(self, command: str) -> tuple[str, str | None]:
"""Transform sudo commands if SUDO_PASSWORD is available.
Returns:
(transformed_command, sudo_stdin) — see _transform_sudo_command
for the full contract. Callers that drive a subprocess directly
should prepend sudo_stdin (when not None) to any stdin_data they
pass to Popen. Callers that embed stdin via heredoc (modal,
daytona) handle sudo_stdin in their own execute() method.
"""
"""Transform sudo commands if SUDO_PASSWORD is available."""
from tools.terminal_tool import _transform_sudo_command
return _transform_sudo_command(command)
def _build_run_kwargs(self, timeout: int | None,
stdin_data: str | None = None) -> dict:
"""Build common subprocess.run kwargs for non-interactive execution."""
kw = {
"text": True,
"timeout": timeout or self.timeout,
"encoding": "utf-8",
"errors": "replace",
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
}
if stdin_data is not None:
kw["input"] = stdin_data
else:
kw["stdin"] = subprocess.DEVNULL
return kw
def execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
"""Execute a command bypassing any persistent shell.
Safe for concurrent use alongside a long-running execute() call.
Backends that maintain a persistent shell (SSH, Local) override this
to route through their oneshot path, avoiding the shell lock.
Non-persistent backends delegate to execute().
"""
return self.execute(command, cwd=cwd, timeout=timeout,
stdin_data=stdin_data)
def _timeout_result(self, timeout: int | None) -> dict:
"""Standard return dict when a command times out."""
return {
"output": f"Command timed out after {timeout or self.timeout}s",
"returncode": 124,
}

View File

@@ -6,19 +6,20 @@ and resumed on next creation, preserving the filesystem across sessions.
"""
import logging
import time
import math
import shlex
import threading
import uuid
import time as _time
import warnings
from typing import Optional
from pathlib import Path
from typing import Dict, Optional
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
_SYNC_INTERVAL_SECONDS = 5.0
class DaytonaEnvironment(BaseEnvironment):
"""Daytona cloud sandbox execution backend.
@@ -27,6 +28,8 @@ class DaytonaEnvironment(BaseEnvironment):
instead of snapshots, making it faster and stateless on the host.
"""
_snapshot_timeout = 60 # Daytona sandbox startup can be slow
def __init__(
self,
image: str,
@@ -113,6 +116,10 @@ class DaytonaEnvironment(BaseEnvironment):
logger.info("Daytona: created sandbox %s for task %s",
self._sandbox.id, task_id)
# The sandbox is freshly created/started — no need to refresh yet.
self._needs_refresh = False
self._last_sync_time: float = 0.0
# Detect remote home dir first so mounts go to the right place.
self._remote_home = "/root"
try:
@@ -132,6 +139,13 @@ class DaytonaEnvironment(BaseEnvironment):
# Upload credential files and skills directory into the sandbox.
self._sync_skills_and_credentials()
# Capture login-shell environment into a snapshot for the unified model
self.init_session()
# ------------------------------------------------------------------
# File sync
# ------------------------------------------------------------------
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool:
"""Upload a file if its mtime/size changed since last sync."""
hp = Path(host_path)
@@ -171,121 +185,64 @@ class DaytonaEnvironment(BaseEnvironment):
def _ensure_sandbox_ready(self):
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
if not self._needs_refresh:
return
self._sandbox.refresh_data()
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
self._sandbox.start()
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
self._needs_refresh = False
def _exec_in_thread(self, exec_command: str, cwd: Optional[str], timeout: int) -> dict:
"""Run exec in a background thread with interrupt polling.
# ------------------------------------------------------------------
# Unified execution hooks
# ------------------------------------------------------------------
The Daytona SDK's exec(timeout=...) parameter is unreliable (the
server-side timeout is not enforced and the SDK has no client-side
fallback), so we wrap the command with the shell ``timeout`` utility
which reliably kills the process and returns exit code 124.
"""
# Wrap with shell `timeout` to enforce the deadline reliably.
# Add a small buffer so the shell timeout fires before any SDK-level
# timeout would, giving us a clean exit code 124.
timed_command = f"timeout {timeout} sh -c {shlex.quote(exec_command)}"
result_holder: dict = {"value": None, "error": None}
def _run():
try:
response = self._sandbox.process.exec(
timed_command, cwd=cwd,
)
result_holder["value"] = {
"output": response.result or "",
"returncode": response.exit_code,
}
except Exception as e:
result_holder["error"] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
# Wait for timeout + generous buffer for network/SDK overhead
deadline = time.monotonic() + timeout + 10
while t.is_alive():
t.join(timeout=0.2)
if is_interrupted():
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return {
"output": "[Command interrupted - Daytona sandbox stopped]",
"returncode": 130,
}
if time.monotonic() > deadline:
# Shell timeout didn't fire and SDK is hung — force stop
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return self._timeout_result(timeout)
if result_holder["error"]:
return {"error": result_holder["error"]}
return result_holder["value"]
def execute(self, command: str, cwd: str = "", *,
timeout: Optional[int] = None,
stdin_data: Optional[str] = None) -> dict:
def _before_execute(self) -> None:
"""Ensure sandbox is ready and sync credentials before each command."""
with self._lock:
self._ensure_sandbox_ready()
# Incremental sync before each command so mid-session credential
# refreshes and skill updates are picked up.
self._sync_skills_and_credentials()
now = _time.monotonic()
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
self._sync_skills_and_credentials()
self._last_sync_time = now
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command, sudo_stdin = self._prepare_command(command)
# Daytona sandboxes execute commands via the Daytona SDK and cannot
# pipe subprocess stdin directly the way a local Popen can. When a
# sudo password is present, use a shell-level pipe from printf so that
# the password feeds sudo -S without appearing as an echo argument
# embedded in the shell string. The password is still visible in the
# remote sandbox's command line, but it is not exposed on the user's
# local machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
effective_cwd = cwd or self.cwd or None
def _run_bash(self, cmd_string: str, *, timeout: int | None = None,
stdin_data: str | None = None):
"""Spawn ``bash -c <cmd_string>`` inside the Daytona sandbox."""
effective_timeout = timeout or self.timeout
if stdin_data is not None:
cmd_string = self._embed_stdin_heredoc(cmd_string, stdin_data)
return self._daytona_exec(cmd_string, effective_timeout, login=False)
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
def _run_bash_login(self, cmd_string: str, *,
timeout: int | None = None):
"""Spawn ``bash -l -c <cmd_string>`` for snapshot creation."""
effective_timeout = timeout or self._snapshot_timeout
return self._daytona_exec(cmd_string, effective_timeout, login=True)
if "error" in result:
from daytona import DaytonaError
err = result["error"]
if isinstance(err, DaytonaError):
with self._lock:
try:
self._ensure_sandbox_ready()
except Exception:
return {"output": f"Daytona execution error: {err}", "returncode": 1}
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" not in result:
return result
return {"output": f"Daytona execution error: {err}", "returncode": 1}
def _daytona_exec(self, cmd_string: str, timeout: int, login: bool):
"""Create a _ThreadedProcessHandle wrapping Daytona's blocking exec."""
from tools.environments.base import _ThreadedProcessHandle
sandbox = self._sandbox
return result
shell_flag = "-l -c" if login else "-c"
timed_cmd = f"timeout {timeout} bash {shell_flag} {shlex.quote(cmd_string)}"
def exec_fn():
response = sandbox.process.exec(timed_cmd)
return response.result or "", response.exit_code
return _ThreadedProcessHandle(exec_fn)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def cleanup(self):
with self._lock:
if self._sandbox is None:
return
self._needs_refresh = True
try:
if self._persistent:
self._sandbox.stop()

View File

@@ -8,17 +8,13 @@ persistence via bind mounts.
import logging
import os
import re
import shlex
import shutil
import subprocess
import sys
import threading
import time
import uuid
from typing import Optional
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
@@ -61,36 +57,6 @@ def _normalize_forward_env_names(forward_env: list[str] | None) -> list[str]:
return normalized
def _normalize_env_dict(env: dict | None) -> dict[str, str]:
"""Validate and normalize a docker_env dict to {str: str}.
Filters out entries with invalid variable names or non-string values.
"""
if not env:
return {}
if not isinstance(env, dict):
logger.warning("docker_env is not a dict: %r", env)
return {}
normalized: dict[str, str] = {}
for key, value in env.items():
if not isinstance(key, str) or not _ENV_VAR_NAME_RE.match(key.strip()):
logger.warning("Ignoring invalid docker_env key: %r", key)
continue
key = key.strip()
if not isinstance(value, str):
# Coerce simple scalar types (int, bool, float) to string;
# reject complex types.
if isinstance(value, (int, float, bool)):
value = str(value)
else:
logger.warning("Ignoring non-string docker_env value for %r: %r", key, value)
continue
normalized[key] = value
return normalized
def _load_hermes_env_vars() -> dict[str, str]:
"""Load ~/.hermes/.env values without failing Docker command execution."""
try:
@@ -241,7 +207,6 @@ class DockerEnvironment(BaseEnvironment):
task_id: str = "default",
volumes: list = None,
forward_env: list[str] | None = None,
env: dict | None = None,
network: bool = True,
host_cwd: str = None,
auto_mount_cwd: bool = False,
@@ -253,7 +218,7 @@ class DockerEnvironment(BaseEnvironment):
self._persistent = persistent_filesystem
self._task_id = task_id
self._forward_env = _normalize_forward_env_names(forward_env)
self._env = _normalize_env_dict(env)
self._cached_forward_env_args: list[str] | None = None
self._container_id: Optional[str] = None
logger.info(f"DockerEnvironment volumes: {volumes}")
# Ensure volumes is a list (config.yaml could be malformed)
@@ -348,11 +313,7 @@ class DockerEnvironment(BaseEnvironment):
# Mount credential files (OAuth tokens, etc.) declared by skills.
# Read-only so the container can authenticate but not modify host creds.
try:
from tools.credential_files import (
get_credential_file_mounts,
get_skills_directory_mount,
get_cache_directory_mounts,
)
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
for mount_entry in get_credential_file_mounts():
volume_args.extend([
@@ -365,9 +326,10 @@ class DockerEnvironment(BaseEnvironment):
mount_entry["container_path"],
)
# Mount skill directories (local + external) so skill
# scripts/templates are available inside the container.
for skills_mount in get_skills_directory_mount():
# Mount the skills directory so skill scripts/templates are
# available inside the container at the same relative path.
skills_mount = get_skills_directory_mount()
if skills_mount:
volume_args.extend([
"-v",
f"{skills_mount['host_path']}:{skills_mount['container_path']}:ro",
@@ -377,32 +339,11 @@ class DockerEnvironment(BaseEnvironment):
skills_mount["host_path"],
skills_mount["container_path"],
)
# Mount host-side cache directories (documents, images, audio,
# screenshots) so the agent can access uploaded files and other
# cached media from inside the container. Read-only — the
# container reads these but the host gateway manages writes.
for cache_mount in get_cache_directory_mounts():
volume_args.extend([
"-v",
f"{cache_mount['host_path']}:{cache_mount['container_path']}:ro",
])
logger.info(
"Docker: mounting cache dir %s -> %s",
cache_mount["host_path"],
cache_mount["container_path"],
)
except Exception as e:
logger.debug("Docker: could not load credential file mounts: %s", e)
# Explicit environment variables (docker_env config) — set at container
# creation so they're available to all processes (including entrypoint).
env_args = []
for key in sorted(self._env):
env_args.extend(["-e", f"{key}={self._env[key]}"])
logger.info(f"Docker volume_args: {volume_args}")
all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args + env_args
all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args
logger.info(f"Docker run_args: {all_run_args}")
# Resolve the docker executable once so it works even when
@@ -430,6 +371,9 @@ class DockerEnvironment(BaseEnvironment):
self._container_id = result.stdout.strip()
logger.info(f"Started container {container_name} ({self._container_id[:12]})")
# Capture login-shell environment into a snapshot for the unified model
self.init_session()
@staticmethod
def _storage_opt_supported() -> bool:
"""Check if Docker's storage driver supports --storage-opt size=.
@@ -470,39 +414,23 @@ class DockerEnvironment(BaseEnvironment):
logger.debug("Docker --storage-opt support: %s", _storage_opt_ok)
return _storage_opt_ok
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
exec_command, sudo_stdin = self._prepare_command(command)
work_dir = cwd or self.cwd
effective_timeout = timeout or self.timeout
# ------------------------------------------------------------------
# Unified execution primitives
# ------------------------------------------------------------------
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
def _build_forward_env_args(self) -> list[str]:
"""Build ``-e KEY=VALUE`` arguments for docker exec env forwarding.
# docker exec -w doesn't expand ~, so prepend a cd into the command.
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/"
Combines explicit ``docker_forward_env`` with skill-declared
``env_passthrough`` vars so skills that declare
``required_environment_variables`` (e.g. Notion) have their keys
forwarded into the container automatically.
assert self._container_id, "Container not started"
cmd = [self._docker_exe, "exec"]
if effective_stdin is not None:
cmd.append("-i")
cmd.extend(["-w", work_dir])
# Build the per-exec environment: start with explicit docker_env values
# (static config), then overlay docker_forward_env / skill env_passthrough
# (dynamic from host process). Forward values take precedence.
exec_env: dict[str, str] = dict(self._env)
Result is cached at instance level to avoid re-reading ~/.hermes/.env
and rebuilding the arg list on every command.
"""
if self._cached_forward_env_args is not None:
return self._cached_forward_env_args
forward_keys = set(self._forward_env)
try:
@@ -510,70 +438,71 @@ class DockerEnvironment(BaseEnvironment):
forward_keys |= get_all_passthrough()
except Exception:
pass
hermes_env = _load_hermes_env_vars() if forward_keys else {}
args: list[str] = []
for key in sorted(forward_keys):
value = os.getenv(key)
if value is None:
value = hermes_env.get(key)
if value is not None:
exec_env[key] = value
args.extend(["-e", f"{key}={value}"])
self._cached_forward_env_args = args
return args
for key in sorted(exec_env):
cmd.extend(["-e", f"{key}={exec_env[key]}"])
cmd.extend([self._container_id, "bash", "-lc", exec_command])
def _run_bash(self, cmd_string: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> subprocess.Popen:
"""Spawn ``bash -c <cmd_string>`` inside the Docker container."""
assert self._container_id, "Container not started"
cmd = [self._docker_exe, "exec"]
if stdin_data is not None:
cmd.append("-i")
cmd.extend(self._build_forward_env_args())
cmd.extend([self._container_id, "bash", "-c", cmd_string])
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
text=True,
)
if stdin_data:
try:
proc.stdin.write(stdin_data)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
return proc
try:
_output_chunks = []
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if effective_stdin:
try:
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass
def _drain():
try:
for line in proc.stdout:
_output_chunks.append(line)
except Exception:
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = time.monotonic() + effective_timeout
while proc.poll() is None:
if is_interrupted():
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted]",
"returncode": 130,
}
if time.monotonic() > deadline:
proc.kill()
reader.join(timeout=2)
return self._timeout_result(effective_timeout)
time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
except Exception as e:
return {"output": f"Docker execution error: {e}", "returncode": 1}
def _run_bash_login(self, cmd_string: str, *,
timeout: int | None = None) -> subprocess.Popen:
"""Spawn ``bash -l -c <cmd_string>`` for snapshot creation."""
assert self._container_id, "Container not started"
cmd = [self._docker_exe, "exec", self._container_id,
"bash", "-l", "-c", cmd_string]
return subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL, text=True,
)
def cleanup(self):
"""Stop and remove the container. Bind-mount dirs persist if persistent=True."""
if self._container_id:
# Clean up snapshot inside the container
paths_to_rm = " ".join(
p for p in (self._snapshot_path,) if p
)
if paths_to_rm:
try:
subprocess.run(
[self._docker_exe, "exec", self._container_id,
"rm", "-f", *paths_to_rm.split()],
capture_output=True, timeout=5,
)
except Exception:
pass
try:
# Stop in background so cleanup doesn't block
stop_cmd = (

View File

@@ -1,23 +1,18 @@
"""Local execution environment with interrupt support and non-blocking I/O."""
"""Local execution environment — spawn-per-call with snapshot."""
import glob
import logging
import os
import platform
import shutil
import signal
import subprocess
import threading
import time
_IS_WINDOWS = platform.system() == "Windows"
from tools.environments.base import BaseEnvironment
from tools.environments.persistent_shell import PersistentShellMixin
from tools.interrupt import is_interrupted
# Unique marker to isolate real command output from shell init/exit noise.
# printf (no trailing newline) keeps the boundaries clean for splitting.
_OUTPUT_FENCE = "__HERMES_FENCE_a9f7b3__"
logger = logging.getLogger(__name__)
# Hermes-internal env vars that should NOT leak into terminal subprocesses.
# These are loaded from ~/.hermes/.env for Hermes' own LLM/provider calls
@@ -209,58 +204,6 @@ def _find_bash() -> str:
_find_shell = _find_bash
# Noise lines emitted by interactive shells when stdin is not a terminal.
# Used as a fallback when output fence markers are missing.
_SHELL_NOISE_SUBSTRINGS = (
# bash
"bash: cannot set terminal process group",
"bash: no job control in this shell",
"no job control in this shell",
"cannot set terminal process group",
"tcsetattr: Inappropriate ioctl for device",
# zsh / oh-my-zsh / macOS terminal session
"Restored session:",
"Saving session...",
"Last login:",
"command not found:",
"Oh My Zsh",
"compinit:",
)
def _clean_shell_noise(output: str) -> str:
"""Strip shell startup/exit warnings that leak when using -i without a TTY.
Removes lines matching known noise patterns from both the beginning
and end of the output. Lines in the middle are left untouched.
"""
def _is_noise(line: str) -> bool:
return any(noise in line for noise in _SHELL_NOISE_SUBSTRINGS)
lines = output.split("\n")
# Strip leading noise
while lines and _is_noise(lines[0]):
lines.pop(0)
# Strip trailing noise (walk backwards, skip empty lines from split)
end = len(lines) - 1
while end >= 0 and (not lines[end] or _is_noise(lines[end])):
end -= 1
if end < 0:
return ""
cleaned = lines[: end + 1]
result = "\n".join(cleaned)
# Preserve trailing newline if original had one
if output.endswith("\n") and result and not result.endswith("\n"):
result += "\n"
return result
# Standard PATH entries for environments with minimal PATH (e.g. systemd services).
# Includes macOS Homebrew paths (/opt/homebrew/* for Apple Silicon).
_SANE_PATH = (
@@ -290,197 +233,87 @@ def _make_run_env(env: dict) -> dict:
return run_env
def _extract_fenced_output(raw: str) -> str:
"""Extract real command output from between fence markers.
The execute() method wraps each command with printf(FENCE) markers.
This function finds the first and last fence and returns only the
content between them, which is the actual command output free of
any shell init/exit noise.
Falls back to pattern-based _clean_shell_noise if fences are missing.
"""
first = raw.find(_OUTPUT_FENCE)
if first == -1:
return _clean_shell_noise(raw)
start = first + len(_OUTPUT_FENCE)
last = raw.rfind(_OUTPUT_FENCE)
if last <= first:
# Only start fence found (e.g. user command called `exit`)
return _clean_shell_noise(raw[start:])
return raw[start:last]
class LocalEnvironment(PersistentShellMixin, BaseEnvironment):
class LocalEnvironment(BaseEnvironment):
"""Run commands directly on the host machine.
Features:
- Popen + polling for interrupt support (user can cancel mid-command)
- Background stdout drain thread to prevent pipe buffer deadlocks
Uses the unified spawn-per-call model:
- bash -l once at session start to capture env snapshot
- bash -c for every subsequent command (fast, no shell init overhead)
- CWD tracked via cwdfile written after each command
- Process group kill (os.setsid) for clean child cleanup
- stdin_data support for piping content (bypasses ARG_MAX limits)
- sudo -S transform via SUDO_PASSWORD env var
- Uses interactive login shell so full user env is available
- Optional persistent shell mode (cwd/env vars survive across calls)
"""
def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None,
persistent: bool = False):
**kwargs):
super().__init__(cwd=cwd or os.getcwd(), timeout=timeout, env=env)
self.persistent = persistent
if self.persistent:
self._init_persistent_shell()
self.init_session()
@property
def _temp_prefix(self) -> str:
return f"/tmp/hermes-local-{self._session_id}"
def _spawn_shell_process(self) -> subprocess.Popen:
user_shell = _find_bash()
def _run_bash(self, cmd_string: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> subprocess.Popen:
run_env = _make_run_env(self.env)
return subprocess.Popen(
[user_shell, "-l"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
env=run_env,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
def _read_temp_files(self, *paths: str) -> list[str]:
results = []
for path in paths:
if os.path.exists(path):
with open(path) as f:
results.append(f.read())
else:
results.append("")
return results
def _kill_shell_children(self):
if self._shell_pid is None:
return
try:
subprocess.run(
["pkill", "-P", str(self._shell_pid)],
capture_output=True, timeout=5,
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
def _cleanup_temp_files(self):
for f in glob.glob(f"{self._temp_prefix}-*"):
if os.path.exists(f):
os.remove(f)
def _execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd or os.getcwd()
effective_timeout = timeout or self.timeout
exec_command, sudo_stdin = self._prepare_command(command)
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
user_shell = _find_bash()
# Newline-separated wrapper (not `cmd; __hermes_rc=...` on one line).
# A trailing `; __hermes_rc` glued to `<<EOF` / a closing `EOF` line breaks
# heredoc parsing: the delimiter must be alone on its line, otherwise the
# rest of this script becomes heredoc body and leaks into stdout (e.g. gh
# issue/PR flows that use here-documents for bodies).
fenced_cmd = (
f"printf '{_OUTPUT_FENCE}'\n"
f"{exec_command}\n"
f"__hermes_rc=$?\n"
f"printf '{_OUTPUT_FENCE}'\n"
f"exit $__hermes_rc\n"
)
run_env = _make_run_env(self.env)
proc = subprocess.Popen(
[user_shell, "-lic", fenced_cmd],
[_find_bash(), "-c", cmd_string],
text=True,
cwd=work_dir,
cwd="/", # CWD set inside the script via cd
env=run_env,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin is not None else subprocess.DEVNULL,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
if effective_stdin is not None:
def _write_stdin():
if stdin_data is not None:
def _write():
try:
proc.stdin.write(effective_stdin)
proc.stdin.write(stdin_data)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
threading.Thread(target=_write_stdin, daemon=True).start()
threading.Thread(target=_write, daemon=True).start()
return proc
_output_chunks: list[str] = []
def _run_bash_login(self, cmd_string: str, *, timeout: int | None = None) -> subprocess.Popen:
"""For snapshot creation: uses bash -l -c."""
run_env = _make_run_env(self.env)
return subprocess.Popen(
[_find_bash(), "-l", "-c", cmd_string],
text=True,
cwd="/",
env=run_env,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
def _drain_stdout():
def _kill_process(self, proc):
"""Local override: kill process group for child cleanup."""
try:
if _IS_WINDOWS:
proc.terminate()
else:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGTERM)
try:
proc.wait(timeout=1.0)
return
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
try:
for line in proc.stdout:
_output_chunks.append(line)
except ValueError:
proc.kill()
except Exception:
pass
finally:
def cleanup(self):
for p in (self._snapshot_path,):
if p:
try:
proc.stdout.close()
except Exception:
os.remove(p)
except OSError:
pass
reader = threading.Thread(target=_drain_stdout, daemon=True)
reader.start()
deadline = time.monotonic() + effective_timeout
while proc.poll() is None:
if is_interrupted():
try:
if _IS_WINDOWS:
proc.terminate()
else:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGTERM)
try:
proc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted — user sent a new message]",
"returncode": 130,
}
if time.monotonic() > deadline:
try:
if _IS_WINDOWS:
proc.terminate()
else:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
proc.kill()
reader.join(timeout=2)
partial = "".join(_output_chunks)
timeout_msg = f"\n[Command timed out after {effective_timeout}s]"
return {
"output": partial + timeout_msg if partial else timeout_msg.lstrip(),
"returncode": 124,
}
time.sleep(0.2)
reader.join(timeout=5)
output = _extract_fenced_output("".join(_output_chunks))
return {"output": output, "returncode": proc.returncode}

View File

@@ -1,20 +1,24 @@
"""Managed Modal environment backed by tool-gateway."""
"""Managed Modal environment backed by tool-gateway.
Uses ``BaseEnvironment`` for command shaping (``_wrap_command()``) but keeps
its own ``execute()`` override because the HTTP gateway cannot return a
ProcessHandle — all execution is request/response.
"""
from __future__ import annotations
import json
import logging
import os
import shlex
import requests
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional
from tools.environments.modal_common import (
BaseModalExecutionEnvironment,
ModalExecStart,
PreparedModalExec,
)
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
from tools.managed_tool_gateway import resolve_managed_tool_gateway
logger = logging.getLogger(__name__)
@@ -33,15 +37,33 @@ class _ManagedModalExecHandle:
exec_id: str
class ManagedModalEnvironment(BaseModalExecutionEnvironment):
"""Gateway-owned Modal sandbox with Hermes-compatible execute/cleanup."""
@dataclass(frozen=True)
class _ExecStartResult:
"""Discriminated return from _start_exec — either immediate or async."""
handle: _ManagedModalExecHandle | None = None
immediate: dict | None = None
class ManagedModalEnvironment(BaseEnvironment):
"""Gateway-owned Modal sandbox with Hermes-compatible execute/cleanup.
Inherits from BaseEnvironment for _wrap_command() (CWD tracking,
snapshot sourcing) but keeps its own execute() since the HTTP gateway
cannot return a ProcessHandle.
**Design note — no init_session():**
Unlike direct backends (local, docker, ssh, modal, daytona, singularity),
ManagedModal does not call init_session() because _run_bash() raises
NotImplementedError (the gateway is HTTP request/response only).
Commands run without login-shell snapshot sourcing — the gateway itself
manages sandbox environment setup. _wrap_command() is still used for
CWD tracking via in-band stdout markers.
"""
_CONNECT_TIMEOUT_SECONDS = _request_timeout_env("TERMINAL_MANAGED_MODAL_CONNECT_TIMEOUT_SECONDS", 1.0)
_POLL_READ_TIMEOUT_SECONDS = _request_timeout_env("TERMINAL_MANAGED_MODAL_POLL_READ_TIMEOUT_SECONDS", 5.0)
_CANCEL_READ_TIMEOUT_SECONDS = _request_timeout_env("TERMINAL_MANAGED_MODAL_CANCEL_READ_TIMEOUT_SECONDS", 5.0)
_client_timeout_grace_seconds = 10.0
_interrupt_output = "[Command interrupted - Modal sandbox exec cancelled]"
_unexpected_error_prefix = "Managed Modal exec failed"
def __init__(
self,
@@ -69,16 +91,120 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
self._create_idempotency_key = str(uuid.uuid4())
self._sandbox_id = self._create_sandbox()
def _start_modal_exec(self, prepared: PreparedModalExec) -> ModalExecStart:
# ------------------------------------------------------------------
# _run_bash stub — ManagedModal cannot return a ProcessHandle
# ------------------------------------------------------------------
def _run_bash(self, cmd_string: str, *, timeout: int | None = None,
stdin_data: str | None = None):
"""ManagedModal is HTTP-based and cannot return a ProcessHandle.
This stub satisfies the BaseEnvironment interface. init_session()
is intentionally not called — the gateway manages the sandbox
environment directly. execute() is overridden with HTTP
request/response logic.
"""
raise NotImplementedError(
"ManagedModalEnvironment is HTTP-based and cannot return a "
"ProcessHandle. Use execute() directly."
)
# ------------------------------------------------------------------
# execute() override — HTTP request/response model
# ------------------------------------------------------------------
def execute(
self,
command: str,
cwd: str = "",
*,
timeout: int | None = None,
stdin_data: str | None = None,
) -> dict:
self._before_execute()
effective_timeout = timeout or self.timeout
# Handle stdin via heredoc embedding (gateway has payload support too)
exec_stdin = stdin_data
if stdin_data is not None:
command = self._embed_stdin_heredoc(command, stdin_data)
exec_stdin = None # embedded in command now
exec_command, sudo_stdin = self._prepare_command(command)
if sudo_stdin is not None:
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
# Use _wrap_command for consistent CWD tracking and snapshot sourcing.
# The wrapped script handles cd internally, so no separate cwd needed.
wrapped = self._wrap_command(exec_command, cwd)
# Start the exec via the gateway
start = self._start_exec(wrapped, effective_timeout, exec_stdin)
if start.immediate is not None:
self._update_cwd_from_gateway_output(start.immediate)
return start.immediate
if start.handle is None:
return self._error_result(
"Managed Modal exec start did not return an exec handle"
)
# Poll loop
deadline = None
if self._client_timeout_grace_seconds is not None:
deadline = time.monotonic() + effective_timeout + self._client_timeout_grace_seconds
while True:
if is_interrupted():
try:
self._cancel_exec(start.handle.exec_id)
except Exception:
pass
return self._result(
"[Command interrupted - Modal sandbox exec cancelled]", 130,
)
try:
result = self._poll_exec(start.handle)
except Exception as exc:
return self._error_result(f"Managed Modal exec poll failed: {exc}")
if result is not None:
self._update_cwd_from_gateway_output(result)
return result
if deadline is not None and time.monotonic() >= deadline:
try:
self._cancel_exec(start.handle.exec_id)
except Exception:
pass
return self._result(
f"Managed Modal exec timed out after {effective_timeout}s", 124,
)
time.sleep(0.25)
def _update_cwd_from_gateway_output(self, result: dict) -> None:
"""Extract CWD from in-band marker in command output."""
self._extract_cwd_from_output(result)
# ------------------------------------------------------------------
# Gateway transport
# ------------------------------------------------------------------
def _start_exec(self, command: str, timeout: int,
stdin_data: str | None) -> _ExecStartResult:
exec_id = str(uuid.uuid4())
payload: Dict[str, Any] = {
"execId": exec_id,
"command": prepared.command,
"cwd": prepared.cwd,
"timeoutMs": int(prepared.timeout * 1000),
"command": command,
"timeoutMs": int(timeout * 1000),
}
if prepared.stdin_data is not None:
payload["stdinData"] = prepared.stdin_data
if stdin_data is not None:
payload["stdinData"] = stdin_data
try:
response = self._request(
@@ -88,37 +214,26 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
timeout=10,
)
except Exception as exc:
return ModalExecStart(
immediate_result=self._error_result(f"Managed Modal exec failed: {exc}")
)
return _ExecStartResult(immediate=self._error_result(f"Managed Modal exec failed: {exc}"))
if response.status_code >= 400:
return ModalExecStart(
immediate_result=self._error_result(
self._format_error("Managed Modal exec failed", response)
)
)
return _ExecStartResult(immediate=self._error_result(
self._format_error("Managed Modal exec failed", response)
))
body = response.json()
status = body.get("status")
if status in {"completed", "failed", "cancelled", "timeout"}:
return ModalExecStart(
immediate_result=self._result(
body.get("output", ""),
body.get("returncode", 1),
)
)
return _ExecStartResult(immediate=self._result(body.get("output", ""), body.get("returncode", 1)))
if body.get("execId") != exec_id:
return ModalExecStart(
immediate_result=self._error_result(
"Managed Modal exec start did not return the expected exec id"
)
)
return _ExecStartResult(immediate=self._error_result(
"Managed Modal exec start did not return the expected exec id"
))
return ModalExecStart(handle=_ManagedModalExecHandle(exec_id=exec_id))
return _ExecStartResult(handle=_ManagedModalExecHandle(exec_id=exec_id))
def _poll_modal_exec(self, handle: _ManagedModalExecHandle) -> dict | None:
def _poll_exec(self, handle: _ManagedModalExecHandle) -> dict | None:
try:
status_response = self._request(
"GET",
@@ -145,11 +260,9 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
)
return None
def _cancel_modal_exec(self, handle: _ManagedModalExecHandle) -> None:
self._cancel_exec(handle.exec_id)
def _timeout_result_for_modal(self, timeout: int) -> dict:
return self._result(f"Managed Modal exec timed out after {timeout}s", 124)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def cleanup(self):
if not getattr(self, "_sandbox_id", None):
@@ -169,6 +282,10 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
finally:
self._sandbox_id = None
# ------------------------------------------------------------------
# Sandbox creation
# ------------------------------------------------------------------
def _create_sandbox(self) -> str:
cpu = self._coerce_number(self._sandbox_kwargs.get("cpu"), 1)
memory = self._coerce_number(
@@ -211,6 +328,10 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
raise RuntimeError("Managed Modal create did not return a sandbox id")
return sandbox_id
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _guard_unsupported_credential_passthrough(self) -> None:
"""Managed Modal does not sync or mount host credential files."""
try:
@@ -226,6 +347,12 @@ class ManagedModalEnvironment(BaseModalExecutionEnvironment):
"credential files inside the sandbox."
)
def _result(self, output: str, returncode: int) -> dict:
return {"output": output, "returncode": returncode}
def _error_result(self, output: str) -> dict:
return self._result(output, 1)
def _request(self, method: str, path: str, *,
json: Dict[str, Any] | None = None,
timeout: int = 30,

View File

@@ -7,24 +7,29 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions.
import asyncio
import json
import logging
import re
import shlex
import threading
from dataclasses import dataclass
import time as _time
from pathlib import Path
from typing import Any, Dict, Optional
from hermes_constants import get_hermes_home
from tools.environments.modal_common import (
BaseModalExecutionEnvironment,
ModalExecStart,
PreparedModalExec,
)
from tools.environments.base import BaseEnvironment
logger = logging.getLogger(__name__)
_SYNC_INTERVAL_SECONDS = 5.0
_SNAPSHOT_STORE = get_hermes_home() / "modal_snapshots.json"
_DIRECT_SNAPSHOT_NAMESPACE = "direct"
# Matches official Python Docker Hub tags: python:3.12, python:3.12-slim,
# python:3.12.3-bookworm, etc. Rejects alpine (not Debian-based) and
# ambiguous tags like python:3 or python:latest.
_PYTHON_IMAGE_RE = re.compile(
r"^python:(\d+\.\d+)(?:\.\d+)?(?:-(slim|bookworm|bullseye|slim-bookworm|slim-bullseye))?$"
)
def _load_snapshots() -> Dict[str, str]:
"""Load snapshot ID mapping from disk."""
@@ -88,7 +93,13 @@ def _delete_direct_snapshot(task_id: str, snapshot_id: str | None = None) -> Non
def _resolve_modal_image(image_spec: Any) -> Any:
"""Convert registry references or snapshot ids into Modal image objects."""
"""Convert registry references or snapshot ids into Modal image objects.
Official ``python:X.Y*`` images are mapped to
``modal.Image.debian_slim(python_version=X.Y)`` because the stock
Docker Hub images lack build tools (gcc/g++) required by Modal's
internal pip bootstrap.
"""
import modal as _modal
if not isinstance(image_spec, str):
@@ -97,6 +108,10 @@ def _resolve_modal_image(image_spec: Any) -> Any:
if image_spec.startswith("im-"):
return _modal.Image.from_id(image_spec)
m = _PYTHON_IMAGE_RE.match(image_spec)
if m:
return _modal.Image.debian_slim(python_version=m.group(1))
return _modal.Image.from_registry(
image_spec,
setup_dockerfile_commands=[
@@ -138,19 +153,15 @@ class _AsyncWorker:
self._thread.join(timeout=10)
@dataclass
class _DirectModalExecHandle:
thread: threading.Thread
result_holder: Dict[str, Any]
class ModalEnvironment(BaseEnvironment):
"""Modal cloud execution via native Modal sandboxes.
Uses the unified spawn-per-call model: _run_bash() returns a
_ThreadedProcessHandle that wraps Modal's async SDK in a thread + OS pipe,
satisfying the ProcessHandle protocol for BaseEnvironment._wait_for_process().
"""
class ModalEnvironment(BaseModalExecutionEnvironment):
"""Modal cloud execution via native Modal sandboxes."""
_stdin_mode = "heredoc"
_poll_interval_seconds = 0.2
_interrupt_output = "[Command interrupted - Modal sandbox terminated]"
_unexpected_error_prefix = "Modal execution error"
_snapshot_timeout = 60 # Modal sandbox cold start can be slow
def __init__(
self,
@@ -170,6 +181,7 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
self._app = None
self._worker = _AsyncWorker()
self._synced_files: Dict[str, tuple] = {}
self._last_sync_time: float = 0.0
sandbox_kwargs = dict(modal_sandbox_kwargs or {})
@@ -186,11 +198,7 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
cred_mounts = []
try:
from tools.credential_files import (
get_credential_file_mounts,
iter_skills_files,
iter_cache_files,
)
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for mount_entry in get_credential_file_mounts():
cred_mounts.append(
@@ -216,20 +224,6 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
)
if skills_files:
logger.info("Modal: mounting %d skill files", len(skills_files))
# Mount host-side cache files (documents, images, audio,
# screenshots). New files arriving mid-session are picked up
# by _sync_files() before each command execution.
cache_files = iter_cache_files()
for entry in cache_files:
cred_mounts.append(
_modal.Mount.from_local_file(
entry["host_path"],
remote_path=entry["container_path"],
)
)
if cache_files:
logger.info("Modal: mounting %d cache files", len(cache_files))
except Exception as e:
logger.debug("Modal: could not load credential file mounts: %s", e)
@@ -255,10 +249,10 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
try:
target_image_spec = restored_snapshot_id or image
try:
# _resolve_modal_image keeps the Modal bootstrap fix together:
# it applies setup_dockerfile_commands with ensurepip before
# Modal builds registry images, while snapshot ids restore via
# modal.Image.from_id() without rebuilding.
# _resolve_modal_image routes python:X.Y images to
# debian_slim (avoiding build-tool failures) and applies
# setup_dockerfile_commands with ensurepip for other
# registry images. Snapshot ids restore via from_id().
effective_image = _resolve_modal_image(target_image_spec)
self._app, self._sandbox = self._worker.run_coroutine(
_create_sandbox(effective_image),
@@ -292,6 +286,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
logger.info("Modal: sandbox created (task=%s)", self._task_id)
# Capture login-shell environment into a snapshot for the unified model
self.init_session()
# ------------------------------------------------------------------
# File sync
# ------------------------------------------------------------------
def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool:
"""Push a single file into the sandbox if changed. Returns True if synced."""
hp = Path(host_path)
@@ -326,19 +327,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
return True
def _sync_files(self) -> None:
"""Push credential, skill, and cache files into the running sandbox.
"""Push credential files and skill files into the running sandbox.
Runs before each command. Uses mtime+size caching so only changed
files are pushed (~13μs overhead in the no-op case). Cache files
are especially important here — new uploads/screenshots may appear
mid-session after sandbox creation.
files are pushed (~13us overhead in the no-op case).
"""
try:
from tools.credential_files import (
get_credential_file_mounts,
iter_skills_files,
iter_cache_files,
)
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for entry in get_credential_file_mounts():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
@@ -347,64 +342,61 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
for entry in iter_skills_files():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
logger.debug("Modal: synced skill file %s", entry["container_path"])
for entry in iter_cache_files():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
logger.debug("Modal: synced cache file %s", entry["container_path"])
except Exception as e:
logger.debug("Modal: file sync failed: %s", e)
# ------------------------------------------------------------------
# Unified execution hooks
# ------------------------------------------------------------------
def _before_execute(self) -> None:
self._sync_files()
now = _time.monotonic()
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
self._sync_files()
self._last_sync_time = now
def _start_modal_exec(self, prepared: PreparedModalExec) -> ModalExecStart:
full_command = f"cd {shlex.quote(prepared.cwd)} && {prepared.command}"
result_holder = {"value": None, "error": None}
def _run_bash(self, cmd_string: str, *, timeout: int | None = None,
stdin_data: str | None = None):
"""Spawn ``bash -c <cmd_string>`` inside the Modal sandbox."""
effective_timeout = timeout or self.timeout
if stdin_data is not None:
cmd_string = self._embed_stdin_heredoc(cmd_string, stdin_data)
return self._modal_exec(cmd_string, effective_timeout, login=False)
def _run():
try:
async def _do_execute():
process = await self._sandbox.exec.aio(
"bash",
"-c",
full_command,
timeout=prepared.timeout,
)
stdout = await process.stdout.read.aio()
stderr = await process.stderr.read.aio()
exit_code = await process.wait.aio()
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8", errors="replace")
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", errors="replace")
output = stdout
if stderr:
output = f"{stdout}\n{stderr}" if stdout else stderr
return self._result(output, exit_code)
def _run_bash_login(self, cmd_string: str, *,
timeout: int | None = None):
"""Spawn ``bash -l -c <cmd_string>`` for snapshot creation."""
effective_timeout = timeout or self._snapshot_timeout
return self._modal_exec(cmd_string, effective_timeout, login=True)
result_holder["value"] = self._worker.run_coroutine(
_do_execute(),
timeout=prepared.timeout + 30,
)
except Exception as e:
result_holder["error"] = e
def _modal_exec(self, cmd_string: str, timeout: int, login: bool):
"""Create a _ThreadedProcessHandle wrapping Modal's async exec."""
from tools.environments.base import _ThreadedProcessHandle
worker = self._worker
sandbox = self._sandbox
t = threading.Thread(target=_run, daemon=True)
t.start()
return ModalExecStart(handle=_DirectModalExecHandle(thread=t, result_holder=result_holder))
def exec_fn():
async def _exec():
args = ["bash", "-l", "-c", cmd_string] if login else ["bash", "-c", cmd_string]
process = await sandbox.exec.aio(*args, timeout=timeout)
stdout = await process.stdout.read.aio()
stderr = await process.stderr.read.aio()
exit_code = await process.wait.aio()
return stdout, stderr, exit_code
def _poll_modal_exec(self, handle: _DirectModalExecHandle) -> dict | None:
if handle.thread.is_alive():
return None
if handle.result_holder["error"]:
return self._error_result(f"Modal execution error: {handle.result_holder['error']}")
return handle.result_holder["value"]
stdout, stderr, exit_code = worker.run_coroutine(
_exec(), timeout=timeout + 30,
)
output = stdout if isinstance(stdout, str) else stdout.decode("utf-8", errors="replace")
if stderr:
output += stderr if isinstance(stderr, str) else stderr.decode("utf-8", errors="replace")
return output, exit_code
def _cancel_modal_exec(self, handle: _DirectModalExecHandle) -> None:
self._worker.run_coroutine(
self._sandbox.terminate.aio(),
timeout=15,
)
return _ThreadedProcessHandle(exec_fn)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def cleanup(self):
"""Snapshot the filesystem (if persistent) then stop the sandbox."""

View File

@@ -1,290 +0,0 @@
"""Persistent shell mixin: file-based IPC protocol for long-lived bash shells."""
import logging
import shlex
import subprocess
import threading
import time
import uuid
from abc import abstractmethod
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
class PersistentShellMixin:
"""Mixin that adds persistent shell capability to any BaseEnvironment.
Subclasses must implement ``_spawn_shell_process()``, ``_read_temp_files()``,
``_kill_shell_children()``, ``_execute_oneshot()``, and ``_cleanup_temp_files()``.
"""
persistent: bool
@abstractmethod
def _spawn_shell_process(self) -> subprocess.Popen: ...
@abstractmethod
def _read_temp_files(self, *paths: str) -> list[str]: ...
@abstractmethod
def _kill_shell_children(self): ...
@abstractmethod
def _execute_oneshot(self, command: str, cwd: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict: ...
@abstractmethod
def _cleanup_temp_files(self): ...
_session_id: str = ""
_poll_interval_start: float = 0.01 # initial poll interval (10ms)
_poll_interval_max: float = 0.25 # max poll interval (250ms) — reduces I/O for long commands
@property
def _temp_prefix(self) -> str:
return f"/tmp/hermes-persistent-{self._session_id}"
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def _init_persistent_shell(self):
self._shell_lock = threading.Lock()
self._shell_proc: subprocess.Popen | None = None
self._shell_alive: bool = False
self._shell_pid: int | None = None
self._session_id = uuid.uuid4().hex[:12]
p = self._temp_prefix
self._pshell_stdout = f"{p}-stdout"
self._pshell_stderr = f"{p}-stderr"
self._pshell_status = f"{p}-status"
self._pshell_cwd = f"{p}-cwd"
self._pshell_pid_file = f"{p}-pid"
self._shell_proc = self._spawn_shell_process()
self._shell_alive = True
self._drain_thread = threading.Thread(
target=self._drain_shell_output, daemon=True,
)
self._drain_thread.start()
init_script = (
f"export TERM=${{TERM:-dumb}}\n"
f"touch {self._pshell_stdout} {self._pshell_stderr} "
f"{self._pshell_status} {self._pshell_cwd} {self._pshell_pid_file}\n"
f"echo $$ > {self._pshell_pid_file}\n"
f"pwd > {self._pshell_cwd}\n"
)
self._send_to_shell(init_script)
deadline = time.monotonic() + 3.0
while time.monotonic() < deadline:
pid_str = self._read_temp_files(self._pshell_pid_file)[0].strip()
if pid_str.isdigit():
self._shell_pid = int(pid_str)
break
time.sleep(0.05)
else:
logger.warning("Could not read persistent shell PID")
self._shell_pid = None
if self._shell_pid:
logger.info(
"Persistent shell started (session=%s, pid=%d)",
self._session_id, self._shell_pid,
)
reported_cwd = self._read_temp_files(self._pshell_cwd)[0].strip()
if reported_cwd:
self.cwd = reported_cwd
def _cleanup_persistent_shell(self):
if self._shell_proc is None:
return
if self._session_id:
self._cleanup_temp_files()
try:
self._shell_proc.stdin.close()
except Exception:
pass
try:
self._shell_proc.terminate()
self._shell_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
self._shell_proc.kill()
self._shell_alive = False
self._shell_proc = None
if hasattr(self, "_drain_thread") and self._drain_thread.is_alive():
self._drain_thread.join(timeout=1.0)
# ------------------------------------------------------------------
# execute() / cleanup() — shared dispatcher, subclasses inherit
# ------------------------------------------------------------------
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
if self.persistent:
return self._execute_persistent(
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
return self._execute_oneshot(
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
def execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
"""Always use the oneshot (non-persistent) execution path.
This bypasses _shell_lock so it can run concurrently with a
long-running command in the persistent shell — used by
execute_code's file-based RPC polling thread.
"""
return self._execute_oneshot(
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
def cleanup(self):
if self.persistent:
self._cleanup_persistent_shell()
# ------------------------------------------------------------------
# Shell I/O
# ------------------------------------------------------------------
def _drain_shell_output(self):
try:
for _ in self._shell_proc.stdout:
pass
except Exception:
pass
self._shell_alive = False
def _send_to_shell(self, text: str):
if not self._shell_alive or self._shell_proc is None:
return
try:
self._shell_proc.stdin.write(text)
self._shell_proc.stdin.flush()
except (BrokenPipeError, OSError):
self._shell_alive = False
def _read_persistent_output(self) -> tuple[str, int, str]:
stdout, stderr, status_raw, cwd = self._read_temp_files(
self._pshell_stdout, self._pshell_stderr,
self._pshell_status, self._pshell_cwd,
)
output = self._merge_output(stdout, stderr)
status = status_raw.strip()
if ":" in status:
status = status.split(":", 1)[1]
try:
exit_code = int(status.strip())
except ValueError:
exit_code = 1
return output, exit_code, cwd.strip()
# ------------------------------------------------------------------
# Execution
# ------------------------------------------------------------------
def _execute_persistent(self, command: str, cwd: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
if not self._shell_alive:
logger.info("Persistent shell died, restarting...")
self._init_persistent_shell()
exec_command, sudo_stdin = self._prepare_command(command)
effective_timeout = timeout or self.timeout
if stdin_data or sudo_stdin:
return self._execute_oneshot(
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
with self._shell_lock:
return self._execute_persistent_locked(
exec_command, cwd, effective_timeout,
)
def _execute_persistent_locked(self, command: str, cwd: str,
timeout: int) -> dict:
work_dir = cwd or self.cwd
cmd_id = uuid.uuid4().hex[:8]
truncate = (
f": > {self._pshell_stdout}\n"
f": > {self._pshell_stderr}\n"
f": > {self._pshell_status}\n"
)
self._send_to_shell(truncate)
escaped = command.replace("'", "'\\''")
ipc_script = (
f"cd {shlex.quote(work_dir)}\n"
f"eval '{escaped}' < /dev/null > {self._pshell_stdout} 2> {self._pshell_stderr}\n"
f"__EC=$?\n"
f"pwd > {self._pshell_cwd}\n"
f"echo {cmd_id}:$__EC > {self._pshell_status}\n"
)
self._send_to_shell(ipc_script)
deadline = time.monotonic() + timeout
poll_interval = self._poll_interval_start # starts at 10ms, backs off to 250ms
while True:
if is_interrupted():
self._kill_shell_children()
output, _, _ = self._read_persistent_output()
return {
"output": output + "\n[Command interrupted]",
"returncode": 130,
}
if time.monotonic() > deadline:
self._kill_shell_children()
output, _, _ = self._read_persistent_output()
if output:
return {
"output": output + f"\n[Command timed out after {timeout}s]",
"returncode": 124,
}
return self._timeout_result(timeout)
if not self._shell_alive:
return {
"output": "Persistent shell died during execution",
"returncode": 1,
}
status_content = self._read_temp_files(self._pshell_status)[0].strip()
if status_content.startswith(cmd_id + ":"):
break
time.sleep(poll_interval)
# Exponential backoff: fast start (10ms) for quick commands,
# ramps up to 250ms for long-running commands — reduces I/O by 10-25x
# on WSL2 where polling keeps the VM hot and memory pressure high.
poll_interval = min(poll_interval * 1.5, self._poll_interval_max)
output, exit_code, new_cwd = self._read_persistent_output()
if new_cwd:
self.cwd = new_cwd
return {"output": output, "returncode": exit_code}
@staticmethod
def _merge_output(stdout: str, stderr: str) -> str:
parts = []
if stdout.strip():
parts.append(stdout.rstrip("\n"))
if stderr.strip():
parts.append(stderr.rstrip("\n"))
return "\n".join(parts)

View File

@@ -8,7 +8,6 @@ via writable overlay directories that survive across sessions.
import json
import logging
import os
import shlex
import shutil
import subprocess
import tempfile
@@ -19,7 +18,6 @@ from typing import Any, Dict, Optional
from hermes_constants import get_hermes_home
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
@@ -241,6 +239,7 @@ class SingularityEnvironment(BaseEnvironment):
self._overlay_dir.mkdir(parents=True, exist_ok=True)
self._start_instance()
self.init_session()
def _start_instance(self):
cmd = [self.executable, "instance", "start"]
@@ -266,7 +265,8 @@ class SingularityEnvironment(BaseEnvironment):
mount_entry["host_path"],
mount_entry["container_path"],
)
for skills_mount in get_skills_directory_mount():
skills_mount = get_skills_directory_mount()
if skills_mount:
cmd.extend(["--bind", f"{skills_mount['host_path']}:{skills_mount['container_path']}:ro"])
logger.info(
"Singularity: binding skills dir %s -> %s",
@@ -294,86 +294,43 @@ class SingularityEnvironment(BaseEnvironment):
except subprocess.TimeoutExpired:
raise RuntimeError("Instance start timed out")
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
# ------------------------------------------------------------------
# Unified execution model — _run_bash is the only execution method
# ------------------------------------------------------------------
def _run_bash(self, cmd_string: str, *,
timeout: int | None = None,
stdin_data: str | None = None) -> subprocess.Popen:
if not self._instance_started:
return {"output": "Instance not started", "returncode": -1}
effective_timeout = timeout or self.timeout
work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command)
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command.
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
exec_command = f"cd ~ && {exec_command}"
work_dir = "/tmp"
elif work_dir.startswith("~/"):
exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}"
work_dir = "/tmp"
cmd = [self.executable, "exec", "--pwd", work_dir,
raise RuntimeError("Singularity instance not started")
cmd = [self.executable, "exec",
f"instance://{self.instance_id}",
"bash", "-c", exec_command]
"bash", "-c", cmd_string]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
text=True,
)
if stdin_data is not None:
try:
proc.stdin.write(stdin_data)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
return proc
try:
import time as _time
_output_chunks = []
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if effective_stdin:
try:
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass
def _drain():
try:
for line in proc.stdout:
_output_chunks.append(line)
except Exception:
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = _time.monotonic() + effective_timeout
while proc.poll() is None:
if is_interrupted():
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted]",
"returncode": 130,
}
if _time.monotonic() > deadline:
proc.kill()
reader.join(timeout=2)
return self._timeout_result(effective_timeout)
_time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
except Exception as e:
return {"output": f"Singularity execution error: {e}", "returncode": 1}
def _run_bash_login(self, cmd_string: str) -> subprocess.Popen:
if not self._instance_started:
raise RuntimeError("Singularity instance not started")
cmd = [self.executable, "exec",
f"instance://{self.instance_id}",
"bash", "-l", "-c", cmd_string]
return subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL, text=True,
)
def cleanup(self):
"""Stop the instance. If persistent, the overlay dir survives for next creation."""

View File

@@ -5,16 +5,15 @@ import shlex
import shutil
import subprocess
import tempfile
import threading
import time
import time as _time
from pathlib import Path
from tools.environments.base import BaseEnvironment
from tools.environments.persistent_shell import PersistentShellMixin
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
_SYNC_INTERVAL_SECONDS = 5.0
def _ensure_ssh_available() -> None:
"""Fail fast with a clear error when the SSH client is unavailable."""
@@ -24,7 +23,7 @@ def _ensure_ssh_available() -> None:
)
class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
class SSHEnvironment(BaseEnvironment):
"""Run commands on a remote machine over SSH.
Uses SSH ControlMaster for connection persistence so subsequent
@@ -34,33 +33,45 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
Foreground commands are interruptible: the local ssh process is killed
and a remote kill is attempted over the ControlMaster socket.
When ``persistent=True``, a single long-lived bash shell is kept alive
over SSH and state (cwd, env vars, shell variables) persists across
``execute()`` calls. Output capture uses file-based IPC on the remote
host (stdout/stderr/exit-code written to temp files, polled via fast
ControlMaster one-shot reads).
Uses the unified spawn-per-call model:
- bash -l once at session start to capture env snapshot on the remote
- bash -c for every subsequent command (fast, no shell init overhead)
- CWD tracked via cwdfile written after each command on the remote host
"""
def __init__(self, host: str, user: str, cwd: str = "~",
timeout: int = 60, port: int = 22, key_path: str = "",
persistent: bool = False):
**kwargs):
if kwargs.get("persistent") is not None:
import warnings
warnings.warn(
"The 'persistent' parameter is no longer supported. "
"SSH backend now uses the unified spawn-per-call model "
"with login-shell snapshot sourcing.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(cwd=cwd, timeout=timeout)
self.host = host
self.user = user
self.port = port
self.key_path = key_path
self.persistent = persistent
self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh"
self.control_dir.mkdir(parents=True, exist_ok=True)
self.control_socket = self.control_dir / f"{user}@{host}:{port}.sock"
# Sync caches — skip rsync when local files haven't changed.
self._synced_files: dict[str, tuple] = {} # remote_path → (mtime, size)
self._skills_fingerprint: set | None = None # {(relpath, mtime, size), ...}
self._created_remote_dirs: set[str] = set()
self._last_sync_time: float = 0.0
_ensure_ssh_available()
self._establish_connection()
self._remote_home = self._detect_remote_home()
self._sync_skills_and_credentials()
if self.persistent:
self._init_persistent_shell()
self.init_session()
def _build_ssh_command(self, extra_args: list | None = None) -> list:
cmd = ["ssh"]
@@ -107,8 +118,12 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
return "/root"
return f"/home/{self.user}"
def _sync_skills_and_credentials(self) -> None:
"""Rsync skills directory and credential files to the remote host."""
def _sync_skills_and_credentials(self, *, force: bool = False) -> None:
"""Rsync skills directory and credential files to the remote host.
Uses local mtime+size caching to skip rsync when nothing changed.
Pass force=True to bypass the cache (e.g. for debugging).
"""
try:
container_base = f"{self._remote_home}/.hermes"
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
@@ -122,183 +137,136 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
rsync_base.extend(["-e", ssh_opts])
dest_prefix = f"{self.user}@{self.host}"
# Sync individual credential files (remap /root/.hermes to detected home)
# --- Credential files: per-file mtime check ---
cred_to_sync = []
for mount_entry in get_credential_file_mounts():
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
parent_dir = str(Path(remote_path).parent)
hp = Path(mount_entry["host_path"])
remote_path = mount_entry["container_path"].replace(
"/root/.hermes", container_base, 1
)
try:
s = hp.stat()
key = (s.st_mtime, s.st_size)
except FileNotFoundError:
continue
if not force and self._synced_files.get(remote_path) == key:
continue
cred_to_sync.append((mount_entry["host_path"], remote_path, key))
# Ensure remote directories exist for any new credential paths.
# container_base is always included so skills rsync has its parent.
needed_dirs = {container_base}
for _, remote_path, _ in cred_to_sync:
needed_dirs.add(str(Path(remote_path).parent))
new_dirs = needed_dirs - self._created_remote_dirs
if new_dirs:
mkdir_cmd = self._build_ssh_command()
mkdir_cmd.append(f"mkdir -p {parent_dir}")
mkdir_cmd.append(f"mkdir -p {' '.join(shlex.quote(d) for d in new_dirs)}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"]
self._created_remote_dirs |= new_dirs
# Rsync changed credential files
for host_path, remote_path, key in cred_to_sync:
cmd = rsync_base + [host_path, f"{dest_prefix}:{remote_path}"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path)
self._synced_files[remote_path] = key
logger.info("SSH: synced credential %s -> %s", host_path, remote_path)
else:
self._invalidate_sync_cache()
logger.debug("SSH: rsync credential failed: %s", result.stderr.strip())
# Sync skill directories (local + external, remap to detected home)
for skills_mount in get_skills_directory_mount(container_base=container_base):
# --- Skills directory: fingerprint check + --delete for pruning ---
skills_mount = get_skills_directory_mount(container_base=container_base)
if skills_mount and (force or self._skills_dir_changed(skills_mount["host_path"])):
remote_path = skills_mount["container_path"]
mkdir_cmd = self._build_ssh_command()
mkdir_cmd.append(f"mkdir -p {remote_path}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
cmd = rsync_base + [
"--delete",
skills_mount["host_path"].rstrip("/") + "/",
f"{dest_prefix}:{remote_path}/",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path)
logger.info("SSH: synced skills dir %s -> %s",
skills_mount["host_path"], remote_path)
else:
self._invalidate_sync_cache()
logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip())
except Exception as e:
logger.debug("SSH: could not sync skills/credentials: %s", e)
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
# Incremental sync before each command so mid-session credential
# refreshes and skill updates are picked up.
self._sync_skills_and_credentials()
return super().execute(command, cwd, timeout=timeout, stdin_data=stdin_data)
_poll_interval_start: float = 0.15 # SSH: higher initial interval (150ms) for network latency
@property
def _temp_prefix(self) -> str:
return f"/tmp/hermes-ssh-{self._session_id}"
def _spawn_shell_process(self) -> subprocess.Popen:
cmd = self._build_ssh_command()
cmd.append("bash -l")
return subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
def _read_temp_files(self, *paths: str) -> list[str]:
if len(paths) == 1:
cmd = self._build_ssh_command()
cmd.append(f"cat {paths[0]} 2>/dev/null")
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10,
)
return [result.stdout]
except (subprocess.TimeoutExpired, OSError):
return [""]
delim = f"__HERMES_SEP_{self._session_id}__"
script = "; ".join(
f"cat {p} 2>/dev/null; echo '{delim}'" for p in paths
)
cmd = self._build_ssh_command()
cmd.append(script)
def _skills_dir_changed(self, host_path: str) -> bool:
"""Return True if any file in the skills dir has changed since last sync."""
root = Path(host_path)
if not root.is_dir():
return False
current: set[tuple] = set()
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10,
)
parts = result.stdout.split(delim + "\n")
return [parts[i] if i < len(parts) else "" for i in range(len(paths))]
except (subprocess.TimeoutExpired, OSError):
return [""] * len(paths)
for f in root.rglob("*"):
if f.is_file() and not f.is_symlink():
s = f.stat()
current.add((str(f.relative_to(root)), s.st_mtime, s.st_size))
except OSError:
return True
if current == self._skills_fingerprint:
return False
self._skills_fingerprint = current
return True
def _kill_shell_children(self):
if self._shell_pid is None:
return
def _invalidate_sync_cache(self) -> None:
"""Clear sync caches — call on rsync failure or reconnect."""
self._synced_files.clear()
self._skills_fingerprint = None
self._created_remote_dirs.clear()
# ------------------------------------------------------------------
# Unified execution hooks
# ------------------------------------------------------------------
def _before_execute(self):
"""Incremental sync before each command so mid-session credential
refreshes and skill updates are picked up."""
now = _time.monotonic()
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
self._sync_skills_and_credentials()
self._last_sync_time = now
def _run_bash(self, cmd_string, *, timeout=None, stdin_data=None):
cmd = self._build_ssh_command()
cmd.append(f"pkill -P {self._shell_pid} 2>/dev/null; true")
try:
subprocess.run(cmd, capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, OSError):
pass
def _cleanup_temp_files(self):
cmd = self._build_ssh_command()
cmd.append(f"rm -f {self._temp_prefix}-*")
try:
subprocess.run(cmd, capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, OSError):
pass
def _execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command)
# Keep ~ unquoted (for shell expansion) and quote only the subpath.
if work_dir == "~":
wrapped = f'cd ~ && {exec_command}'
elif work_dir.startswith("~/"):
wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}'
else:
wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}'
effective_timeout = timeout or self.timeout
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
cmd = self._build_ssh_command()
cmd.append(wrapped)
kwargs = self._build_run_kwargs(timeout, effective_stdin)
kwargs.pop("timeout", None)
_output_chunks = []
cmd.extend(["bash", "-c", shlex.quote(cmd_string)])
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
text=True,
)
if effective_stdin:
if stdin_data:
try:
proc.stdin.write(effective_stdin)
proc.stdin.write(stdin_data)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
return proc
def _drain():
def _run_bash_login(self, cmd_string, *, timeout=None):
cmd = self._build_ssh_command()
cmd.extend(["bash", "-l", "-c", shlex.quote(cmd_string)])
return subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL, text=True,
)
def cleanup(self):
# Clean up remote snapshot before closing ControlMaster
if self._snapshot_path:
paths = self._snapshot_path
try:
for line in proc.stdout:
_output_chunks.append(line)
cmd = self._build_ssh_command()
cmd.append(f"rm -f {paths}")
subprocess.run(cmd, capture_output=True, timeout=5)
except Exception:
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = time.monotonic() + effective_timeout
while proc.poll() is None:
if is_interrupted():
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted]",
"returncode": 130,
}
if time.monotonic() > deadline:
proc.kill()
reader.join(timeout=2)
return self._timeout_result(effective_timeout)
time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
def cleanup(self):
super().cleanup()
if self.control_socket.exists():
try:

View File

@@ -695,6 +695,15 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Layer 1: Cap search output to prevent context overflow
MAX_SEARCH_OUTPUT_CHARS = 50_000
if len(result_json) > MAX_SEARCH_OUTPUT_CHARS:
result_json = (
result_json[:MAX_SEARCH_OUTPUT_CHARS]
+ f"\n\n[Search output truncated: {len(result_json):,} chars exceeded "
f"{MAX_SEARCH_OUTPUT_CHARS:,} char limit. Use a more specific pattern, "
f"file_glob, or reduce limit/context to see complete results.]"
)
# Hint when results were truncated — explicit next offset is clearer
# than relying on the model to infer it from total_count vs match count.
if result_dict.get("truncated"):
@@ -783,7 +792,7 @@ SEARCH_FILES_SCHEMA = {
"target": {"type": "string", "enum": ["content", "files"], "description": "'content' searches inside file contents, 'files' searches for files by name", "default": "content"},
"path": {"type": "string", "description": "Directory or file to search in (default: current working directory)", "default": "."},
"file_glob": {"type": "string", "description": "Filter files by pattern in grep mode (e.g., '*.py' to only search Python files)"},
"limit": {"type": "integer", "description": "Maximum number of results to return (default: 50)", "default": 50},
"limit": {"type": "integer", "description": "Maximum number of results to return (default: 50)", "default": 50, "maximum": 10000},
"offset": {"type": "integer", "description": "Skip first N results for pagination (default: 0)", "default": 0},
"output_mode": {"type": "string", "enum": ["content", "files_only", "count"], "description": "Output format for grep mode: 'content' shows matching lines with line numbers, 'files_only' lists file paths, 'count' shows match counts per file", "default": "content"},
"context": {"type": "integer", "description": "Number of context lines before and after each match (grep mode only)", "default": 0}
@@ -822,7 +831,7 @@ def _handle_search_files(args, **kw):
output_mode=args.get("output_mode", "content"), context=args.get("context", 0), task_id=tid)
registry.register(name="read_file", toolset="file", schema=READ_FILE_SCHEMA, handler=_handle_read_file, check_fn=_check_file_reqs, emoji="📖")
registry.register(name="write_file", toolset="file", schema=WRITE_FILE_SCHEMA, handler=_handle_write_file, check_fn=_check_file_reqs, emoji="✍️")
registry.register(name="patch", toolset="file", schema=PATCH_SCHEMA, handler=_handle_patch, check_fn=_check_file_reqs, emoji="🔧")
registry.register(name="search_files", toolset="file", schema=SEARCH_FILES_SCHEMA, handler=_handle_search_files, check_fn=_check_file_reqs, emoji="🔎")
registry.register(name="read_file", toolset="file", schema=READ_FILE_SCHEMA, handler=_handle_read_file, check_fn=_check_file_reqs, emoji="📖", max_result_size_chars=float('inf'))
registry.register(name="write_file", toolset="file", schema=WRITE_FILE_SCHEMA, handler=_handle_write_file, check_fn=_check_file_reqs, emoji="✍️", max_result_size_chars=100_000)
registry.register(name="patch", toolset="file", schema=PATCH_SCHEMA, handler=_handle_patch, check_fn=_check_file_reqs, emoji="🔧", max_result_size_chars=100_000)
registry.register(name="search_files", toolset="file", schema=SEARCH_FILES_SCHEMA, handler=_handle_search_files, check_fn=_check_file_reqs, emoji="🔎", max_result_size_chars=20_000)

View File

@@ -27,10 +27,12 @@ class ToolEntry:
__slots__ = (
"name", "toolset", "schema", "handler", "check_fn",
"requires_env", "is_async", "description", "emoji",
"max_result_size_chars",
)
def __init__(self, name, toolset, schema, handler, check_fn,
requires_env, is_async, description, emoji):
requires_env, is_async, description, emoji,
max_result_size_chars=None):
self.name = name
self.toolset = toolset
self.schema = schema
@@ -40,6 +42,7 @@ class ToolEntry:
self.is_async = is_async
self.description = description
self.emoji = emoji
self.max_result_size_chars = max_result_size_chars
class ToolRegistry:
@@ -64,6 +67,7 @@ class ToolRegistry:
is_async: bool = False,
description: str = "",
emoji: str = "",
max_result_size_chars: int | float | None = None,
):
"""Register a tool. Called at module-import time by each tool file."""
existing = self._tools.get(name)
@@ -83,6 +87,7 @@ class ToolRegistry:
is_async=is_async,
description=description or schema.get("description", ""),
emoji=emoji,
max_result_size_chars=max_result_size_chars,
)
if check_fn and toolset not in self._toolset_checks:
self._toolset_checks[toolset] = check_fn
@@ -164,6 +169,14 @@ class ToolRegistry:
# Query helpers (replace redundant dicts in model_tools.py)
# ------------------------------------------------------------------
def get_max_result_size(self, name: str) -> int | float:
"""Return per-tool max result size, or global default."""
entry = self._tools.get(name)
if entry and entry.max_result_size_chars is not None:
return entry.max_result_size_chars
from tools.tool_result_storage import DEFAULT_MAX_RESULT_SIZE_CHARS
return DEFAULT_MAX_RESULT_SIZE_CHARS
def get_all_tool_names(self) -> List[str]:
"""Return sorted list of all registered tool names."""
return sorted(self._tools.keys())

View File

@@ -615,9 +615,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
docker_env = cc.get("docker_env", {})
if env_type == "local":
lc = local_config or {}
return _LocalEnvironment(cwd=cwd, timeout=timeout,
persistent=lc.get("persistent", False))
return _LocalEnvironment(cwd=cwd, timeout=timeout)
elif env_type == "docker":
return _DockerEnvironment(
@@ -709,7 +707,6 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
key_path=ssh_config.get("key", ""),
cwd=cwd,
timeout=timeout,
persistent=ssh_config.get("persistent", False),
)
else:
@@ -1597,4 +1594,5 @@ registry.register(
handler=_handle_terminal,
check_fn=check_terminal_requirements,
emoji="💻",
max_result_size_chars=30_000,
)

View File

@@ -0,0 +1,193 @@
"""Tool result persistence -- preserves large outputs to disk instead of truncating.
Defense against context-window overflow operates at three levels:
1. **Per-tool output cap** (inside each tool): Tools like search_files
pre-truncate their own output before returning. This is the first line
of defense and the only one the tool author controls.
2. **Per-result persistence** (maybe_persist_tool_result): After a tool
returns, if its output exceeds the tool's registered threshold
(registry.get_max_result_size), the full output is written to disk and
the in-context content is replaced with a preview + file path reference.
The model can read_file to access the full output.
3. **Per-turn aggregate budget** (enforce_turn_budget): After all tool
results in a single assistant turn are collected, if the total exceeds
MAX_TURN_BUDGET_CHARS (200K), the largest non-persisted results are
spilled to disk until the aggregate is under budget. This catches cases
where many medium-sized results combine to overflow context.
"""
import logging
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
DEFAULT_MAX_RESULT_SIZE_CHARS: int = 50_000
MAX_TURN_BUDGET_CHARS: int = 200_000
PREVIEW_SIZE_CHARS: int = 2000
PERSISTED_OUTPUT_TAG = "<persisted-output>"
PERSISTED_OUTPUT_CLOSING_TAG = "</persisted-output>"
@dataclass
class PersistedResult:
tool_use_id: str
original_size: int
file_path: str
preview: str
has_more: bool
def get_storage_dir(session_id: str) -> Path:
"""Return ~/.hermes/sessions/{session_id}/tool-results/, creating if needed."""
from hermes_constants import get_hermes_home
d = get_hermes_home() / "sessions" / session_id / "tool-results"
d.mkdir(parents=True, exist_ok=True)
return d
def generate_preview(content: str, max_chars: int = PREVIEW_SIZE_CHARS) -> tuple[str, bool]:
"""Truncate at last newline within max_chars. Returns (preview, has_more)."""
if len(content) <= max_chars:
return content, False
# Find last newline within budget
truncated = content[:max_chars]
last_nl = truncated.rfind("\n")
if last_nl > max_chars // 2: # Only use newline boundary if it's past halfway
truncated = truncated[:last_nl + 1]
return truncated, True
def persist_large_result(
content: str,
tool_use_id: str,
storage_dir: Path,
threshold: int = DEFAULT_MAX_RESULT_SIZE_CHARS,
) -> PersistedResult | None:
"""Write full content to disk if it exceeds *threshold*.
Uses open(path, 'x') for atomic exclusive create — dedup on retry.
Returns None if content is small enough to keep inline.
"""
if len(content) <= threshold:
return None
file_path = storage_dir / f"{tool_use_id}.txt"
try:
with open(file_path, "x", encoding="utf-8") as f:
f.write(content)
except FileExistsError:
pass # Already persisted (e.g. retry) — fall through to preview
preview, has_more = generate_preview(content)
return PersistedResult(
tool_use_id=tool_use_id,
original_size=len(content),
file_path=str(file_path),
preview=preview,
has_more=has_more,
)
def build_persisted_output_message(result: PersistedResult) -> str:
"""Build the <persisted-output> replacement block."""
size_kb = result.original_size / 1024
if size_kb >= 1024:
size_str = f"{size_kb / 1024:.1f} MB"
else:
size_str = f"{size_kb:.1f} KB"
msg = f"{PERSISTED_OUTPUT_TAG}\n"
msg += f"This tool result was too large ({result.original_size:,} characters, {size_str}).\n"
msg += f"Full output saved to: {result.file_path}\n"
msg += "Use read_file to access specific sections of this output if needed.\n\n"
msg += f"Preview (first {len(result.preview)} chars):\n"
msg += result.preview
if result.has_more:
msg += "\n..."
msg += f"\n{PERSISTED_OUTPUT_CLOSING_TAG}"
return msg
def maybe_persist_tool_result(
content: str,
tool_name: str,
tool_use_id: str,
storage_dir: Path,
) -> str:
"""Per-result persistence entry point (level 2). Check per-tool threshold, persist if needed.
Returns original content (if small) or the <persisted-output> replacement.
"""
from tools.registry import registry
threshold = registry.get_max_result_size(tool_name)
# Infinity means never persist (e.g. read_file)
if not isinstance(threshold, (int, float)) or threshold == float('inf'):
return content
if len(content) <= threshold:
return content
result = persist_large_result(content, tool_use_id, storage_dir,
threshold=threshold)
if result is None:
return content
logger.info(
"Persisted large tool result: %s (%s, %d chars -> %s)",
tool_name, tool_use_id, result.original_size, result.file_path,
)
return build_persisted_output_message(result)
def enforce_turn_budget(
tool_messages: list[dict],
storage_dir: Path,
budget: int = MAX_TURN_BUDGET_CHARS,
) -> list[dict]:
"""Per-turn aggregate budget entry point (level 3). After all tool results in a turn, enforce aggregate budget.
If total chars exceed budget, persist the largest results first until under budget.
Already-persisted results (containing <persisted-output>) are skipped.
Mutates the list in-place and returns it.
"""
# Calculate total and identify candidates
candidates = [] # (index, size) for non-persisted results
total_size = 0
for i, msg in enumerate(tool_messages):
content = msg.get("content", "")
size = len(content)
total_size += size
if PERSISTED_OUTPUT_TAG not in content:
candidates.append((i, size))
if total_size <= budget:
return tool_messages
# Sort candidates by size descending — persist largest first
candidates.sort(key=lambda x: x[1], reverse=True)
for idx, size in candidates:
if total_size <= budget:
break
msg = tool_messages[idx]
content = msg["content"]
tool_use_id = msg.get("tool_call_id", f"budget_{idx}")
result = persist_large_result(content, tool_use_id, storage_dir,
threshold=0)
if result:
replacement = build_persisted_output_message(result)
total_size -= size
total_size += len(replacement)
tool_messages[idx]["content"] = replacement
logger.info(
"Budget enforcement: persisted tool result %s (%d chars)",
tool_use_id, size,
)
return tool_messages

View File

@@ -2085,6 +2085,7 @@ registry.register(
check_fn=check_web_api_key,
requires_env=_web_requires_env(),
emoji="🔍",
max_result_size_chars=100_000,
)
registry.register(
name="web_extract",
@@ -2096,4 +2097,5 @@ registry.register(
requires_env=_web_requires_env(),
is_async=True,
emoji="📄",
max_result_size_chars=100_000,
)

111
uv.lock generated
View File

@@ -10,14 +10,14 @@ resolution-markers = [
[[package]]
name = "agent-client-protocol"
version = "0.9.0"
version = "0.8.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pydantic" },
]
sdist = { url = "https://files.pythonhosted.org/packages/eb/13/3b893421369767e7043cc115d6ef0df417c298b84563be3a12df0416158d/agent_client_protocol-0.9.0.tar.gz", hash = "sha256:f744c48ab9af0f0b4452e5ab5498d61bcab97c26dbe7d6feec5fd36de49be30b", size = 71853, upload-time = "2026-03-26T01:21:00.379Z" }
sdist = { url = "https://files.pythonhosted.org/packages/1b/7b/7cdac86db388809d9e3bc58cac88cc7dfa49b7615b98fab304a828cd7f8a/agent_client_protocol-0.8.1.tar.gz", hash = "sha256:1bbf15663bf51f64942597f638e32a6284c5da918055d9672d3510e965143dbd", size = 68866, upload-time = "2026-02-13T15:34:54.567Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/ed/c284543c08aa443a4ef2c8bd120be51da8433dd174c01749b5d87c333f22/agent_client_protocol-0.9.0-py3-none-any.whl", hash = "sha256:06911500b51d8cb69112544e2be01fc5e7db39ef88fecbc3848c5c6f194798ee", size = 56850, upload-time = "2026-03-26T01:20:59.252Z" },
{ url = "https://files.pythonhosted.org/packages/4b/f3/219eeca0ad4a20843d4b9eaac5532f87018b9d25730a62a16f54f6c52d1a/agent_client_protocol-0.8.1-py3-none-any.whl", hash = "sha256:9421a11fd435b4831660272d169c3812d553bb7247049c138c3ca127e4b8af8e", size = 54529, upload-time = "2026-02-13T15:34:53.344Z" },
]
[[package]]
@@ -1017,31 +1017,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c6/45/e6dd0c6c740c67c07474f2eb5175bb5656598488db444c4abd2a4e948393/daytona_toolbox_api_client_async-0.155.0-py3-none-any.whl", hash = "sha256:6ecf6351a31686d8e33ff054db69e279c45b574018b6c9a1cae15a7940412951", size = 176355, upload-time = "2026-03-24T14:47:36.327Z" },
]
[[package]]
name = "debugpy"
version = "1.8.20"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e0/b7/cd8080344452e4874aae67c40d8940e2b4d47b01601a8fd9f44786c757c7/debugpy-1.8.20.tar.gz", hash = "sha256:55bc8701714969f1ab89a6d5f2f3d40c36f91b2cbe2f65d98bf8196f6a6a2c33", size = 1645207, upload-time = "2026-01-29T23:03:28.199Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/51/56/c3baf5cbe4dd77427fd9aef99fcdade259ad128feeb8a786c246adb838e5/debugpy-1.8.20-cp311-cp311-macosx_15_0_universal2.whl", hash = "sha256:eada6042ad88fa1571b74bd5402ee8b86eded7a8f7b827849761700aff171f1b", size = 2208318, upload-time = "2026-01-29T23:03:36.481Z" },
{ url = "https://files.pythonhosted.org/packages/9a/7d/4fa79a57a8e69fe0d9763e98d1110320f9ecd7f1f362572e3aafd7417c9d/debugpy-1.8.20-cp311-cp311-manylinux_2_34_x86_64.whl", hash = "sha256:7de0b7dfeedc504421032afba845ae2a7bcc32ddfb07dae2c3ca5442f821c344", size = 3171493, upload-time = "2026-01-29T23:03:37.775Z" },
{ url = "https://files.pythonhosted.org/packages/7d/f2/1e8f8affe51e12a26f3a8a8a4277d6e60aa89d0a66512f63b1e799d424a4/debugpy-1.8.20-cp311-cp311-win32.whl", hash = "sha256:773e839380cf459caf73cc533ea45ec2737a5cc184cf1b3b796cd4fd98504fec", size = 5209240, upload-time = "2026-01-29T23:03:39.109Z" },
{ url = "https://files.pythonhosted.org/packages/d5/92/1cb532e88560cbee973396254b21bece8c5d7c2ece958a67afa08c9f10dc/debugpy-1.8.20-cp311-cp311-win_amd64.whl", hash = "sha256:1f7650546e0eded1902d0f6af28f787fa1f1dbdbc97ddabaf1cd963a405930cb", size = 5233481, upload-time = "2026-01-29T23:03:40.659Z" },
{ url = "https://files.pythonhosted.org/packages/14/57/7f34f4736bfb6e00f2e4c96351b07805d83c9a7b33d28580ae01374430f7/debugpy-1.8.20-cp312-cp312-macosx_15_0_universal2.whl", hash = "sha256:4ae3135e2089905a916909ef31922b2d733d756f66d87345b3e5e52b7a55f13d", size = 2550686, upload-time = "2026-01-29T23:03:42.023Z" },
{ url = "https://files.pythonhosted.org/packages/ab/78/b193a3975ca34458f6f0e24aaf5c3e3da72f5401f6054c0dfd004b41726f/debugpy-1.8.20-cp312-cp312-manylinux_2_34_x86_64.whl", hash = "sha256:88f47850a4284b88bd2bfee1f26132147d5d504e4e86c22485dfa44b97e19b4b", size = 4310588, upload-time = "2026-01-29T23:03:43.314Z" },
{ url = "https://files.pythonhosted.org/packages/c1/55/f14deb95eaf4f30f07ef4b90a8590fc05d9e04df85ee379712f6fb6736d7/debugpy-1.8.20-cp312-cp312-win32.whl", hash = "sha256:4057ac68f892064e5f98209ab582abfee3b543fb55d2e87610ddc133a954d390", size = 5331372, upload-time = "2026-01-29T23:03:45.526Z" },
{ url = "https://files.pythonhosted.org/packages/a1/39/2bef246368bd42f9bd7cba99844542b74b84dacbdbea0833e610f384fee8/debugpy-1.8.20-cp312-cp312-win_amd64.whl", hash = "sha256:a1a8f851e7cf171330679ef6997e9c579ef6dd33c9098458bd9986a0f4ca52e3", size = 5372835, upload-time = "2026-01-29T23:03:47.245Z" },
{ url = "https://files.pythonhosted.org/packages/15/e2/fc500524cc6f104a9d049abc85a0a8b3f0d14c0a39b9c140511c61e5b40b/debugpy-1.8.20-cp313-cp313-macosx_15_0_universal2.whl", hash = "sha256:5dff4bb27027821fdfcc9e8f87309a28988231165147c31730128b1c983e282a", size = 2539560, upload-time = "2026-01-29T23:03:48.738Z" },
{ url = "https://files.pythonhosted.org/packages/90/83/fb33dcea789ed6018f8da20c5a9bc9d82adc65c0c990faed43f7c955da46/debugpy-1.8.20-cp313-cp313-manylinux_2_34_x86_64.whl", hash = "sha256:84562982dd7cf5ebebfdea667ca20a064e096099997b175fe204e86817f64eaf", size = 4293272, upload-time = "2026-01-29T23:03:50.169Z" },
{ url = "https://files.pythonhosted.org/packages/a6/25/b1e4a01bfb824d79a6af24b99ef291e24189080c93576dfd9b1a2815cd0f/debugpy-1.8.20-cp313-cp313-win32.whl", hash = "sha256:da11dea6447b2cadbf8ce2bec59ecea87cc18d2c574980f643f2d2dfe4862393", size = 5331208, upload-time = "2026-01-29T23:03:51.547Z" },
{ url = "https://files.pythonhosted.org/packages/13/f7/a0b368ce54ffff9e9028c098bd2d28cfc5b54f9f6c186929083d4c60ba58/debugpy-1.8.20-cp313-cp313-win_amd64.whl", hash = "sha256:eb506e45943cab2efb7c6eafdd65b842f3ae779f020c82221f55aca9de135ed7", size = 5372930, upload-time = "2026-01-29T23:03:53.585Z" },
{ url = "https://files.pythonhosted.org/packages/33/2e/f6cb9a8a13f5058f0a20fe09711a7b726232cd5a78c6a7c05b2ec726cff9/debugpy-1.8.20-cp314-cp314-macosx_15_0_universal2.whl", hash = "sha256:9c74df62fc064cd5e5eaca1353a3ef5a5d50da5eb8058fcef63106f7bebe6173", size = 2538066, upload-time = "2026-01-29T23:03:54.999Z" },
{ url = "https://files.pythonhosted.org/packages/c5/56/6ddca50b53624e1ca3ce1d1e49ff22db46c47ea5fb4c0cc5c9b90a616364/debugpy-1.8.20-cp314-cp314-manylinux_2_34_x86_64.whl", hash = "sha256:077a7447589ee9bc1ff0cdf443566d0ecf540ac8aa7333b775ebcb8ce9f4ecad", size = 4269425, upload-time = "2026-01-29T23:03:56.518Z" },
{ url = "https://files.pythonhosted.org/packages/c5/d9/d64199c14a0d4c476df46c82470a3ce45c8d183a6796cfb5e66533b3663c/debugpy-1.8.20-cp314-cp314-win32.whl", hash = "sha256:352036a99dd35053b37b7803f748efc456076f929c6a895556932eaf2d23b07f", size = 5331407, upload-time = "2026-01-29T23:03:58.481Z" },
{ url = "https://files.pythonhosted.org/packages/e0/d9/1f07395b54413432624d61524dfd98c1a7c7827d2abfdb8829ac92638205/debugpy-1.8.20-cp314-cp314-win_amd64.whl", hash = "sha256:a98eec61135465b062846112e5ecf2eebb855305acc1dfbae43b72903b8ab5be", size = 5372521, upload-time = "2026-01-29T23:03:59.864Z" },
{ url = "https://files.pythonhosted.org/packages/e0/c3/7f67dea8ccf8fdcb9c99033bbe3e90b9e7395415843accb81428c441be2d/debugpy-1.8.20-py2.py3-none-any.whl", hash = "sha256:5be9bed9ae3be00665a06acaa48f8329d2b9632f15fd09f6a9a8c8d9907e54d7", size = 5337658, upload-time = "2026-01-29T23:04:17.404Z" },
]
[[package]]
name = "deprecated"
version = "1.3.1"
@@ -1158,24 +1133,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/97/a8/c070e1340636acb38d4e6a7e45c46d168a462b48b9b3257e14ca0e5af79b/environs-14.6.0-py3-none-any.whl", hash = "sha256:f8fb3d6c6a55872b0c6db077a28f5a8c7b8984b7c32029613d44cef95cfc0812", size = 17205, upload-time = "2026-02-20T04:02:07.299Z" },
]
[[package]]
name = "exa-py"
version = "2.10.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpcore" },
{ name = "httpx" },
{ name = "openai" },
{ name = "pydantic" },
{ name = "python-dotenv" },
{ name = "requests" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fe/4f/f06a6f277d668f143e330fe503b0027cc5fed753b22c3e161f8cbbccdf65/exa_py-2.10.2.tar.gz", hash = "sha256:f781f30b199f1102333384728adae64bb15a6bbcabfa97e91fd705f90acffc45", size = 53792, upload-time = "2026-03-26T20:29:35.764Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e2/bc/7a34e904a415040ba626948d0b0a36a08cd073f12b13342578a68331be3c/exa_py-2.10.2-py3-none-any.whl", hash = "sha256:ecb2a7581f4b7a8aeb6b434acce1bbc40f92ed1d4126b2aa6029913acd904a47", size = 72248, upload-time = "2026-03-26T20:29:37.306Z" },
]
[[package]]
name = "execnet"
version = "2.1.2"
@@ -1643,13 +1600,13 @@ wheels = [
[[package]]
name = "hermes-agent"
version = "0.7.0"
version = "0.5.0"
source = { editable = "." }
dependencies = [
{ name = "anthropic" },
{ name = "edge-tts" },
{ name = "exa-py" },
{ name = "fal-client" },
{ name = "faster-whisper" },
{ name = "fire" },
{ name = "firecrawl-py" },
{ name = "httpx" },
@@ -1675,13 +1632,10 @@ all = [
{ name = "aiohttp" },
{ name = "croniter" },
{ name = "daytona" },
{ name = "debugpy" },
{ name = "dingtalk-stream" },
{ name = "discord-py", extra = ["voice"] },
{ name = "elevenlabs" },
{ name = "faster-whisper" },
{ name = "honcho-ai" },
{ name = "lark-oapi" },
{ name = "mcp" },
{ name = "modal" },
{ name = "numpy" },
@@ -1689,7 +1643,7 @@ all = [
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "pytest-xdist" },
{ name = "python-telegram-bot", extra = ["webhooks"] },
{ name = "python-telegram-bot" },
{ name = "pywinpty", marker = "sys_platform == 'win32'" },
{ name = "simple-term-menu" },
{ name = "slack-bolt" },
@@ -1706,7 +1660,6 @@ daytona = [
{ name = "daytona" },
]
dev = [
{ name = "debugpy" },
{ name = "mcp" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
@@ -1715,9 +1668,6 @@ dev = [
dingtalk = [
{ name = "dingtalk-stream" },
]
feishu = [
{ name = "lark-oapi" },
]
homeassistant = [
{ name = "aiohttp" },
]
@@ -1725,7 +1675,6 @@ honcho = [
{ name = "honcho-ai" },
]
matrix = [
{ name = "markdown" },
{ name = "matrix-nio", extra = ["e2e"] },
]
mcp = [
@@ -1734,7 +1683,7 @@ mcp = [
messaging = [
{ name = "aiohttp" },
{ name = "discord-py", extra = ["voice"] },
{ name = "python-telegram-bot", extra = ["webhooks"] },
{ name = "python-telegram-bot" },
{ name = "slack-bolt" },
{ name = "slack-sdk" },
]
@@ -1763,7 +1712,6 @@ tts-premium = [
{ name = "elevenlabs" },
]
voice = [
{ name = "faster-whisper" },
{ name = "numpy" },
{ name = "sounddevice" },
]
@@ -1773,7 +1721,7 @@ yc-bench = [
[package.metadata]
requires-dist = [
{ name = "agent-client-protocol", marker = "extra == 'acp'", specifier = ">=0.9.0,<1.0" },
{ name = "agent-client-protocol", marker = "extra == 'acp'", specifier = ">=0.8.1,<0.9" },
{ name = "aiohttp", marker = "extra == 'homeassistant'", specifier = ">=3.9.0,<4" },
{ name = "aiohttp", marker = "extra == 'messaging'", specifier = ">=3.13.3,<4" },
{ name = "aiohttp", marker = "extra == 'sms'", specifier = ">=3.9.0,<4" },
@@ -1781,15 +1729,13 @@ requires-dist = [
{ name = "atroposlib", marker = "extra == 'rl'", git = "https://github.com/NousResearch/atropos.git" },
{ name = "croniter", marker = "extra == 'cron'", specifier = ">=6.0.0,<7" },
{ name = "daytona", marker = "extra == 'daytona'", specifier = ">=0.148.0,<1" },
{ name = "debugpy", marker = "extra == 'dev'", specifier = ">=1.8.0,<2" },
{ name = "dingtalk-stream", marker = "extra == 'dingtalk'", specifier = ">=0.1.0,<1" },
{ name = "discord-py", extras = ["voice"], marker = "extra == 'messaging'", specifier = ">=2.7.1,<3" },
{ name = "edge-tts", specifier = ">=7.2.7,<8" },
{ name = "elevenlabs", marker = "extra == 'tts-premium'", specifier = ">=1.0,<2" },
{ name = "exa-py", specifier = ">=2.9.0,<3" },
{ name = "fal-client", specifier = ">=0.13.1,<1" },
{ name = "fastapi", marker = "extra == 'rl'", specifier = ">=0.104.0,<1" },
{ name = "faster-whisper", marker = "extra == 'voice'", specifier = ">=1.0.0,<2" },
{ name = "faster-whisper", specifier = ">=1.0.0,<2" },
{ name = "fire", specifier = ">=0.7.1,<1" },
{ name = "firecrawl-py", specifier = ">=4.16.0,<5" },
{ name = "hermes-agent", extras = ["acp"], marker = "extra == 'all'" },
@@ -1798,7 +1744,6 @@ requires-dist = [
{ name = "hermes-agent", extras = ["daytona"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["dev"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["dingtalk"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["feishu"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["homeassistant"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["honcho"], marker = "extra == 'all'" },
{ name = "hermes-agent", extras = ["mcp"], marker = "extra == 'all'" },
@@ -1812,8 +1757,6 @@ requires-dist = [
{ name = "honcho-ai", marker = "extra == 'honcho'", specifier = ">=2.0.1,<3" },
{ name = "httpx", specifier = ">=0.28.1,<1" },
{ name = "jinja2", specifier = ">=3.1.5,<4" },
{ name = "lark-oapi", marker = "extra == 'feishu'", specifier = ">=1.5.3,<2" },
{ name = "markdown", marker = "extra == 'matrix'", specifier = ">=3.6,<4" },
{ name = "matrix-nio", extras = ["e2e"], marker = "extra == 'matrix'", specifier = ">=0.24.0,<1" },
{ name = "mcp", marker = "extra == 'dev'", specifier = ">=1.2.0,<2" },
{ name = "mcp", marker = "extra == 'mcp'", specifier = ">=1.2.0,<2" },
@@ -1829,7 +1772,7 @@ requires-dist = [
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=1.3.0,<2" },
{ name = "pytest-xdist", marker = "extra == 'dev'", specifier = ">=3.0,<4" },
{ name = "python-dotenv", specifier = ">=1.2.1,<2" },
{ name = "python-telegram-bot", extras = ["webhooks"], marker = "extra == 'messaging'", specifier = ">=22.6,<23" },
{ name = "python-telegram-bot", marker = "extra == 'messaging'", specifier = ">=22.6,<23" },
{ name = "pywinpty", marker = "sys_platform == 'win32' and extra == 'pty'", specifier = ">=2.0.0,<3" },
{ name = "pyyaml", specifier = ">=6.0.2,<7" },
{ name = "requests", specifier = ">=2.33.0,<3" },
@@ -1846,7 +1789,7 @@ requires-dist = [
{ name = "wandb", marker = "extra == 'rl'", specifier = ">=0.15.0,<1" },
{ name = "yc-bench", marker = "python_full_version >= '3.12' and extra == 'yc-bench'", git = "https://github.com/collinear-ai/yc-bench.git" },
]
provides-extras = ["modal", "daytona", "dev", "messaging", "cron", "slack", "matrix", "cli", "tts-premium", "voice", "pty", "honcho", "mcp", "homeassistant", "sms", "acp", "dingtalk", "feishu", "rl", "yc-bench", "all"]
provides-extras = ["modal", "daytona", "dev", "messaging", "cron", "slack", "matrix", "cli", "tts-premium", "voice", "pty", "honcho", "mcp", "homeassistant", "sms", "acp", "dingtalk", "rl", "yc-bench", "all"]
[[package]]
name = "hf-transfer"
@@ -2324,21 +2267,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0a/dd/8050c947d435c8d4bc94e3252f4d8bb8a76cfb424f043a8680be637a57f1/kiwisolver-1.5.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:59cd8683f575d96df5bb48f6add94afc055012c29e28124fcae2b63661b9efb1", size = 73558, upload-time = "2026-03-09T13:15:52.112Z" },
]
[[package]]
name = "lark-oapi"
version = "1.5.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
{ name = "pycryptodome" },
{ name = "requests" },
{ name = "requests-toolbelt" },
{ name = "websockets" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/bf/ff/2ece5d735ebfa2af600a53176f2636ae47af2bf934e08effab64f0d1e047/lark_oapi-1.5.3-py3-none-any.whl", hash = "sha256:fda6b32bb38d21b6bdaae94979c600b94c7c521e985adade63a54e4b3e20cc36", size = 6993016, upload-time = "2026-01-27T08:21:49.307Z" },
]
[[package]]
name = "latex2sympy2-extended"
version = "1.11.0"
@@ -3966,11 +3894,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/13/97/7298f0e1afe3a1ae52ff4c5af5087ed4de319ea73eb3b5c8c4dd4e76e708/python_telegram_bot-22.6-py3-none-any.whl", hash = "sha256:e598fe171c3dde2dfd0f001619ee9110eece66761a677b34719fb18934935ce0", size = 737267, upload-time = "2026-01-24T13:56:58.06Z" },
]
[package.optional-dependencies]
webhooks = [
{ name = "tornado" },
]
[[package]]
name = "pytz"
version = "2025.2"
@@ -4199,18 +4122,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/56/5d/c814546c2333ceea4ba42262d8c4d55763003e767fa169adc693bd524478/requests-2.33.0-py3-none-any.whl", hash = "sha256:3324635456fa185245e24865e810cecec7b4caf933d7eb133dcde67d48cee69b", size = 65017, upload-time = "2026-03-25T15:10:40.382Z" },
]
[[package]]
name = "requests-toolbelt"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f3/61/d7545dafb7ac2230c70d38d31cbfe4cc64f7144dc41f6e4e4b78ecd9f5bb/requests-toolbelt-1.0.0.tar.gz", hash = "sha256:7681a0a3d047012b5bdc0ee37d7f8f07ebe76ab08caeccfc3921ce23c88d5bc6", size = 206888, upload-time = "2023-05-01T04:11:33.229Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3f/51/d4db610ef29373b879047326cbf6fa98b6c1969d6f6dc423279de2b1be2c/requests_toolbelt-1.0.0-py2.py3-none-any.whl", hash = "sha256:cccfdd665f0a24fcf4726e690f65639d272bb0637b9b92dfd91a5568ccf6bd06", size = 54481, upload-time = "2023-05-01T04:11:28.427Z" },
]
[[package]]
name = "rich"
version = "14.3.3"