mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 15:31:38 +08:00
Compare commits
3 Commits
skill/gith
...
nomad-back
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
354e668eee | ||
|
|
327eb38b15 | ||
|
|
ae6435f787 |
34
README.md
34
README.md
@@ -199,7 +199,7 @@ The terminal tool can execute commands in different environments:
|
||||
**Configure in `~/.hermes/config.yaml`:**
|
||||
```yaml
|
||||
terminal:
|
||||
backend: local # or: docker, ssh, singularity, modal
|
||||
backend: local # or: docker, ssh, singularity, modal, nomad
|
||||
cwd: "." # Working directory ("." = current dir)
|
||||
timeout: 180 # Command timeout in seconds
|
||||
```
|
||||
@@ -240,6 +240,38 @@ modal setup # Authenticate with Modal
|
||||
hermes config set terminal.backend modal
|
||||
```
|
||||
|
||||
**Nomad SlotPool (optional, draft):** local Nomad dev agent + docker sandbox-server.
|
||||
|
||||
This backend is under active development and must be explicitly enabled.
|
||||
|
||||
```bash
|
||||
uv pip install "hermes-agent[nomad]"
|
||||
|
||||
# Start Nomad (dev mode)
|
||||
# Docker driver:
|
||||
nomad agent -dev -config=nomad-dev.hcl
|
||||
|
||||
# Apptainer/raw_exec driver (draft):
|
||||
# nomad agent -dev -config=nomad-singularity.hcl
|
||||
|
||||
# Build sandbox-server image
|
||||
docker build -t hermes-sandbox:local -f tools/sandbox/Dockerfile .
|
||||
|
||||
# Enable Nomad backend
|
||||
export TERMINAL_ENV=nomad
|
||||
export TERMINAL_NOMAD_ADDRESS=http://localhost:4646
|
||||
export TERMINAL_NOMAD_JOB_ID=hermes-sandbox
|
||||
export TERMINAL_NOMAD_IMAGE=hermes-sandbox:local
|
||||
export TERMINAL_NOMAD_DRIVER=docker # or: raw_exec (Apptainer draft)
|
||||
export TERMINAL_NOMAD_SLOTS=10
|
||||
export TERMINAL_NOMAD_MIN=1
|
||||
export TERMINAL_NOMAD_MAX=10
|
||||
|
||||
# raw_exec only:
|
||||
# export TERMINAL_NOMAD_APPTAINER_IMAGE=/path/to/hermes-sandbox.sif
|
||||
# export TERMINAL_NOMAD_RAW_EXEC_PORT=8080
|
||||
```
|
||||
|
||||
**Sudo Support:** If a command needs sudo, you'll be prompted for your password (cached for the session). Or set `SUDO_PASSWORD` in `~/.hermes/.env`.
|
||||
|
||||
### 📱 Messaging Gateway
|
||||
|
||||
68
docs/nomad_backend_plan.md
Normal file
68
docs/nomad_backend_plan.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# PR3 Plan: Optional Nomad SlotPool Backend (Draft)
|
||||
|
||||
Goal: Reintroduce the old `atropos-agent` Nomad SlotPool sandbox backend **without competing** with the current default terminal/Modal code path.
|
||||
|
||||
## Non-goals
|
||||
- Do not change the model-facing tool schema.
|
||||
- Do not change the default behavior.
|
||||
- Do not require Nomad unless explicitly enabled.
|
||||
|
||||
## Desired UX
|
||||
|
||||
Default (today):
|
||||
- `TERMINAL_ENV=local|docker|singularity|ssh|modal`
|
||||
|
||||
Add (PR3):
|
||||
- `TERMINAL_ENV=nomad` (or `TERMINAL_ENV=sandbox`)
|
||||
- Uses slot-based multiplexing: tasks acquire a slot, execute commands inside it, release.
|
||||
- Driver can be `docker` or `singularity` (Apptainer).
|
||||
|
||||
Minimal switching friction:
|
||||
- same `terminal_tool(command, task_id=...)` interface
|
||||
- only env vars change
|
||||
|
||||
## Proposed integration approach
|
||||
|
||||
Because the full Nomad/SlotPool implementation lives in `/Users/shannon/Workspace/Nous/atropos-agent`, we should not duplicate it in Hermes-Agent.
|
||||
|
||||
Instead:
|
||||
|
||||
1) Add an **optional dependency** on `atropos-agent` (package name TBD) behind an extra, e.g.
|
||||
- `pip install hermes-agent[nomad]`
|
||||
|
||||
2) Add a small adapter module in Hermes-Agent:
|
||||
- `tools/nomad_pool.py` (adapter)
|
||||
|
||||
3) In `tools/terminal_tool.py` extend `_create_environment()`:
|
||||
- accept `task_id` (already added in PR2 for modal pooling)
|
||||
- add a new env_type:
|
||||
- `env_type == "nomad"` → return `NomadPooledTaskEnvironment`
|
||||
|
||||
4) `NomadPooledTaskEnvironment` mirrors `ModalPooledTaskEnvironment`:
|
||||
- owns a slot lease
|
||||
- executes command within slot workspace
|
||||
- releases slot on cleanup
|
||||
|
||||
## Configuration env vars
|
||||
|
||||
- `TERMINAL_ENV=nomad`
|
||||
- `TERMINAL_NOMAD_ADDRESS=http://localhost:4646`
|
||||
- `TERMINAL_NOMAD_DRIVER=docker|singularity`
|
||||
- `TERMINAL_NOMAD_IMAGE=...`
|
||||
- `TERMINAL_NOMAD_SLOTS=10`
|
||||
- `TERMINAL_NOMAD_MIN=1`
|
||||
- `TERMINAL_NOMAD_MAX=10`
|
||||
|
||||
Optionally:
|
||||
- `TERMINAL_NOMAD_AUTOSTART=1` (start `nomad agent -dev ...` locally if not running)
|
||||
|
||||
## Testing plan (when compute available)
|
||||
|
||||
- Sanity: `TERMINAL_ENV=nomad` run `terminal_tool("echo hello")`
|
||||
- Concurrency: run N parallel tasks, ensure slot leases are unique and isolated
|
||||
- Cleanup: ensure slot release happens even on exceptions
|
||||
|
||||
## Current status
|
||||
|
||||
- Hermes-Agent `main` does **not** contain the Nomad backend. The full implementation is in `atropos-agent`.
|
||||
- PR3 should be an adapter + optional extra, not a rewrite.
|
||||
@@ -73,6 +73,12 @@ class AgentResult:
|
||||
# Tool errors encountered during the loop
|
||||
tool_errors: List[ToolError] = field(default_factory=list)
|
||||
|
||||
# Tool-call metrics (debugging / optional reward shaping)
|
||||
tool_calls_attempted: int = 0
|
||||
tool_calls_schema_valid: int = 0
|
||||
tool_calls_executed_ok: int = 0
|
||||
tool_calls_exec_error: int = 0
|
||||
|
||||
|
||||
def _extract_reasoning_from_message(message) -> Optional[str]:
|
||||
"""
|
||||
@@ -136,6 +142,8 @@ class HermesAgentLoop:
|
||||
temperature: float = 1.0,
|
||||
max_tokens: Optional[int] = None,
|
||||
extra_body: Optional[Dict[str, Any]] = None,
|
||||
tool_handler=None,
|
||||
max_context_tokens: Optional[int] = None,
|
||||
):
|
||||
"""
|
||||
Initialize the agent loop.
|
||||
@@ -152,6 +160,13 @@ class HermesAgentLoop:
|
||||
extra_body: Extra parameters passed to the OpenAI client's create() call.
|
||||
Used for OpenRouter provider preferences, transforms, etc.
|
||||
e.g. {"provider": {"ignore": ["DeepInfra"]}}
|
||||
tool_handler: Optional async callable(tool_name, args, task_id) -> str.
|
||||
When provided, used INSTEAD of handle_function_call() for
|
||||
tool dispatch. This allows sandbox backends (Modal, Nomad)
|
||||
to route tool calls through their slot-based execution.
|
||||
max_context_tokens: Maximum prompt tokens before truncation.
|
||||
If None, no truncation is applied.
|
||||
Recommended: set to max_model_len - max_tokens - 512 (safety margin).
|
||||
"""
|
||||
self.server = server
|
||||
self.tool_schemas = tool_schemas
|
||||
@@ -161,6 +176,123 @@ class HermesAgentLoop:
|
||||
self.temperature = temperature
|
||||
self.max_tokens = max_tokens
|
||||
self.extra_body = extra_body
|
||||
self.tool_handler = tool_handler
|
||||
self.max_context_tokens = max_context_tokens
|
||||
|
||||
def _truncate_context(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Truncate conversation history to fit within max_context_tokens.
|
||||
|
||||
Strategy:
|
||||
- Keep system message (index 0) and initial user message (index 1) always
|
||||
- Keep last 6 messages (recent context) always
|
||||
- For everything in between, progressively truncate tool result content
|
||||
- If still too long, drop oldest middle messages entirely
|
||||
|
||||
Uses rough char/4 token estimate (fast, no tokenizer needed).
|
||||
|
||||
NOTE: This function mutates the provided list (it may pop/replace entries).
|
||||
Call it on a copy when you want to preserve the full trajectory.
|
||||
"""
|
||||
if self.max_context_tokens is None:
|
||||
return messages
|
||||
|
||||
def estimate_tokens(msgs):
|
||||
total = 0
|
||||
for m in msgs:
|
||||
content = m.get("content", "") or ""
|
||||
total += len(content) // 4 + 10 # ~4 chars per token + overhead
|
||||
if "tool_calls" in m:
|
||||
total += 50 * len(m["tool_calls"]) # tool call overhead
|
||||
return total
|
||||
|
||||
if estimate_tokens(messages) <= self.max_context_tokens:
|
||||
return messages
|
||||
|
||||
protect_head = 2
|
||||
protect_tail = max(0, min(6, len(messages) - protect_head))
|
||||
middle_start = protect_head
|
||||
middle_end = len(messages) - protect_tail
|
||||
|
||||
# Phase 1: truncate tool outputs in the middle
|
||||
if middle_start < middle_end:
|
||||
for i in range(middle_start, middle_end):
|
||||
if messages[i].get("role") == "tool":
|
||||
content = messages[i].get("content", "") or ""
|
||||
if len(content) > 200:
|
||||
messages[i] = dict(messages[i])
|
||||
messages[i]["content"] = content[:100] + "\n...[truncated]...\n" + content[-50:]
|
||||
|
||||
if estimate_tokens(messages) <= self.max_context_tokens:
|
||||
return messages
|
||||
|
||||
# Phase 2: drop oldest middle messages (try to keep assistant+tool pairs)
|
||||
while middle_start < middle_end and estimate_tokens(messages) > self.max_context_tokens:
|
||||
msg = messages[middle_start]
|
||||
messages.pop(middle_start)
|
||||
middle_end -= 1
|
||||
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
tool_ids = {
|
||||
tc.get("id") or tc.get("tool_call_id", "")
|
||||
for tc in msg.get("tool_calls", [])
|
||||
if isinstance(tc, dict)
|
||||
}
|
||||
i = middle_start
|
||||
while i < middle_end:
|
||||
if messages[i].get("role") == "tool" and messages[i].get("tool_call_id", "") in tool_ids:
|
||||
messages.pop(i)
|
||||
middle_end -= 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
return messages
|
||||
|
||||
def _normalize_tool_args(self, tool_name: str, tool_args_raw: str) -> (Dict[str, Any], bool):
|
||||
"""Normalize tool arguments into a dict.
|
||||
|
||||
Returns: (args_dict, schema_valid)
|
||||
|
||||
schema_valid is True only when arguments decode directly into a dict
|
||||
(no double-decoding and no coercion/wrapping required).
|
||||
|
||||
Goal: keep environments robust (never crash on args format drift) while
|
||||
still allowing reward functions to penalize malformed formats if desired.
|
||||
"""
|
||||
try:
|
||||
decoded = json.loads(tool_args_raw)
|
||||
except json.JSONDecodeError:
|
||||
# Not JSON at all — treat as a plain string
|
||||
if tool_name == "terminal":
|
||||
return {"command": tool_args_raw}, False
|
||||
return {"input": tool_args_raw}, False
|
||||
|
||||
if isinstance(decoded, dict):
|
||||
if tool_name == "terminal":
|
||||
cmd = decoded.get("command")
|
||||
if isinstance(cmd, str) and cmd.strip():
|
||||
return decoded, True
|
||||
if isinstance(decoded.get("input"), str):
|
||||
return {"command": decoded.get("input")}, False
|
||||
return decoded, False
|
||||
return decoded, True
|
||||
|
||||
if isinstance(decoded, str):
|
||||
s = decoded.strip()
|
||||
if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
|
||||
try:
|
||||
decoded2 = json.loads(s)
|
||||
except json.JSONDecodeError:
|
||||
decoded2 = None
|
||||
if isinstance(decoded2, dict):
|
||||
return decoded2, False
|
||||
|
||||
if tool_name == "terminal":
|
||||
return {"command": decoded}, False
|
||||
return {"input": decoded}, False
|
||||
|
||||
if tool_name == "terminal":
|
||||
return {"command": str(decoded)}, False
|
||||
return {"input": decoded}, False
|
||||
|
||||
async def run(self, messages: List[Dict[str, Any]]) -> AgentResult:
|
||||
"""
|
||||
@@ -176,14 +308,22 @@ class HermesAgentLoop:
|
||||
reasoning_per_turn = []
|
||||
tool_errors: List[ToolError] = []
|
||||
|
||||
tool_calls_attempted = 0
|
||||
tool_calls_schema_valid = 0
|
||||
tool_calls_executed_ok = 0
|
||||
tool_calls_exec_error = 0
|
||||
|
||||
import time as _time
|
||||
|
||||
for turn in range(self.max_turns):
|
||||
turn_start = _time.monotonic()
|
||||
|
||||
# Truncate prompt view on a copy (preserve full trajectory in `messages`)
|
||||
prompt_messages = self._truncate_context(list(messages))
|
||||
|
||||
# Build the chat_completion kwargs
|
||||
chat_kwargs = {
|
||||
"messages": messages,
|
||||
"messages": prompt_messages,
|
||||
"n": 1,
|
||||
"temperature": self.temperature,
|
||||
}
|
||||
@@ -215,6 +355,10 @@ class HermesAgentLoop:
|
||||
finished_naturally=False,
|
||||
reasoning_per_turn=reasoning_per_turn,
|
||||
tool_errors=tool_errors,
|
||||
tool_calls_attempted=tool_calls_attempted,
|
||||
tool_calls_schema_valid=tool_calls_schema_valid,
|
||||
tool_calls_executed_ok=tool_calls_executed_ok,
|
||||
tool_calls_exec_error=tool_calls_exec_error,
|
||||
)
|
||||
|
||||
api_elapsed = _time.monotonic() - api_start
|
||||
@@ -228,6 +372,10 @@ class HermesAgentLoop:
|
||||
finished_naturally=False,
|
||||
reasoning_per_turn=reasoning_per_turn,
|
||||
tool_errors=tool_errors,
|
||||
tool_calls_attempted=tool_calls_attempted,
|
||||
tool_calls_schema_valid=tool_calls_schema_valid,
|
||||
tool_calls_executed_ok=tool_calls_executed_ok,
|
||||
tool_calls_exec_error=tool_calls_exec_error,
|
||||
)
|
||||
|
||||
assistant_msg = response.choices[0].message
|
||||
@@ -270,6 +418,7 @@ class HermesAgentLoop:
|
||||
|
||||
# Validate tool name
|
||||
if tool_name not in self.valid_tool_names:
|
||||
tool_calls_exec_error += 1
|
||||
tool_result = json.dumps(
|
||||
{
|
||||
"error": f"Unknown tool '{tool_name}'. "
|
||||
@@ -287,35 +436,35 @@ class HermesAgentLoop:
|
||||
tool_name, turn + 1,
|
||||
)
|
||||
else:
|
||||
# Parse arguments and dispatch
|
||||
try:
|
||||
args = json.loads(tool_args_raw)
|
||||
except json.JSONDecodeError:
|
||||
args = {}
|
||||
logger.warning(
|
||||
"Invalid JSON in tool call arguments for '%s': %s",
|
||||
tool_name, tool_args_raw[:200],
|
||||
)
|
||||
tool_calls_attempted += 1
|
||||
args, schema_valid = self._normalize_tool_args(tool_name, tool_args_raw)
|
||||
if schema_valid:
|
||||
tool_calls_schema_valid += 1
|
||||
|
||||
try:
|
||||
if tool_name == "terminal":
|
||||
backend = os.getenv("TERMINAL_ENV", "local")
|
||||
cmd_preview = args.get("command", "")[:80]
|
||||
cmd_preview = str(args.get("command", ""))[:80]
|
||||
logger.info(
|
||||
"[%s] $ %s", self.task_id[:8], cmd_preview,
|
||||
)
|
||||
|
||||
# Run tool calls in a thread pool so backends that use
|
||||
# asyncio.run() internally (modal, docker) get a clean
|
||||
# event loop instead of deadlocking inside Atropos's loop.
|
||||
tool_submit_time = _time.monotonic()
|
||||
loop = asyncio.get_event_loop()
|
||||
tool_result = await loop.run_in_executor(
|
||||
_tool_executor,
|
||||
lambda: handle_function_call(
|
||||
tool_name, args, task_id=self.task_id
|
||||
),
|
||||
)
|
||||
|
||||
if self.tool_handler:
|
||||
tool_result = await self.tool_handler(tool_name, args, self.task_id)
|
||||
else:
|
||||
# Run tool calls in a thread pool so backends that use
|
||||
# asyncio.run() internally (modal, docker) get a clean
|
||||
# event loop instead of deadlocking inside Atropos's loop.
|
||||
loop = asyncio.get_event_loop()
|
||||
tool_result = await loop.run_in_executor(
|
||||
_tool_executor,
|
||||
lambda: handle_function_call(
|
||||
tool_name, args, task_id=self.task_id
|
||||
),
|
||||
)
|
||||
|
||||
tool_elapsed = _time.monotonic() - tool_submit_time
|
||||
|
||||
# Log slow tools and thread pool stats for debugging
|
||||
@@ -327,6 +476,7 @@ class HermesAgentLoop:
|
||||
tool_elapsed, pool_active,
|
||||
)
|
||||
except Exception as e:
|
||||
tool_calls_exec_error += 1
|
||||
tool_result = json.dumps(
|
||||
{"error": f"Tool execution failed: {type(e).__name__}: {str(e)}"}
|
||||
)
|
||||
@@ -340,22 +490,31 @@ class HermesAgentLoop:
|
||||
"Tool '%s' execution failed on turn %d: %s",
|
||||
tool_name, turn + 1, e,
|
||||
)
|
||||
else:
|
||||
tool_err = False
|
||||
try:
|
||||
result_data = json.loads(tool_result)
|
||||
if isinstance(result_data, dict):
|
||||
err = result_data.get("error")
|
||||
if err:
|
||||
tool_err = True
|
||||
|
||||
# Also check if the tool returned an error in its JSON result
|
||||
try:
|
||||
result_data = json.loads(tool_result)
|
||||
if isinstance(result_data, dict):
|
||||
err = result_data.get("error")
|
||||
exit_code = result_data.get("exit_code")
|
||||
if err and exit_code and exit_code < 0:
|
||||
tool_errors.append(ToolError(
|
||||
turn=turn + 1, tool_name=tool_name,
|
||||
arguments=tool_args_raw[:200],
|
||||
error=str(err),
|
||||
tool_result=tool_result[:500],
|
||||
))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
exit_code = result_data.get("exit_code")
|
||||
if exit_code is not None and isinstance(exit_code, int) and exit_code < 0:
|
||||
tool_err = True
|
||||
tool_errors.append(ToolError(
|
||||
turn=turn + 1, tool_name=tool_name,
|
||||
arguments=tool_args_raw[:200],
|
||||
error=str(err) if err else "nonzero exit_code",
|
||||
tool_result=tool_result[:500],
|
||||
))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
if tool_err:
|
||||
tool_calls_exec_error += 1
|
||||
else:
|
||||
tool_calls_executed_ok += 1
|
||||
|
||||
# Add tool response to conversation
|
||||
messages.append(
|
||||
@@ -396,6 +555,10 @@ class HermesAgentLoop:
|
||||
finished_naturally=True,
|
||||
reasoning_per_turn=reasoning_per_turn,
|
||||
tool_errors=tool_errors,
|
||||
tool_calls_attempted=tool_calls_attempted,
|
||||
tool_calls_schema_valid=tool_calls_schema_valid,
|
||||
tool_calls_executed_ok=tool_calls_executed_ok,
|
||||
tool_calls_exec_error=tool_calls_exec_error,
|
||||
)
|
||||
|
||||
# Hit max turns without the model stopping
|
||||
@@ -407,6 +570,10 @@ class HermesAgentLoop:
|
||||
finished_naturally=False,
|
||||
reasoning_per_turn=reasoning_per_turn,
|
||||
tool_errors=tool_errors,
|
||||
tool_calls_attempted=tool_calls_attempted,
|
||||
tool_calls_schema_valid=tool_calls_schema_valid,
|
||||
tool_calls_executed_ok=tool_calls_executed_ok,
|
||||
tool_calls_exec_error=tool_calls_exec_error,
|
||||
)
|
||||
|
||||
def _get_managed_state(self) -> Optional[Dict[str, Any]]:
|
||||
|
||||
@@ -478,6 +478,7 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
tokenizer=self.tokenizer,
|
||||
tool_call_parser=tc_parser,
|
||||
) as managed:
|
||||
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=tools,
|
||||
@@ -487,6 +488,7 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
max_context_tokens=_max_ctx,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
except NotImplementedError:
|
||||
@@ -495,6 +497,7 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
"ManagedServer not available (OpenAI server?). "
|
||||
"Falling back to direct server mode."
|
||||
)
|
||||
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
@@ -504,10 +507,12 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
max_context_tokens=_max_ctx,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
else:
|
||||
# Phase 1: OpenAI server -- native tool_calls, placeholder tokens
|
||||
_max_ctx = self.config.max_token_length if (self.config.max_token_length and self.config.max_token_length > 0) else None
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
@@ -517,6 +522,7 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
max_context_tokens=_max_ctx,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
|
||||
|
||||
@@ -49,15 +49,22 @@ class HermesToolCallParser(ToolCallParser):
|
||||
continue
|
||||
|
||||
tc_data = json.loads(raw_json)
|
||||
# Handle arguments: could be dict or already a JSON string
|
||||
raw_args = tc_data.get("arguments", {})
|
||||
if isinstance(raw_args, str):
|
||||
# Already a string — pass through as-is.
|
||||
# It may be a JSON string ("{...}") or a plain string ("ls").
|
||||
args_str = raw_args
|
||||
else:
|
||||
# Dict — serialize to JSON
|
||||
args_str = json.dumps(raw_args, ensure_ascii=False)
|
||||
tool_calls.append(
|
||||
ChatCompletionMessageToolCall(
|
||||
id=f"call_{uuid.uuid4().hex[:8]}",
|
||||
type="function",
|
||||
function=Function(
|
||||
name=tc_data["name"],
|
||||
arguments=json.dumps(
|
||||
tc_data.get("arguments", {}), ensure_ascii=False
|
||||
),
|
||||
arguments=args_str,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
2
nomad-dev.hcl
Normal file
2
nomad-dev.hcl
Normal file
@@ -0,0 +1,2 @@
|
||||
## Moved to `Hermes-Agent/nomad-dev.hcl`.
|
||||
## Kept temporarily for convenience; remove after downstream docs/scripts updated.
|
||||
25
nomad-singularity.hcl
Normal file
25
nomad-singularity.hcl
Normal file
@@ -0,0 +1,25 @@
|
||||
# Nomad Configuration for Singularity/Apptainer Sandbox
|
||||
# Run with: nomad agent -dev -config=nomad-singularity.hcl
|
||||
#
|
||||
# This enables the raw_exec driver, which can be used to run Apptainer
|
||||
# commands on hosts where Docker is unavailable.
|
||||
#
|
||||
# NOTE: Hermes-Agent's Nomad backend support is draft; this file is provided
|
||||
# as a starting point for local testing.
|
||||
|
||||
client {
|
||||
enabled = true
|
||||
|
||||
options {
|
||||
"driver.raw_exec.enable" = "1"
|
||||
}
|
||||
}
|
||||
|
||||
plugin "raw_exec" {
|
||||
config {
|
||||
enabled = true
|
||||
}
|
||||
}
|
||||
|
||||
# If you have a dedicated Nomad Singularity/Apptainer driver plugin installed,
|
||||
# you can configure that instead of raw_exec.
|
||||
@@ -35,12 +35,16 @@ dependencies = [
|
||||
|
||||
[project.optional-dependencies]
|
||||
modal = ["swe-rex[modal]>=1.4.0"]
|
||||
# Nomad SlotPool sandbox backend (local Nomad dev agent + Docker/Apptainer)
|
||||
# Uses aiohttp for the Nomad HTTP API and sandbox-server execution.
|
||||
nomad = ["aiohttp>=3.9.0"]
|
||||
dev = ["pytest", "pytest-asyncio"]
|
||||
messaging = ["python-telegram-bot>=20.0", "discord.py>=2.0", "aiohttp>=3.9.0"]
|
||||
cron = ["croniter"]
|
||||
cli = ["simple-term-menu"]
|
||||
all = [
|
||||
"hermes-agent[modal]",
|
||||
"hermes-agent[nomad]",
|
||||
"hermes-agent[messaging]",
|
||||
"hermes-agent[cron]",
|
||||
"hermes-agent[cli]",
|
||||
|
||||
569
tools/nomad_slotpool.py
Normal file
569
tools/nomad_slotpool.py
Normal file
@@ -0,0 +1,569 @@
|
||||
"""Nomad SlotPool sandbox backend (ported from Nous/atropos-agent).
|
||||
|
||||
This module provides an OPTIONAL slot-based sandbox execution backend.
|
||||
|
||||
Design goals:
|
||||
- Keep Hermes-Agent's default terminal backends unchanged.
|
||||
- Enable the Nomad backend with minimal friction:
|
||||
TERMINAL_ENV=nomad
|
||||
|
||||
How it works (high level):
|
||||
- A local Nomad dev agent manages one or more sandbox-server containers.
|
||||
- Each container hosts N "slots" (workspace dirs).
|
||||
- Each agent trajectory acquires a slot, runs tools inside it, then releases.
|
||||
|
||||
This file is intentionally self-contained so we can iterate without touching
|
||||
other tool implementations.
|
||||
|
||||
NOTE: This backend requires aiohttp and a working Nomad installation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Nomad API client (async)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class AllocationStatus(Enum):
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
COMPLETE = "complete"
|
||||
FAILED = "failed"
|
||||
LOST = "lost"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Allocation:
|
||||
id: str
|
||||
job_id: str
|
||||
task_group: str
|
||||
node_id: str
|
||||
status: AllocationStatus
|
||||
address: Optional[str] = None
|
||||
port: Optional[int] = None
|
||||
|
||||
@property
|
||||
def http_address(self) -> Optional[str]:
|
||||
if self.address and self.port:
|
||||
return f"http://{self.address}:{self.port}"
|
||||
return None
|
||||
|
||||
|
||||
class NomadClient:
|
||||
def __init__(
|
||||
self,
|
||||
address: str = "http://localhost:4646",
|
||||
token: Optional[str] = None,
|
||||
timeout: float = 30.0,
|
||||
):
|
||||
self.address = address.rstrip("/")
|
||||
self.token = token or os.environ.get("NOMAD_TOKEN")
|
||||
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
async def _get_session(self) -> aiohttp.ClientSession:
|
||||
if self._session is None or self._session.closed:
|
||||
headers = {}
|
||||
if self.token:
|
||||
headers["X-Nomad-Token"] = self.token
|
||||
self._session = aiohttp.ClientSession(timeout=self.timeout, headers=headers)
|
||||
return self._session
|
||||
|
||||
async def close(self):
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
async def _request(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||
session = await self._get_session()
|
||||
url = f"{self.address}{path}"
|
||||
async with session.request(method, url, json=data) as resp:
|
||||
if resp.status == 404:
|
||||
return {"error": "not_found", "status": 404}
|
||||
text = await resp.text()
|
||||
if not text:
|
||||
return {"status": resp.status}
|
||||
try:
|
||||
parsed = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return {"text": text, "status": resp.status}
|
||||
if resp.status >= 400:
|
||||
return {"error": parsed, "status": resp.status}
|
||||
return parsed if isinstance(parsed, dict) else {"data": parsed, "status": resp.status}
|
||||
|
||||
async def is_healthy(self) -> bool:
|
||||
try:
|
||||
res = await self._request("GET", "/v1/status/leader")
|
||||
return "error" not in res
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
|
||||
res = await self._request("GET", f"/v1/job/{job_id}")
|
||||
if res.get("error") == "not_found":
|
||||
return None
|
||||
if "error" in res:
|
||||
raise RuntimeError(f"Nomad get_job failed: {res}")
|
||||
return res
|
||||
|
||||
async def submit_job(self, job_spec: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self._request("POST", "/v1/jobs", data=job_spec)
|
||||
|
||||
async def stop_job(self, job_id: str, purge: bool = True) -> Dict[str, Any]:
|
||||
return await self._request("DELETE", f"/v1/job/{job_id}?purge={'true' if purge else 'false'}")
|
||||
|
||||
async def get_job_allocations(self, job_id: str) -> List[Allocation]:
|
||||
res = await self._request("GET", f"/v1/job/{job_id}/allocations")
|
||||
if "error" in res:
|
||||
# Some Nomad builds return list directly, not dict
|
||||
if isinstance(res.get("data"), list):
|
||||
allocs_data = res["data"]
|
||||
else:
|
||||
raise RuntimeError(f"Nomad allocations failed: {res}")
|
||||
allocs_data = res if isinstance(res, list) else res.get("data", res)
|
||||
if not isinstance(allocs_data, list):
|
||||
return []
|
||||
|
||||
allocs: List[Allocation] = []
|
||||
for a in allocs_data:
|
||||
try:
|
||||
allocs.append(
|
||||
Allocation(
|
||||
id=a["ID"],
|
||||
job_id=a.get("JobID", job_id),
|
||||
task_group=a.get("TaskGroup", "sandbox"),
|
||||
node_id=a.get("NodeID", ""),
|
||||
status=AllocationStatus(str(a.get("ClientStatus", "pending"))),
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
return allocs
|
||||
|
||||
async def get_allocation(self, alloc_id: str) -> Dict[str, Any]:
|
||||
res = await self._request("GET", f"/v1/allocation/{alloc_id}")
|
||||
if "error" in res:
|
||||
raise RuntimeError(f"Nomad get_allocation failed: {res}")
|
||||
return res
|
||||
|
||||
|
||||
def create_sandbox_job(
|
||||
job_id: str,
|
||||
image: str,
|
||||
count: int,
|
||||
slots_per_container: int,
|
||||
privileged: bool,
|
||||
cpu: int,
|
||||
memory: int,
|
||||
port: int = 8080,
|
||||
datacenter: str = "dc1",
|
||||
driver: str = "docker",
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a sandbox-server Nomad job spec.
|
||||
|
||||
driver:
|
||||
- docker: runs tools/sandbox_server.py inside a Docker container image.
|
||||
- raw_exec: runs Apptainer/Singularity via raw_exec on the host.
|
||||
|
||||
raw_exec is provided as a draft option for HPC-like hosts without Docker.
|
||||
"""
|
||||
|
||||
if driver == "raw_exec":
|
||||
# For raw_exec, we bind to a fixed port. This is intended for dev/testing
|
||||
# on a single host. Scaling to multiple allocations requires more robust
|
||||
# service discovery.
|
||||
apptainer_image = os.getenv("TERMINAL_NOMAD_APPTAINER_IMAGE", "")
|
||||
if not apptainer_image:
|
||||
raise RuntimeError(
|
||||
"TERMINAL_NOMAD_APPTAINER_IMAGE is required for raw_exec driver"
|
||||
)
|
||||
|
||||
cmd = (
|
||||
"apptainer exec "
|
||||
"--bind \"$NOMAD_ALLOC_DIR/data:/data\" "
|
||||
f"{apptainer_image} "
|
||||
f"python /sandbox_server.py --port {port} --slots {slots_per_container} --data-dir /data"
|
||||
)
|
||||
|
||||
return {
|
||||
"ID": job_id,
|
||||
"Name": job_id,
|
||||
"Type": "service",
|
||||
"Datacenters": [datacenter],
|
||||
"TaskGroups": [
|
||||
{
|
||||
"Name": "sandbox",
|
||||
"Count": count,
|
||||
"Update": {"HealthCheck": "task_states", "MinHealthyTime": 0},
|
||||
"Networks": [{"Mode": "host"}],
|
||||
"Tasks": [
|
||||
{
|
||||
"Name": "sandbox-server",
|
||||
"Driver": "raw_exec",
|
||||
"Config": {
|
||||
"command": "bash",
|
||||
"args": ["-lc", cmd],
|
||||
},
|
||||
"Env": {"PYTHONUNBUFFERED": "1", "NOMAD_ALLOC_DIR": "${NOMAD_ALLOC_DIR}"},
|
||||
"Resources": {"CPU": cpu, "MemoryMB": memory},
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
# Default: docker
|
||||
return {
|
||||
"ID": job_id,
|
||||
"Name": job_id,
|
||||
"Type": "service",
|
||||
"Datacenters": [datacenter],
|
||||
"TaskGroups": [
|
||||
{
|
||||
"Name": "sandbox",
|
||||
"Count": count,
|
||||
"Update": {"HealthCheck": "task_states", "MinHealthyTime": 0},
|
||||
"Networks": [
|
||||
{
|
||||
"Mode": "host",
|
||||
"DynamicPorts": [{"Label": "http", "To": port}],
|
||||
}
|
||||
],
|
||||
"Tasks": [
|
||||
{
|
||||
"Name": "sandbox-server",
|
||||
"Driver": "docker",
|
||||
"Config": {
|
||||
"image": image,
|
||||
"force_pull": False,
|
||||
"ports": ["http"],
|
||||
"privileged": privileged,
|
||||
"command": "python",
|
||||
"args": [
|
||||
"/sandbox_server.py",
|
||||
"--port",
|
||||
str(port),
|
||||
"--slots",
|
||||
str(slots_per_container),
|
||||
"--data-dir",
|
||||
"/data",
|
||||
],
|
||||
},
|
||||
"Env": {"PYTHONUNBUFFERED": "1", "NOMAD_ALLOC_DIR": "${NOMAD_ALLOC_DIR}"},
|
||||
"Resources": {"CPU": cpu, "MemoryMB": memory},
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Slot + executor
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class SlotState(Enum):
|
||||
AVAILABLE = "available"
|
||||
ACQUIRED = "acquired"
|
||||
EXECUTING = "executing"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Slot:
|
||||
slot_id: str
|
||||
alloc_id: str
|
||||
container_addr: str
|
||||
workspace_dir: str = ""
|
||||
state: SlotState = SlotState.AVAILABLE
|
||||
trajectory_id: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if not self.workspace_dir:
|
||||
self.workspace_dir = f"/data/{self.slot_id}"
|
||||
|
||||
@property
|
||||
def is_available(self) -> bool:
|
||||
return self.state == SlotState.AVAILABLE
|
||||
|
||||
def acquire(self, trajectory_id: str):
|
||||
if not self.is_available:
|
||||
raise RuntimeError(f"Slot not available: {self.slot_id} ({self.state})")
|
||||
self.state = SlotState.ACQUIRED
|
||||
self.trajectory_id = trajectory_id
|
||||
|
||||
def release(self):
|
||||
self.state = SlotState.AVAILABLE
|
||||
self.trajectory_id = None
|
||||
|
||||
|
||||
def create_slots_for_allocation(alloc_id: str, container_addr: str, num_slots: int) -> List[Slot]:
|
||||
return [
|
||||
Slot(slot_id=f"slot_{i}", alloc_id=alloc_id, container_addr=container_addr)
|
||||
for i in range(num_slots)
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExecutionResult:
|
||||
success: bool
|
||||
output: str = ""
|
||||
error: str = ""
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
class SandboxExecutor:
|
||||
def __init__(self, timeout: float = 30.0):
|
||||
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
async def _get_session(self) -> aiohttp.ClientSession:
|
||||
if self._session is None or self._session.closed:
|
||||
self._session = aiohttp.ClientSession(timeout=self.timeout)
|
||||
return self._session
|
||||
|
||||
async def close(self):
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
async def execute(self, slot: Slot, tool_name: str, args: Dict[str, Any], timeout: float) -> ExecutionResult:
|
||||
session = await self._get_session()
|
||||
url = f"{slot.container_addr}/execute"
|
||||
payload = {
|
||||
"slot_id": slot.slot_id,
|
||||
"tool": tool_name,
|
||||
"args": args,
|
||||
"execution_id": str(uuid.uuid4()),
|
||||
"timeout": timeout,
|
||||
}
|
||||
async with session.post(url, json=payload) as resp:
|
||||
data = await resp.json()
|
||||
return ExecutionResult(
|
||||
success=bool(data.get("success", False)),
|
||||
output=str(data.get("output", "")),
|
||||
error=str(data.get("error", "")),
|
||||
metadata=dict(data.get("metadata", {}) or {}),
|
||||
)
|
||||
|
||||
async def reset_slot(self, slot: Slot) -> ExecutionResult:
|
||||
session = await self._get_session()
|
||||
url = f"{slot.container_addr}/reset"
|
||||
payload = {"slot_id": slot.slot_id}
|
||||
async with session.post(url, json=payload) as resp:
|
||||
data = await resp.json()
|
||||
return ExecutionResult(
|
||||
success=bool(data.get("success", False)),
|
||||
output=str(data.get("output", "")),
|
||||
error=str(data.get("error", "")),
|
||||
metadata=dict(data.get("metadata", {}) or {}),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SlotPoolConfig:
|
||||
nomad_address: str = "http://localhost:4646"
|
||||
job_id: str = "hermes-sandbox"
|
||||
datacenter: str = "dc1"
|
||||
image: str = "hermes-sandbox:local"
|
||||
|
||||
# Driver selection: docker (default) or raw_exec (for Apptainer/Singularity)
|
||||
driver: str = "docker" # docker | raw_exec
|
||||
|
||||
slots_per_container: int = 10
|
||||
privileged: bool = False
|
||||
cpu: int = 500
|
||||
memory: int = 512
|
||||
min_containers: int = 1
|
||||
max_containers: int = 10
|
||||
acquire_timeout: float = 30.0
|
||||
|
||||
# raw_exec: fixed port (dynamic ports are harder to discover reliably)
|
||||
raw_exec_port: int = 8080
|
||||
|
||||
|
||||
class SlotPool:
|
||||
def __init__(self, cfg: SlotPoolConfig):
|
||||
self.cfg = cfg
|
||||
self.nomad = NomadClient(address=cfg.nomad_address)
|
||||
self.executor = SandboxExecutor(timeout=cfg.acquire_timeout)
|
||||
self._slots: Dict[str, Slot] = {}
|
||||
self._queue: asyncio.Queue[str] = asyncio.Queue()
|
||||
self._started = False
|
||||
|
||||
def _slot_key(self, alloc_id: str, slot_id: str) -> str:
|
||||
return f"{alloc_id}:{slot_id}"
|
||||
|
||||
async def start(self):
|
||||
if self._started:
|
||||
return
|
||||
if not await self.nomad.is_healthy():
|
||||
raise RuntimeError(f"Nomad not reachable at {self.cfg.nomad_address}")
|
||||
job = await self.nomad.get_job(self.cfg.job_id)
|
||||
if job is None:
|
||||
spec = create_sandbox_job(
|
||||
job_id=self.cfg.job_id,
|
||||
image=self.cfg.image,
|
||||
count=self.cfg.min_containers,
|
||||
slots_per_container=self.cfg.slots_per_container,
|
||||
privileged=self.cfg.privileged,
|
||||
cpu=self.cfg.cpu,
|
||||
memory=self.cfg.memory,
|
||||
datacenter=self.cfg.datacenter,
|
||||
driver=self.cfg.driver,
|
||||
port=self.cfg.raw_exec_port,
|
||||
)
|
||||
res = await self.nomad.submit_job(spec)
|
||||
if "error" in res:
|
||||
raise RuntimeError(f"Nomad submit job failed: {res}")
|
||||
await self._refresh_slots()
|
||||
self._started = True
|
||||
|
||||
async def close(self):
|
||||
await self.executor.close()
|
||||
await self.nomad.close()
|
||||
|
||||
async def _refresh_slots(self):
|
||||
allocs = await self.nomad.get_job_allocations(self.cfg.job_id)
|
||||
for alloc in allocs:
|
||||
detail = await self.nomad.get_allocation(alloc.id)
|
||||
# Find the mapped host port for "http".
|
||||
addr = detail.get("NodeName") or detail.get("NodeID")
|
||||
# Prefer explicit address in allocation resources
|
||||
# Nomad stores addresses under Resources->Networks sometimes.
|
||||
net = (detail.get("Resources") or {}).get("Networks") or []
|
||||
address = None
|
||||
port = None
|
||||
|
||||
if self.cfg.driver == "raw_exec":
|
||||
# raw_exec: use fixed port and best-effort node address discovery
|
||||
port = self.cfg.raw_exec_port
|
||||
address = (
|
||||
detail.get("NodeAddress")
|
||||
or detail.get("NodeName")
|
||||
or detail.get("NodeID")
|
||||
)
|
||||
else:
|
||||
# docker: discover dynamic port mapping
|
||||
if net and isinstance(net, list):
|
||||
n0 = net[0]
|
||||
address = n0.get("IP")
|
||||
ports = n0.get("DynamicPorts") or []
|
||||
for p in ports:
|
||||
if p.get("Label") == "http":
|
||||
port = p.get("Value")
|
||||
|
||||
if not address or not port:
|
||||
# Fall back: allocation node identity
|
||||
address = detail.get("NodeAddress") or detail.get("NodeName") or detail.get("NodeID")
|
||||
|
||||
if not address or not port:
|
||||
# Can't use this alloc
|
||||
continue
|
||||
container_addr = f"http://{address}:{port}"
|
||||
|
||||
for s in create_slots_for_allocation(alloc.id, container_addr, self.cfg.slots_per_container):
|
||||
key = self._slot_key(s.alloc_id, s.slot_id)
|
||||
if key in self._slots:
|
||||
continue
|
||||
self._slots[key] = s
|
||||
await self._queue.put(key)
|
||||
|
||||
async def acquire(self, trajectory_id: str) -> Slot:
|
||||
if not self._started:
|
||||
raise RuntimeError("SlotPool not started")
|
||||
while True:
|
||||
key = await asyncio.wait_for(self._queue.get(), timeout=self.cfg.acquire_timeout)
|
||||
slot = self._slots.get(key)
|
||||
if not slot:
|
||||
continue
|
||||
try:
|
||||
slot.acquire(trajectory_id)
|
||||
return slot
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
async def release(self, slot: Slot, reset_workspace: bool = True):
|
||||
if reset_workspace:
|
||||
try:
|
||||
await self.executor.reset_slot(slot)
|
||||
except Exception:
|
||||
pass
|
||||
slot.release()
|
||||
await self._queue.put(self._slot_key(slot.alloc_id, slot.slot_id))
|
||||
|
||||
async def execute_bash(self, slot: Slot, command: str, timeout_s: float) -> ExecutionResult:
|
||||
return await self.executor.execute(slot, "bash", {"command": command}, timeout=timeout_s)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Sync wrapper (thread + event loop)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class NomadSlotPoolManager:
|
||||
"""Runs a SlotPool on a dedicated event loop thread for sync callers."""
|
||||
|
||||
def __init__(self, cfg: SlotPoolConfig):
|
||||
self.cfg = cfg
|
||||
self._pool = SlotPool(cfg)
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._started = threading.Event()
|
||||
|
||||
def _run_loop(self):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
self._loop = loop
|
||||
loop.run_until_complete(self._pool.start())
|
||||
self._started.set()
|
||||
loop.run_forever()
|
||||
|
||||
def start(self):
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._thread = threading.Thread(target=self._run_loop, name="nomad-slotpool", daemon=True)
|
||||
self._thread.start()
|
||||
self._started.wait(timeout=120)
|
||||
|
||||
def _call(self, coro):
|
||||
if not self._loop:
|
||||
raise RuntimeError("NomadSlotPoolManager not started")
|
||||
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
|
||||
return fut.result()
|
||||
|
||||
def acquire(self, trajectory_id: str) -> Slot:
|
||||
return self._call(self._pool.acquire(trajectory_id))
|
||||
|
||||
def release(self, slot: Slot, reset_workspace: bool = True):
|
||||
return self._call(self._pool.release(slot, reset_workspace=reset_workspace))
|
||||
|
||||
def execute_bash(self, slot: Slot, command: str, timeout_s: float) -> ExecutionResult:
|
||||
return self._call(self._pool.execute_bash(slot, command, timeout_s=timeout_s))
|
||||
|
||||
|
||||
_global_manager: Optional[NomadSlotPoolManager] = None
|
||||
|
||||
|
||||
def get_global_manager(cfg: SlotPoolConfig) -> NomadSlotPoolManager:
|
||||
global _global_manager
|
||||
if _global_manager is None:
|
||||
_global_manager = NomadSlotPoolManager(cfg)
|
||||
_global_manager.start()
|
||||
return _global_manager
|
||||
13
tools/sandbox/Dockerfile
Normal file
13
tools/sandbox/Dockerfile
Normal file
@@ -0,0 +1,13 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /
|
||||
|
||||
# Minimal deps for sandbox_server
|
||||
RUN pip install --no-cache-dir aiohttp
|
||||
|
||||
# Copy sandbox server
|
||||
COPY tools/sandbox_server.py /sandbox_server.py
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
CMD ["python", "/sandbox_server.py", "--port", "8080", "--slots", "10", "--data-dir", "/data"]
|
||||
1912
tools/sandbox_server.py
Normal file
1912
tools/sandbox_server.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1215,6 +1215,16 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
"cwd": os.getenv("TERMINAL_CWD", default_cwd),
|
||||
"timeout": int(os.getenv("TERMINAL_TIMEOUT", "60")),
|
||||
"lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")),
|
||||
|
||||
# Nomad sandbox backend (slot pool)
|
||||
"nomad_address": os.getenv("TERMINAL_NOMAD_ADDRESS", "http://localhost:4646"),
|
||||
"nomad_job_id": os.getenv("TERMINAL_NOMAD_JOB_ID", "hermes-sandbox"),
|
||||
"nomad_image": os.getenv("TERMINAL_NOMAD_IMAGE", "hermes-sandbox:local"),
|
||||
"nomad_slots": int(os.getenv("TERMINAL_NOMAD_SLOTS", "10")),
|
||||
"nomad_min": int(os.getenv("TERMINAL_NOMAD_MIN", "1")),
|
||||
"nomad_max": int(os.getenv("TERMINAL_NOMAD_MAX", "10")),
|
||||
"nomad_privileged": os.getenv("TERMINAL_NOMAD_PRIVILEGED", "").lower() in ("1", "true", "yes"),
|
||||
|
||||
# SSH-specific config
|
||||
"ssh_host": os.getenv("TERMINAL_SSH_HOST", ""),
|
||||
"ssh_user": os.getenv("TERMINAL_SSH_USER", ""),
|
||||
@@ -1223,15 +1233,16 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None):
|
||||
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, task_id: str = "", ssh_config: dict = None):
|
||||
"""
|
||||
Create an execution environment from mini-swe-agent.
|
||||
|
||||
Args:
|
||||
env_type: One of "local", "docker", "singularity", "modal", "ssh"
|
||||
image: Docker/Singularity/Modal image name (ignored for local/ssh)
|
||||
env_type: One of "local", "docker", "singularity", "modal", "ssh", "nomad"
|
||||
image: Docker/Singularity/Modal image name (ignored for local/ssh/nomad)
|
||||
cwd: Working directory
|
||||
timeout: Default command timeout
|
||||
task_id: Used for per-task sandbox isolation (nomad/modal pooling)
|
||||
ssh_config: SSH connection config (for env_type="ssh")
|
||||
|
||||
Returns:
|
||||
@@ -1252,7 +1263,48 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_c
|
||||
elif env_type == "modal":
|
||||
# Use custom Modal wrapper with sudo support
|
||||
return _ModalEnvironment(image=image, cwd=cwd, timeout=timeout)
|
||||
|
||||
|
||||
elif env_type == "nomad":
|
||||
# Slot-based sandbox backend via local Nomad dev agent.
|
||||
# Requires: pip install "hermes-agent[nomad]" and a running Nomad agent.
|
||||
from tools.nomad_slotpool import SlotPoolConfig, get_global_manager
|
||||
|
||||
cfg = SlotPoolConfig(
|
||||
nomad_address=os.getenv("TERMINAL_NOMAD_ADDRESS", "http://localhost:4646"),
|
||||
job_id=os.getenv("TERMINAL_NOMAD_JOB_ID", "hermes-sandbox"),
|
||||
image=os.getenv("TERMINAL_NOMAD_IMAGE", "hermes-sandbox:local"),
|
||||
driver=os.getenv("TERMINAL_NOMAD_DRIVER", "docker"),
|
||||
raw_exec_port=int(os.getenv("TERMINAL_NOMAD_RAW_EXEC_PORT", "8080")),
|
||||
slots_per_container=int(os.getenv("TERMINAL_NOMAD_SLOTS", "10")),
|
||||
min_containers=int(os.getenv("TERMINAL_NOMAD_MIN", "1")),
|
||||
max_containers=int(os.getenv("TERMINAL_NOMAD_MAX", "10")),
|
||||
privileged=os.getenv("TERMINAL_NOMAD_PRIVILEGED", "").lower() in ("1", "true", "yes"),
|
||||
)
|
||||
manager = get_global_manager(cfg)
|
||||
slot = manager.acquire(task_id or str(uuid.uuid4()))
|
||||
|
||||
class _NomadSlotEnvironment:
|
||||
def __init__(self, _slot):
|
||||
self._slot = _slot
|
||||
|
||||
def execute(self, command: str, cwd: str = "", *, timeout: int | None = None) -> dict:
|
||||
# cwd ignored: sandbox_server executes within slot workspace.
|
||||
timeout_s = float(timeout if timeout is not None else 60)
|
||||
res = manager.execute_bash(self._slot, command, timeout_s=timeout_s)
|
||||
exit_code = int(res.metadata.get("exit_code", 0)) if isinstance(res.metadata, dict) else 0
|
||||
return {"output": res.output, "returncode": exit_code}
|
||||
|
||||
def cleanup(self):
|
||||
try:
|
||||
manager.release(self._slot, reset_workspace=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.cleanup()
|
||||
|
||||
return _NomadSlotEnvironment(slot)
|
||||
|
||||
elif env_type == "ssh":
|
||||
if not ssh_config or not ssh_config.get("host") or not ssh_config.get("user"):
|
||||
raise ValueError("SSH environment requires ssh_host and ssh_user to be configured")
|
||||
@@ -1578,6 +1630,7 @@ def terminal_tool(
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=effective_timeout,
|
||||
task_id=effective_task_id,
|
||||
ssh_config=ssh_config
|
||||
)
|
||||
except ImportError as e:
|
||||
|
||||
Reference in New Issue
Block a user