mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
14 Commits
fix/docker
...
add-upstre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f9c02bb37 | ||
|
|
3dbeaea3dc | ||
|
|
26d9b5af29 | ||
|
|
ef8cb9afd2 | ||
|
|
407a1e24b2 | ||
|
|
e1e69dfd32 | ||
|
|
003b6e49df | ||
|
|
dab2cfe566 | ||
|
|
c87bd5dd87 | ||
|
|
2a67e4fa57 | ||
|
|
136a64942d | ||
|
|
9f74d1f2ec | ||
|
|
11ad4173de | ||
|
|
92cb77eaa7 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -49,3 +49,4 @@ cli-config.yaml
|
||||
skills/.hub/
|
||||
ignored/
|
||||
.worktrees/
|
||||
environments/benchmarks/evals/
|
||||
|
||||
@@ -18,9 +18,14 @@ Benchmarks (eval-only):
|
||||
- benchmarks/terminalbench_2/: Terminal-Bench 2.0 evaluation
|
||||
"""
|
||||
|
||||
from environments.agent_loop import AgentResult, HermesAgentLoop
|
||||
from environments.tool_context import ToolContext
|
||||
from environments.hermes_base_env import HermesAgentBaseEnv, HermesAgentEnvConfig
|
||||
try:
|
||||
from environments.agent_loop import AgentResult, HermesAgentLoop
|
||||
from environments.tool_context import ToolContext
|
||||
from environments.hermes_base_env import HermesAgentBaseEnv, HermesAgentEnvConfig
|
||||
except ImportError:
|
||||
# atroposlib not installed — environments are unavailable but
|
||||
# submodules like tool_call_parsers can still be imported directly.
|
||||
pass
|
||||
|
||||
__all__ = [
|
||||
"AgentResult",
|
||||
|
||||
@@ -249,23 +249,62 @@ class HermesAgentLoop:
|
||||
reasoning = _extract_reasoning_from_message(assistant_msg)
|
||||
reasoning_per_turn.append(reasoning)
|
||||
|
||||
# Check for tool calls -- standard OpenAI spec
|
||||
# Check for tool calls -- standard OpenAI spec.
|
||||
# Fallback: if response has no structured tool_calls but content
|
||||
# contains raw tool call tags (e.g. <tool_call>), parse them using
|
||||
# hermes-agent's standalone parsers. This handles the case where
|
||||
# ManagedServer's ToolCallTranslator couldn't parse because vLLM
|
||||
# isn't installed.
|
||||
if (
|
||||
not assistant_msg.tool_calls
|
||||
and assistant_msg.content
|
||||
and self.tool_schemas
|
||||
and "<tool_call>" in (assistant_msg.content or "")
|
||||
):
|
||||
try:
|
||||
from environments.tool_call_parsers import get_parser
|
||||
fallback_parser = get_parser("hermes")
|
||||
parsed_content, parsed_calls = fallback_parser.parse(
|
||||
assistant_msg.content
|
||||
)
|
||||
if parsed_calls:
|
||||
assistant_msg.tool_calls = parsed_calls
|
||||
if parsed_content is not None:
|
||||
assistant_msg.content = parsed_content
|
||||
logger.debug(
|
||||
"Fallback parser extracted %d tool calls from raw content",
|
||||
len(parsed_calls),
|
||||
)
|
||||
except Exception:
|
||||
pass # Fall through to no tool calls
|
||||
|
||||
if assistant_msg.tool_calls:
|
||||
# Normalize tool calls to dicts — they may come as objects
|
||||
# (OpenAI API) or dicts (vLLM ToolCallTranslator).
|
||||
def _tc_to_dict(tc):
|
||||
if isinstance(tc, dict):
|
||||
return {
|
||||
"id": tc.get("id", f"call_{uuid.uuid4().hex[:8]}"),
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tc.get("function", {}).get("name", tc.get("name", "")),
|
||||
"arguments": tc.get("function", {}).get("arguments", tc.get("arguments", "{}")),
|
||||
},
|
||||
}
|
||||
return {
|
||||
"id": tc.id,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tc.function.name,
|
||||
"arguments": tc.function.arguments,
|
||||
},
|
||||
}
|
||||
|
||||
# Build the assistant message dict for conversation history
|
||||
msg_dict: Dict[str, Any] = {
|
||||
"role": "assistant",
|
||||
"content": assistant_msg.content or "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tc.id,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tc.function.name,
|
||||
"arguments": tc.function.arguments,
|
||||
},
|
||||
}
|
||||
for tc in assistant_msg.tool_calls
|
||||
],
|
||||
"tool_calls": [_tc_to_dict(tc) for tc in assistant_msg.tool_calls],
|
||||
}
|
||||
|
||||
# Preserve reasoning_content for multi-turn chat template handling
|
||||
@@ -278,8 +317,13 @@ class HermesAgentLoop:
|
||||
|
||||
# Execute each tool call via hermes-agent's dispatch
|
||||
for tc in assistant_msg.tool_calls:
|
||||
tool_name = tc.function.name
|
||||
tool_args_raw = tc.function.arguments
|
||||
# Handle both object (OpenAI) and dict (vLLM) formats
|
||||
if isinstance(tc, dict):
|
||||
tool_name = tc.get("function", {}).get("name", tc.get("name", ""))
|
||||
tool_args_raw = tc.get("function", {}).get("arguments", tc.get("arguments", "{}"))
|
||||
else:
|
||||
tool_name = tc.function.name
|
||||
tool_args_raw = tc.function.arguments
|
||||
|
||||
# Validate tool name
|
||||
if tool_name not in self.valid_tool_names:
|
||||
@@ -390,10 +434,11 @@ class HermesAgentLoop:
|
||||
pass
|
||||
|
||||
# Add tool response to conversation
|
||||
tc_id = tc.get("id", "") if isinstance(tc, dict) else tc.id
|
||||
messages.append(
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_call_id": tc.id,
|
||||
"tool_call_id": tc_id,
|
||||
"content": tool_result,
|
||||
}
|
||||
)
|
||||
|
||||
38
environments/benchmarks/tblite/local.yaml
Normal file
38
environments/benchmarks/tblite/local.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
# OpenThoughts-TBLite Evaluation -- Docker Backend (Local Compute)
|
||||
#
|
||||
# Runs tasks in Docker containers on the local machine.
|
||||
# Sandboxed like Modal but no cloud costs. Good for dev/testing.
|
||||
#
|
||||
# Usage:
|
||||
# python environments/benchmarks/tblite/tblite_env.py evaluate \
|
||||
# --config environments/benchmarks/tblite/local.yaml
|
||||
#
|
||||
# # Override concurrency:
|
||||
# python environments/benchmarks/tblite/tblite_env.py evaluate \
|
||||
# --config environments/benchmarks/tblite/local.yaml \
|
||||
# --env.eval_concurrency 4
|
||||
|
||||
env:
|
||||
enabled_toolsets: ["terminal", "file"]
|
||||
max_agent_turns: 60
|
||||
max_token_length: 32000
|
||||
agent_temperature: 0.8
|
||||
terminal_backend: "docker"
|
||||
terminal_timeout: 300
|
||||
tool_pool_size: 16
|
||||
dataset_name: "NousResearch/openthoughts-tblite"
|
||||
test_timeout: 600
|
||||
task_timeout: 1200
|
||||
eval_concurrency: 8 # max 8 tasks at once
|
||||
tokenizer_name: "NousResearch/Hermes-3-Llama-3.1-8B"
|
||||
use_wandb: false
|
||||
wandb_name: "openthoughts-tblite-local"
|
||||
ensure_scores_are_not_same: false
|
||||
data_dir_to_save_evals: "environments/benchmarks/evals/openthoughts-tblite-local"
|
||||
|
||||
openai:
|
||||
base_url: "https://openrouter.ai/api/v1"
|
||||
model_name: "anthropic/claude-sonnet-4"
|
||||
server_type: "openai"
|
||||
health_check: false
|
||||
# api_key loaded from OPENROUTER_API_KEY in .env
|
||||
40
environments/benchmarks/tblite/local_vllm.yaml
Normal file
40
environments/benchmarks/tblite/local_vllm.yaml
Normal file
@@ -0,0 +1,40 @@
|
||||
# OpenThoughts-TBLite Evaluation -- Local vLLM Backend
|
||||
#
|
||||
# Runs against a local vLLM server with Docker sandboxes.
|
||||
#
|
||||
# Start the vLLM server from the atropos directory:
|
||||
# python -m example_trainer.vllm_api_server \
|
||||
# --model Qwen/Qwen3-4B-Instruct-2507 \
|
||||
# --port 9001 \
|
||||
# --gpu-memory-utilization 0.8 \
|
||||
# --max-model-len=32000
|
||||
#
|
||||
# Then run:
|
||||
# python environments/benchmarks/tblite/tblite_env.py evaluate \
|
||||
# --config environments/benchmarks/tblite/local_vllm.yaml
|
||||
|
||||
env:
|
||||
enabled_toolsets: ["terminal", "file"]
|
||||
max_agent_turns: 60
|
||||
max_token_length: 16000
|
||||
agent_temperature: 0.6
|
||||
terminal_backend: "docker"
|
||||
terminal_timeout: 300
|
||||
tool_pool_size: 16
|
||||
dataset_name: "NousResearch/openthoughts-tblite"
|
||||
test_timeout: 600
|
||||
task_timeout: 1200
|
||||
eval_concurrency: 8
|
||||
tool_call_parser: "hermes"
|
||||
system_prompt: "You are an expert terminal agent. You MUST use the provided tools to complete tasks. Use the terminal tool to run shell commands, read_file to read files, write_file to write files, search_files to search, and patch to edit files. Do NOT write out solutions as text - execute them using the tools. Always start by exploring the environment with terminal commands."
|
||||
tokenizer_name: "Qwen/Qwen3-4B-Instruct-2507"
|
||||
use_wandb: false
|
||||
wandb_name: "tblite-qwen3-4b-instruct"
|
||||
ensure_scores_are_not_same: false
|
||||
data_dir_to_save_evals: "environments/benchmarks/evals/tblite-qwen3-4b-local"
|
||||
|
||||
openai:
|
||||
base_url: "http://localhost:9001"
|
||||
model_name: "Qwen/Qwen3-4B-Instruct-2507"
|
||||
server_type: "vllm"
|
||||
health_check: false
|
||||
@@ -118,6 +118,14 @@ class TerminalBench2EvalConfig(HermesAgentEnvConfig):
|
||||
"Tasks exceeding this are scored as FAIL. Default 30 minutes.",
|
||||
)
|
||||
|
||||
# --- Eval concurrency ---
|
||||
eval_concurrency: int = Field(
|
||||
default=0,
|
||||
description="Maximum number of tasks to evaluate in parallel. "
|
||||
"0 means unlimited (all tasks run concurrently). "
|
||||
"Set to 8 for local backends to avoid overwhelming the machine.",
|
||||
)
|
||||
|
||||
|
||||
# Tasks that cannot run properly on Modal and are excluded from scoring.
|
||||
MODAL_INCOMPATIBLE_TASKS = {
|
||||
@@ -429,8 +437,13 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
|
||||
"error": "no_image",
|
||||
}
|
||||
|
||||
# --- 2. Register per-task Modal image override ---
|
||||
register_task_env_overrides(task_id, {"modal_image": modal_image})
|
||||
# --- 2. Register per-task image override ---
|
||||
# Set both modal_image and docker_image so the task image is used
|
||||
# regardless of which backend is configured.
|
||||
register_task_env_overrides(task_id, {
|
||||
"modal_image": modal_image,
|
||||
"docker_image": modal_image,
|
||||
})
|
||||
logger.info(
|
||||
"Task %s: registered image override for task_id %s",
|
||||
task_name, task_id[:8],
|
||||
@@ -445,17 +458,37 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
|
||||
messages.append({"role": "user", "content": self.format_prompt(eval_item)})
|
||||
|
||||
# --- 4. Run agent loop ---
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
# Use ManagedServer (Phase 2) for vLLM/SGLang backends to get
|
||||
# token-level tracking via /generate. Falls back to direct
|
||||
# ServerManager (Phase 1) for OpenAI endpoints.
|
||||
if self._use_managed_server():
|
||||
async with self.server.managed_server(
|
||||
tokenizer=self.tokenizer,
|
||||
preserve_think_blocks=bool(self.config.thinking_mode),
|
||||
) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
else:
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
|
||||
# --- 5. Verify -- run test suite in the agent's sandbox ---
|
||||
# Skip verification if the agent produced no meaningful output
|
||||
@@ -655,13 +688,19 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
|
||||
|
||||
async def _eval_with_timeout(self, item: Dict[str, Any]) -> Dict:
|
||||
"""
|
||||
Wrap rollout_and_score_eval with a per-task wall-clock timeout.
|
||||
Wrap rollout_and_score_eval with a per-task wall-clock timeout
|
||||
and optional concurrency limit via semaphore.
|
||||
|
||||
If the task exceeds task_timeout seconds, it's automatically scored
|
||||
as FAIL. This prevents any single task from hanging indefinitely.
|
||||
"""
|
||||
task_name = item.get("task_name", "unknown")
|
||||
category = item.get("category", "unknown")
|
||||
|
||||
# Acquire concurrency semaphore if configured
|
||||
if self._eval_semaphore:
|
||||
await self._eval_semaphore.acquire()
|
||||
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
self.rollout_and_score_eval(item),
|
||||
@@ -679,6 +718,9 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
|
||||
}
|
||||
self._save_result(out)
|
||||
return out
|
||||
finally:
|
||||
if self._eval_semaphore:
|
||||
self._eval_semaphore.release()
|
||||
|
||||
async def evaluate(self, *args, **kwargs) -> None:
|
||||
"""
|
||||
@@ -696,6 +738,13 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv):
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Set up concurrency limit if configured
|
||||
if self.config.eval_concurrency > 0:
|
||||
self._eval_semaphore = asyncio.Semaphore(self.config.eval_concurrency)
|
||||
print(f" Eval concurrency: {self.config.eval_concurrency} tasks at a time")
|
||||
else:
|
||||
self._eval_semaphore = None
|
||||
|
||||
# Route all logging through tqdm.write() so the progress bar stays
|
||||
# pinned at the bottom while log lines scroll above it.
|
||||
from tqdm import tqdm
|
||||
|
||||
@@ -229,6 +229,12 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
from environments.agent_loop import resize_tool_pool
|
||||
resize_tool_pool(config.tool_pool_size)
|
||||
|
||||
# Set tool_parser on the ServerManager so ManagedServer uses it
|
||||
# for bidirectional tool call translation (raw text ↔ OpenAI tool_calls).
|
||||
if hasattr(self.server, 'tool_parser'):
|
||||
self.server.tool_parser = config.tool_call_parser
|
||||
print(f"🔧 Tool parser: {config.tool_call_parser}")
|
||||
|
||||
# Current group's resolved tools (set in collect_trajectories)
|
||||
self._current_group_tools: Optional[Tuple[List[Dict], Set[str]]] = None
|
||||
|
||||
@@ -466,22 +472,14 @@ class HermesAgentBaseEnv(BaseEnv):
|
||||
# Run the agent loop
|
||||
result: AgentResult
|
||||
if self._use_managed_server():
|
||||
# Phase 2: ManagedServer with parser -- exact tokens + logprobs
|
||||
# Load the tool call parser from registry based on config
|
||||
from environments.tool_call_parsers import get_parser
|
||||
try:
|
||||
tc_parser = get_parser(self.config.tool_call_parser)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
"Tool call parser '%s' not found, falling back to 'hermes'",
|
||||
self.config.tool_call_parser,
|
||||
)
|
||||
tc_parser = get_parser("hermes")
|
||||
|
||||
# Phase 2: ManagedServer with ToolCallTranslator -- exact tokens + logprobs
|
||||
# tool_parser is set on ServerManager in __init__ and passed through
|
||||
# to ManagedServer, which uses ToolCallTranslator for bidirectional
|
||||
# translation between raw text and OpenAI tool_calls.
|
||||
try:
|
||||
async with self.server.managed_server(
|
||||
tokenizer=self.tokenizer,
|
||||
tool_call_parser=tc_parser,
|
||||
preserve_think_blocks=bool(self.config.thinking_mode),
|
||||
) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
|
||||
@@ -114,11 +114,27 @@ def _patch_swerex_modal():
|
||||
self._worker = _AsyncWorker()
|
||||
self._worker.start()
|
||||
|
||||
# Pre-build a modal.Image with pip fix for Modal's legacy image builder.
|
||||
# Modal requires `python -m pip` to work during image build, but some
|
||||
# task images (e.g., TBLite's broken-python) have intentionally broken pip.
|
||||
# Fix: remove stale pip dist-info and reinstall via ensurepip before Modal
|
||||
# tries to use it. This is a no-op for images where pip already works.
|
||||
import modal as _modal
|
||||
image_spec = self.config.image
|
||||
if isinstance(image_spec, str):
|
||||
image_spec = _modal.Image.from_registry(
|
||||
image_spec,
|
||||
setup_dockerfile_commands=[
|
||||
"RUN rm -rf /usr/local/lib/python*/site-packages/pip* 2>/dev/null; "
|
||||
"python -m ensurepip --upgrade --default-pip 2>/dev/null || true",
|
||||
],
|
||||
)
|
||||
|
||||
# Create AND start the deployment entirely on the worker's loop/thread
|
||||
# so all gRPC channels and async state are bound to that loop
|
||||
async def _create_and_start():
|
||||
deployment = ModalDeployment(
|
||||
image=self.config.image,
|
||||
image=image_spec,
|
||||
startup_timeout=self.config.startup_timeout,
|
||||
runtime_timeout=self.config.runtime_timeout,
|
||||
deployment_timeout=self.config.deployment_timeout,
|
||||
|
||||
486
tests/test_agent_loop.py
Normal file
486
tests/test_agent_loop.py
Normal file
@@ -0,0 +1,486 @@
|
||||
"""
|
||||
Tests for environments/agent_loop.py — HermesAgentLoop.
|
||||
|
||||
Tests the multi-turn agent engine using mocked servers, without needing
|
||||
real API keys or running servers.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
try:
|
||||
from environments.agent_loop import (
|
||||
AgentResult,
|
||||
HermesAgentLoop,
|
||||
ToolError,
|
||||
_extract_reasoning_from_message,
|
||||
resize_tool_pool,
|
||||
)
|
||||
except ImportError:
|
||||
pytest.skip("atroposlib not installed", allow_module_level=True)
|
||||
|
||||
|
||||
# ─── Mock server infrastructure ─────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockFunction:
|
||||
name: str
|
||||
arguments: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockToolCall:
|
||||
id: str
|
||||
function: MockFunction
|
||||
type: str = "function"
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockMessage:
|
||||
content: Optional[str]
|
||||
role: str = "assistant"
|
||||
tool_calls: Optional[List[MockToolCall]] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
reasoning: Optional[str] = None
|
||||
reasoning_details: Optional[list] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockChoice:
|
||||
message: MockMessage
|
||||
finish_reason: str = "stop"
|
||||
index: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockChatCompletion:
|
||||
choices: List[MockChoice]
|
||||
id: str = "chatcmpl-mock"
|
||||
model: str = "mock-model"
|
||||
|
||||
|
||||
class MockServer:
|
||||
"""
|
||||
Mock server that returns pre-configured responses in sequence.
|
||||
Mimics the chat_completion() interface.
|
||||
"""
|
||||
|
||||
def __init__(self, responses: List[MockChatCompletion]):
|
||||
self.responses = responses
|
||||
self.call_count = 0
|
||||
self.call_history: List[Dict[str, Any]] = []
|
||||
|
||||
async def chat_completion(self, **kwargs) -> MockChatCompletion:
|
||||
self.call_history.append(kwargs)
|
||||
if self.call_count >= len(self.responses):
|
||||
# Return a simple text response if we run out
|
||||
return MockChatCompletion(
|
||||
choices=[MockChoice(message=MockMessage(content="Done."))]
|
||||
)
|
||||
resp = self.responses[self.call_count]
|
||||
self.call_count += 1
|
||||
return resp
|
||||
|
||||
|
||||
def make_text_response(content: str) -> MockChatCompletion:
|
||||
"""Create a simple text-only response (no tool calls)."""
|
||||
return MockChatCompletion(
|
||||
choices=[MockChoice(message=MockMessage(content=content))]
|
||||
)
|
||||
|
||||
|
||||
def make_tool_response(
|
||||
tool_name: str,
|
||||
arguments: dict,
|
||||
content: str = "",
|
||||
tool_call_id: str = "call_001",
|
||||
) -> MockChatCompletion:
|
||||
"""Create a response with a single tool call."""
|
||||
return MockChatCompletion(
|
||||
choices=[
|
||||
MockChoice(
|
||||
message=MockMessage(
|
||||
content=content,
|
||||
tool_calls=[
|
||||
MockToolCall(
|
||||
id=tool_call_id,
|
||||
function=MockFunction(
|
||||
name=tool_name,
|
||||
arguments=json.dumps(arguments),
|
||||
),
|
||||
)
|
||||
],
|
||||
),
|
||||
finish_reason="tool_calls",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# ─── Tests ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestAgentResult:
|
||||
def test_defaults(self):
|
||||
result = AgentResult(messages=[])
|
||||
assert result.messages == []
|
||||
assert result.managed_state is None
|
||||
assert result.turns_used == 0
|
||||
assert result.finished_naturally is False
|
||||
assert result.reasoning_per_turn == []
|
||||
assert result.tool_errors == []
|
||||
|
||||
|
||||
class TestExtractReasoning:
|
||||
def test_reasoning_content_field(self):
|
||||
msg = MockMessage(content="hello", reasoning_content="I think...")
|
||||
assert _extract_reasoning_from_message(msg) == "I think..."
|
||||
|
||||
def test_reasoning_field(self):
|
||||
msg = MockMessage(content="hello", reasoning="Let me consider...")
|
||||
assert _extract_reasoning_from_message(msg) == "Let me consider..."
|
||||
|
||||
def test_reasoning_details(self):
|
||||
detail = MagicMock()
|
||||
detail.text = "Detail reasoning"
|
||||
msg = MockMessage(content="hello", reasoning_details=[detail])
|
||||
assert _extract_reasoning_from_message(msg) == "Detail reasoning"
|
||||
|
||||
def test_reasoning_details_dict_format(self):
|
||||
msg = MockMessage(
|
||||
content="hello",
|
||||
reasoning_details=[{"text": "Dict reasoning"}],
|
||||
)
|
||||
assert _extract_reasoning_from_message(msg) == "Dict reasoning"
|
||||
|
||||
def test_no_reasoning(self):
|
||||
msg = MockMessage(content="hello")
|
||||
assert _extract_reasoning_from_message(msg) is None
|
||||
|
||||
def test_reasoning_content_takes_priority(self):
|
||||
msg = MockMessage(
|
||||
content="hello",
|
||||
reasoning_content="First",
|
||||
reasoning="Second",
|
||||
)
|
||||
assert _extract_reasoning_from_message(msg) == "First"
|
||||
|
||||
|
||||
class TestHermesAgentLoop:
|
||||
"""Test the agent loop with mock servers."""
|
||||
|
||||
@pytest.fixture
|
||||
def basic_tools(self):
|
||||
"""Minimal tool schema for testing."""
|
||||
return [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "terminal",
|
||||
"description": "Run a command",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {
|
||||
"type": "string",
|
||||
"description": "Command to run",
|
||||
}
|
||||
},
|
||||
"required": ["command"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string"},
|
||||
},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
@pytest.fixture
|
||||
def valid_names(self):
|
||||
return {"terminal", "read_file", "todo"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_text_response(self, basic_tools, valid_names):
|
||||
"""Model responds with text only, no tool calls."""
|
||||
server = MockServer([make_text_response("Hello! How can I help?")])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally is True
|
||||
assert result.turns_used == 1
|
||||
assert len(result.messages) >= 2 # user + assistant
|
||||
assert result.messages[-1]["role"] == "assistant"
|
||||
assert result.messages[-1]["content"] == "Hello! How can I help?"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tool_call_then_text(self, basic_tools, valid_names):
|
||||
"""Model calls a tool, then responds with text."""
|
||||
server = MockServer([
|
||||
make_tool_response("todo", {"todos": [{"id": "1", "content": "test", "status": "pending"}]}),
|
||||
make_text_response("I created a todo for you."),
|
||||
])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Create a todo"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally is True
|
||||
assert result.turns_used == 2
|
||||
# Should have: user, assistant (tool_call), tool (result), assistant (text)
|
||||
roles = [m["role"] for m in result.messages]
|
||||
assert roles == ["user", "assistant", "tool", "assistant"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_turns_reached(self, basic_tools, valid_names):
|
||||
"""Model keeps calling tools until max_turns is hit."""
|
||||
# Create responses that always call a tool
|
||||
responses = [
|
||||
make_tool_response("todo", {"todos": [{"id": str(i), "content": f"task {i}", "status": "pending"}]}, tool_call_id=f"call_{i}")
|
||||
for i in range(10)
|
||||
]
|
||||
server = MockServer(responses)
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=3,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Keep going"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally is False
|
||||
assert result.turns_used == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_tool_name(self, basic_tools, valid_names):
|
||||
"""Model calls a tool not in valid_tool_names."""
|
||||
server = MockServer([
|
||||
make_tool_response("nonexistent_tool", {"arg": "val"}),
|
||||
make_text_response("OK, that didn't work."),
|
||||
])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Call something weird"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Should record a tool error
|
||||
assert len(result.tool_errors) >= 1
|
||||
assert result.tool_errors[0].tool_name == "nonexistent_tool"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_response(self, basic_tools, valid_names):
|
||||
"""Server returns empty response."""
|
||||
server = MockServer([MockChatCompletion(choices=[])])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally is False
|
||||
assert result.turns_used == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_error_handling(self, basic_tools, valid_names):
|
||||
"""Server raises an exception."""
|
||||
|
||||
class FailingServer:
|
||||
async def chat_completion(self, **kwargs):
|
||||
raise ConnectionError("Server unreachable")
|
||||
|
||||
agent = HermesAgentLoop(
|
||||
server=FailingServer(),
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally is False
|
||||
assert result.turns_used == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tools_passed_to_server(self, basic_tools, valid_names):
|
||||
"""Verify tools are passed in the chat_completion kwargs."""
|
||||
server = MockServer([make_text_response("OK")])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
await agent.run(messages)
|
||||
|
||||
assert len(server.call_history) == 1
|
||||
assert "tools" in server.call_history[0]
|
||||
assert server.call_history[0]["tools"] == basic_tools
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extra_body_forwarded(self, basic_tools, valid_names):
|
||||
"""extra_body should be forwarded to server."""
|
||||
extra = {"provider": {"ignore": ["DeepInfra"]}}
|
||||
server = MockServer([make_text_response("OK")])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
extra_body=extra,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
await agent.run(messages)
|
||||
|
||||
assert server.call_history[0].get("extra_body") == extra
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_managed_state_returned(self, basic_tools, valid_names):
|
||||
"""If server has get_state(), result should include managed_state."""
|
||||
server = MockServer([make_text_response("OK")])
|
||||
server.get_state = lambda: {"nodes": [{"test": True}]}
|
||||
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.managed_state is not None
|
||||
assert "nodes" in result.managed_state
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_managed_state_without_get_state(self, basic_tools, valid_names):
|
||||
"""Regular server without get_state() should return None managed_state."""
|
||||
server = MockServer([make_text_response("OK")])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Hi"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.managed_state is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_memory_tool_blocked(self, basic_tools):
|
||||
"""Memory tool should return error in RL environments."""
|
||||
valid = {"terminal", "read_file", "todo", "memory"}
|
||||
server = MockServer([
|
||||
make_tool_response("memory", {"action": "add", "target": "user", "content": "test"}),
|
||||
make_text_response("Done"),
|
||||
])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Remember this"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Find the tool response
|
||||
tool_msgs = [m for m in result.messages if m["role"] == "tool"]
|
||||
assert len(tool_msgs) >= 1
|
||||
tool_result = json.loads(tool_msgs[0]["content"])
|
||||
assert "error" in tool_result
|
||||
assert "not available" in tool_result["error"].lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_search_blocked(self, basic_tools):
|
||||
"""session_search should return error in RL environments."""
|
||||
valid = {"terminal", "read_file", "todo", "session_search"}
|
||||
server = MockServer([
|
||||
make_tool_response("session_search", {"query": "test"}),
|
||||
make_text_response("Done"),
|
||||
])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "Search sessions"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
tool_msgs = [m for m in result.messages if m["role"] == "tool"]
|
||||
assert len(tool_msgs) >= 1
|
||||
tool_result = json.loads(tool_msgs[0]["content"])
|
||||
assert "error" in tool_result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reasoning_content_preserved(self, basic_tools, valid_names):
|
||||
"""Reasoning content should be extracted and preserved."""
|
||||
resp = MockChatCompletion(
|
||||
choices=[
|
||||
MockChoice(
|
||||
message=MockMessage(
|
||||
content="The answer is 42.",
|
||||
reasoning_content="Let me think about this step by step...",
|
||||
)
|
||||
)
|
||||
]
|
||||
)
|
||||
server = MockServer([resp])
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=basic_tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=10,
|
||||
)
|
||||
messages = [{"role": "user", "content": "What is the meaning of life?"}]
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert len(result.reasoning_per_turn) == 1
|
||||
assert result.reasoning_per_turn[0] == "Let me think about this step by step..."
|
||||
|
||||
|
||||
class TestResizeToolPool:
|
||||
def test_resize_works(self):
|
||||
"""resize_tool_pool should not raise."""
|
||||
resize_tool_pool(16) # Small pool for testing
|
||||
resize_tool_pool(128) # Restore default
|
||||
550
tests/test_agent_loop_tool_calling.py
Normal file
550
tests/test_agent_loop_tool_calling.py
Normal file
@@ -0,0 +1,550 @@
|
||||
"""Integration tests for HermesAgentLoop tool calling.
|
||||
|
||||
Tests the full agent loop with real LLM calls via OpenRouter.
|
||||
Uses stepfun/step-3.5-flash:free by default (zero cost), falls back
|
||||
to anthropic/claude-sonnet-4 if the free model is unavailable.
|
||||
|
||||
These tests verify:
|
||||
1. Single tool call: model calls a tool, gets result, responds
|
||||
2. Multi-tool call: model calls multiple tools in one turn
|
||||
3. Multi-turn: model calls tools across multiple turns
|
||||
4. Unknown tool rejection: model calling a non-existent tool gets an error
|
||||
5. Max turns: loop stops when max_turns is reached
|
||||
6. No tools: model responds without calling any tools
|
||||
7. Tool error handling: tool execution errors are captured
|
||||
|
||||
Run:
|
||||
pytest tests/test_agent_loop_tool_calling.py -v
|
||||
pytest tests/test_agent_loop_tool_calling.py -v -k "single" # run one test
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Set
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable
|
||||
_repo_root = Path(__file__).resolve().parent.parent
|
||||
if str(_repo_root) not in sys.path:
|
||||
sys.path.insert(0, str(_repo_root))
|
||||
|
||||
try:
|
||||
from environments.agent_loop import AgentResult, HermesAgentLoop
|
||||
from atroposlib.envs.server_handling.openai_server import OpenAIServer # noqa: F401
|
||||
except ImportError:
|
||||
pytest.skip("atroposlib not installed", allow_module_level=True)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test infrastructure
|
||||
# =========================================================================
|
||||
|
||||
# Models to try, in order of preference (free first)
|
||||
_MODELS = [
|
||||
"stepfun/step-3.5-flash:free",
|
||||
"google/gemini-2.0-flash-001",
|
||||
"anthropic/claude-sonnet-4",
|
||||
]
|
||||
|
||||
def _get_api_key():
|
||||
key = os.getenv("OPENROUTER_API_KEY", "")
|
||||
if not key:
|
||||
pytest.skip("OPENROUTER_API_KEY not set")
|
||||
return key
|
||||
|
||||
|
||||
def _make_server(model: str = None):
|
||||
"""Create an OpenAI server for testing."""
|
||||
from atroposlib.envs.server_handling.openai_server import OpenAIServer
|
||||
from atroposlib.envs.server_handling.server_manager import APIServerConfig
|
||||
|
||||
config = APIServerConfig(
|
||||
base_url="https://openrouter.ai/api/v1",
|
||||
model_name=model or _MODELS[0],
|
||||
server_type="openai",
|
||||
api_key=_get_api_key(),
|
||||
health_check=False,
|
||||
)
|
||||
return OpenAIServer(config)
|
||||
|
||||
|
||||
async def _try_models(test_fn):
|
||||
"""Try running a test with each model until one works."""
|
||||
last_error = None
|
||||
for model in _MODELS:
|
||||
try:
|
||||
server = _make_server(model)
|
||||
return await test_fn(server, model)
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if "rate" in str(e).lower() or "limit" in str(e).lower():
|
||||
continue # Rate limited, try next model
|
||||
raise # Real error
|
||||
pytest.skip(f"All models failed. Last error: {last_error}")
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Fake tools for testing
|
||||
# =========================================================================
|
||||
|
||||
# Simple calculator tool
|
||||
CALC_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "calculate",
|
||||
"description": "Calculate a math expression. Returns the numeric result.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"expression": {
|
||||
"type": "string",
|
||||
"description": "Math expression to evaluate, e.g. '2 + 3'"
|
||||
}
|
||||
},
|
||||
"required": ["expression"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Weather lookup tool
|
||||
WEATHER_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "get_weather",
|
||||
"description": "Get the current weather for a city. Returns temperature and conditions.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": "string",
|
||||
"description": "City name, e.g. 'Tokyo'"
|
||||
}
|
||||
},
|
||||
"required": ["city"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Lookup tool (always succeeds)
|
||||
LOOKUP_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "lookup",
|
||||
"description": "Look up a fact. Returns a short answer string.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "What to look up"
|
||||
}
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Error tool (always fails)
|
||||
ERROR_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "failing_tool",
|
||||
"description": "A tool that always fails with an error.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"input": {"type": "string"}
|
||||
},
|
||||
"required": ["input"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _fake_tool_handler(tool_name: str, args: Dict[str, Any], **kwargs) -> str:
|
||||
"""Handle fake tool calls for testing."""
|
||||
if tool_name == "calculate":
|
||||
expr = args.get("expression", "0")
|
||||
try:
|
||||
# Safe eval for simple math
|
||||
result = eval(expr, {"__builtins__": {}}, {})
|
||||
return json.dumps({"result": result})
|
||||
except Exception as e:
|
||||
return json.dumps({"error": str(e)})
|
||||
|
||||
elif tool_name == "get_weather":
|
||||
city = args.get("city", "Unknown")
|
||||
# Return canned weather
|
||||
return json.dumps({
|
||||
"city": city,
|
||||
"temperature": 22,
|
||||
"conditions": "sunny",
|
||||
"humidity": 45,
|
||||
})
|
||||
|
||||
elif tool_name == "lookup":
|
||||
query = args.get("query", "")
|
||||
return json.dumps({"answer": f"The answer to '{query}' is 42."})
|
||||
|
||||
elif tool_name == "failing_tool":
|
||||
raise RuntimeError("This tool always fails!")
|
||||
|
||||
return json.dumps({"error": f"Unknown tool: {tool_name}"})
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Tests
|
||||
# =========================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_single_tool_call():
|
||||
"""Model should call a single tool, get the result, and respond."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What's the weather in Tokyo? Use the get_weather tool."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert isinstance(result, AgentResult)
|
||||
assert result.turns_used >= 2, f"Expected at least 2 turns (tool call + response), got {result.turns_used}"
|
||||
|
||||
# Verify a tool call happened
|
||||
tool_calls_found = False
|
||||
for msg in result.messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
if tc["function"]["name"] == "get_weather":
|
||||
tool_calls_found = True
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "city" in args
|
||||
assert tool_calls_found, "Model should have called get_weather"
|
||||
|
||||
# Verify tool result is in conversation
|
||||
tool_results = [m for m in result.messages if m.get("role") == "tool"]
|
||||
assert len(tool_results) >= 1, "Should have at least one tool result"
|
||||
|
||||
# Verify the final response references the weather
|
||||
final_msg = result.messages[-1]
|
||||
assert final_msg["role"] == "assistant"
|
||||
assert final_msg["content"], "Final response should have content"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multi_tool_single_turn():
|
||||
"""Model should call multiple tools in a single turn."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[WEATHER_TOOL, CALC_TOOL],
|
||||
valid_tool_names={"get_weather", "calculate"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": (
|
||||
"I need two things at once: "
|
||||
"1) What's the weather in Paris? Use get_weather. "
|
||||
"2) What is 15 * 7? Use calculate. "
|
||||
"Call BOTH tools in a single response."
|
||||
)},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Count distinct tools called
|
||||
tools_called = set()
|
||||
for msg in result.messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tools_called.add(tc["function"]["name"])
|
||||
|
||||
# At minimum, both tools should have been called (maybe in different turns)
|
||||
assert "get_weather" in tools_called, f"get_weather not called. Called: {tools_called}"
|
||||
assert "calculate" in tools_called, f"calculate not called. Called: {tools_called}"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multi_turn_conversation():
|
||||
"""Agent should handle multiple turns of tool calls."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[LOOKUP_TOOL, CALC_TOOL],
|
||||
valid_tool_names={"lookup", "calculate"},
|
||||
max_turns=10,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": (
|
||||
"First, use the lookup tool to look up 'meaning of life'. "
|
||||
"Then use calculate to compute 6 * 7. "
|
||||
"Do these in separate tool calls, one at a time."
|
||||
)},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Should have used both tools
|
||||
tools_called = set()
|
||||
for msg in result.messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tools_called.add(tc["function"]["name"])
|
||||
|
||||
assert "lookup" in tools_called, f"lookup not called. Called: {tools_called}"
|
||||
assert "calculate" in tools_called, f"calculate not called. Called: {tools_called}"
|
||||
|
||||
# Should finish naturally
|
||||
assert result.finished_naturally, "Should finish naturally after answering"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_tool_rejected():
|
||||
"""If the model calls a tool not in valid_tool_names, it gets an error."""
|
||||
|
||||
async def _run(server, model):
|
||||
# Only allow "calculate" but give schema for both
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[CALC_TOOL, WEATHER_TOOL],
|
||||
valid_tool_names={"calculate"}, # weather NOT allowed
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What's the weather in London? Use get_weather."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Check if get_weather was called and rejected
|
||||
if result.tool_errors:
|
||||
weather_errors = [e for e in result.tool_errors if e.tool_name == "get_weather"]
|
||||
assert len(weather_errors) > 0, "get_weather should have been rejected"
|
||||
assert "Unknown tool" in weather_errors[0].error
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_turns_limit():
|
||||
"""Agent should stop after max_turns even if model keeps calling tools."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[LOOKUP_TOOL],
|
||||
valid_tool_names={"lookup"},
|
||||
max_turns=2, # Very low limit
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": (
|
||||
"Keep looking up facts. Look up 'fact 1', then 'fact 2', "
|
||||
"then 'fact 3', then 'fact 4'. Do them one at a time."
|
||||
)},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.turns_used <= 2, f"Should stop at max_turns=2, used {result.turns_used}"
|
||||
assert not result.finished_naturally, "Should NOT finish naturally (hit max_turns)"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_tools_direct_response():
|
||||
"""When no tools are useful, model should respond directly."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=200,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What is 2 + 2? Just answer directly, no tools needed."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally, "Should finish naturally with a direct response"
|
||||
assert result.turns_used == 1, f"Should take exactly 1 turn for a direct answer, took {result.turns_used}"
|
||||
|
||||
final = result.messages[-1]
|
||||
assert final["role"] == "assistant"
|
||||
assert final["content"], "Should have text content"
|
||||
assert "4" in final["content"], "Should contain the answer '4'"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tool_error_handling():
|
||||
"""Tool execution errors should be captured and reported to the model."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[ERROR_TOOL],
|
||||
valid_tool_names={"failing_tool"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "Please call the failing_tool with input 'test'."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# The tool error should be recorded
|
||||
assert len(result.tool_errors) >= 1, "Should have at least one tool error"
|
||||
assert "RuntimeError" in result.tool_errors[0].error or "always fails" in result.tool_errors[0].error
|
||||
|
||||
# The error should be in the conversation as a tool result
|
||||
tool_results = [m for m in result.messages if m.get("role") == "tool"]
|
||||
assert len(tool_results) >= 1
|
||||
error_result = json.loads(tool_results[0]["content"])
|
||||
assert "error" in error_result
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_result_structure():
|
||||
"""Verify the AgentResult has all expected fields populated."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[CALC_TOOL],
|
||||
valid_tool_names={"calculate"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=300,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What is 3 + 4? Use the calculate tool."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Structural checks
|
||||
assert isinstance(result, AgentResult)
|
||||
assert isinstance(result.messages, list)
|
||||
assert len(result.messages) >= 3, "Should have user + assistant(tool) + tool_result + assistant(final)"
|
||||
assert isinstance(result.turns_used, int)
|
||||
assert result.turns_used > 0
|
||||
assert isinstance(result.finished_naturally, bool)
|
||||
assert isinstance(result.tool_errors, list)
|
||||
assert isinstance(result.reasoning_per_turn, list)
|
||||
|
||||
# Messages should follow OpenAI format
|
||||
for msg in result.messages:
|
||||
assert "role" in msg, f"Message missing 'role': {msg}"
|
||||
assert msg["role"] in ("system", "user", "assistant", "tool"), f"Invalid role: {msg['role']}"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversation_history_preserved():
|
||||
"""The full conversation history should be in result.messages."""
|
||||
|
||||
async def _run(server, model):
|
||||
agent = HermesAgentLoop(
|
||||
server=server,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.0,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful weather assistant."},
|
||||
{"role": "user", "content": "What's the weather in Berlin? Use get_weather."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# System message should be preserved
|
||||
assert result.messages[0]["role"] == "system"
|
||||
assert "weather assistant" in result.messages[0]["content"]
|
||||
|
||||
# User message should be preserved
|
||||
assert result.messages[1]["role"] == "user"
|
||||
assert "Berlin" in result.messages[1]["content"]
|
||||
|
||||
# Should have assistant + tool + assistant sequence
|
||||
roles = [m["role"] for m in result.messages]
|
||||
assert "tool" in roles, "Should have tool results in conversation"
|
||||
|
||||
return result
|
||||
|
||||
await _try_models(_run)
|
||||
359
tests/test_agent_loop_vllm.py
Normal file
359
tests/test_agent_loop_vllm.py
Normal file
@@ -0,0 +1,359 @@
|
||||
"""Integration tests for HermesAgentLoop with a local vLLM server.
|
||||
|
||||
Tests the full Phase 2 flow: ManagedServer + tool calling with a real
|
||||
vLLM backend, producing actual token IDs and logprobs for RL training.
|
||||
|
||||
Requires a running vLLM server. Start one from the atropos directory:
|
||||
|
||||
python -m example_trainer.vllm_api_server \
|
||||
--model Qwen/Qwen3-4B-Thinking-2507 \
|
||||
--port 9001 \
|
||||
--gpu-memory-utilization 0.8 \
|
||||
--max-model-len=32000
|
||||
|
||||
Tests are automatically skipped if the server is not reachable.
|
||||
|
||||
Run:
|
||||
pytest tests/test_agent_loop_vllm.py -v
|
||||
pytest tests/test_agent_loop_vllm.py -v -k "single"
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
# Ensure repo root is importable
|
||||
_repo_root = Path(__file__).resolve().parent.parent
|
||||
if str(_repo_root) not in sys.path:
|
||||
sys.path.insert(0, str(_repo_root))
|
||||
|
||||
try:
|
||||
from environments.agent_loop import AgentResult, HermesAgentLoop
|
||||
except ImportError:
|
||||
pytest.skip("atroposlib not installed", allow_module_level=True)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Configuration
|
||||
# =========================================================================
|
||||
|
||||
VLLM_HOST = "localhost"
|
||||
VLLM_PORT = 9001
|
||||
VLLM_BASE_URL = f"http://{VLLM_HOST}:{VLLM_PORT}"
|
||||
VLLM_MODEL = "Qwen/Qwen3-4B-Thinking-2507"
|
||||
|
||||
|
||||
def _vllm_is_running() -> bool:
|
||||
"""Check if the vLLM server is reachable."""
|
||||
try:
|
||||
r = requests.get(f"{VLLM_BASE_URL}/health", timeout=3)
|
||||
return r.status_code == 200
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# Skip all tests in this module if vLLM is not running
|
||||
pytestmark = pytest.mark.skipif(
|
||||
not _vllm_is_running(),
|
||||
reason=(
|
||||
f"vLLM server not reachable at {VLLM_BASE_URL}. "
|
||||
"Start it with: python -m example_trainer.vllm_api_server "
|
||||
f"--model {VLLM_MODEL} --port {VLLM_PORT} "
|
||||
"--gpu-memory-utilization 0.8 --max-model-len=32000"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Server setup
|
||||
# =========================================================================
|
||||
|
||||
def _make_server_manager():
|
||||
"""Create a ServerManager pointing to the local vLLM server."""
|
||||
from atroposlib.envs.server_handling.server_manager import (
|
||||
ServerManager,
|
||||
APIServerConfig,
|
||||
)
|
||||
|
||||
config = APIServerConfig(
|
||||
base_url=VLLM_BASE_URL,
|
||||
model_name=VLLM_MODEL,
|
||||
server_type="vllm",
|
||||
health_check=False,
|
||||
)
|
||||
sm = ServerManager([config], tool_parser="hermes")
|
||||
sm.servers[0].server_healthy = True
|
||||
return sm
|
||||
|
||||
|
||||
def _get_tokenizer():
|
||||
"""Load the tokenizer for the model."""
|
||||
from transformers import AutoTokenizer
|
||||
return AutoTokenizer.from_pretrained(VLLM_MODEL)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Fake tools
|
||||
# =========================================================================
|
||||
|
||||
WEATHER_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "get_weather",
|
||||
"description": "Get the current weather for a city. Returns temperature and conditions.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": "string",
|
||||
"description": "City name, e.g. 'Tokyo'",
|
||||
}
|
||||
},
|
||||
"required": ["city"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
CALC_TOOL = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "calculate",
|
||||
"description": "Calculate a math expression. Returns the numeric result.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"expression": {
|
||||
"type": "string",
|
||||
"description": "Math expression, e.g. '2 + 3'",
|
||||
}
|
||||
},
|
||||
"required": ["expression"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _fake_tool_handler(tool_name: str, args: Dict[str, Any], **kwargs) -> str:
|
||||
"""Handle fake tool calls for testing."""
|
||||
if tool_name == "get_weather":
|
||||
city = args.get("city", "Unknown")
|
||||
return json.dumps({
|
||||
"city": city,
|
||||
"temperature": 22,
|
||||
"conditions": "sunny",
|
||||
"humidity": 45,
|
||||
})
|
||||
elif tool_name == "calculate":
|
||||
expr = args.get("expression", "0")
|
||||
try:
|
||||
result = eval(expr, {"__builtins__": {}}, {})
|
||||
return json.dumps({"result": result})
|
||||
except Exception as e:
|
||||
return json.dumps({"error": str(e)})
|
||||
return json.dumps({"error": f"Unknown tool: {tool_name}"})
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Tests
|
||||
# =========================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vllm_single_tool_call():
|
||||
"""vLLM model calls a tool, gets result, responds — full Phase 2 flow."""
|
||||
sm = _make_server_manager()
|
||||
tokenizer = _get_tokenizer()
|
||||
|
||||
async with sm.managed_server(tokenizer=tokenizer) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.6,
|
||||
max_tokens=1000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What's the weather in Tokyo? Use the get_weather tool."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert isinstance(result, AgentResult)
|
||||
assert result.turns_used >= 2, f"Expected at least 2 turns, got {result.turns_used}"
|
||||
|
||||
# Verify tool call happened
|
||||
tool_calls_found = False
|
||||
for msg in result.messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
if tc["function"]["name"] == "get_weather":
|
||||
tool_calls_found = True
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "city" in args
|
||||
assert tool_calls_found, "Model should have called get_weather"
|
||||
|
||||
# Verify tool results in conversation
|
||||
tool_results = [m for m in result.messages if m.get("role") == "tool"]
|
||||
assert len(tool_results) >= 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vllm_multi_tool_calls():
|
||||
"""vLLM model calls multiple tools across turns."""
|
||||
sm = _make_server_manager()
|
||||
tokenizer = _get_tokenizer()
|
||||
|
||||
async with sm.managed_server(tokenizer=tokenizer) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=[WEATHER_TOOL, CALC_TOOL],
|
||||
valid_tool_names={"get_weather", "calculate"},
|
||||
max_turns=10,
|
||||
temperature=0.6,
|
||||
max_tokens=1000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": (
|
||||
"I need two things: "
|
||||
"1) What's the weather in Paris? Use get_weather. "
|
||||
"2) What is 15 * 7? Use calculate."
|
||||
)},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Both tools should be called
|
||||
tools_called = set()
|
||||
for msg in result.messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tools_called.add(tc["function"]["name"])
|
||||
|
||||
assert "get_weather" in tools_called, f"get_weather not called. Called: {tools_called}"
|
||||
assert "calculate" in tools_called, f"calculate not called. Called: {tools_called}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vllm_managed_server_produces_nodes():
|
||||
"""ManagedServer should produce SequenceNodes with tokens and logprobs."""
|
||||
sm = _make_server_manager()
|
||||
tokenizer = _get_tokenizer()
|
||||
|
||||
async with sm.managed_server(tokenizer=tokenizer) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.6,
|
||||
max_tokens=1000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What's the weather in Berlin? Use get_weather."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Get the managed state — should have SequenceNodes
|
||||
state = managed.get_state()
|
||||
|
||||
assert state is not None, "ManagedServer should return state"
|
||||
nodes = state.get("nodes", [])
|
||||
assert len(nodes) >= 1, f"Should have at least 1 node, got {len(nodes)}"
|
||||
|
||||
node = nodes[0]
|
||||
assert hasattr(node, "tokens"), "Node should have tokens"
|
||||
assert hasattr(node, "logprobs"), "Node should have logprobs"
|
||||
assert len(node.tokens) > 0, "Tokens should not be empty"
|
||||
assert len(node.logprobs) > 0, "Logprobs should not be empty"
|
||||
assert len(node.tokens) == len(node.logprobs), (
|
||||
f"Tokens ({len(node.tokens)}) and logprobs ({len(node.logprobs)}) should have same length"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vllm_no_tools_direct_response():
|
||||
"""vLLM model should respond directly when no tools are needed."""
|
||||
sm = _make_server_manager()
|
||||
tokenizer = _get_tokenizer()
|
||||
|
||||
async with sm.managed_server(tokenizer=tokenizer) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=[WEATHER_TOOL],
|
||||
valid_tool_names={"get_weather"},
|
||||
max_turns=5,
|
||||
temperature=0.6,
|
||||
max_tokens=500,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What is 2 + 2? Answer directly, no tools."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
assert result.finished_naturally, "Should finish naturally"
|
||||
assert result.turns_used == 1, f"Should take 1 turn, took {result.turns_used}"
|
||||
|
||||
final = result.messages[-1]
|
||||
assert final["role"] == "assistant"
|
||||
assert final["content"], "Should have content"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_vllm_thinking_content_extracted():
|
||||
"""Qwen3-Thinking model should produce reasoning content."""
|
||||
sm = _make_server_manager()
|
||||
tokenizer = _get_tokenizer()
|
||||
|
||||
async with sm.managed_server(
|
||||
tokenizer=tokenizer,
|
||||
preserve_think_blocks=True,
|
||||
) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=[CALC_TOOL],
|
||||
valid_tool_names={"calculate"},
|
||||
max_turns=5,
|
||||
temperature=0.6,
|
||||
max_tokens=1000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "What is 123 * 456? Use the calculate tool."},
|
||||
]
|
||||
|
||||
with patch("environments.agent_loop.handle_function_call", side_effect=_fake_tool_handler):
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Qwen3-Thinking should generate <think> blocks
|
||||
# Check if any content contains thinking markers
|
||||
has_thinking = False
|
||||
for msg in result.messages:
|
||||
content = msg.get("content", "") or ""
|
||||
if "<think>" in content or "</think>" in content:
|
||||
has_thinking = True
|
||||
break
|
||||
|
||||
# Also check reasoning_per_turn
|
||||
has_reasoning = any(r for r in result.reasoning_per_turn if r)
|
||||
|
||||
# At least one of these should be true for a thinking model
|
||||
assert has_thinking or has_reasoning, (
|
||||
"Qwen3-Thinking should produce <think> blocks or reasoning content"
|
||||
)
|
||||
178
tests/test_managed_server_tool_support.py
Normal file
178
tests/test_managed_server_tool_support.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""
|
||||
Tests for ManagedServer tool_call_parser integration.
|
||||
|
||||
Validates that:
|
||||
1. ManagedServer accepts tool_call_parser parameter (tool_call_support branch)
|
||||
2. ServerManager.managed_server() passes tool_call_parser through
|
||||
3. The parser's parse() output is correctly attached to ChatCompletion responses
|
||||
4. hermes-agent's tool_call_parsers are compatible with ManagedServer's expectations
|
||||
|
||||
These tests verify the contract between hermes-agent's environments/ code
|
||||
and atroposlib's ManagedServer. They detect API incompatibilities early.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
try:
|
||||
import atroposlib # noqa: F401
|
||||
except ImportError:
|
||||
pytest.skip("atroposlib not installed", allow_module_level=True)
|
||||
|
||||
|
||||
class TestManagedServerAPI:
|
||||
"""Test that ManagedServer's API matches what hermes-agent expects."""
|
||||
|
||||
def test_managed_server_init_signature(self):
|
||||
"""ManagedServer should accept tool_call_parser parameter."""
|
||||
from atroposlib.envs.server_handling.managed_server import ManagedServer
|
||||
|
||||
sig = inspect.signature(ManagedServer.__init__)
|
||||
params = list(sig.parameters.keys())
|
||||
|
||||
# Core params that must exist
|
||||
assert "self" in params
|
||||
assert "server" in params
|
||||
assert "tokenizer" in params
|
||||
assert "track_tree" in params
|
||||
|
||||
# tool_call_parser — required for tool_call_support branch
|
||||
# If this fails, atroposlib hasn't been updated to tool_call_support
|
||||
has_tool_parser = "tool_call_parser" in params
|
||||
if not has_tool_parser:
|
||||
pytest.skip(
|
||||
"ManagedServer does not have tool_call_parser param — "
|
||||
"baseline atroposlib (pre tool_call_support branch)"
|
||||
)
|
||||
|
||||
def test_server_manager_managed_server_signature(self):
|
||||
"""ServerManager.managed_server() should accept tool_call_parser."""
|
||||
from atroposlib.envs.server_handling.server_manager import ServerManager
|
||||
|
||||
sig = inspect.signature(ServerManager.managed_server)
|
||||
params = list(sig.parameters.keys())
|
||||
|
||||
assert "self" in params
|
||||
assert "tokenizer" in params
|
||||
|
||||
has_tool_parser = "tool_call_parser" in params
|
||||
if not has_tool_parser:
|
||||
pytest.skip(
|
||||
"ServerManager.managed_server() does not have tool_call_parser param — "
|
||||
"baseline atroposlib (pre tool_call_support branch)"
|
||||
)
|
||||
|
||||
def test_managed_server_chat_template_kwargs(self):
|
||||
"""ManagedServer should have CHAT_TEMPLATE_KWARGS for forwarding tools/thinking."""
|
||||
from atroposlib.envs.server_handling.managed_server import ManagedServer
|
||||
|
||||
if not hasattr(ManagedServer, "CHAT_TEMPLATE_KWARGS"):
|
||||
pytest.skip(
|
||||
"ManagedServer does not have CHAT_TEMPLATE_KWARGS — "
|
||||
"baseline atroposlib (pre tool_call_support branch)"
|
||||
)
|
||||
|
||||
kwargs = ManagedServer.CHAT_TEMPLATE_KWARGS
|
||||
assert "tools" in kwargs, "tools must be in CHAT_TEMPLATE_KWARGS"
|
||||
|
||||
def test_no_get_logprobs_method(self):
|
||||
"""get_logprobs should be removed in tool_call_support branch."""
|
||||
from atroposlib.envs.server_handling.managed_server import ManagedServer
|
||||
|
||||
# In baseline, get_logprobs exists. In tool_call_support, it's removed.
|
||||
# We just note the state — not a hard fail either way.
|
||||
has_get_logprobs = hasattr(ManagedServer, "get_logprobs")
|
||||
if has_get_logprobs:
|
||||
pytest.skip(
|
||||
"ManagedServer still has get_logprobs — baseline atroposlib"
|
||||
)
|
||||
|
||||
|
||||
class TestParserCompatibility:
|
||||
"""Test that hermes-agent's parsers match ManagedServer's expectations."""
|
||||
|
||||
def test_parser_parse_returns_correct_format(self):
|
||||
"""
|
||||
ManagedServer expects parser.parse(text) -> (content, tool_calls)
|
||||
where tool_calls is a list of objects with .id, .function.name, .function.arguments
|
||||
"""
|
||||
from environments.tool_call_parsers import get_parser
|
||||
|
||||
parser = get_parser("hermes")
|
||||
text = '<tool_call>{"name": "terminal", "arguments": {"command": "ls"}}</tool_call>'
|
||||
content, tool_calls = parser.parse(text)
|
||||
|
||||
assert tool_calls is not None
|
||||
assert len(tool_calls) == 1
|
||||
|
||||
tc = tool_calls[0]
|
||||
# ManagedServer accesses these attrs directly
|
||||
assert hasattr(tc, "id")
|
||||
assert hasattr(tc, "function")
|
||||
assert hasattr(tc.function, "name")
|
||||
assert hasattr(tc.function, "arguments")
|
||||
|
||||
def test_parser_no_tools_returns_none(self):
|
||||
"""ManagedServer checks `if parsed_tool_calls:` — None should be falsy."""
|
||||
from environments.tool_call_parsers import get_parser
|
||||
|
||||
parser = get_parser("hermes")
|
||||
content, tool_calls = parser.parse("Just text, no tools")
|
||||
assert tool_calls is None
|
||||
|
||||
def test_parser_content_is_string_or_none(self):
|
||||
"""ManagedServer uses `parsed_content or ""` — must be str or None."""
|
||||
from environments.tool_call_parsers import get_parser
|
||||
|
||||
parser = get_parser("hermes")
|
||||
|
||||
# With tool calls
|
||||
text = '<tool_call>{"name": "terminal", "arguments": {"command": "ls"}}</tool_call>'
|
||||
content, _ = parser.parse(text)
|
||||
assert content is None or isinstance(content, str)
|
||||
|
||||
# Without tool calls
|
||||
content2, _ = parser.parse("Just text")
|
||||
assert isinstance(content2, str)
|
||||
|
||||
|
||||
class TestBaseEnvCompatibility:
|
||||
"""Test that hermes_base_env.py's managed_server() call matches the API."""
|
||||
|
||||
def test_hermes_base_env_managed_server_call_pattern(self):
|
||||
"""
|
||||
Verify that hermes_base_env.py passes tool_call_parser to managed_server().
|
||||
This is a source-level check — the actual managed_server() call must match.
|
||||
"""
|
||||
import ast
|
||||
|
||||
base_env_path = Path(__file__).parent.parent / "environments" / "hermes_base_env.py"
|
||||
source = base_env_path.read_text()
|
||||
tree = ast.parse(source)
|
||||
|
||||
# Find the managed_server() call
|
||||
found_tool_call_parser_kwarg = False
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Call):
|
||||
# Look for self.server.managed_server(...)
|
||||
if isinstance(node.func, ast.Attribute) and node.func.attr == "managed_server":
|
||||
for kw in node.keywords:
|
||||
if kw.arg == "tool_call_parser":
|
||||
found_tool_call_parser_kwarg = True
|
||||
|
||||
assert found_tool_call_parser_kwarg, (
|
||||
"hermes_base_env.py should pass tool_call_parser= to managed_server()"
|
||||
)
|
||||
|
||||
def test_hermes_base_env_uses_get_parser(self):
|
||||
"""Verify hermes_base_env imports and uses get_parser from tool_call_parsers."""
|
||||
base_env_path = Path(__file__).parent.parent / "environments" / "hermes_base_env.py"
|
||||
source = base_env_path.read_text()
|
||||
|
||||
assert "from environments.tool_call_parsers import get_parser" in source
|
||||
assert "get_parser(" in source
|
||||
159
tests/test_tool_call_parsers.py
Normal file
159
tests/test_tool_call_parsers.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Tests for environments/tool_call_parsers/ — client-side tool call parsers.
|
||||
|
||||
These parsers extract structured tool_calls from raw model output text.
|
||||
Used in Phase 2 (VLLM/generate) where the server returns raw tokens.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
try:
|
||||
from environments.tool_call_parsers import (
|
||||
ParseResult,
|
||||
ToolCallParser,
|
||||
get_parser,
|
||||
list_parsers,
|
||||
)
|
||||
except ImportError:
|
||||
pytest.skip("atroposlib not installed", allow_module_level=True)
|
||||
|
||||
|
||||
# ─── Registry tests ─────────────────────────────────────────────────────
|
||||
|
||||
class TestParserRegistry:
|
||||
def test_list_parsers_returns_nonempty(self):
|
||||
parsers = list_parsers()
|
||||
assert len(parsers) > 0
|
||||
|
||||
def test_hermes_parser_registered(self):
|
||||
parsers = list_parsers()
|
||||
assert "hermes" in parsers
|
||||
|
||||
def test_get_parser_returns_instance(self):
|
||||
parser = get_parser("hermes")
|
||||
assert isinstance(parser, ToolCallParser)
|
||||
|
||||
def test_get_parser_unknown_raises(self):
|
||||
with pytest.raises(KeyError):
|
||||
get_parser("nonexistent_parser_xyz")
|
||||
|
||||
def test_all_registered_parsers_instantiate(self):
|
||||
"""Every registered parser should be importable and instantiable."""
|
||||
for name in list_parsers():
|
||||
parser = get_parser(name)
|
||||
assert isinstance(parser, ToolCallParser)
|
||||
assert hasattr(parser, "parse")
|
||||
|
||||
|
||||
# ─── Hermes parser tests ────────────────────────────────────────────────
|
||||
|
||||
class TestHermesParser:
|
||||
@pytest.fixture
|
||||
def parser(self):
|
||||
return get_parser("hermes")
|
||||
|
||||
def test_no_tool_call(self, parser):
|
||||
text = "Hello, I can help you with that."
|
||||
content, tool_calls = parser.parse(text)
|
||||
assert content == text
|
||||
assert tool_calls is None
|
||||
|
||||
def test_single_tool_call(self, parser):
|
||||
text = '<tool_call>{"name": "terminal", "arguments": {"command": "ls -la"}}</tool_call>'
|
||||
content, tool_calls = parser.parse(text)
|
||||
assert tool_calls is not None
|
||||
assert len(tool_calls) == 1
|
||||
assert tool_calls[0].function.name == "terminal"
|
||||
args = json.loads(tool_calls[0].function.arguments)
|
||||
assert args["command"] == "ls -la"
|
||||
|
||||
def test_tool_call_with_surrounding_text(self, parser):
|
||||
text = 'Let me check that for you.\n<tool_call>{"name": "terminal", "arguments": {"command": "pwd"}}</tool_call>'
|
||||
content, tool_calls = parser.parse(text)
|
||||
assert tool_calls is not None
|
||||
assert len(tool_calls) == 1
|
||||
assert tool_calls[0].function.name == "terminal"
|
||||
# Content should have the surrounding text
|
||||
if content is not None:
|
||||
assert "check that" in content or content.strip() != ""
|
||||
|
||||
def test_multiple_tool_calls(self, parser):
|
||||
text = (
|
||||
'<tool_call>{"name": "terminal", "arguments": {"command": "ls"}}</tool_call>\n'
|
||||
'<tool_call>{"name": "read_file", "arguments": {"path": "test.py"}}</tool_call>'
|
||||
)
|
||||
content, tool_calls = parser.parse(text)
|
||||
assert tool_calls is not None
|
||||
assert len(tool_calls) == 2
|
||||
names = {tc.function.name for tc in tool_calls}
|
||||
assert "terminal" in names
|
||||
assert "read_file" in names
|
||||
|
||||
def test_tool_call_ids_are_unique(self, parser):
|
||||
text = (
|
||||
'<tool_call>{"name": "terminal", "arguments": {"command": "ls"}}</tool_call>\n'
|
||||
'<tool_call>{"name": "terminal", "arguments": {"command": "pwd"}}</tool_call>'
|
||||
)
|
||||
_, tool_calls = parser.parse(text)
|
||||
assert tool_calls is not None
|
||||
ids = [tc.id for tc in tool_calls]
|
||||
assert len(ids) == len(set(ids)), "Tool call IDs must be unique"
|
||||
|
||||
def test_empty_string(self, parser):
|
||||
content, tool_calls = parser.parse("")
|
||||
assert tool_calls is None
|
||||
|
||||
def test_malformed_json_in_tool_call(self, parser):
|
||||
text = '<tool_call>not valid json</tool_call>'
|
||||
content, tool_calls = parser.parse(text)
|
||||
# Should either return None tool_calls or handle gracefully
|
||||
# (implementation may vary — some parsers return error tool calls)
|
||||
|
||||
def test_truncated_tool_call(self, parser):
|
||||
"""Test handling of unclosed tool_call tag (model truncated mid-generation)."""
|
||||
text = '<tool_call>{"name": "terminal", "arguments": {"command": "ls -la"}'
|
||||
content, tool_calls = parser.parse(text)
|
||||
# Parser should handle truncated output gracefully
|
||||
# Either parse it successfully or return None
|
||||
|
||||
|
||||
# ─── Parse result contract tests (applies to ALL parsers) ───────────────
|
||||
|
||||
class TestParseResultContract:
|
||||
"""Ensure all parsers conform to the ParseResult contract."""
|
||||
|
||||
@pytest.fixture(params=["hermes"]) # Add more as needed
|
||||
def parser(self, request):
|
||||
return get_parser(request.param)
|
||||
|
||||
def test_returns_tuple_of_two(self, parser):
|
||||
result = parser.parse("hello world")
|
||||
assert isinstance(result, tuple)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_no_tools_returns_none_tool_calls(self, parser):
|
||||
content, tool_calls = parser.parse("Just plain text, no tools.")
|
||||
assert tool_calls is None
|
||||
assert content is not None
|
||||
|
||||
def test_tool_calls_are_proper_objects(self, parser):
|
||||
"""When tool calls are found, they should be ChatCompletionMessageToolCall objects."""
|
||||
# Use hermes format since that's universal
|
||||
text = '<tool_call>{"name": "terminal", "arguments": {"command": "echo hi"}}</tool_call>'
|
||||
content, tool_calls = parser.parse(text)
|
||||
if tool_calls is not None:
|
||||
for tc in tool_calls:
|
||||
assert hasattr(tc, "id")
|
||||
assert hasattr(tc, "function")
|
||||
assert hasattr(tc.function, "name")
|
||||
assert hasattr(tc.function, "arguments")
|
||||
assert tc.id is not None
|
||||
assert isinstance(tc.function.name, str)
|
||||
assert isinstance(tc.function.arguments, str)
|
||||
271
tests/tools/test_modal_sandbox_fixes.py
Normal file
271
tests/tools/test_modal_sandbox_fixes.py
Normal file
@@ -0,0 +1,271 @@
|
||||
"""Tests for Modal sandbox infrastructure fixes (TBLite baseline).
|
||||
|
||||
Covers the 9 bugs discovered while setting up TBLite evaluation:
|
||||
1. Tool resolution — terminal + file tools load with minisweagent
|
||||
2. CWD fix — host paths get replaced with /root for container backends
|
||||
3. ephemeral_disk version check
|
||||
4. Tilde ~ replaced with /root for container backends
|
||||
5. ensurepip fix in patches.py for Modal image builder
|
||||
6. install_pipx stays True for swerex-remote
|
||||
7. /home/ added to host prefix check
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable
|
||||
_repo_root = Path(__file__).resolve().parent.parent.parent
|
||||
if str(_repo_root) not in sys.path:
|
||||
sys.path.insert(0, str(_repo_root))
|
||||
|
||||
try:
|
||||
import tools.terminal_tool # noqa: F401
|
||||
_tt_mod = sys.modules["tools.terminal_tool"]
|
||||
except ImportError:
|
||||
pytest.skip("hermes-agent tools not importable (missing deps)", allow_module_level=True)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 1: Tool resolution includes terminal + file tools
|
||||
# =========================================================================
|
||||
|
||||
class TestToolResolution:
|
||||
"""Verify get_tool_definitions returns all expected tools for eval."""
|
||||
|
||||
def _has_minisweagent(self):
|
||||
try:
|
||||
import minisweagent # noqa: F401
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
def test_terminal_and_file_toolsets_resolve_all_tools(self):
|
||||
"""enabled_toolsets=['terminal', 'file'] should produce 6 tools."""
|
||||
if not self._has_minisweagent():
|
||||
pytest.skip("minisweagent not installed (git submodule update --init)")
|
||||
from model_tools import get_tool_definitions
|
||||
tools = get_tool_definitions(
|
||||
enabled_toolsets=["terminal", "file"],
|
||||
quiet_mode=True,
|
||||
)
|
||||
names = {t["function"]["name"] for t in tools}
|
||||
expected = {"terminal", "process", "read_file", "write_file", "search_files", "patch"}
|
||||
assert expected == names, f"Expected {expected}, got {names}"
|
||||
|
||||
def test_terminal_tool_present(self):
|
||||
"""The terminal tool must be present (not silently dropped)."""
|
||||
if not self._has_minisweagent():
|
||||
pytest.skip("minisweagent not installed (git submodule update --init)")
|
||||
from model_tools import get_tool_definitions
|
||||
tools = get_tool_definitions(
|
||||
enabled_toolsets=["terminal", "file"],
|
||||
quiet_mode=True,
|
||||
)
|
||||
names = [t["function"]["name"] for t in tools]
|
||||
assert "terminal" in names, (
|
||||
f"terminal tool missing! Only got: {names}. "
|
||||
"Check that minisweagent is installed (git submodule update --init)."
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 2-4: CWD handling for container backends
|
||||
# =========================================================================
|
||||
|
||||
class TestCwdHandling:
|
||||
"""Verify host paths are sanitized for container backends."""
|
||||
|
||||
def test_home_path_replaced_for_modal(self):
|
||||
"""TERMINAL_CWD=/home/user/... should be replaced with /root for modal."""
|
||||
with patch.dict(os.environ, {
|
||||
"TERMINAL_ENV": "modal",
|
||||
"TERMINAL_CWD": "/home/dakota/github/hermes-agent",
|
||||
}):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == "/root", (
|
||||
f"Expected /root, got {config['cwd']}. "
|
||||
"/home/ paths should be replaced for modal backend."
|
||||
)
|
||||
|
||||
def test_users_path_replaced_for_docker(self):
|
||||
"""TERMINAL_CWD=/Users/... should be replaced with /root for docker."""
|
||||
with patch.dict(os.environ, {
|
||||
"TERMINAL_ENV": "docker",
|
||||
"TERMINAL_CWD": "/Users/someone/projects",
|
||||
}):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == "/root", (
|
||||
f"Expected /root, got {config['cwd']}. "
|
||||
"/Users/ paths should be replaced for docker backend."
|
||||
)
|
||||
|
||||
def test_windows_path_replaced_for_modal(self):
|
||||
"""TERMINAL_CWD=C:\\Users\\... should be replaced for modal."""
|
||||
with patch.dict(os.environ, {
|
||||
"TERMINAL_ENV": "modal",
|
||||
"TERMINAL_CWD": "C:\\Users\\someone\\projects",
|
||||
}):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == "/root"
|
||||
|
||||
def test_default_cwd_is_root_for_container_backends(self):
|
||||
"""Container backends should default to /root, not ~."""
|
||||
for backend in ("modal", "docker", "singularity", "daytona"):
|
||||
with patch.dict(os.environ, {"TERMINAL_ENV": backend}, clear=False):
|
||||
# Remove TERMINAL_CWD so it uses default
|
||||
env = os.environ.copy()
|
||||
env.pop("TERMINAL_CWD", None)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == "/root", (
|
||||
f"Backend {backend}: expected /root default, got {config['cwd']}"
|
||||
)
|
||||
|
||||
def test_local_backend_uses_getcwd(self):
|
||||
"""Local backend should use os.getcwd(), not /root."""
|
||||
with patch.dict(os.environ, {"TERMINAL_ENV": "local"}, clear=False):
|
||||
env = os.environ.copy()
|
||||
env.pop("TERMINAL_CWD", None)
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == os.getcwd()
|
||||
|
||||
def test_ssh_preserves_home_paths(self):
|
||||
"""SSH backend should NOT replace /home/ paths (they're valid remotely)."""
|
||||
with patch.dict(os.environ, {
|
||||
"TERMINAL_ENV": "ssh",
|
||||
"TERMINAL_CWD": "/home/remote-user/work",
|
||||
"TERMINAL_SSH_HOST": "example.com",
|
||||
"TERMINAL_SSH_USER": "user",
|
||||
}):
|
||||
config = _tt_mod._get_env_config()
|
||||
assert config["cwd"] == "/home/remote-user/work", (
|
||||
"SSH backend should preserve /home/ paths"
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 5: ephemeral_disk version check
|
||||
# =========================================================================
|
||||
|
||||
class TestEphemeralDiskCheck:
|
||||
"""Verify ephemeral_disk is only passed when modal supports it."""
|
||||
|
||||
def test_ephemeral_disk_skipped_when_unsupported(self):
|
||||
"""If modal.Sandbox.create doesn't have ephemeral_disk param, skip it."""
|
||||
# Mock the modal import and Sandbox.create signature
|
||||
mock_modal = MagicMock()
|
||||
mock_sandbox_create = MagicMock()
|
||||
# Simulate a signature WITHOUT ephemeral_disk
|
||||
import inspect
|
||||
mock_params = {
|
||||
"args": inspect.Parameter("args", inspect.Parameter.VAR_POSITIONAL),
|
||||
"image": inspect.Parameter("image", inspect.Parameter.KEYWORD_ONLY),
|
||||
"timeout": inspect.Parameter("timeout", inspect.Parameter.KEYWORD_ONLY),
|
||||
"cpu": inspect.Parameter("cpu", inspect.Parameter.KEYWORD_ONLY),
|
||||
"memory": inspect.Parameter("memory", inspect.Parameter.KEYWORD_ONLY),
|
||||
}
|
||||
mock_sig = inspect.Signature(parameters=list(mock_params.values()))
|
||||
|
||||
with patch.dict(os.environ, {"TERMINAL_ENV": "modal"}):
|
||||
config = _tt_mod._get_env_config()
|
||||
# The config has container_disk default of 51200
|
||||
disk = config.get("container_disk", 51200)
|
||||
assert disk > 0, "disk should default to > 0"
|
||||
|
||||
# Simulate the version check logic from terminal_tool.py
|
||||
sandbox_kwargs = {}
|
||||
if disk > 0:
|
||||
try:
|
||||
if "ephemeral_disk" in mock_params:
|
||||
sandbox_kwargs["ephemeral_disk"] = disk
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert "ephemeral_disk" not in sandbox_kwargs, (
|
||||
"ephemeral_disk should not be set when Sandbox.create doesn't support it"
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 6: ModalEnvironment defaults
|
||||
# =========================================================================
|
||||
|
||||
class TestModalEnvironmentDefaults:
|
||||
"""Verify ModalEnvironment has correct defaults."""
|
||||
|
||||
def test_default_cwd_is_root(self):
|
||||
"""ModalEnvironment default cwd should be /root, not ~."""
|
||||
from tools.environments.modal import ModalEnvironment
|
||||
import inspect
|
||||
sig = inspect.signature(ModalEnvironment.__init__)
|
||||
cwd_default = sig.parameters["cwd"].default
|
||||
assert cwd_default == "/root", (
|
||||
f"ModalEnvironment cwd default should be /root, got {cwd_default!r}. "
|
||||
"Tilde ~ is not expanded by subprocess.run(cwd=...)."
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 7: ensurepip fix in patches.py
|
||||
# =========================================================================
|
||||
|
||||
class TestEnsurepipFix:
|
||||
"""Verify the pip fix is applied in the patched Modal init."""
|
||||
|
||||
def test_patched_init_creates_image_with_setup_commands(self):
|
||||
"""The patched __init__ should create a modal.Image with pip fix."""
|
||||
try:
|
||||
from environments.patches import _patch_swerex_modal
|
||||
except ImportError:
|
||||
pytest.skip("environments.patches not importable")
|
||||
|
||||
# Check that the patch code references ensurepip
|
||||
import inspect
|
||||
source = inspect.getsource(_patch_swerex_modal)
|
||||
assert "ensurepip" in source, (
|
||||
"patches._patch_swerex_modal should include ensurepip fix "
|
||||
"for Modal's legacy image builder"
|
||||
)
|
||||
assert "setup_dockerfile_commands" in source, (
|
||||
"patches._patch_swerex_modal should use setup_dockerfile_commands "
|
||||
"to fix pip before Modal's bootstrap"
|
||||
)
|
||||
|
||||
def test_patched_init_uses_install_pipx_from_config(self):
|
||||
"""The patched init should respect install_pipx from config."""
|
||||
try:
|
||||
from environments.patches import _patch_swerex_modal
|
||||
except ImportError:
|
||||
pytest.skip("environments.patches not importable")
|
||||
|
||||
import inspect
|
||||
source = inspect.getsource(_patch_swerex_modal)
|
||||
assert "install_pipx" in source, (
|
||||
"patches._patch_swerex_modal should pass install_pipx to ModalDeployment"
|
||||
)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Test 8: Host prefix list completeness
|
||||
# =========================================================================
|
||||
|
||||
class TestHostPrefixList:
|
||||
"""Verify the host prefix list catches common host-only paths."""
|
||||
|
||||
def test_all_common_host_prefixes_caught(self):
|
||||
"""The host prefix check should catch /Users/, /home/, C:\\, C:/."""
|
||||
# Read the actual source to verify the prefixes
|
||||
import inspect
|
||||
source = inspect.getsource(_tt_mod._get_env_config)
|
||||
for prefix in ["/Users/", "/home/", 'C:\\\\"', "C:/"]:
|
||||
# Normalize for source comparison
|
||||
check = prefix.rstrip('"')
|
||||
assert check in source or prefix in source, (
|
||||
f"Host prefix {prefix!r} not found in _get_env_config. "
|
||||
"Container backends need this to avoid using host paths."
|
||||
)
|
||||
@@ -50,7 +50,7 @@ class ModalEnvironment(BaseEnvironment):
|
||||
def __init__(
|
||||
self,
|
||||
image: str,
|
||||
cwd: str = "~",
|
||||
cwd: str = "/root",
|
||||
timeout: int = 60,
|
||||
modal_sandbox_kwargs: Optional[Dict[str, Any]] = None,
|
||||
persistent_filesystem: bool = True,
|
||||
@@ -95,6 +95,7 @@ class ModalEnvironment(BaseEnvironment):
|
||||
startup_timeout=180.0,
|
||||
runtime_timeout=3600.0,
|
||||
modal_sandbox_kwargs=sandbox_kwargs,
|
||||
install_pipx=True, # Required: installs pipx + swe-rex runtime (swerex-remote)
|
||||
)
|
||||
|
||||
def execute(self, command: str, cwd: str = "", *,
|
||||
|
||||
@@ -415,7 +415,7 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
if env_type == "local":
|
||||
default_cwd = os.getcwd()
|
||||
else:
|
||||
default_cwd = "~"
|
||||
default_cwd = "/root"
|
||||
|
||||
# Read TERMINAL_CWD but sanity-check it for container backends.
|
||||
# If the CWD looks like a host-local path that can't exist inside a
|
||||
@@ -424,7 +424,7 @@ def _get_env_config() -> Dict[str, Any]:
|
||||
# SSH is excluded since /home/ paths are valid on remote machines.
|
||||
cwd = os.getenv("TERMINAL_CWD", default_cwd)
|
||||
if env_type in ("modal", "docker", "singularity", "daytona") and cwd:
|
||||
host_prefixes = ("/Users/", "C:\\", "C:/")
|
||||
host_prefixes = ("/Users/", "/home/", "C:\\", "C:/")
|
||||
if any(cwd.startswith(p) for p in host_prefixes) and cwd != default_cwd:
|
||||
logger.info("Ignoring TERMINAL_CWD=%r for %s backend "
|
||||
"(host path won't exist in sandbox). Using %r instead.",
|
||||
@@ -504,7 +504,12 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
|
||||
if memory > 0:
|
||||
sandbox_kwargs["memory"] = memory
|
||||
if disk > 0:
|
||||
sandbox_kwargs["ephemeral_disk"] = disk
|
||||
try:
|
||||
import inspect, modal
|
||||
if "ephemeral_disk" in inspect.signature(modal.Sandbox.create).parameters:
|
||||
sandbox_kwargs["ephemeral_disk"] = disk
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return _ModalEnvironment(
|
||||
image=image, cwd=cwd, timeout=timeout,
|
||||
|
||||
Reference in New Issue
Block a user