mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
9 Commits
codex-port
...
endless-te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dff5481e58 | ||
|
|
fe17b5ff08 | ||
|
|
6fdb38ed29 | ||
|
|
b7e713b101 | ||
|
|
0e694b954a | ||
|
|
c12e46cf13 | ||
|
|
b93ad43191 | ||
|
|
f1c2f8a414 | ||
|
|
9139eeaa60 |
@@ -13,6 +13,7 @@ Core layers:
|
||||
Concrete environments:
|
||||
- terminal_test_env/: Simple file-creation tasks for testing the stack
|
||||
- hermes_swe_env/: SWE-bench style tasks with Modal sandboxes
|
||||
- endless_terminals/: Terminal tasks from HuggingFace dataset with Apptainer containers
|
||||
|
||||
Benchmarks (eval-only):
|
||||
- benchmarks/terminalbench_2/: Terminal-Bench 2.0 evaluation
|
||||
|
||||
5
environments/endless_terminals/__init__.py
Normal file
5
environments/endless_terminals/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Endless Terminals Environment - Terminal task training from HuggingFace dataset."""
|
||||
|
||||
from .endless_terminals_env import EndlessTerminalsEnv, EndlessTerminalsEnvConfig
|
||||
|
||||
__all__ = ["EndlessTerminalsEnv", "EndlessTerminalsEnvConfig"]
|
||||
69
environments/endless_terminals/default.yaml
Normal file
69
environments/endless_terminals/default.yaml
Normal file
@@ -0,0 +1,69 @@
|
||||
# Endless Terminals Environment -- Default Configuration
|
||||
#
|
||||
# Trains agents on terminal tasks from the Endless Terminals HuggingFace dataset.
|
||||
# Uses hermes-agent backends (modal/docker/local) with per-task Docker images.
|
||||
# Tests run in the same sandbox the agent used (no separate containers needed).
|
||||
#
|
||||
# Dataset: https://huggingface.co/datasets/obiwan96/endless-terminals-train
|
||||
#
|
||||
# Prerequisites:
|
||||
# 1. Download dataset: huggingface-cli download obiwan96/endless-terminals-train \
|
||||
# --repo-type dataset --local-dir ~/endless-terminals-data \
|
||||
# --local-dir-use-symlinks False
|
||||
# 2. Set TASKS_BASE_DIR environment variable or configure tasks_base_dir below
|
||||
# 3. For modal backend: Configure Modal CLI (modal token set)
|
||||
# 4. For docker backend: Install Docker
|
||||
#
|
||||
# Usage:
|
||||
# python environments/endless_terminals/endless_terminals_env.py process \
|
||||
# --config environments/endless_terminals/default.yaml
|
||||
|
||||
env:
|
||||
# Toolsets
|
||||
enabled_toolsets: ["terminal", "file"]
|
||||
|
||||
# Agent configuration
|
||||
max_agent_turns: 32
|
||||
max_token_length: 4096
|
||||
agent_temperature: 1.0
|
||||
|
||||
# Terminal backend
|
||||
terminal_backend: "local" # Change to "modal" or "docker" for cloud isolation
|
||||
|
||||
# Dataset settings
|
||||
use_dataset: true
|
||||
dataset_name: "obiwan96/endless-terminals"
|
||||
dataset_split: "train"
|
||||
dataset_cache_dir: "~/.cache/huggingface/datasets"
|
||||
tasks_base_dir: "" # Set to directory containing task_* folders (e.g., ~/endless-terminals-data)
|
||||
|
||||
# Test execution
|
||||
test_timeout_s: 60
|
||||
|
||||
# Training configuration
|
||||
group_size: 8
|
||||
total_steps: 10000
|
||||
steps_per_eval: 500
|
||||
|
||||
num_eval_tasks: 10
|
||||
eval_split_ratio: 0.1
|
||||
|
||||
# Logging
|
||||
use_wandb: true
|
||||
wandb_name: "endless-terminals"
|
||||
|
||||
# System prompt
|
||||
system_prompt: >
|
||||
You are a skilled Linux system administrator and programmer.
|
||||
You have access to a terminal and file tools to complete system administration
|
||||
and programming tasks. Use the tools effectively to solve the given task,
|
||||
and verify your solution works correctly before finishing.
|
||||
|
||||
openai:
|
||||
base_url: "https://openrouter.ai/api/v1"
|
||||
model_name: "anthropic/claude-sonnet-4.5"
|
||||
server_type: "openai"
|
||||
api_key: "" # Loaded from OPENROUTER_API_KEY env var
|
||||
health_check: false
|
||||
timeout: 30 # 30 second timeout per request
|
||||
max_retries: 2 # Only retry twice
|
||||
921
environments/endless_terminals/endless_terminals_env.py
Normal file
921
environments/endless_terminals/endless_terminals_env.py
Normal file
@@ -0,0 +1,921 @@
|
||||
"""
|
||||
Endless Terminals Environment for Hermes-Agent + Atropos RL.
|
||||
|
||||
Loads pre-generated terminal tasks from HuggingFace dataset and scores
|
||||
agent performance using test execution in the agent's sandbox.
|
||||
|
||||
Uses hermes-agent backends (modal, docker, local) with per-task Docker images
|
||||
extracted from container.def files. Tests run in the same sandbox the agent
|
||||
used, following the Terminal Bench 2 pattern.
|
||||
|
||||
Dataset: https://huggingface.co/datasets/obiwan96/endless-terminals-train
|
||||
|
||||
Run:
|
||||
python environments/endless_terminals/endless_terminals_env.py process \
|
||||
--config environments/endless_terminals/default.yaml
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
# Ensure hermes-agent root is on path
|
||||
_repo_root = Path(__file__).resolve().parent.parent.parent
|
||||
if str(_repo_root) not in sys.path:
|
||||
sys.path.insert(0, str(_repo_root))
|
||||
|
||||
from atroposlib.envs.base import ScoredDataGroup, ScoredDataItem
|
||||
from atroposlib.type_definitions import Item
|
||||
|
||||
from environments.hermes_base_env import HermesAgentBaseEnv, HermesAgentEnvConfig
|
||||
from environments.agent_loop import AgentResult
|
||||
from environments.tool_context import ToolContext
|
||||
from tools.terminal_tool import (
|
||||
register_task_env_overrides,
|
||||
clear_task_env_overrides,
|
||||
cleanup_vm,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Add endless-terminals to path for imports
|
||||
ENDLESS_TERMINALS_PATH = os.getenv(
|
||||
"ENDLESS_TERMINALS_PATH",
|
||||
str(Path.home() / "Desktop" / "Projects" / "endless-terminals")
|
||||
)
|
||||
sys.path.insert(0, ENDLESS_TERMINALS_PATH)
|
||||
|
||||
|
||||
class EndlessTerminalsEnvConfig(HermesAgentEnvConfig):
|
||||
"""Configuration for Endless Terminals environment."""
|
||||
|
||||
# Dataset settings
|
||||
use_dataset: bool = Field(
|
||||
default=True,
|
||||
description="Load tasks from HuggingFace dataset (recommended). If False, generate procedurally."
|
||||
)
|
||||
dataset_name: str = Field(
|
||||
default="obiwan96/endless-terminals-train",
|
||||
description="HuggingFace dataset name"
|
||||
)
|
||||
dataset_split: str = Field(
|
||||
default="train",
|
||||
description="Dataset split to use"
|
||||
)
|
||||
dataset_cache_dir: str = Field(
|
||||
default="~/.cache/huggingface/datasets",
|
||||
description="HuggingFace datasets cache directory"
|
||||
)
|
||||
tasks_base_dir: str = Field(
|
||||
default="",
|
||||
description="Base directory containing task_* folders. If empty, uses paths from dataset."
|
||||
)
|
||||
|
||||
# Test execution
|
||||
test_timeout_s: int = Field(default=60, description="Test execution timeout (seconds)")
|
||||
|
||||
# Docker image fallback
|
||||
default_docker_image: str = Field(
|
||||
default="ubuntu:22.04",
|
||||
description="Default Docker image if container.def parsing fails"
|
||||
)
|
||||
|
||||
# Agent defaults
|
||||
max_agent_turns: int = Field(default=32, description="Max turns for agent (increased for long traces)")
|
||||
|
||||
# Evaluation settings
|
||||
num_eval_tasks: int = Field(
|
||||
default=10,
|
||||
description="Number of tasks to run during periodic evaluation"
|
||||
)
|
||||
eval_split_ratio: float = Field(
|
||||
default=0.1,
|
||||
description="Fraction of dataset to hold out for evaluation (0.0-1.0)"
|
||||
)
|
||||
|
||||
|
||||
class EndlessTerminalsEnv(HermesAgentBaseEnv):
|
||||
"""
|
||||
Endless Terminals environment using pre-generated HuggingFace dataset.
|
||||
|
||||
Loads terminal tasks from dataset, runs agent with terminal tools,
|
||||
and scores by executing tests in the agent's sandbox using ToolContext.
|
||||
"""
|
||||
|
||||
name = "endless_terminals_env"
|
||||
env_config_cls = EndlessTerminalsEnvConfig
|
||||
|
||||
@classmethod
|
||||
def config_init(cls) -> Tuple[EndlessTerminalsEnvConfig, List["APIServerConfig"]]:
|
||||
"""
|
||||
Default configuration for Endless Terminals environment.
|
||||
|
||||
This is used when no config file is provided, but note that when using
|
||||
--config, the YAML is loaded differently and this may not be called.
|
||||
"""
|
||||
from atroposlib.envs.server_handling.server_manager import APIServerConfig
|
||||
|
||||
env_config = EndlessTerminalsEnvConfig(
|
||||
enabled_toolsets=["terminal", "file"],
|
||||
max_agent_turns=32,
|
||||
terminal_backend="local",
|
||||
use_dataset=True,
|
||||
tasks_base_dir="",
|
||||
group_size=1,
|
||||
total_steps=1,
|
||||
use_wandb=False,
|
||||
)
|
||||
|
||||
server_configs = [
|
||||
APIServerConfig(
|
||||
base_url="https://openrouter.ai/api/v1",
|
||||
model_name="anthropic/claude-sonnet-4.5",
|
||||
server_type="openai",
|
||||
api_key=os.getenv("OPENROUTER_API_KEY", ""),
|
||||
health_check=False,
|
||||
)
|
||||
]
|
||||
|
||||
return env_config, server_configs
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._dataset = None
|
||||
self._train_dataset = None
|
||||
self._eval_dataset = None
|
||||
self._dataset_indices = []
|
||||
self._current_index = 0
|
||||
|
||||
# Metrics tracking for wandb - single buffer with dicts
|
||||
self._metrics_buffer = []
|
||||
|
||||
# Debug: check server config
|
||||
if hasattr(self, 'server') and hasattr(self.server, 'servers'):
|
||||
for i, srv in enumerate(self.server.servers):
|
||||
logger.debug(f"Server {i}: model_name={getattr(srv.config, 'model_name', 'NONE')}")
|
||||
|
||||
async def setup(self):
|
||||
"""Load dataset from HuggingFace or local directory."""
|
||||
if not self.config.use_dataset:
|
||||
logger.info("Using procedural task generation (not implemented yet)")
|
||||
return
|
||||
|
||||
# If tasks_base_dir is set, load from local directory instead of HuggingFace
|
||||
if self.config.tasks_base_dir:
|
||||
tasks_base = Path(os.path.expanduser(self.config.tasks_base_dir))
|
||||
|
||||
# Resolve to absolute path if relative
|
||||
if not tasks_base.is_absolute():
|
||||
tasks_base = Path.cwd() / tasks_base
|
||||
|
||||
tasks_base = tasks_base.resolve()
|
||||
|
||||
if not tasks_base.exists():
|
||||
raise RuntimeError(f"tasks_base_dir not found: {tasks_base}")
|
||||
|
||||
logger.info(f"Loading tasks from local directory: {tasks_base}")
|
||||
|
||||
# Find all task_* directories
|
||||
task_dirs = sorted(tasks_base.glob("task_*"))
|
||||
logger.info(f"Found {len(task_dirs)} task directories")
|
||||
|
||||
if not task_dirs:
|
||||
# Debug: show what's actually in the directory
|
||||
all_items = list(tasks_base.iterdir())
|
||||
logger.warning(f"Directory contains {len(all_items)} items:")
|
||||
for item in all_items[:10]:
|
||||
logger.warning(f" - {item.name} ({'dir' if item.is_dir() else 'file'})")
|
||||
raise RuntimeError(f"No task_* directories found in {tasks_base}")
|
||||
|
||||
# Create fake dataset items (just the directory paths)
|
||||
self._dataset = [
|
||||
{
|
||||
"description": f"Task from {task_dir.name}",
|
||||
"extra_info": {"task_dir": str(task_dir)},
|
||||
}
|
||||
for task_dir in task_dirs
|
||||
]
|
||||
|
||||
logger.info(f"Loaded {len(self._dataset)} tasks from local directory")
|
||||
|
||||
self._split_dataset()
|
||||
return
|
||||
|
||||
# Otherwise, load from HuggingFace
|
||||
logger.info(f"Loading dataset from HuggingFace: {self.config.dataset_name}")
|
||||
|
||||
try:
|
||||
from datasets import load_dataset
|
||||
|
||||
self._dataset = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
lambda: load_dataset(
|
||||
self.config.dataset_name,
|
||||
split=self.config.dataset_split,
|
||||
cache_dir=os.path.expanduser(self.config.dataset_cache_dir)
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Loaded {len(self._dataset)} tasks from HuggingFace")
|
||||
|
||||
self._split_dataset()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR loading dataset: {e}")
|
||||
raise
|
||||
|
||||
def _split_dataset(self):
|
||||
"""Split dataset into train and eval sets based on eval_split_ratio."""
|
||||
if self._dataset is None or len(self._dataset) == 0:
|
||||
raise RuntimeError("Cannot split empty dataset")
|
||||
|
||||
total_size = len(self._dataset)
|
||||
eval_size = int(total_size * self.config.eval_split_ratio)
|
||||
train_size = total_size - eval_size
|
||||
|
||||
all_indices = list(range(total_size))
|
||||
random.shuffle(all_indices)
|
||||
|
||||
train_indices = all_indices[:train_size]
|
||||
eval_indices = all_indices[train_size:]
|
||||
|
||||
if isinstance(self._dataset, list):
|
||||
self._train_dataset = [self._dataset[i] for i in train_indices]
|
||||
self._eval_dataset = [self._dataset[i] for i in eval_indices]
|
||||
else:
|
||||
self._train_dataset = self._dataset.select(train_indices)
|
||||
self._eval_dataset = self._dataset.select(eval_indices)
|
||||
|
||||
self._dataset_indices = list(range(len(self._train_dataset)))
|
||||
random.shuffle(self._dataset_indices)
|
||||
self._current_index = 0
|
||||
|
||||
logger.info(
|
||||
f"Split dataset: {len(self._train_dataset)} train, "
|
||||
f"{len(self._eval_dataset)} eval "
|
||||
f"(ratio={self.config.eval_split_ratio:.1%})"
|
||||
)
|
||||
|
||||
async def get_next_item(self) -> Item:
|
||||
"""Sample next task from training dataset."""
|
||||
if self._train_dataset is None:
|
||||
raise RuntimeError("Dataset not loaded. Call setup() first.")
|
||||
|
||||
# Get next task (with wraparound)
|
||||
idx = self._dataset_indices[self._current_index]
|
||||
task = self._train_dataset[idx]
|
||||
|
||||
# Advance to next task
|
||||
self._current_index += 1
|
||||
if self._current_index >= len(self._dataset_indices):
|
||||
# Reshuffle for next epoch
|
||||
random.shuffle(self._dataset_indices)
|
||||
self._current_index = 0
|
||||
logger.info("Reshuffled dataset (completed one epoch)")
|
||||
|
||||
# Extract task directory path
|
||||
task_dir = task.get("extra_info", {}).get("task_dir")
|
||||
if not task_dir:
|
||||
task_dir = task.get("reward_spec", {}).get("ground_truth")
|
||||
|
||||
# Resolve task directory path
|
||||
if task_dir:
|
||||
task_dir_path = Path(task_dir)
|
||||
# If tasks_base_dir is configured and path doesn't exist, reconstruct it
|
||||
if self.config.tasks_base_dir and not task_dir_path.exists():
|
||||
original_path = Path(task_dir)
|
||||
task_name = original_path.name
|
||||
task_dir_path = Path(os.path.expanduser(self.config.tasks_base_dir)) / task_name
|
||||
else:
|
||||
logger.error("No task directory path found in dataset item")
|
||||
return await self.get_next_item()
|
||||
|
||||
# Verify directory exists
|
||||
if not task_dir_path.exists():
|
||||
logger.warning(f"Task dir not found: {task_dir_path}")
|
||||
logger.warning("Hint: Set tasks_base_dir to directory containing task_* folders")
|
||||
return await self.get_next_item() # Try next task
|
||||
|
||||
# Look for test file in tests/ subdirectory first, then at root
|
||||
final_test = task_dir_path / "tests" / "test_final_state.py"
|
||||
if not final_test.exists():
|
||||
final_test = task_dir_path / "test_final_state.py"
|
||||
|
||||
# Verify test file exists
|
||||
if not final_test.exists():
|
||||
logger.warning(f"Missing test file in {task_dir_path} (checked tests/ and root)")
|
||||
return await self.get_next_item()
|
||||
|
||||
# Parse container.def to extract Docker image
|
||||
# Check environment/ subdirectory first, then root
|
||||
container_def = task_dir_path / "environment" / "container.def"
|
||||
if not container_def.exists():
|
||||
container_def = task_dir_path / "container.def"
|
||||
docker_image = self._parse_docker_image_from_def(container_def)
|
||||
|
||||
# Try to load description from instruction.md or task.json
|
||||
description = task.get("description", "")
|
||||
|
||||
# First try instruction.md
|
||||
instruction_md = task_dir_path / "instruction.md"
|
||||
if not description and instruction_md.exists():
|
||||
try:
|
||||
description = instruction_md.read_text().strip()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load instruction.md for {task_dir_path.name}: {e}")
|
||||
|
||||
# Fallback to task.json in environment/
|
||||
if not description:
|
||||
task_json = task_dir_path / "environment" / "task.json"
|
||||
if task_json.exists():
|
||||
try:
|
||||
import json
|
||||
task_data = json.loads(task_json.read_text())
|
||||
description = task_data.get("description", "") or task_data.get("instruction", "")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load task.json for {task_dir_path.name}: {e}")
|
||||
|
||||
if not description:
|
||||
description = f"Complete the task in {task_dir_path.name}"
|
||||
|
||||
return {
|
||||
"task_id": f"{task_dir_path.name}",
|
||||
"task_name": task_dir_path.name,
|
||||
"description": description,
|
||||
"task_dir": str(task_dir_path),
|
||||
"final_test": str(final_test),
|
||||
"docker_image": docker_image,
|
||||
"dataset_index": idx,
|
||||
}
|
||||
|
||||
def format_prompt(self, item: Item) -> str:
|
||||
"""Return the task description for the agent."""
|
||||
return str(item.get("description", ""))
|
||||
|
||||
def _parse_docker_image_from_def(self, container_def_path: Path) -> str:
|
||||
"""
|
||||
Parse container.def file to extract the Docker base image.
|
||||
|
||||
Apptainer definition files typically look like:
|
||||
Bootstrap: docker
|
||||
From: ubuntu:22.04
|
||||
|
||||
Returns the image from the "From:" line, or falls back to default.
|
||||
"""
|
||||
if not container_def_path.exists():
|
||||
logger.warning(f"container.def not found at {container_def_path}, using default image")
|
||||
return self.config.default_docker_image
|
||||
|
||||
try:
|
||||
content = container_def_path.read_text()
|
||||
# Look for "From: <image>" line (case-insensitive)
|
||||
match = re.search(r'^From:\s*(.+)$', content, re.MULTILINE | re.IGNORECASE)
|
||||
if match:
|
||||
image = match.group(1).strip()
|
||||
logger.info(f"Extracted Docker image from container.def: {image}")
|
||||
return image
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse {container_def_path}: {e}")
|
||||
|
||||
logger.warning(f"Could not extract image from {container_def_path}, using default")
|
||||
return self.config.default_docker_image
|
||||
|
||||
async def collect_trajectory(
|
||||
self, item: Item
|
||||
) -> Tuple[Optional[ScoredDataItem], List[Item]]:
|
||||
"""
|
||||
Override to register per-task Docker image before running the agent.
|
||||
|
||||
Follows Terminal Bench 2 pattern: register_task_env_overrides() tells
|
||||
the hermes-agent terminal backend to use a specific Docker image for
|
||||
this task_id.
|
||||
|
||||
This is a copy of HermesAgentBaseEnv.collect_trajectory with Docker
|
||||
image registration added after task_id generation.
|
||||
"""
|
||||
import uuid
|
||||
from environments.agent_loop import HermesAgentLoop
|
||||
|
||||
task_id = str(uuid.uuid4())
|
||||
task_name = item.get("task_name", "unknown")
|
||||
docker_image = item.get("docker_image", self.config.default_docker_image)
|
||||
|
||||
logger.debug(f"collect_trajectory START for {task_name}")
|
||||
|
||||
# Register Docker image override for this task_id
|
||||
logger.debug(f"Registering Docker image: {docker_image}")
|
||||
register_task_env_overrides(task_id, {"modal_image": docker_image})
|
||||
logger.info(
|
||||
f"Task {task_name}: registered Docker image {docker_image} for task_id {task_id[:8]}"
|
||||
)
|
||||
logger.debug("Docker image registered")
|
||||
|
||||
try:
|
||||
# Get group-level tools (resolved once in collect_trajectories)
|
||||
logger.debug("Resolving tools...")
|
||||
if self._current_group_tools is None:
|
||||
tools, valid_names = self._resolve_tools_for_group()
|
||||
else:
|
||||
tools, valid_names = self._current_group_tools
|
||||
logger.debug(f"Tools resolved: {len(tools)} tools")
|
||||
|
||||
# Build initial messages
|
||||
logger.debug("Building initial messages...")
|
||||
messages: List[Dict[str, Any]] = []
|
||||
if self.config.system_prompt:
|
||||
messages.append({"role": "system", "content": self.config.system_prompt})
|
||||
messages.append({"role": "user", "content": self.format_prompt(item)})
|
||||
logger.debug("Messages built, starting agent loop...")
|
||||
|
||||
# Run the agent loop
|
||||
result: AgentResult
|
||||
managed_state: Optional[Dict[str, Any]] = None
|
||||
|
||||
if self._use_managed_server():
|
||||
# Phase 2: ManagedServer with parser
|
||||
from environments.tool_call_parsers import get_parser
|
||||
try:
|
||||
tc_parser = get_parser(self.config.tool_call_parser)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
"Tool call parser '%s' not found, falling back to 'hermes'",
|
||||
self.config.tool_call_parser,
|
||||
)
|
||||
tc_parser = get_parser("hermes")
|
||||
|
||||
try:
|
||||
async with self.server.managed_server(
|
||||
tokenizer=self.tokenizer,
|
||||
tool_call_parser=tc_parser,
|
||||
) as managed:
|
||||
agent = HermesAgentLoop(
|
||||
server=managed,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Get state directly from managed server while still in context
|
||||
managed_state = managed.get_state()
|
||||
except NotImplementedError:
|
||||
# DummyManagedServer not allowed
|
||||
logger.warning("ManagedServer not available. Falling back to direct server mode.")
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
else:
|
||||
# Phase 1: OpenAI server
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Skip reward computation if agent produced no output
|
||||
only_system_and_user = all(
|
||||
msg.get("role") in ("system", "user") for msg in result.messages
|
||||
)
|
||||
if result.turns_used == 0 or only_system_and_user:
|
||||
logger.warning(
|
||||
"Agent loop produced no output (turns=%d). Skipping trajectory.",
|
||||
result.turns_used,
|
||||
)
|
||||
# Return None to skip this trajectory (likely an API failure)
|
||||
return None, []
|
||||
else:
|
||||
# Compute reward using ToolContext
|
||||
ctx = ToolContext(task_id)
|
||||
try:
|
||||
reward = await self.compute_reward(item, result, ctx)
|
||||
except Exception as e:
|
||||
logger.error("compute_reward failed: %s", e)
|
||||
reward = 0.0
|
||||
finally:
|
||||
ctx.cleanup()
|
||||
|
||||
# Track metrics for wandb logging
|
||||
task_metrics = {
|
||||
"test_passed": 1.0 if reward > 0.5 else 0.0,
|
||||
"reward": reward,
|
||||
"turns_used": result.turns_used,
|
||||
"finished_naturally": result.finished_naturally,
|
||||
"docker_image": docker_image,
|
||||
"num_tool_errors": len(result.tool_errors),
|
||||
}
|
||||
|
||||
# Include detailed tool errors if any occurred
|
||||
if result.tool_errors:
|
||||
task_metrics["tool_errors"] = [
|
||||
{
|
||||
"turn": err.turn,
|
||||
"tool": err.tool_name,
|
||||
"error": err.error[:200],
|
||||
}
|
||||
for err in result.tool_errors
|
||||
]
|
||||
|
||||
self._metrics_buffer.append(task_metrics)
|
||||
|
||||
# ============================================================================
|
||||
# Build ScoredDataGroup from ManagedServer state
|
||||
# ============================================================================
|
||||
# Phase 2: Extract pre-computed data from SequenceNodes
|
||||
# We may have multiple trajectories in the nodes due to how interesting
|
||||
# agents can be, so iterate through all nodes and return multiple sequences.
|
||||
#
|
||||
# Each SequenceNode contains:
|
||||
# - tokens: Full unmasked token sequence [1, 2, 3, ..., N]
|
||||
# - masked_tokens: Training format [-100, -100, ..., -100, actual, actual, ...]
|
||||
# - logprobs: Training format [1.0, 1.0, ..., 1.0, -0.5, -0.3, ...]
|
||||
# - full_text: Complete text (prompt + all completions)
|
||||
#
|
||||
# Phase 1: Create placeholder tokens for OpenAI-style servers
|
||||
# ============================================================================
|
||||
nodes = (managed_state or {}).get("nodes", []) if managed_state else []
|
||||
|
||||
# Create ScoredDataGroup with lists for multiple trajectories
|
||||
scored_group = ScoredDataGroup()
|
||||
scored_group["tokens"] = []
|
||||
scored_group["masks"] = []
|
||||
scored_group["scores"] = []
|
||||
scored_group["messages"] = []
|
||||
scored_group["inference_logprobs"] = []
|
||||
|
||||
if nodes:
|
||||
# Phase 2: iterate through all nodes (may have multiple trajectories)
|
||||
for i, node in enumerate(nodes):
|
||||
scored_group["tokens"].append(node.tokens)
|
||||
scored_group["masks"].append(node.masked_tokens)
|
||||
scored_group["scores"].append(reward)
|
||||
scored_group["messages"].append(result.messages)
|
||||
|
||||
if hasattr(node, "logprobs") and node.logprobs:
|
||||
scored_group["inference_logprobs"].append(node.logprobs)
|
||||
else:
|
||||
# Placeholder logprobs if not available
|
||||
scored_group["inference_logprobs"].append([1.0] * len(node.tokens))
|
||||
|
||||
logger.debug(f"Added trajectory {i+1}/{len(nodes)} with {len(node.tokens)} tokens")
|
||||
|
||||
else:
|
||||
# Phase 1: create placeholder tokens for OpenAI-style servers
|
||||
full_text = "\n".join(
|
||||
msg.get("content", "") for msg in result.messages if msg.get("content")
|
||||
)
|
||||
if self.tokenizer:
|
||||
tokens = self.tokenizer.encode(full_text, add_special_tokens=True)
|
||||
else:
|
||||
tokens = list(range(min(len(full_text) // 4, 128)))
|
||||
|
||||
scored_group["tokens"].append(tokens)
|
||||
scored_group["masks"].append([-100] + tokens[1:])
|
||||
scored_group["scores"].append(reward)
|
||||
scored_group["messages"].append(result.messages)
|
||||
scored_group["inference_logprobs"].append([1.0] * len(tokens))
|
||||
|
||||
# Return None if no trajectories collected
|
||||
if len(scored_group["tokens"]) == 0:
|
||||
return None, []
|
||||
|
||||
logger.debug(f"Returning ScoredDataGroup with {len(scored_group['tokens'])} trajectories")
|
||||
return scored_group, []
|
||||
|
||||
finally:
|
||||
# Clean up task overrides and sandbox
|
||||
clear_task_env_overrides(task_id)
|
||||
try:
|
||||
cleanup_vm(task_id)
|
||||
except Exception as e:
|
||||
logger.debug(f"VM cleanup for {task_id[:8]}: {e}")
|
||||
|
||||
async def compute_reward(
|
||||
self,
|
||||
item: Item,
|
||||
result: AgentResult,
|
||||
ctx: ToolContext
|
||||
) -> float:
|
||||
"""
|
||||
Run final tests in the agent's sandbox and return binary reward.
|
||||
|
||||
Uses ToolContext to execute pytest in the SAME sandbox the agent used,
|
||||
following the Terminal Bench 2 verification pattern. No separate
|
||||
Apptainer execution needed.
|
||||
|
||||
Returns 1.0 if tests pass, 0.0 otherwise.
|
||||
"""
|
||||
task_name = item.get("task_name", "unknown")
|
||||
final_test_path = Path(item.get("final_test", ""))
|
||||
|
||||
if not final_test_path.exists():
|
||||
logger.error(f"Task {task_name}: test file not found at {final_test_path}")
|
||||
return 0.0
|
||||
|
||||
logger.info(f"Task {task_name}: running tests in sandbox...")
|
||||
|
||||
try:
|
||||
# Run tests in a thread to avoid blocking the event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
reward = await loop.run_in_executor(
|
||||
None,
|
||||
self._run_tests_in_sandbox,
|
||||
final_test_path,
|
||||
ctx,
|
||||
task_name,
|
||||
)
|
||||
|
||||
status = "PASS" if reward == 1.0 else "FAIL"
|
||||
logger.info(f"Task {task_name}: {status} (reward={reward})")
|
||||
return reward
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_name}: test execution failed: {e}", exc_info=True)
|
||||
return 0.0
|
||||
|
||||
def _run_tests_in_sandbox(
|
||||
self,
|
||||
test_file_path: Path,
|
||||
ctx: ToolContext,
|
||||
task_name: str,
|
||||
) -> float:
|
||||
"""
|
||||
Upload test file to sandbox and execute pytest.
|
||||
|
||||
Runs in thread pool (via run_in_executor) to avoid blocking the event loop
|
||||
with synchronous ToolContext calls.
|
||||
|
||||
Args:
|
||||
test_file_path: Local path to test_final_state.py
|
||||
ctx: ToolContext scoped to the agent's sandbox
|
||||
task_name: For logging
|
||||
|
||||
Returns:
|
||||
1.0 if tests pass, 0.0 otherwise
|
||||
"""
|
||||
try:
|
||||
# Upload test file to sandbox
|
||||
test_content = test_file_path.read_text()
|
||||
ctx.write_file("/workspace/test_final_state.py", test_content)
|
||||
logger.debug(f"Task {task_name}: uploaded test file to /workspace/test_final_state.py")
|
||||
|
||||
# Run pytest in the sandbox
|
||||
result = ctx.terminal(
|
||||
"cd /workspace && python -m pytest -q test_final_state.py",
|
||||
timeout=self.config.test_timeout_s,
|
||||
)
|
||||
|
||||
exit_code = result.get("exit_code", -1)
|
||||
output = result.get("output", "")
|
||||
|
||||
if exit_code == 0:
|
||||
logger.debug(f"Task {task_name}: tests passed")
|
||||
return 1.0
|
||||
else:
|
||||
# Log failure output (last 500 chars for debugging)
|
||||
output_preview = output[-500:] if output else "(no output)"
|
||||
logger.info(
|
||||
f"Task {task_name}: tests failed (exit_code={exit_code})\n{output_preview}"
|
||||
)
|
||||
return 0.0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_name}: error running tests: {e}")
|
||||
return 0.0
|
||||
|
||||
async def evaluate(self):
|
||||
"""
|
||||
Periodic evaluation on holdout eval set.
|
||||
|
||||
Runs the agent on num_eval_tasks from the held-out eval set
|
||||
(never seen during training). Returns metrics for wandb logging.
|
||||
"""
|
||||
if self._eval_dataset is None:
|
||||
logger.warning("Cannot evaluate: eval dataset not loaded")
|
||||
return {}
|
||||
|
||||
if len(self._eval_dataset) == 0:
|
||||
logger.warning("Eval dataset is empty")
|
||||
return {}
|
||||
|
||||
# Use min of num_eval_tasks and actual eval set size
|
||||
num_tasks = min(self.config.num_eval_tasks, len(self._eval_dataset))
|
||||
logger.info(f"Starting evaluation on {num_tasks} held-out tasks...")
|
||||
|
||||
eval_metrics = {
|
||||
"rewards": [],
|
||||
"passes": [],
|
||||
"turns": [],
|
||||
"natural_finishes": [],
|
||||
}
|
||||
|
||||
# Sample from eval set (holdout)
|
||||
import random
|
||||
eval_indices = random.sample(range(len(self._eval_dataset)), num_tasks)
|
||||
|
||||
for idx in eval_indices:
|
||||
task = self._eval_dataset[idx]
|
||||
|
||||
# Build item using same logic as get_next_item
|
||||
task_dir = task.get("extra_info", {}).get("task_dir")
|
||||
if not task_dir:
|
||||
task_dir = task.get("reward_spec", {}).get("ground_truth")
|
||||
|
||||
if not task_dir:
|
||||
continue
|
||||
|
||||
task_dir_path = Path(task_dir)
|
||||
if self.config.tasks_base_dir and not task_dir_path.exists():
|
||||
original_path = Path(task_dir)
|
||||
task_name = original_path.name
|
||||
task_dir_path = Path(os.path.expanduser(self.config.tasks_base_dir)) / task_name
|
||||
|
||||
if not task_dir_path.exists():
|
||||
continue
|
||||
|
||||
# Find test file
|
||||
final_test = task_dir_path / "tests" / "test_final_state.py"
|
||||
if not final_test.exists():
|
||||
final_test = task_dir_path / "test_final_state.py"
|
||||
if not final_test.exists():
|
||||
continue
|
||||
|
||||
# Parse Docker image
|
||||
container_def = task_dir_path / "environment" / "container.def"
|
||||
if not container_def.exists():
|
||||
container_def = task_dir_path / "container.def"
|
||||
docker_image = self._parse_docker_image_from_def(container_def)
|
||||
|
||||
# Load description
|
||||
description = task.get("description", "")
|
||||
instruction_md = task_dir_path / "instruction.md"
|
||||
if not description and instruction_md.exists():
|
||||
try:
|
||||
description = instruction_md.read_text().strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
item = {
|
||||
"description": description,
|
||||
"final_test": str(final_test),
|
||||
"docker_image": docker_image,
|
||||
}
|
||||
|
||||
# Run agent on this task
|
||||
try:
|
||||
import uuid
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
# Register task environment
|
||||
from model_tools import register_task_env_overrides
|
||||
register_task_env_overrides(task_id, {"modal_image": docker_image})
|
||||
|
||||
# Build messages
|
||||
messages = [
|
||||
{"role": "system", "content": self.config.system_prompt},
|
||||
{"role": "user", "content": description or "Complete the task."},
|
||||
]
|
||||
|
||||
# Get tools
|
||||
from model_tools import get_tool_definitions
|
||||
tools = get_tool_definitions(self.config.enabled_toolsets)
|
||||
valid_names = {t["function"]["name"] for t in tools}
|
||||
|
||||
# Run agent
|
||||
from environments.agent_loop import HermesAgentLoop
|
||||
agent = HermesAgentLoop(
|
||||
server=self.server,
|
||||
tool_schemas=tools,
|
||||
valid_tool_names=valid_names,
|
||||
max_turns=self.config.max_agent_turns,
|
||||
task_id=task_id,
|
||||
temperature=self.config.agent_temperature,
|
||||
max_tokens=self.config.max_token_length,
|
||||
extra_body=self.config.extra_body,
|
||||
)
|
||||
result = await agent.run(messages)
|
||||
|
||||
# Compute reward
|
||||
from environments.tool_context import ToolContext
|
||||
ctx = ToolContext(task_id)
|
||||
try:
|
||||
reward = await self.compute_reward(item, result, ctx)
|
||||
except Exception as e:
|
||||
logger.warning(f"Eval reward computation failed: {e}")
|
||||
reward = 0.0
|
||||
finally:
|
||||
ctx.cleanup()
|
||||
|
||||
# Track metrics
|
||||
eval_metrics["rewards"].append(reward)
|
||||
eval_metrics["passes"].append(1.0 if reward > 0.5 else 0.0)
|
||||
eval_metrics["turns"].append(result.turns_used)
|
||||
eval_metrics["natural_finishes"].append(1.0 if result.finished_naturally else 0.0)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Eval task failed: {e}")
|
||||
continue
|
||||
finally:
|
||||
# Cleanup
|
||||
from model_tools import clear_task_env_overrides, cleanup_vm
|
||||
clear_task_env_overrides(task_id)
|
||||
cleanup_vm(task_id)
|
||||
|
||||
# Aggregate metrics
|
||||
if not eval_metrics["rewards"]:
|
||||
logger.warning("No eval tasks completed successfully")
|
||||
return {}
|
||||
|
||||
aggregated = {
|
||||
"eval/pass_rate": sum(eval_metrics["passes"]) / len(eval_metrics["passes"]),
|
||||
"eval/avg_reward": sum(eval_metrics["rewards"]) / len(eval_metrics["rewards"]),
|
||||
"eval/avg_turns": sum(eval_metrics["turns"]) / len(eval_metrics["turns"]),
|
||||
"eval/natural_finish_rate": sum(eval_metrics["natural_finishes"]) / len(eval_metrics["natural_finishes"]),
|
||||
"eval/num_tasks": len(eval_metrics["rewards"]),
|
||||
}
|
||||
|
||||
logger.info(f"Evaluation complete: pass_rate={aggregated['eval/pass_rate']:.2%}, avg_turns={aggregated['eval/avg_turns']:.1f}")
|
||||
return aggregated
|
||||
|
||||
async def wandb_log(self, wandb_metrics: Optional[Dict] = None):
|
||||
"""Log Endless Terminals specific metrics to wandb."""
|
||||
if wandb_metrics is None:
|
||||
wandb_metrics = {}
|
||||
|
||||
# Aggregate metrics from buffer
|
||||
if self._metrics_buffer:
|
||||
# Test pass rate
|
||||
test_passes = [m["test_passed"] for m in self._metrics_buffer]
|
||||
wandb_metrics["endless_terminals/test_pass_rate"] = sum(test_passes) / len(test_passes)
|
||||
wandb_metrics["endless_terminals/num_tests_passed"] = sum(test_passes)
|
||||
wandb_metrics["endless_terminals/num_tests_total"] = len(test_passes)
|
||||
|
||||
# Turns used statistics
|
||||
turns = [m["turns_used"] for m in self._metrics_buffer]
|
||||
wandb_metrics["endless_terminals/avg_turns_used"] = sum(turns) / len(turns)
|
||||
wandb_metrics["endless_terminals/max_turns_used"] = max(turns)
|
||||
wandb_metrics["endless_terminals/min_turns_used"] = min(turns)
|
||||
|
||||
# Natural finish rate (did model stop on its own vs hitting max turns)
|
||||
natural_finishes = [1.0 if m["finished_naturally"] else 0.0 for m in self._metrics_buffer]
|
||||
wandb_metrics["endless_terminals/natural_finish_rate"] = sum(natural_finishes) / len(natural_finishes)
|
||||
|
||||
# Tool error statistics
|
||||
total_tool_errors = sum(m["num_tool_errors"] for m in self._metrics_buffer)
|
||||
wandb_metrics["endless_terminals/total_tool_errors"] = total_tool_errors
|
||||
wandb_metrics["endless_terminals/avg_tool_errors_per_task"] = total_tool_errors / len(self._metrics_buffer)
|
||||
|
||||
# Docker image distribution (count unique images used)
|
||||
docker_images = [m["docker_image"] for m in self._metrics_buffer]
|
||||
unique_images = set(docker_images)
|
||||
wandb_metrics["endless_terminals/num_unique_docker_images"] = len(unique_images)
|
||||
|
||||
# Log most common errors if any
|
||||
all_errors = []
|
||||
for m in self._metrics_buffer:
|
||||
if "tool_errors" in m:
|
||||
all_errors.extend(m["tool_errors"])
|
||||
|
||||
if all_errors:
|
||||
# Count error types
|
||||
error_tools = {}
|
||||
for err in all_errors:
|
||||
tool = err["tool"]
|
||||
error_tools[tool] = error_tools.get(tool, 0) + 1
|
||||
|
||||
# Log top 3 error-prone tools
|
||||
for i, (tool, count) in enumerate(sorted(error_tools.items(), key=lambda x: x[1], reverse=True)[:3]):
|
||||
wandb_metrics[f"endless_terminals/errors_by_tool/{tool}"] = count
|
||||
|
||||
# Clear buffer after logging
|
||||
self._metrics_buffer = []
|
||||
|
||||
await super().wandb_log(wandb_metrics)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
EndlessTerminalsEnv.cli()
|
||||
Reference in New Issue
Block a user