Compare commits

...

3 Commits

Author SHA1 Message Date
Shannon Sands
354e668eee Nomad backend: draft raw_exec/Apptainer driver support + docs 2026-02-14 15:11:43 +10:00
Shannon Sands
327eb38b15 Draft Nomad SlotPool terminal backend (opt-in)
- Add optional 'nomad' extra (aiohttp)
- Add tools/nomad_slotpool.py: minimal Nomad client + slot pool + sync thread wrapper
- Add tools/sandbox_server.py + tools/sandbox/Dockerfile for sandbox container image
- Extend terminal_tool to support TERMINAL_ENV=nomad (acquire slot per task_id)
- Document draft usage in README and add nomad-dev.hcl

NOTE: Not tested on cluster; intended for later live validation.
2026-02-14 14:30:07 +10:00
Shannon Sands
ae6435f787 Env robustness: context-safe prompting + tool arg normalization
- Preserve full trajectory while truncating prompt view per turn (avoids context overflow)
- Add max_context_tokens support and wire from env config
- Normalize tool call arguments robustly (dict / stringified JSON / plain string)
- Avoid double-encoding tool arguments in Hermes parser
- Add tool-call metrics to AgentResult for debugging/optional shaping

Scope: environments/* only
2026-02-14 13:13:00 +10:00
12 changed files with 2902 additions and 44 deletions

View File

@@ -199,7 +199,7 @@ The terminal tool can execute commands in different environments:
**Configure in `~/.hermes/config.yaml`:**
```yaml
terminal:
backend: local # or: docker, ssh, singularity, modal
backend: local # or: docker, ssh, singularity, modal, nomad
cwd: "." # Working directory ("." = current dir)
timeout: 180 # Command timeout in seconds
```
@@ -240,6 +240,38 @@ modal setup # Authenticate with Modal
hermes config set terminal.backend modal
```
**Nomad SlotPool (optional, draft):** local Nomad dev agent + docker sandbox-server.
This backend is under active development and must be explicitly enabled.
```bash
uv pip install "hermes-agent[nomad]"
# Start Nomad (dev mode)
# Docker driver:
nomad agent -dev -config=nomad-dev.hcl
# Apptainer/raw_exec driver (draft):
# nomad agent -dev -config=nomad-singularity.hcl
# Build sandbox-server image
docker build -t hermes-sandbox:local -f tools/sandbox/Dockerfile .
# Enable Nomad backend
export TERMINAL_ENV=nomad
export TERMINAL_NOMAD_ADDRESS=http://localhost:4646
export TERMINAL_NOMAD_JOB_ID=hermes-sandbox
export TERMINAL_NOMAD_IMAGE=hermes-sandbox:local
export TERMINAL_NOMAD_DRIVER=docker # or: raw_exec (Apptainer draft)
export TERMINAL_NOMAD_SLOTS=10
export TERMINAL_NOMAD_MIN=1
export TERMINAL_NOMAD_MAX=10
# raw_exec only:
# export TERMINAL_NOMAD_APPTAINER_IMAGE=/path/to/hermes-sandbox.sif
# export TERMINAL_NOMAD_RAW_EXEC_PORT=8080
```
**Sudo Support:** If a command needs sudo, you'll be prompted for your password (cached for the session). Or set `SUDO_PASSWORD` in `~/.hermes/.env`.
### 📱 Messaging Gateway

View File

@@ -0,0 +1,68 @@
# PR3 Plan: Optional Nomad SlotPool Backend (Draft)
Goal: Reintroduce the old `atropos-agent` Nomad SlotPool sandbox backend **without competing** with the current default terminal/Modal code path.
## Non-goals
- Do not change the model-facing tool schema.
- Do not change the default behavior.
- Do not require Nomad unless explicitly enabled.
## Desired UX
Default (today):
- `TERMINAL_ENV=local|docker|singularity|ssh|modal`
Add (PR3):
- `TERMINAL_ENV=nomad` (or `TERMINAL_ENV=sandbox`)
- Uses slot-based multiplexing: tasks acquire a slot, execute commands inside it, release.
- Driver can be `docker` or `singularity` (Apptainer).
Minimal switching friction:
- same `terminal_tool(command, task_id=...)` interface
- only env vars change
## Proposed integration approach
Because the full Nomad/SlotPool implementation lives in `/Users/shannon/Workspace/Nous/atropos-agent`, we should not duplicate it in Hermes-Agent.
Instead:
1) Add an **optional dependency** on `atropos-agent` (package name TBD) behind an extra, e.g.
- `pip install hermes-agent[nomad]`
2) Add a small adapter module in Hermes-Agent:
- `tools/nomad_pool.py` (adapter)
3) In `tools/terminal_tool.py` extend `_create_environment()`:
- accept `task_id` (already added in PR2 for modal pooling)
- add a new env_type:
- `env_type == "nomad"` → return `NomadPooledTaskEnvironment`
4) `NomadPooledTaskEnvironment` mirrors `ModalPooledTaskEnvironment`:
- owns a slot lease
- executes command within slot workspace
- releases slot on cleanup
## Configuration env vars
- `TERMINAL_ENV=nomad`
- `TERMINAL_NOMAD_ADDRESS=http://localhost:4646`
- `TERMINAL_NOMAD_DRIVER=docker|singularity`
- `TERMINAL_NOMAD_IMAGE=...`
- `TERMINAL_NOMAD_SLOTS=10`
- `TERMINAL_NOMAD_MIN=1`
- `TERMINAL_NOMAD_MAX=10`
Optionally:
- `TERMINAL_NOMAD_AUTOSTART=1` (start `nomad agent -dev ...` locally if not running)
## Testing plan (when compute available)
- Sanity: `TERMINAL_ENV=nomad` run `terminal_tool("echo hello")`
- Concurrency: run N parallel tasks, ensure slot leases are unique and isolated
- Cleanup: ensure slot release happens even on exceptions
## Current status
- Hermes-Agent `main` does **not** contain the Nomad backend. The full implementation is in `atropos-agent`.
- PR3 should be an adapter + optional extra, not a rewrite.

View File

@@ -73,6 +73,12 @@ class AgentResult:
# Tool errors encountered during the loop
tool_errors: List[ToolError] = field(default_factory=list)
# Tool-call metrics (debugging / optional reward shaping)
tool_calls_attempted: int = 0
tool_calls_schema_valid: int = 0
tool_calls_executed_ok: int = 0
tool_calls_exec_error: int = 0
def _extract_reasoning_from_message(message) -> Optional[str]:
"""
@@ -136,6 +142,8 @@ class HermesAgentLoop:
temperature: float = 1.0,
max_tokens: Optional[int] = None,
extra_body: Optional[Dict[str, Any]] = None,
tool_handler=None,
max_context_tokens: Optional[int] = None,
):
"""
Initialize the agent loop.
@@ -152,6 +160,13 @@ class HermesAgentLoop:
extra_body: Extra parameters passed to the OpenAI client's create() call.
Used for OpenRouter provider preferences, transforms, etc.
e.g. {"provider": {"ignore": ["DeepInfra"]}}
tool_handler: Optional async callable(tool_name, args, task_id) -> str.
When provided, used INSTEAD of handle_function_call() for
tool dispatch. This allows sandbox backends (Modal, Nomad)
to route tool calls through their slot-based execution.
max_context_tokens: Maximum prompt tokens before truncation.
If None, no truncation is applied.
Recommended: set to max_model_len - max_tokens - 512 (safety margin).
"""
self.server = server
self.tool_schemas = tool_schemas
@@ -161,6 +176,123 @@ class HermesAgentLoop:
self.temperature = temperature
self.max_tokens = max_tokens
self.extra_body = extra_body
self.tool_handler = tool_handler
self.max_context_tokens = max_context_tokens
def _truncate_context(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Truncate conversation history to fit within max_context_tokens.
Strategy:
- Keep system message (index 0) and initial user message (index 1) always
- Keep last 6 messages (recent context) always
- For everything in between, progressively truncate tool result content
- If still too long, drop oldest middle messages entirely
Uses rough char/4 token estimate (fast, no tokenizer needed).
NOTE: This function mutates the provided list (it may pop/replace entries).
Call it on a copy when you want to preserve the full trajectory.
"""
if self.max_context_tokens is None:
return messages
def estimate_tokens(msgs):
total = 0
for m in msgs:
content = m.get("content", "") or ""
total += len(content) // 4 + 10 # ~4 chars per token + overhead
if "tool_calls" in m:
total += 50 * len(m["tool_calls"]) # tool call overhead
return total
if estimate_tokens(messages) <= self.max_context_tokens:
return messages
protect_head = 2
protect_tail = max(0, min(6, len(messages) - protect_head))
middle_start = protect_head
middle_end = len(messages) - protect_tail
# Phase 1: truncate tool outputs in the middle
if middle_start < middle_end:
for i in range(middle_start, middle_end):
if messages[i].get("role") == "tool":
content = messages[i].get("content", "") or ""
if len(content) > 200:
messages[i] = dict(messages[i])
messages[i]["content"] = content[:100] + "\n...[truncated]...\n" + content[-50:]
if estimate_tokens(messages) <= self.max_context_tokens:
return messages
# Phase 2: drop oldest middle messages (try to keep assistant+tool pairs)
while middle_start < middle_end and estimate_tokens(messages) > self.max_context_tokens:
msg = messages[middle_start]
messages.pop(middle_start)
middle_end -= 1
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_ids = {
tc.get("id") or tc.get("tool_call_id", "")
for tc in msg.get("tool_calls", [])
if isinstance(tc, dict)
}
i = middle_start
while i < middle_end:
if messages[i].get("role") == "tool" and messages[i].get("tool_call_id", "") in tool_ids:
messages.pop(i)
middle_end -= 1
else:
i += 1
return messages
def _normalize_tool_args(self, tool_name: str, tool_args_raw: str) -> (Dict[str, Any], bool):
"""Normalize tool arguments into a dict.
Returns: (args_dict, schema_valid)
schema_valid is True only when arguments decode directly into a dict
(no double-decoding and no coercion/wrapping required).
Goal: keep environments robust (never crash on args format drift) while
still allowing reward functions to penalize malformed formats if desired.
"""
try:
decoded = json.loads(tool_args_raw)
except json.JSONDecodeError:
# Not JSON at all — treat as a plain string
if tool_name == "terminal":
return {"command": tool_args_raw}, False
return {"input": tool_args_raw}, False
if isinstance(decoded, dict):
if tool_name == "terminal":
cmd = decoded.get("command")
if isinstance(cmd, str) and cmd.strip():
return decoded, True
if isinstance(decoded.get("input"), str):
return {"command": decoded.get("input")}, False
return decoded, False
return decoded, True
if isinstance(decoded, str):
s = decoded.strip()
if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
try:
decoded2 = json.loads(s)
except json.JSONDecodeError:
decoded2 = None
if isinstance(decoded2, dict):
return decoded2, False
if tool_name == "terminal":
return {"command": decoded}, False
return {"input": decoded}, False
if tool_name == "terminal":
return {"command": str(decoded)}, False
return {"input": decoded}, False
async def run(self, messages: List[Dict[str, Any]]) -> AgentResult:
"""
@@ -176,14 +308,22 @@ class HermesAgentLoop:
reasoning_per_turn = []
tool_errors: List[ToolError] = []
tool_calls_attempted = 0
tool_calls_schema_valid = 0
tool_calls_executed_ok = 0
tool_calls_exec_error = 0
import time as _time
for turn in range(self.max_turns):
turn_start = _time.monotonic()
# Truncate prompt view on a copy (preserve full trajectory in `messages`)
prompt_messages = self._truncate_context(list(messages))
# Build the chat_completion kwargs
chat_kwargs = {
"messages": messages,
"messages": prompt_messages,
"n": 1,
"temperature": self.temperature,
}
@@ -215,6 +355,10 @@ class HermesAgentLoop:
finished_naturally=False,
reasoning_per_turn=reasoning_per_turn,
tool_errors=tool_errors,
tool_calls_attempted=tool_calls_attempted,
tool_calls_schema_valid=tool_calls_schema_valid,
tool_calls_executed_ok=tool_calls_executed_ok,
tool_calls_exec_error=tool_calls_exec_error,
)
api_elapsed = _time.monotonic() - api_start
@@ -228,6 +372,10 @@ class HermesAgentLoop:
finished_naturally=False,
reasoning_per_turn=reasoning_per_turn,
tool_errors=tool_errors,
tool_calls_attempted=tool_calls_attempted,
tool_calls_schema_valid=tool_calls_schema_valid,
tool_calls_executed_ok=tool_calls_executed_ok,
tool_calls_exec_error=tool_calls_exec_error,
)
assistant_msg = response.choices[0].message
@@ -270,6 +418,7 @@ class HermesAgentLoop:
# Validate tool name
if tool_name not in self.valid_tool_names:
tool_calls_exec_error += 1
tool_result = json.dumps(
{
"error": f"Unknown tool '{tool_name}'. "
@@ -287,35 +436,35 @@ class HermesAgentLoop:
tool_name, turn + 1,
)
else:
# Parse arguments and dispatch
try:
args = json.loads(tool_args_raw)
except json.JSONDecodeError:
args = {}
logger.warning(
"Invalid JSON in tool call arguments for '%s': %s",
tool_name, tool_args_raw[:200],
)
tool_calls_attempted += 1
args, schema_valid = self._normalize_tool_args(tool_name, tool_args_raw)
if schema_valid:
tool_calls_schema_valid += 1
try:
if tool_name == "terminal":
backend = os.getenv("TERMINAL_ENV", "local")
cmd_preview = args.get("command", "")[:80]
cmd_preview = str(args.get("command", ""))[:80]
logger.info(
"[%s] $ %s", self.task_id[:8], cmd_preview,
)
# Run tool calls in a thread pool so backends that use
# asyncio.run() internally (modal, docker) get a clean
# event loop instead of deadlocking inside Atropos's loop.
tool_submit_time = _time.monotonic()
loop = asyncio.get_event_loop()
tool_result = await loop.run_in_executor(
_tool_executor,
lambda: handle_function_call(
tool_name, args, task_id=self.task_id
),
)
if self.tool_handler:
tool_result = await self.tool_handler(tool_name, args, self.task_id)
else:
# Run tool calls in a thread pool so backends that use
# asyncio.run() internally (modal, docker) get a clean
# event loop instead of deadlocking inside Atropos's loop.
loop = asyncio.get_event_loop()
tool_result = await loop.run_in_executor(
_tool_executor,
lambda: handle_function_call(
tool_name, args, task_id=self.task_id
),
)
tool_elapsed = _time.monotonic() - tool_submit_time
# Log slow tools and thread pool stats for debugging
@@ -327,6 +476,7 @@ class HermesAgentLoop:
tool_elapsed, pool_active,
)
except Exception as e:
tool_calls_exec_error += 1
tool_result = json.dumps(
{"error": f"Tool execution failed: {type(e).__name__}: {str(e)}"}
)
@@ -340,22 +490,31 @@ class HermesAgentLoop:
"Tool '%s' execution failed on turn %d: %s",
tool_name, turn + 1, e,
)
else:
tool_err = False
try:
result_data = json.loads(tool_result)
if isinstance(result_data, dict):
err = result_data.get("error")
if err:
tool_err = True
# Also check if the tool returned an error in its JSON result
try:
result_data = json.loads(tool_result)
if isinstance(result_data, dict):
err = result_data.get("error")
exit_code = result_data.get("exit_code")
if err and exit_code and exit_code < 0:
tool_errors.append(ToolError(
turn=turn + 1, tool_name=tool_name,
arguments=tool_args_raw[:200],
error=str(err),
tool_result=tool_result[:500],
))
except (json.JSONDecodeError, TypeError):
pass
exit_code = result_data.get("exit_code")
if exit_code is not None and isinstance(exit_code, int) and exit_code < 0:
tool_err = True
tool_errors.append(ToolError(
turn=turn + 1, tool_name=tool_name,
arguments=tool_args_raw[:200],
error=str(err) if err else "nonzero exit_code",
tool_result=tool_result[:500],
))
except (json.JSONDecodeError, TypeError):
pass
if tool_err:
tool_calls_exec_error += 1
else:
tool_calls_executed_ok += 1
# Add tool response to conversation
messages.append(
@@ -396,6 +555,10 @@ class HermesAgentLoop:
finished_naturally=True,
reasoning_per_turn=reasoning_per_turn,
tool_errors=tool_errors,
tool_calls_attempted=tool_calls_attempted,
tool_calls_schema_valid=tool_calls_schema_valid,
tool_calls_executed_ok=tool_calls_executed_ok,
tool_calls_exec_error=tool_calls_exec_error,
)
# Hit max turns without the model stopping
@@ -407,6 +570,10 @@ class HermesAgentLoop:
finished_naturally=False,
reasoning_per_turn=reasoning_per_turn,
tool_errors=tool_errors,
tool_calls_attempted=tool_calls_attempted,
tool_calls_schema_valid=tool_calls_schema_valid,
tool_calls_executed_ok=tool_calls_executed_ok,
tool_calls_exec_error=tool_calls_exec_error,
)
def _get_managed_state(self) -> Optional[Dict[str, Any]]:

View File

@@ -478,6 +478,7 @@ class HermesAgentBaseEnv(BaseEnv):
tokenizer=self.tokenizer,
tool_call_parser=tc_parser,
) as managed:
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
agent = HermesAgentLoop(
server=managed,
tool_schemas=tools,
@@ -487,6 +488,7 @@ class HermesAgentBaseEnv(BaseEnv):
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
max_context_tokens=_max_ctx,
)
result = await agent.run(messages)
except NotImplementedError:
@@ -495,6 +497,7 @@ class HermesAgentBaseEnv(BaseEnv):
"ManagedServer not available (OpenAI server?). "
"Falling back to direct server mode."
)
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
agent = HermesAgentLoop(
server=self.server,
tool_schemas=tools,
@@ -504,10 +507,12 @@ class HermesAgentBaseEnv(BaseEnv):
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
max_context_tokens=_max_ctx,
)
result = await agent.run(messages)
else:
# Phase 1: OpenAI server -- native tool_calls, placeholder tokens
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
agent = HermesAgentLoop(
server=self.server,
tool_schemas=tools,
@@ -517,6 +522,7 @@ class HermesAgentBaseEnv(BaseEnv):
temperature=self.config.agent_temperature,
max_tokens=self.config.max_token_length,
extra_body=self.config.extra_body,
max_context_tokens=_max_ctx,
)
result = await agent.run(messages)

View File

@@ -49,15 +49,22 @@ class HermesToolCallParser(ToolCallParser):
continue
tc_data = json.loads(raw_json)
# Handle arguments: could be dict or already a JSON string
raw_args = tc_data.get("arguments", {})
if isinstance(raw_args, str):
# Already a string — pass through as-is.
# It may be a JSON string ("{...}") or a plain string ("ls").
args_str = raw_args
else:
# Dict — serialize to JSON
args_str = json.dumps(raw_args, ensure_ascii=False)
tool_calls.append(
ChatCompletionMessageToolCall(
id=f"call_{uuid.uuid4().hex[:8]}",
type="function",
function=Function(
name=tc_data["name"],
arguments=json.dumps(
tc_data.get("arguments", {}), ensure_ascii=False
),
arguments=args_str,
),
)
)

2
nomad-dev.hcl Normal file
View File

@@ -0,0 +1,2 @@
## Moved to `Hermes-Agent/nomad-dev.hcl`.
## Kept temporarily for convenience; remove after downstream docs/scripts updated.

25
nomad-singularity.hcl Normal file
View File

@@ -0,0 +1,25 @@
# Nomad Configuration for Singularity/Apptainer Sandbox
# Run with: nomad agent -dev -config=nomad-singularity.hcl
#
# This enables the raw_exec driver, which can be used to run Apptainer
# commands on hosts where Docker is unavailable.
#
# NOTE: Hermes-Agent's Nomad backend support is draft; this file is provided
# as a starting point for local testing.
client {
enabled = true
options {
"driver.raw_exec.enable" = "1"
}
}
plugin "raw_exec" {
config {
enabled = true
}
}
# If you have a dedicated Nomad Singularity/Apptainer driver plugin installed,
# you can configure that instead of raw_exec.

View File

@@ -35,12 +35,16 @@ dependencies = [
[project.optional-dependencies]
modal = ["swe-rex[modal]>=1.4.0"]
# Nomad SlotPool sandbox backend (local Nomad dev agent + Docker/Apptainer)
# Uses aiohttp for the Nomad HTTP API and sandbox-server execution.
nomad = ["aiohttp>=3.9.0"]
dev = ["pytest", "pytest-asyncio"]
messaging = ["python-telegram-bot>=20.0", "discord.py>=2.0", "aiohttp>=3.9.0"]
cron = ["croniter"]
cli = ["simple-term-menu"]
all = [
"hermes-agent[modal]",
"hermes-agent[nomad]",
"hermes-agent[messaging]",
"hermes-agent[cron]",
"hermes-agent[cli]",

569
tools/nomad_slotpool.py Normal file
View File

@@ -0,0 +1,569 @@
"""Nomad SlotPool sandbox backend (ported from Nous/atropos-agent).
This module provides an OPTIONAL slot-based sandbox execution backend.
Design goals:
- Keep Hermes-Agent's default terminal backends unchanged.
- Enable the Nomad backend with minimal friction:
TERMINAL_ENV=nomad
How it works (high level):
- A local Nomad dev agent manages one or more sandbox-server containers.
- Each container hosts N "slots" (workspace dirs).
- Each agent trajectory acquires a slot, runs tools inside it, then releases.
This file is intentionally self-contained so we can iterate without touching
other tool implementations.
NOTE: This backend requires aiohttp and a working Nomad installation.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import aiohttp
logger = logging.getLogger(__name__)
# =============================================================================
# Nomad API client (async)
# =============================================================================
class AllocationStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
LOST = "lost"
@dataclass
class Allocation:
id: str
job_id: str
task_group: str
node_id: str
status: AllocationStatus
address: Optional[str] = None
port: Optional[int] = None
@property
def http_address(self) -> Optional[str]:
if self.address and self.port:
return f"http://{self.address}:{self.port}"
return None
class NomadClient:
def __init__(
self,
address: str = "http://localhost:4646",
token: Optional[str] = None,
timeout: float = 30.0,
):
self.address = address.rstrip("/")
self.token = token or os.environ.get("NOMAD_TOKEN")
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
headers = {}
if self.token:
headers["X-Nomad-Token"] = self.token
self._session = aiohttp.ClientSession(timeout=self.timeout, headers=headers)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def _request(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
session = await self._get_session()
url = f"{self.address}{path}"
async with session.request(method, url, json=data) as resp:
if resp.status == 404:
return {"error": "not_found", "status": 404}
text = await resp.text()
if not text:
return {"status": resp.status}
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return {"text": text, "status": resp.status}
if resp.status >= 400:
return {"error": parsed, "status": resp.status}
return parsed if isinstance(parsed, dict) else {"data": parsed, "status": resp.status}
async def is_healthy(self) -> bool:
try:
res = await self._request("GET", "/v1/status/leader")
return "error" not in res
except Exception:
return False
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
res = await self._request("GET", f"/v1/job/{job_id}")
if res.get("error") == "not_found":
return None
if "error" in res:
raise RuntimeError(f"Nomad get_job failed: {res}")
return res
async def submit_job(self, job_spec: Dict[str, Any]) -> Dict[str, Any]:
return await self._request("POST", "/v1/jobs", data=job_spec)
async def stop_job(self, job_id: str, purge: bool = True) -> Dict[str, Any]:
return await self._request("DELETE", f"/v1/job/{job_id}?purge={'true' if purge else 'false'}")
async def get_job_allocations(self, job_id: str) -> List[Allocation]:
res = await self._request("GET", f"/v1/job/{job_id}/allocations")
if "error" in res:
# Some Nomad builds return list directly, not dict
if isinstance(res.get("data"), list):
allocs_data = res["data"]
else:
raise RuntimeError(f"Nomad allocations failed: {res}")
allocs_data = res if isinstance(res, list) else res.get("data", res)
if not isinstance(allocs_data, list):
return []
allocs: List[Allocation] = []
for a in allocs_data:
try:
allocs.append(
Allocation(
id=a["ID"],
job_id=a.get("JobID", job_id),
task_group=a.get("TaskGroup", "sandbox"),
node_id=a.get("NodeID", ""),
status=AllocationStatus(str(a.get("ClientStatus", "pending"))),
)
)
except Exception:
continue
return allocs
async def get_allocation(self, alloc_id: str) -> Dict[str, Any]:
res = await self._request("GET", f"/v1/allocation/{alloc_id}")
if "error" in res:
raise RuntimeError(f"Nomad get_allocation failed: {res}")
return res
def create_sandbox_job(
job_id: str,
image: str,
count: int,
slots_per_container: int,
privileged: bool,
cpu: int,
memory: int,
port: int = 8080,
datacenter: str = "dc1",
driver: str = "docker",
) -> Dict[str, Any]:
"""Create a sandbox-server Nomad job spec.
driver:
- docker: runs tools/sandbox_server.py inside a Docker container image.
- raw_exec: runs Apptainer/Singularity via raw_exec on the host.
raw_exec is provided as a draft option for HPC-like hosts without Docker.
"""
if driver == "raw_exec":
# For raw_exec, we bind to a fixed port. This is intended for dev/testing
# on a single host. Scaling to multiple allocations requires more robust
# service discovery.
apptainer_image = os.getenv("TERMINAL_NOMAD_APPTAINER_IMAGE", "")
if not apptainer_image:
raise RuntimeError(
"TERMINAL_NOMAD_APPTAINER_IMAGE is required for raw_exec driver"
)
cmd = (
"apptainer exec "
"--bind \"$NOMAD_ALLOC_DIR/data:/data\" "
f"{apptainer_image} "
f"python /sandbox_server.py --port {port} --slots {slots_per_container} --data-dir /data"
)
return {
"ID": job_id,
"Name": job_id,
"Type": "service",
"Datacenters": [datacenter],
"TaskGroups": [
{
"Name": "sandbox",
"Count": count,
"Update": {"HealthCheck": "task_states", "MinHealthyTime": 0},
"Networks": [{"Mode": "host"}],
"Tasks": [
{
"Name": "sandbox-server",
"Driver": "raw_exec",
"Config": {
"command": "bash",
"args": ["-lc", cmd],
},
"Env": {"PYTHONUNBUFFERED": "1", "NOMAD_ALLOC_DIR": "${NOMAD_ALLOC_DIR}"},
"Resources": {"CPU": cpu, "MemoryMB": memory},
}
],
}
],
}
# Default: docker
return {
"ID": job_id,
"Name": job_id,
"Type": "service",
"Datacenters": [datacenter],
"TaskGroups": [
{
"Name": "sandbox",
"Count": count,
"Update": {"HealthCheck": "task_states", "MinHealthyTime": 0},
"Networks": [
{
"Mode": "host",
"DynamicPorts": [{"Label": "http", "To": port}],
}
],
"Tasks": [
{
"Name": "sandbox-server",
"Driver": "docker",
"Config": {
"image": image,
"force_pull": False,
"ports": ["http"],
"privileged": privileged,
"command": "python",
"args": [
"/sandbox_server.py",
"--port",
str(port),
"--slots",
str(slots_per_container),
"--data-dir",
"/data",
],
},
"Env": {"PYTHONUNBUFFERED": "1", "NOMAD_ALLOC_DIR": "${NOMAD_ALLOC_DIR}"},
"Resources": {"CPU": cpu, "MemoryMB": memory},
}
],
}
],
}
# =============================================================================
# Slot + executor
# =============================================================================
class SlotState(Enum):
AVAILABLE = "available"
ACQUIRED = "acquired"
EXECUTING = "executing"
ERROR = "error"
@dataclass
class Slot:
slot_id: str
alloc_id: str
container_addr: str
workspace_dir: str = ""
state: SlotState = SlotState.AVAILABLE
trajectory_id: Optional[str] = None
def __post_init__(self):
if not self.workspace_dir:
self.workspace_dir = f"/data/{self.slot_id}"
@property
def is_available(self) -> bool:
return self.state == SlotState.AVAILABLE
def acquire(self, trajectory_id: str):
if not self.is_available:
raise RuntimeError(f"Slot not available: {self.slot_id} ({self.state})")
self.state = SlotState.ACQUIRED
self.trajectory_id = trajectory_id
def release(self):
self.state = SlotState.AVAILABLE
self.trajectory_id = None
def create_slots_for_allocation(alloc_id: str, container_addr: str, num_slots: int) -> List[Slot]:
return [
Slot(slot_id=f"slot_{i}", alloc_id=alloc_id, container_addr=container_addr)
for i in range(num_slots)
]
@dataclass
class ExecutionResult:
success: bool
output: str = ""
error: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
class SandboxExecutor:
def __init__(self, timeout: float = 30.0):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(timeout=self.timeout)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def execute(self, slot: Slot, tool_name: str, args: Dict[str, Any], timeout: float) -> ExecutionResult:
session = await self._get_session()
url = f"{slot.container_addr}/execute"
payload = {
"slot_id": slot.slot_id,
"tool": tool_name,
"args": args,
"execution_id": str(uuid.uuid4()),
"timeout": timeout,
}
async with session.post(url, json=payload) as resp:
data = await resp.json()
return ExecutionResult(
success=bool(data.get("success", False)),
output=str(data.get("output", "")),
error=str(data.get("error", "")),
metadata=dict(data.get("metadata", {}) or {}),
)
async def reset_slot(self, slot: Slot) -> ExecutionResult:
session = await self._get_session()
url = f"{slot.container_addr}/reset"
payload = {"slot_id": slot.slot_id}
async with session.post(url, json=payload) as resp:
data = await resp.json()
return ExecutionResult(
success=bool(data.get("success", False)),
output=str(data.get("output", "")),
error=str(data.get("error", "")),
metadata=dict(data.get("metadata", {}) or {}),
)
@dataclass
class SlotPoolConfig:
nomad_address: str = "http://localhost:4646"
job_id: str = "hermes-sandbox"
datacenter: str = "dc1"
image: str = "hermes-sandbox:local"
# Driver selection: docker (default) or raw_exec (for Apptainer/Singularity)
driver: str = "docker" # docker | raw_exec
slots_per_container: int = 10
privileged: bool = False
cpu: int = 500
memory: int = 512
min_containers: int = 1
max_containers: int = 10
acquire_timeout: float = 30.0
# raw_exec: fixed port (dynamic ports are harder to discover reliably)
raw_exec_port: int = 8080
class SlotPool:
def __init__(self, cfg: SlotPoolConfig):
self.cfg = cfg
self.nomad = NomadClient(address=cfg.nomad_address)
self.executor = SandboxExecutor(timeout=cfg.acquire_timeout)
self._slots: Dict[str, Slot] = {}
self._queue: asyncio.Queue[str] = asyncio.Queue()
self._started = False
def _slot_key(self, alloc_id: str, slot_id: str) -> str:
return f"{alloc_id}:{slot_id}"
async def start(self):
if self._started:
return
if not await self.nomad.is_healthy():
raise RuntimeError(f"Nomad not reachable at {self.cfg.nomad_address}")
job = await self.nomad.get_job(self.cfg.job_id)
if job is None:
spec = create_sandbox_job(
job_id=self.cfg.job_id,
image=self.cfg.image,
count=self.cfg.min_containers,
slots_per_container=self.cfg.slots_per_container,
privileged=self.cfg.privileged,
cpu=self.cfg.cpu,
memory=self.cfg.memory,
datacenter=self.cfg.datacenter,
driver=self.cfg.driver,
port=self.cfg.raw_exec_port,
)
res = await self.nomad.submit_job(spec)
if "error" in res:
raise RuntimeError(f"Nomad submit job failed: {res}")
await self._refresh_slots()
self._started = True
async def close(self):
await self.executor.close()
await self.nomad.close()
async def _refresh_slots(self):
allocs = await self.nomad.get_job_allocations(self.cfg.job_id)
for alloc in allocs:
detail = await self.nomad.get_allocation(alloc.id)
# Find the mapped host port for "http".
addr = detail.get("NodeName") or detail.get("NodeID")
# Prefer explicit address in allocation resources
# Nomad stores addresses under Resources->Networks sometimes.
net = (detail.get("Resources") or {}).get("Networks") or []
address = None
port = None
if self.cfg.driver == "raw_exec":
# raw_exec: use fixed port and best-effort node address discovery
port = self.cfg.raw_exec_port
address = (
detail.get("NodeAddress")
or detail.get("NodeName")
or detail.get("NodeID")
)
else:
# docker: discover dynamic port mapping
if net and isinstance(net, list):
n0 = net[0]
address = n0.get("IP")
ports = n0.get("DynamicPorts") or []
for p in ports:
if p.get("Label") == "http":
port = p.get("Value")
if not address or not port:
# Fall back: allocation node identity
address = detail.get("NodeAddress") or detail.get("NodeName") or detail.get("NodeID")
if not address or not port:
# Can't use this alloc
continue
container_addr = f"http://{address}:{port}"
for s in create_slots_for_allocation(alloc.id, container_addr, self.cfg.slots_per_container):
key = self._slot_key(s.alloc_id, s.slot_id)
if key in self._slots:
continue
self._slots[key] = s
await self._queue.put(key)
async def acquire(self, trajectory_id: str) -> Slot:
if not self._started:
raise RuntimeError("SlotPool not started")
while True:
key = await asyncio.wait_for(self._queue.get(), timeout=self.cfg.acquire_timeout)
slot = self._slots.get(key)
if not slot:
continue
try:
slot.acquire(trajectory_id)
return slot
except Exception:
continue
async def release(self, slot: Slot, reset_workspace: bool = True):
if reset_workspace:
try:
await self.executor.reset_slot(slot)
except Exception:
pass
slot.release()
await self._queue.put(self._slot_key(slot.alloc_id, slot.slot_id))
async def execute_bash(self, slot: Slot, command: str, timeout_s: float) -> ExecutionResult:
return await self.executor.execute(slot, "bash", {"command": command}, timeout=timeout_s)
# =============================================================================
# Sync wrapper (thread + event loop)
# =============================================================================
class NomadSlotPoolManager:
"""Runs a SlotPool on a dedicated event loop thread for sync callers."""
def __init__(self, cfg: SlotPoolConfig):
self.cfg = cfg
self._pool = SlotPool(cfg)
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._started = threading.Event()
def _run_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
loop.run_until_complete(self._pool.start())
self._started.set()
loop.run_forever()
def start(self):
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run_loop, name="nomad-slotpool", daemon=True)
self._thread.start()
self._started.wait(timeout=120)
def _call(self, coro):
if not self._loop:
raise RuntimeError("NomadSlotPoolManager not started")
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fut.result()
def acquire(self, trajectory_id: str) -> Slot:
return self._call(self._pool.acquire(trajectory_id))
def release(self, slot: Slot, reset_workspace: bool = True):
return self._call(self._pool.release(slot, reset_workspace=reset_workspace))
def execute_bash(self, slot: Slot, command: str, timeout_s: float) -> ExecutionResult:
return self._call(self._pool.execute_bash(slot, command, timeout_s=timeout_s))
_global_manager: Optional[NomadSlotPoolManager] = None
def get_global_manager(cfg: SlotPoolConfig) -> NomadSlotPoolManager:
global _global_manager
if _global_manager is None:
_global_manager = NomadSlotPoolManager(cfg)
_global_manager.start()
return _global_manager

13
tools/sandbox/Dockerfile Normal file
View File

@@ -0,0 +1,13 @@
FROM python:3.11-slim
WORKDIR /
# Minimal deps for sandbox_server
RUN pip install --no-cache-dir aiohttp
# Copy sandbox server
COPY tools/sandbox_server.py /sandbox_server.py
EXPOSE 8080
CMD ["python", "/sandbox_server.py", "--port", "8080", "--slots", "10", "--data-dir", "/data"]

1912
tools/sandbox_server.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1215,6 +1215,16 @@ def _get_env_config() -> Dict[str, Any]:
"cwd": os.getenv("TERMINAL_CWD", default_cwd),
"timeout": int(os.getenv("TERMINAL_TIMEOUT", "60")),
"lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")),
# Nomad sandbox backend (slot pool)
"nomad_address": os.getenv("TERMINAL_NOMAD_ADDRESS", "http://localhost:4646"),
"nomad_job_id": os.getenv("TERMINAL_NOMAD_JOB_ID", "hermes-sandbox"),
"nomad_image": os.getenv("TERMINAL_NOMAD_IMAGE", "hermes-sandbox:local"),
"nomad_slots": int(os.getenv("TERMINAL_NOMAD_SLOTS", "10")),
"nomad_min": int(os.getenv("TERMINAL_NOMAD_MIN", "1")),
"nomad_max": int(os.getenv("TERMINAL_NOMAD_MAX", "10")),
"nomad_privileged": os.getenv("TERMINAL_NOMAD_PRIVILEGED", "").lower() in ("1", "true", "yes"),
# SSH-specific config
"ssh_host": os.getenv("TERMINAL_SSH_HOST", ""),
"ssh_user": os.getenv("TERMINAL_SSH_USER", ""),
@@ -1223,15 +1233,16 @@ def _get_env_config() -> Dict[str, Any]:
}
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None):
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, task_id: str = "", ssh_config: dict = None):
"""
Create an execution environment from mini-swe-agent.
Args:
env_type: One of "local", "docker", "singularity", "modal", "ssh"
image: Docker/Singularity/Modal image name (ignored for local/ssh)
env_type: One of "local", "docker", "singularity", "modal", "ssh", "nomad"
image: Docker/Singularity/Modal image name (ignored for local/ssh/nomad)
cwd: Working directory
timeout: Default command timeout
task_id: Used for per-task sandbox isolation (nomad/modal pooling)
ssh_config: SSH connection config (for env_type="ssh")
Returns:
@@ -1252,7 +1263,48 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_c
elif env_type == "modal":
# Use custom Modal wrapper with sudo support
return _ModalEnvironment(image=image, cwd=cwd, timeout=timeout)
elif env_type == "nomad":
# Slot-based sandbox backend via local Nomad dev agent.
# Requires: pip install "hermes-agent[nomad]" and a running Nomad agent.
from tools.nomad_slotpool import SlotPoolConfig, get_global_manager
cfg = SlotPoolConfig(
nomad_address=os.getenv("TERMINAL_NOMAD_ADDRESS", "http://localhost:4646"),
job_id=os.getenv("TERMINAL_NOMAD_JOB_ID", "hermes-sandbox"),
image=os.getenv("TERMINAL_NOMAD_IMAGE", "hermes-sandbox:local"),
driver=os.getenv("TERMINAL_NOMAD_DRIVER", "docker"),
raw_exec_port=int(os.getenv("TERMINAL_NOMAD_RAW_EXEC_PORT", "8080")),
slots_per_container=int(os.getenv("TERMINAL_NOMAD_SLOTS", "10")),
min_containers=int(os.getenv("TERMINAL_NOMAD_MIN", "1")),
max_containers=int(os.getenv("TERMINAL_NOMAD_MAX", "10")),
privileged=os.getenv("TERMINAL_NOMAD_PRIVILEGED", "").lower() in ("1", "true", "yes"),
)
manager = get_global_manager(cfg)
slot = manager.acquire(task_id or str(uuid.uuid4()))
class _NomadSlotEnvironment:
def __init__(self, _slot):
self._slot = _slot
def execute(self, command: str, cwd: str = "", *, timeout: int | None = None) -> dict:
# cwd ignored: sandbox_server executes within slot workspace.
timeout_s = float(timeout if timeout is not None else 60)
res = manager.execute_bash(self._slot, command, timeout_s=timeout_s)
exit_code = int(res.metadata.get("exit_code", 0)) if isinstance(res.metadata, dict) else 0
return {"output": res.output, "returncode": exit_code}
def cleanup(self):
try:
manager.release(self._slot, reset_workspace=True)
except Exception:
pass
def stop(self):
self.cleanup()
return _NomadSlotEnvironment(slot)
elif env_type == "ssh":
if not ssh_config or not ssh_config.get("host") or not ssh_config.get("user"):
raise ValueError("SSH environment requires ssh_host and ssh_user to be configured")
@@ -1578,6 +1630,7 @@ def terminal_tool(
image=image,
cwd=cwd,
timeout=effective_timeout,
task_id=effective_task_id,
ssh_config=ssh_config
)
except ImportError as e: