diff --git a/atropos/api/tool_executor_server.py b/atropos/api/tool_executor_server.py index 95deed9331..21a98c1b16 100644 --- a/atropos/api/tool_executor_server.py +++ b/atropos/api/tool_executor_server.py @@ -19,11 +19,7 @@ from fastapi import FastAPI, Header, HTTPException, status from pydantic import BaseModel, Field from ..slots import SlotPool, SlotPoolConfig -from ..tools import BashTool, ImageGenerateTool, ReadFileTool, ToolRegistry, WriteFileTool -from ..tools.mixture_of_agents_tool import MixtureOfAgentsTool -from ..tools.terminal_tool import TerminalTool -from ..tools.vision_tools import VisionAnalyzeTool -from ..tools.web_tools import WebCrawlTool, WebExtractTool, WebSearchTool +from ..tools import ToolRegistry, build_tool_registry from ..tools.base import ( ArtifactArchiveRequestPayload, ArtifactArchiveResponsePayload, @@ -120,17 +116,12 @@ def _check_auth(cfg: ToolExecutorServerConfig, authorization: Optional[str]) -> async def _startup() -> None: cfg = ToolExecutorServerConfig.from_env() - tools = ToolRegistry() - tools.register(BashTool()) - tools.register(TerminalTool()) - tools.register(ReadFileTool()) - tools.register(WriteFileTool()) - tools.register(ImageGenerateTool()) - tools.register(WebSearchTool()) - tools.register(WebExtractTool()) - tools.register(WebCrawlTool()) - tools.register(VisionAnalyzeTool()) - tools.register(MixtureOfAgentsTool()) + # Default to Atropos "full" tool surface: sandbox + external (if tool_server_url provided). + tools: ToolRegistry = build_tool_registry( + enabled_toolsets=["full"], + disabled_toolsets=None, + tool_server_url=cfg.tool_server_url, + ) pool = SlotPool( SlotPoolConfig( diff --git a/atropos/api/tool_server.py b/atropos/api/tool_server.py index 9357886b8e..053eee6df5 100644 --- a/atropos/api/tool_server.py +++ b/atropos/api/tool_server.py @@ -19,12 +19,8 @@ from pathlib import Path from fastapi import FastAPI, Header, HTTPException, status from pydantic import BaseModel, Field -from ..tools import ToolRegistry +from ..tools import ToolRegistry, build_tool_registry from ..tools.base import ToolResultPayload, ToolServerExecuteRequest -from ..tools.image_generation_tool import ImageGenerateTool -from ..tools.mixture_of_agents_tool import MixtureOfAgentsTool -from ..tools.vision_tools import VisionAnalyzeTool -from ..tools.web_tools import WebCrawlTool, WebExtractTool, WebSearchTool class ToolServerConfig(BaseModel): @@ -63,21 +59,13 @@ async def root() -> Dict[str, str]: async def _startup() -> None: cfg = ToolServerConfig.from_env() - tools = ToolRegistry() - for tool in [ - ImageGenerateTool(), - WebSearchTool(), - WebExtractTool(), - WebCrawlTool(), - VisionAnalyzeTool(), - MixtureOfAgentsTool(), - ]: - ok, reason = tool.is_available() - if ok: - tools.register(tool) - else: - # Keep startup resilient when optional deps/keys are missing. - print(f"[ToolServer] Skipping tool '{tool.name}': {reason}") + # External-only registry. It will only include tools that are enabled by toolsets and + # whose Hermes requirements/keys are satisfied in this process. + tools: ToolRegistry = build_tool_registry( + enabled_toolsets=["all"], + disabled_toolsets=["terminal", "sandbox", "filesystem", "terminal_stateful", "default"], + tool_server_url="enabled", + ) app.state.cfg = cfg app.state.tools = tools @@ -129,13 +117,16 @@ async def execute_tool( async with sem: try: kwargs = dict(req.tool.arguments) - # Some external tools need access to the trajectory/workspace context (e.g. fetching sandbox artifacts). - if req.trajectory_id and "trajectory_id" in inspect.signature(tool.execute).parameters: + sig = inspect.signature(tool.execute).parameters + # Some tools can benefit from extra context. + if req.trajectory_id and "trajectory_id" in sig: kwargs["trajectory_id"] = req.trajectory_id - if req.slot_id and "slot_id" in inspect.signature(tool.execute).parameters: + if req.slot_id and "slot_id" in sig: kwargs["slot_id"] = req.slot_id - if req.container_addr and "container_addr" in inspect.signature(tool.execute).parameters: + if req.container_addr and "container_addr" in sig: kwargs["container_addr"] = req.container_addr + if "task_id" in sig: + kwargs["task_id"] = req.trajectory_id result = await tool.execute(**kwargs) except Exception as e: return ToolResultPayload( diff --git a/atropos/envs/agent_env.py b/atropos/envs/agent_env.py index 1c4bbcfd7f..0e88a22237 100644 --- a/atropos/envs/agent_env.py +++ b/atropos/envs/agent_env.py @@ -20,15 +20,7 @@ from atroposlib.envs.base import APIServerConfig, BaseEnv, BaseEnvConfig, Item, from ..agent import AgentConfig, AtroposAgent from ..slots import SlotPool, SlotPoolConfig -from ..tools import BashTool, ReadFileTool, ToolRegistry, WriteFileTool -from ..tools.image_generation_tool import ImageGenerateTool -from ..tools.mixture_of_agents_tool import MixtureOfAgentsTool -from ..tools.terminal_tool import TerminalTool -from ..tools.terminal_stateful_tool import TerminalStatefulTool -from ..tools.tmux_tool import TmuxTool -from ..tools.toolsets import resolve_multiple_toolsets -from ..tools.vision_tools import VisionAnalyzeTool -from ..tools.web_tools import WebCrawlTool, WebExtractTool, WebSearchTool +from ..tools import ToolRegistry, build_tool_registry from ..tools.tool_executor import ToolExecutor, ToolExecutorConfig @@ -110,42 +102,11 @@ class AgentEnv(BaseEnv, ABC, Generic[AgentEnvConfigT]): self._tool_server_inprocess: bool = False def build_tools(self) -> ToolRegistry: - available_tools = [ - BashTool(), - TerminalTool(), - TerminalStatefulTool(), - TmuxTool(), - ReadFileTool(), - WriteFileTool(), - ImageGenerateTool(), - WebSearchTool(), - WebExtractTool(), - WebCrawlTool(), - VisionAnalyzeTool(), - MixtureOfAgentsTool(), - ] - - tool_by_name = {t.name: t for t in available_tools} - - enabled_toolsets = self.config.enabled_toolsets or ["default"] - selected = set(resolve_multiple_toolsets(enabled_toolsets)) - if self.config.disabled_toolsets: - selected -= set(resolve_multiple_toolsets(self.config.disabled_toolsets)) - - tools = ToolRegistry() - for name in sorted(selected): - tool = tool_by_name.get(name) - if tool is None: - continue - # External tools require a ToolServer URL; avoid advertising broken tools. - if tool.schema.external and not self.config.tool_server_url: - continue - ok, _reason = tool.is_available() - if not ok: - continue - tools.register(tool) - - return tools + return build_tool_registry( + enabled_toolsets=self.config.enabled_toolsets or ["default"], + disabled_toolsets=self.config.disabled_toolsets or None, + tool_server_url=self.config.tool_server_url, + ) @abstractmethod def build_task(self, item: Item) -> str: diff --git a/atropos/tools/__init__.py b/atropos/tools/__init__.py index 1ba666159c..4e6f1101bc 100644 --- a/atropos/tools/__init__.py +++ b/atropos/tools/__init__.py @@ -5,14 +5,10 @@ Provides base Tool class and common tool implementations. """ from .base import Tool, ToolCall, ToolRegistry, ToolResult, ToolSchema -from .basic_tools import BashTool, ReadFileTool, WriteFileTool -from .image_generation_tool import ImageGenerateTool -from .mixture_of_agents_tool import MixtureOfAgentsTool -from .terminal_tool import TerminalTool +from .build_registry import build_tool_registry +from .sandbox_stubs import BashTool, ReadFileTool, TerminalTool, WriteFileTool from .terminal_stateful_tool import TerminalStatefulTool from .tmux_tool import TmuxTool -from .vision_tools import VisionAnalyzeTool -from .web_tools import WebCrawlTool, WebExtractTool, WebSearchTool __all__ = [ "Tool", @@ -23,13 +19,8 @@ __all__ = [ "BashTool", "ReadFileTool", "WriteFileTool", - "ImageGenerateTool", "TerminalTool", "TerminalStatefulTool", "TmuxTool", - "WebSearchTool", - "WebExtractTool", - "WebCrawlTool", - "VisionAnalyzeTool", - "MixtureOfAgentsTool", + "build_tool_registry", ] diff --git a/atropos/tools/basic_tools.py b/atropos/tools/basic_tools.py deleted file mode 100644 index 0efd625efa..0000000000 --- a/atropos/tools/basic_tools.py +++ /dev/null @@ -1,283 +0,0 @@ -""" -Basic tool implementations for atropos-agent. - -These tools provide simple sandbox operations: -- BashTool: Execute shell commands -- ReadFileTool: Read file contents -- WriteFileTool: Write content to files - -For PoC, these run via subprocess in the local environment. -Production usage should use proper sandbox isolation. -""" - -import asyncio -import os -from pathlib import Path -from typing import Optional - -from .base import Tool, ToolResult, ToolSchema - - -class BashTool(Tool): - """ - Execute bash commands in a sandboxed environment. - - TODO: Nomad slot execution - """ - - def __init__( - self, - working_dir: Optional[str] = None, - timeout: float = 30.0, - max_output_size: int = 10000, - ): - self.working_dir = working_dir or os.getcwd() - self.timeout = timeout - self.max_output_size = max_output_size - - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="bash", - description="Execute a bash command and return stdout/stderr. Use for running shell commands, scripts, or system operations.", - parameters={ - "command": { - "type": "string", - "description": "The bash command to execute", - }, - }, - required=["command"], - ) - - async def execute(self, command: str) -> ToolResult: - """Execute a bash command.""" - try: - process = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=self.working_dir, - ) - - try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout=self.timeout, - ) - except asyncio.TimeoutError: - process.kill() - await process.wait() - return ToolResult( - success=False, - error=f"Command timed out after {self.timeout}s", - metadata={"exit_code": -1, "timeout": True}, - ) - - stdout_str = stdout.decode("utf-8", errors="replace") - stderr_str = stderr.decode("utf-8", errors="replace") - - # Truncate if too long - if len(stdout_str) > self.max_output_size: - stdout_str = stdout_str[:self.max_output_size] + "\n... (output truncated)" - if len(stderr_str) > self.max_output_size: - stderr_str = stderr_str[:self.max_output_size] + "\n... (output truncated)" - - exit_code = process.returncode - success = exit_code == 0 - - output = stdout_str - if stderr_str: - output = f"{stdout_str}\n[stderr]\n{stderr_str}" if stdout_str else stderr_str - - return ToolResult( - success=success, - output=output.strip(), - error="" if success else f"Exit code: {exit_code}", - metadata={"exit_code": exit_code}, - ) - - except Exception as e: - return ToolResult( - success=False, - error=f"Failed to execute command: {str(e)}", - ) - - -class ReadFileTool(Tool): - """Read the contents of a file.""" - - def __init__( - self, - working_dir: Optional[str] = None, - max_file_size: int = 100000, - ): - self.working_dir = Path(working_dir) if working_dir else Path.cwd() - self.max_file_size = max_file_size - - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="read_file", - description="Read the contents of a file at the given path.", - parameters={ - "path": { - "type": "string", - "description": "Path to the file (relative to working directory)", - }, - }, - required=["path"], - ) - - async def execute(self, path: str) -> ToolResult: - """Read a file's contents.""" - try: - file_path = self.working_dir / path - - # Security: prevent path traversal outside working dir - resolved = file_path.resolve() - if not str(resolved).startswith(str(self.working_dir.resolve())): - return ToolResult( - success=False, - error="Access denied: path outside working directory", - ) - - if not resolved.exists(): - return ToolResult( - success=False, - error=f"File not found: {path}", - ) - - if not resolved.is_file(): - return ToolResult( - success=False, - error=f"Not a file: {path}", - ) - - size = resolved.stat().st_size - if size > self.max_file_size: - return ToolResult( - success=False, - error=f"File too large: {size} bytes (max {self.max_file_size})", - ) - - content = resolved.read_text(encoding="utf-8", errors="replace") - - return ToolResult( - success=True, - output=content, - metadata={"path": str(resolved), "size": size}, - ) - - except Exception as e: - return ToolResult( - success=False, - error=f"Failed to read file: {str(e)}", - ) - - -class WriteFileTool(Tool): - """Write content to a file.""" - - def __init__( - self, - working_dir: Optional[str] = None, - max_file_size: int = 100000, - ): - self.working_dir = Path(working_dir) if working_dir else Path.cwd() - self.max_file_size = max_file_size - - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="write_file", - description="Write content to a file at the given path. Creates parent directories if needed.", - parameters={ - "path": { - "type": "string", - "description": "Path to the file (relative to working directory)", - }, - "content": { - "type": "string", - "description": "Content to write to the file", - }, - }, - required=["path", "content"], - ) - - async def execute(self, path: str, content: str) -> ToolResult: - """Write content to a file.""" - try: - if len(content) > self.max_file_size: - return ToolResult( - success=False, - error=f"Content too large: {len(content)} bytes (max {self.max_file_size})", - ) - - file_path = self.working_dir / path - - # Security: prevent path traversal outside working dir - resolved = file_path.resolve() - if not str(resolved).startswith(str(self.working_dir.resolve())): - return ToolResult( - success=False, - error="Access denied: path outside working directory", - ) - - # Create parent directories - resolved.parent.mkdir(parents=True, exist_ok=True) - - resolved.write_text(content, encoding="utf-8") - - return ToolResult( - success=True, - output=f"Successfully wrote {len(content)} bytes to {path}", - metadata={"path": str(resolved), "size": len(content)}, - ) - - except Exception as e: - return ToolResult( - success=False, - error=f"Failed to write file: {str(e)}", - ) - -class FireCrawl(Tool): - """Perform a web crawl using FireCrawl tool.""" - - def __init__( - self, - working_dir: Optional[str] = None, - timeout: float = 60.0, - ): - self.working_dir = working_dir or os.getcwd() - self.timeout = timeout - - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="firecrawl", - description="Perform a web crawl starting from a given URL using FireCrawl.", - parameters={ - "start_url": { - "type": "string", - "description": "The starting URL for the web crawl", - }, - "max_pages": { - "type": "integer", - "description": "Maximum number of pages to crawl", - }, - }, - required=["start_url"], - ) - - async def execute(self, start_url: str, max_pages: int = 100) -> ToolResult: - """Execute a web crawl using FireCrawl.""" - try: - command = f"firecrawl --start-url {start_url} --max-pages {max_pages}" - bash_tool = BashTool(working_dir=self.working_dir, timeout=self.timeout) - result = await bash_tool.execute(command) - return result - except Exception as e: - return ToolResult( - success=False, - error=f"Failed to execute FireCrawl: {str(e)}", - ) \ No newline at end of file diff --git a/atropos/tools/build_registry.py b/atropos/tools/build_registry.py new file mode 100644 index 0000000000..c26553be72 --- /dev/null +++ b/atropos/tools/build_registry.py @@ -0,0 +1,64 @@ +""" +Unified tool registry builder for Hermes-Agent Atropos integration. + +This composes: +- sandbox tool stubs (terminal/bash/read_file/write_file + stateful terminal/tmux) +- Hermes external tools (web/vision/image/moa/skills/browser), executed via ToolServer + +ToolExecutor only needs the schema + `external` routing bit; ToolServer executes +the external tools via Hermes' existing implementations. +""" + +from __future__ import annotations + +from typing import List, Optional + +from .base import ToolRegistry +from .hermes_external_tools import build_external_tools +from .sandbox_stubs import BashTool, ReadFileTool, TerminalTool, WriteFileTool +from .terminal_stateful_tool import TerminalStatefulTool +from .tmux_tool import TmuxTool +from .toolset_resolver import resolve_multiple_toolsets + + +def build_tool_registry( + *, + enabled_toolsets: Optional[List[str]] = None, + disabled_toolsets: Optional[List[str]] = None, + tool_server_url: Optional[str] = None, +) -> ToolRegistry: + """ + Build a ToolRegistry for AgentEnv / ToolExecutor / ToolServer. + + If `tool_server_url` is not provided, external tools will be omitted so we do + not advertise tools that cannot execute. + """ + enabled_toolsets = enabled_toolsets or ["default"] + + # Resolve tool names using Hermes toolsets plus Atropos additions. + selected = set(resolve_multiple_toolsets(enabled_toolsets)) + if disabled_toolsets: + selected -= set(resolve_multiple_toolsets(disabled_toolsets)) + + reg = ToolRegistry() + + # Always register sandbox tools if selected. + sandbox_by_name = { + "terminal": TerminalTool(), + "bash": BashTool(), + "read_file": ReadFileTool(), + "write_file": WriteFileTool(), + "terminal_stateful": TerminalStatefulTool(), + "tmux": TmuxTool(), + } + for name, tool in sandbox_by_name.items(): + if name in selected: + reg.register(tool) + + # External tools: only include when ToolServer is configured. + if tool_server_url: + for tool in build_external_tools(selected_tool_names=selected): + if tool.name in selected: + reg.register(tool) + + return reg diff --git a/atropos/tools/hermes_external_tools.py b/atropos/tools/hermes_external_tools.py new file mode 100644 index 0000000000..3172913863 --- /dev/null +++ b/atropos/tools/hermes_external_tools.py @@ -0,0 +1,90 @@ +""" +Hermes external tool adapter for Atropos ToolServer. + +These tools reuse Hermes-Agent's existing tool runner (`model_tools.handle_function_call`) +so we don't duplicate external tool implementations. + +Important: +- These are marked `external=True` and should be executed ONLY by ToolServer. +- We run `handle_function_call` in a worker thread because the Hermes implementation + uses `asyncio.run()` internally for some async tools (web_extract, vision, MoA, etc). +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any, Dict, List, Optional + +import model_tools + +from .base import Tool, ToolResult, ToolSchema + + +def _schema_from_openai_tool_dict(tool: Dict[str, Any], *, external: bool) -> ToolSchema: + fn = tool.get("function") or {} + name = str(fn.get("name") or "") + description = str(fn.get("description") or "") + params = fn.get("parameters") or {} + properties = params.get("properties") or {} + required = params.get("required") or [] + if not isinstance(required, list): + required = [] + return ToolSchema( + name=name, + description=description, + parameters=dict(properties), + required=[str(x) for x in required if isinstance(x, (str, int))], + external=external, + ) + + +class HermesExternalTool(Tool): + def __init__(self, schema: ToolSchema): + self._schema = schema + + @property + def schema(self) -> ToolSchema: + return self._schema + + async def execute(self, task_id: Optional[str] = None, **kwargs: Any) -> ToolResult: + # `model_tools.handle_function_call` returns a JSON string (success or error). + # Run in a thread because some Hermes tool handlers call `asyncio.run()`. + raw = await asyncio.to_thread(model_tools.handle_function_call, self.name, kwargs, task_id) + + try: + parsed = json.loads(raw) + except Exception: + # Keep as plain string. + return ToolResult(success=True, output=str(raw)) + + if isinstance(parsed, dict) and parsed.get("error"): + return ToolResult(success=False, error=str(parsed.get("error")), output="") + + return ToolResult(success=True, output=json.dumps(parsed, ensure_ascii=False)) + + +def build_external_tools( + *, + selected_tool_names: Optional[set[str]] = None, +) -> List[HermesExternalTool]: + """ + Build external tool wrappers from Hermes tool declarations. + + Filters out sandbox-oriented tools (e.g. `terminal`) since those should run + inside the sandbox via ToolExecutor. + """ + # IMPORTANT: Hermes' `model_tools.get_tool_definitions()` only understands Hermes toolsets. + # Atropos envs add extra toolsets (filesystem/sandbox/stateful). To avoid noisy "Unknown toolset" + # prints and accidental filtering, we fetch ALL Hermes tool definitions here and filter by name. + tools = model_tools.get_tool_definitions(enabled_toolsets=None, disabled_toolsets=None, quiet_mode=True) + + wrappers: List[HermesExternalTool] = [] + for t in tools: + schema = _schema_from_openai_tool_dict(t, external=True) + if schema.name in {"terminal"}: + continue + if selected_tool_names is not None and schema.name not in selected_tool_names: + continue + wrappers.append(HermesExternalTool(schema)) + return wrappers diff --git a/atropos/tools/image_generation_tool.py b/atropos/tools/image_generation_tool.py deleted file mode 100644 index d24a264dc2..0000000000 --- a/atropos/tools/image_generation_tool.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -Image generation tool (external). - -This is intentionally minimal for the Phase 4.6 "external tool demo": -- executed via ToolServer (no secrets in sandboxes) -- by default returns a tiny inline PNG data URL (no network required) -- can optionally proxy to an OpenAI-compatible images endpoint (e.g. a local service) -""" - -from __future__ import annotations - -import base64 -import json -import os -from dataclasses import dataclass -from typing import Literal, Optional - -import httpx - -from .base import Tool, ToolResult, ToolSchema - - -def _tiny_png_data_url() -> str: - # 1x1 transparent PNG - png_bytes = base64.b64decode( - "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMB/6X0kQAAAABJRU5ErkJggg==" - ) - return f"data:image/png;base64,{base64.b64encode(png_bytes).decode('ascii')}" - - -@dataclass -class ImageGenerateConfig: - backend: Literal["inline", "openai"] = "inline" - base_url: Optional[str] = None - model: Optional[str] = None - timeout_s: float = 120.0 - - @classmethod - def from_env(cls) -> "ImageGenerateConfig": - backend = (os.getenv("IMAGE_GENERATE_BACKEND") or "inline").strip().lower() - if backend not in {"inline", "openai"}: - backend = "inline" - - base_url = os.getenv("IMAGE_GENERATE_BASE_URL") or os.getenv("OLLAMA_BASE_URL") - model = os.getenv("IMAGE_GENERATE_MODEL") or os.getenv("OLLAMA_IMAGE_MODEL") - - timeout_s = float(os.getenv("IMAGE_GENERATE_TIMEOUT_S", "120.0")) - return cls( - backend=backend, # type: ignore[arg-type] - base_url=base_url, - model=model, - timeout_s=timeout_s, - ) - - -class ImageGenerateTool(Tool): - def __init__(self, config: Optional[ImageGenerateConfig] = None) -> None: - self._config = config or ImageGenerateConfig.from_env() - - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="image_generate", - description=( - "Generate an image from a text prompt. Returns a JSON object containing an image URL " - "(often a data URL) plus metadata." - ), - parameters={ - "prompt": {"type": "string", "description": "The image prompt. Be detailed."}, - "aspect_ratio": { - "type": "string", - "enum": ["landscape", "square", "portrait"], - "description": "Desired aspect ratio.", - "default": "landscape", - }, - }, - required=["prompt"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - # Availability of this external tool is primarily governed by whether a ToolServer - # is configured/enabled. The tool itself will return a clear error if an upstream - # image endpoint is required but not configured. - return True, None - - async def execute(self, prompt: str, aspect_ratio: str = "landscape") -> ToolResult: # noqa: ARG002 - cfg = self._config - - if cfg.backend == "inline": - payload = { - "url": _tiny_png_data_url(), - "backend": "inline", - "width": 1, - "height": 1, - } - return ToolResult(success=True, output=json.dumps(payload)) - - # OpenAI-compatible images generation endpoint. - # Expected: POST {base_url}/v1/images/generations with OpenAI-style body. - base_url = cfg.base_url or "" - base_url = base_url.rstrip("/") - if not base_url.endswith("/v1"): - base_url = f"{base_url}/v1" - url = f"{base_url}/images/generations" - - body = { - "prompt": prompt, - "n": 1, - "response_format": "url", - } - if cfg.model: - body["model"] = cfg.model - - try: - async with httpx.AsyncClient(timeout=cfg.timeout_s) as client: - resp = await client.post(url, json=body) - resp.raise_for_status() - data = resp.json() - except Exception as e: - return ToolResult(success=False, error=f"image_generate upstream call failed: {e}") - - try: - image_url = data["data"][0]["url"] - except Exception: - return ToolResult(success=False, error=f"Unexpected image response format: {data!r}") - - payload = {"url": image_url, "backend": "openai"} - return ToolResult(success=True, output=json.dumps(payload)) diff --git a/atropos/tools/mixture_of_agents_tool.py b/atropos/tools/mixture_of_agents_tool.py deleted file mode 100644 index 6ff25326eb..0000000000 --- a/atropos/tools/mixture_of_agents_tool.py +++ /dev/null @@ -1,649 +0,0 @@ -#!/usr/bin/env python3 -""" -Mixture-of-Agents Tool Module - -This module implements the Mixture-of-Agents (MoA) methodology that leverages -the collective strengths of multiple LLMs through a layered architecture to -achieve state-of-the-art performance on complex reasoning tasks. - -Based on the research paper: "Mixture-of-Agents Enhances Large Language Model Capabilities" -by Junlin Wang et al. (arXiv:2406.04692v1) - -Key Features: -- Multi-layer LLM collaboration for enhanced reasoning -- Parallel processing of reference models for efficiency -- Intelligent aggregation and synthesis of diverse responses -- Specialized for extremely difficult problems requiring intense reasoning -- Optimized for coding, mathematics, and complex analytical tasks - -Available Tool: -- mixture_of_agents_tool: Process complex queries using multiple frontier models - -Architecture: -1. Reference models generate diverse initial responses in parallel -2. Aggregator model synthesizes responses into a high-quality output -3. Multiple layers can be used for iterative refinement (future enhancement) - -Models Used (via OpenRouter): -- Reference Models: claude-opus-4, gemini-2.5-pro, gpt-4.1, deepseek-r1 -- Aggregator Model: claude-opus-4 (highest capability for synthesis) - -Configuration: - To customize the MoA setup, modify the configuration constants at the top of this file: - - REFERENCE_MODELS: List of models for generating diverse initial responses - - AGGREGATOR_MODEL: Model used to synthesize the final response - - REFERENCE_TEMPERATURE/AGGREGATOR_TEMPERATURE: Sampling temperatures - - MIN_SUCCESSFUL_REFERENCES: Minimum successful models needed to proceed - -Usage: - from mixture_of_agents_tool import mixture_of_agents_tool - import asyncio - - # Process a complex query - result = await mixture_of_agents_tool( - user_prompt="Solve this complex mathematical proof..." - ) -""" - -import json -import os -import asyncio -import uuid -import datetime -from pathlib import Path -from typing import Dict, Any, List, Optional -from openai import AsyncOpenAI - -# Initialize OpenRouter API client for MoA processing -openrouter_client = AsyncOpenAI( - api_key=os.getenv("OPENROUTER_API_KEY"), - base_url="https://openrouter.ai/api/v1" -) - -# Configuration for MoA processing -# Reference models - these generate diverse initial responses in parallel (OpenRouter slugs) -REFERENCE_MODELS = [ - "anthropic/claude-opus-4.5", - "google/gemini-3-pro-preview", - "openai/gpt-5.2-pro", - "deepseek/deepseek-v3.2" -] - -# Aggregator model - synthesizes reference responses into final output -AGGREGATOR_MODEL = "anthropic/claude-opus-4.5" # Use highest capability model for aggregation - -# Temperature settings optimized for MoA performance -REFERENCE_TEMPERATURE = 0.6 # Balanced creativity for diverse perspectives -AGGREGATOR_TEMPERATURE = 0.4 # Focused synthesis for consistency - -# Failure handling configuration -MIN_SUCCESSFUL_REFERENCES = 1 # Minimum successful reference models needed to proceed - -# System prompt for the aggregator model (from the research paper) -AGGREGATOR_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability. - -Responses from models:""" - -# Debug mode configuration -DEBUG_MODE = os.getenv("MOA_TOOLS_DEBUG", "false").lower() == "true" -DEBUG_SESSION_ID = str(uuid.uuid4()) -DEBUG_LOG_PATH = Path("./logs") -DEBUG_DATA = { - "session_id": DEBUG_SESSION_ID, - "start_time": datetime.datetime.now().isoformat(), - "debug_enabled": DEBUG_MODE, - "tool_calls": [] -} if DEBUG_MODE else None - -# Create logs directory if debug mode is enabled -if DEBUG_MODE: - DEBUG_LOG_PATH.mkdir(exist_ok=True) - print(f"๐Ÿ› MoA debug mode enabled - Session ID: {DEBUG_SESSION_ID}") - - -def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None: - """ - Log a debug call entry to the global debug data structure. - - Args: - tool_name (str): Name of the tool being called - call_data (Dict[str, Any]): Data about the call including parameters and results - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - call_entry = { - "timestamp": datetime.datetime.now().isoformat(), - "tool_name": tool_name, - **call_data - } - - DEBUG_DATA["tool_calls"].append(call_entry) - - -def _save_debug_log() -> None: - """ - Save the current debug data to a JSON file in the logs directory. - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - try: - debug_filename = f"moa_tools_debug_{DEBUG_SESSION_ID}.json" - debug_filepath = DEBUG_LOG_PATH / debug_filename - - # Update end time - DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat() - DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"]) - - with open(debug_filepath, 'w', encoding='utf-8') as f: - json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False) - - print(f"๐Ÿ› MoA debug log saved: {debug_filepath}") - - except Exception as e: - print(f"โŒ Error saving MoA debug log: {str(e)}") - - -def _construct_aggregator_prompt(system_prompt: str, responses: List[str]) -> str: - """ - Construct the final system prompt for the aggregator including all model responses. - - Args: - system_prompt (str): Base system prompt for aggregation - responses (List[str]): List of responses from reference models - - Returns: - str: Complete system prompt with enumerated responses - """ - response_text = "\n".join([f"{i+1}. {response}" for i, response in enumerate(responses)]) - return f"{system_prompt}\n\n{response_text}" - - -async def _run_reference_model_safe( - model: str, - user_prompt: str, - temperature: float = REFERENCE_TEMPERATURE, - max_tokens: int = 32000, - max_retries: int = 6 -) -> tuple[str, str, bool]: - """ - Run a single reference model with retry logic and graceful failure handling. - - Args: - model (str): Model identifier to use - user_prompt (str): The user's query - temperature (float): Sampling temperature for response generation - max_tokens (int): Maximum tokens in response - max_retries (int): Maximum number of retry attempts - - Returns: - tuple[str, str, bool]: (model_name, response_content_or_error, success_flag) - """ - for attempt in range(max_retries): - try: - print(f"๐Ÿค– Querying {model} (attempt {attempt + 1}/{max_retries})") - - # Build parameters for the API call - api_params = { - "model": model, - "messages": [{"role": "user", "content": user_prompt}], - "extra_body": { - "reasoning": { - "enabled": True, - "effort": "xhigh" - } - } - } - - # GPT models (especially gpt-4o-mini) don't support custom temperature values - # Only include temperature for non-GPT models - if not model.lower().startswith('gpt-'): - api_params["temperature"] = temperature - - response = await openrouter_client.chat.completions.create(**api_params) - - content = response.choices[0].message.content.strip() - print(f"โœ… {model} responded ({len(content)} characters)") - return model, content, True - - except Exception as e: - error_str = str(e) - # Log more detailed error information for debugging - if "invalid" in error_str.lower(): - print(f"โš ๏ธ {model} invalid request error (attempt {attempt + 1}): {error_str}") - elif "rate" in error_str.lower() or "limit" in error_str.lower(): - print(f"โš ๏ธ {model} rate limit error (attempt {attempt + 1}): {error_str}") - else: - print(f"โš ๏ธ {model} unknown error (attempt {attempt + 1}): {error_str}") - - if attempt < max_retries - 1: - # Exponential backoff for rate limiting: 2s, 4s, 8s, 16s, 32s, 60s - sleep_time = min(2 ** (attempt + 1), 60) - print(f" Retrying in {sleep_time}s...") - await asyncio.sleep(sleep_time) - else: - error_msg = f"{model} failed after {max_retries} attempts: {error_str}" - print(f"โŒ {error_msg}") - return model, error_msg, False - - -async def _run_aggregator_model( - system_prompt: str, - user_prompt: str, - temperature: float = AGGREGATOR_TEMPERATURE, - max_tokens: int = None -) -> str: - """ - Run the aggregator model to synthesize the final response. - - Args: - system_prompt (str): System prompt with all reference responses - user_prompt (str): Original user query - temperature (float): Focused temperature for consistent aggregation - max_tokens (int): Maximum tokens in final response - - Returns: - str: Synthesized final response - """ - print(f"๐Ÿง  Running aggregator model: {AGGREGATOR_MODEL}") - - # Build parameters for the API call - api_params = { - "model": AGGREGATOR_MODEL, - "messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ], - "extra_body": { - "reasoning": { - "enabled": True, - "effort": "xhigh" - } - } - } - - # GPT models (especially gpt-4o-mini) don't support custom temperature values - # Only include temperature for non-GPT models - if not AGGREGATOR_MODEL.lower().startswith('gpt-'): - api_params["temperature"] = temperature - - response = await openrouter_client.chat.completions.create(**api_params) - - content = response.choices[0].message.content.strip() - print(f"โœ… Aggregation complete ({len(content)} characters)") - return content - - -async def mixture_of_agents_tool( - user_prompt: str, - reference_models: Optional[List[str]] = None, - aggregator_model: Optional[str] = None -) -> str: - """ - Process a complex query using the Mixture-of-Agents methodology. - - This tool leverages multiple frontier language models to collaboratively solve - extremely difficult problems requiring intense reasoning. It's particularly - effective for: - - Complex mathematical proofs and calculations - - Advanced coding problems and algorithm design - - Multi-step analytical reasoning tasks - - Problems requiring diverse domain expertise - - Tasks where single models show limitations - - The MoA approach uses a fixed 2-layer architecture: - 1. Layer 1: Multiple reference models generate diverse responses in parallel (temp=0.6) - 2. Layer 2: Aggregator model synthesizes the best elements into final response (temp=0.4) - - Args: - user_prompt (str): The complex query or problem to solve - reference_models (Optional[List[str]]): Custom reference models to use - aggregator_model (Optional[str]): Custom aggregator model to use - - Returns: - str: JSON string containing the MoA results with the following structure: - { - "success": bool, - "response": str, - "models_used": { - "reference_models": List[str], - "aggregator_model": str - }, - "processing_time": float - } - - Raises: - Exception: If MoA processing fails or API key is not set - """ - start_time = datetime.datetime.now() - - debug_call_data = { - "parameters": { - "user_prompt": user_prompt[:200] + "..." if len(user_prompt) > 200 else user_prompt, - "reference_models": reference_models or REFERENCE_MODELS, - "aggregator_model": aggregator_model or AGGREGATOR_MODEL, - "reference_temperature": REFERENCE_TEMPERATURE, - "aggregator_temperature": AGGREGATOR_TEMPERATURE, - "min_successful_references": MIN_SUCCESSFUL_REFERENCES - }, - "error": None, - "success": False, - "reference_responses_count": 0, - "failed_models_count": 0, - "failed_models": [], - "final_response_length": 0, - "processing_time_seconds": 0, - "models_used": {} - } - - try: - print(f"๐Ÿš€ Starting Mixture-of-Agents processing...") - print(f"๐Ÿ“ Query: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}") - - # Validate API key availability - if not os.getenv("OPENROUTER_API_KEY"): - raise ValueError("OPENROUTER_API_KEY environment variable not set") - - # Use provided models or defaults - ref_models = reference_models or REFERENCE_MODELS - agg_model = aggregator_model or AGGREGATOR_MODEL - - print(f"๐Ÿ”„ Using {len(ref_models)} reference models in 2-layer MoA architecture") - - # Layer 1: Generate diverse responses from reference models (with failure handling) - print("๐Ÿ“ก Layer 1: Generating reference responses...") - model_results = await asyncio.gather(*[ - _run_reference_model_safe(model, user_prompt, REFERENCE_TEMPERATURE) - for model in ref_models - ]) - - # Separate successful and failed responses - successful_responses = [] - failed_models = [] - - for model_name, content, success in model_results: - if success: - successful_responses.append(content) - else: - failed_models.append(model_name) - - successful_count = len(successful_responses) - failed_count = len(failed_models) - - print(f"๐Ÿ“Š Reference model results: {successful_count} successful, {failed_count} failed") - - if failed_models: - print(f"โš ๏ธ Failed models: {', '.join(failed_models)}") - - # Check if we have enough successful responses to proceed - if successful_count < MIN_SUCCESSFUL_REFERENCES: - raise ValueError(f"Insufficient successful reference models ({successful_count}/{len(ref_models)}). Need at least {MIN_SUCCESSFUL_REFERENCES} successful responses.") - - debug_call_data["reference_responses_count"] = successful_count - debug_call_data["failed_models_count"] = failed_count - debug_call_data["failed_models"] = failed_models - - # Layer 2: Aggregate responses using the aggregator model - print("๐Ÿง  Layer 2: Synthesizing final response...") - aggregator_system_prompt = _construct_aggregator_prompt( - AGGREGATOR_SYSTEM_PROMPT, - successful_responses - ) - - final_response = await _run_aggregator_model( - aggregator_system_prompt, - user_prompt, - AGGREGATOR_TEMPERATURE - ) - - # Calculate processing time - end_time = datetime.datetime.now() - processing_time = (end_time - start_time).total_seconds() - - print(f"โœ… MoA processing completed in {processing_time:.2f} seconds") - - # Prepare successful response (only final aggregated result, minimal fields) - result = { - "success": True, - "response": final_response, - "models_used": { - "reference_models": ref_models, - "aggregator_model": agg_model - } - } - - debug_call_data["success"] = True - debug_call_data["final_response_length"] = len(final_response) - debug_call_data["processing_time_seconds"] = processing_time - debug_call_data["models_used"] = result["models_used"] - - # Log debug information - _log_debug_call("mixture_of_agents_tool", debug_call_data) - _save_debug_log() - - return json.dumps(result, indent=2, ensure_ascii=False) - - except Exception as e: - error_msg = f"Error in MoA processing: {str(e)}" - print(f"โŒ {error_msg}") - - # Calculate processing time even for errors - end_time = datetime.datetime.now() - processing_time = (end_time - start_time).total_seconds() - - # Prepare error response (minimal fields) - result = { - "success": False, - "response": "MoA processing failed. Please try again or use a single model for this query.", - "models_used": { - "reference_models": reference_models or REFERENCE_MODELS, - "aggregator_model": aggregator_model or AGGREGATOR_MODEL - }, - "error": error_msg - } - - debug_call_data["error"] = error_msg - debug_call_data["processing_time_seconds"] = processing_time - _log_debug_call("mixture_of_agents_tool", debug_call_data) - _save_debug_log() - - return json.dumps(result, indent=2, ensure_ascii=False) - - -def check_openrouter_api_key() -> bool: - """ - Check if the OpenRouter API key is available in environment variables. - - Returns: - bool: True if API key is set, False otherwise - """ - return bool(os.getenv("OPENROUTER_API_KEY")) - - -def check_moa_requirements() -> bool: - """ - Check if all requirements for MoA tools are met. - - Returns: - bool: True if requirements are met, False otherwise - """ - return check_openrouter_api_key() - - -def get_debug_session_info() -> Dict[str, Any]: - """ - Get information about the current debug session. - - Returns: - Dict[str, Any]: Dictionary containing debug session information - """ - if not DEBUG_MODE or not DEBUG_DATA: - return { - "enabled": False, - "session_id": None, - "log_path": None, - "total_calls": 0 - } - - return { - "enabled": True, - "session_id": DEBUG_SESSION_ID, - "log_path": str(DEBUG_LOG_PATH / f"moa_tools_debug_{DEBUG_SESSION_ID}.json"), - "total_calls": len(DEBUG_DATA["tool_calls"]) - } - - -def get_available_models() -> Dict[str, List[str]]: - """ - Get information about available models for MoA processing. - - Returns: - Dict[str, List[str]]: Dictionary with reference and aggregator models - """ - return { - "reference_models": REFERENCE_MODELS, - "aggregator_models": [AGGREGATOR_MODEL], - "supported_models": REFERENCE_MODELS + [AGGREGATOR_MODEL] - } - - -def get_moa_configuration() -> Dict[str, Any]: - """ - Get the current MoA configuration settings. - - Returns: - Dict[str, Any]: Dictionary containing all configuration parameters - """ - return { - "reference_models": REFERENCE_MODELS, - "aggregator_model": AGGREGATOR_MODEL, - "reference_temperature": REFERENCE_TEMPERATURE, - "aggregator_temperature": AGGREGATOR_TEMPERATURE, - "min_successful_references": MIN_SUCCESSFUL_REFERENCES, - "total_reference_models": len(REFERENCE_MODELS), - "failure_tolerance": f"{len(REFERENCE_MODELS) - MIN_SUCCESSFUL_REFERENCES}/{len(REFERENCE_MODELS)} models can fail" - } - - -# ============================================================================= -# Atropos-Agent Tool wrapper (Hermes compatibility) -# ============================================================================= - -from .base import Tool, ToolResult, ToolSchema # noqa: E402 - - -def _tool_result_from_json(output: str) -> ToolResult: - try: - data = json.loads(output) - except Exception: - return ToolResult(success=True, output=output) - - if isinstance(data, dict) and "success" in data: - ok = bool(data.get("success")) - if not ok: - err = data.get("error") or "Mixture-of-Agents failed" - return ToolResult(success=False, output=output, error=str(err)) - return ToolResult(success=True, output=output) - - return ToolResult(success=True, output=output) - - -class MixtureOfAgentsTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="mixture_of_agents", - description=( - "Process extremely difficult problems requiring intense reasoning using a Mixture-of-Agents." - ), - parameters={ - "user_prompt": { - "type": "string", - "description": "The complex query or problem to solve using multiple models.", - } - }, - required=["user_prompt"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - if not os.getenv("OPENROUTER_API_KEY"): - return False, "OPENROUTER_API_KEY not set" - return True, None - - async def execute(self, user_prompt: str) -> ToolResult: - output = await mixture_of_agents_tool(user_prompt=user_prompt) - return _tool_result_from_json(output) - - -if __name__ == "__main__": - """ - Simple test/demo when run directly - """ - print("๐Ÿค– Mixture-of-Agents Tool Module") - print("=" * 50) - - # Check if API key is available - api_available = check_openrouter_api_key() - - if not api_available: - print("โŒ OPENROUTER_API_KEY environment variable not set") - print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'") - print("Get API key at: https://openrouter.ai/") - exit(1) - else: - print("โœ… OpenRouter API key found") - - print("๐Ÿ› ๏ธ MoA tools ready for use!") - - # Show current configuration - config = get_moa_configuration() - print(f"\nโš™๏ธ Current Configuration:") - print(f" ๐Ÿค– Reference models ({len(config['reference_models'])}): {', '.join(config['reference_models'])}") - print(f" ๐Ÿง  Aggregator model: {config['aggregator_model']}") - print(f" ๐ŸŒก๏ธ Reference temperature: {config['reference_temperature']}") - print(f" ๐ŸŒก๏ธ Aggregator temperature: {config['aggregator_temperature']}") - print(f" ๐Ÿ›ก๏ธ Failure tolerance: {config['failure_tolerance']}") - print(f" ๐Ÿ“Š Minimum successful models: {config['min_successful_references']}") - - # Show debug mode status - if DEBUG_MODE: - print(f"\n๐Ÿ› Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}") - print(f" Debug logs will be saved to: ./logs/moa_tools_debug_{DEBUG_SESSION_ID}.json") - else: - print("\n๐Ÿ› Debug mode disabled (set MOA_TOOLS_DEBUG=true to enable)") - - print("\nBasic usage:") - print(" from mixture_of_agents_tool import mixture_of_agents_tool") - print(" import asyncio") - print("") - print(" async def main():") - print(" result = await mixture_of_agents_tool(") - print(" user_prompt='Solve this complex mathematical proof...'") - print(" )") - print(" print(result)") - print(" asyncio.run(main())") - - print("\nBest use cases:") - print(" - Complex mathematical proofs and calculations") - print(" - Advanced coding problems and algorithm design") - print(" - Multi-step analytical reasoning tasks") - print(" - Problems requiring diverse domain expertise") - print(" - Tasks where single models show limitations") - - print("\nPerformance characteristics:") - print(" - Higher latency due to multiple model calls") - print(" - Significantly improved quality for complex tasks") - print(" - Parallel processing for efficiency") - print(f" - Optimized temperatures: {REFERENCE_TEMPERATURE} for reference models, {AGGREGATOR_TEMPERATURE} for aggregation") - print(" - Token-efficient: only returns final aggregated response") - print(" - Resilient: continues with partial model failures") - print(f" - Configurable: easy to modify models and settings at top of file") - print(" - State-of-the-art results on challenging benchmarks") - - print("\nDebug mode:") - print(" # Enable debug logging") - print(" export MOA_TOOLS_DEBUG=true") - print(" # Debug logs capture all MoA processing steps and metrics") - print(" # Logs saved to: ./logs/moa_tools_debug_UUID.json") diff --git a/atropos/tools/sandbox_stubs.py b/atropos/tools/sandbox_stubs.py new file mode 100644 index 0000000000..ad4f0ceeee --- /dev/null +++ b/atropos/tools/sandbox_stubs.py @@ -0,0 +1,98 @@ +""" +Sandbox tool stubs for Atropos ToolExecutor. + +These tools are executed inside the sandbox containers via: +ToolExecutor -> SlotPool -> sandbox_server.py + +They intentionally do NOT execute anything on the host process. If they are +called directly (outside ToolExecutor), they return a clear error. +""" + +from __future__ import annotations + +from typing import Optional + +from .base import Tool, ToolResult, ToolSchema + + +class TerminalTool(Tool): + @property + def schema(self) -> ToolSchema: + return ToolSchema( + name="terminal", + description=( + "Execute a command inside the sandbox slot workspace and return stdout/stderr. " + "Filesystem persists within a trajectory slot. Background processes are not supported " + "in stateless mode." + ), + parameters={ + "command": {"type": "string", "description": "The command to execute"}, + "timeout": { + "type": "integer", + "description": "Command timeout in seconds (optional).", + "minimum": 1, + }, + "background": { + "type": "boolean", + "description": "Not supported in sandbox terminal (always false).", + "default": False, + }, + }, + required=["command"], + external=False, + ) + + async def execute(self, **_kwargs) -> ToolResult: + return ToolResult( + success=False, + error="terminal must be executed via ToolExecutor inside the sandbox", + ) + + +class BashTool(Tool): + @property + def schema(self) -> ToolSchema: + return ToolSchema( + name="bash", + description="Execute a bash command inside the sandbox slot workspace.", + parameters={"command": {"type": "string", "description": "The bash command to execute"}}, + required=["command"], + external=False, + ) + + async def execute(self, **_kwargs) -> ToolResult: + return ToolResult(success=False, error="bash must be executed via ToolExecutor inside the sandbox") + + +class ReadFileTool(Tool): + @property + def schema(self) -> ToolSchema: + return ToolSchema( + name="read_file", + description="Read a file from the sandbox slot workspace.", + parameters={"path": {"type": "string", "description": "Path to the file"}}, + required=["path"], + external=False, + ) + + async def execute(self, **_kwargs) -> ToolResult: + return ToolResult(success=False, error="read_file must be executed via ToolExecutor inside the sandbox") + + +class WriteFileTool(Tool): + @property + def schema(self) -> ToolSchema: + return ToolSchema( + name="write_file", + description="Write a file into the sandbox slot workspace.", + parameters={ + "path": {"type": "string", "description": "Path to the file"}, + "content": {"type": "string", "description": "File content"}, + }, + required=["path", "content"], + external=False, + ) + + async def execute(self, **_kwargs) -> ToolResult: + return ToolResult(success=False, error="write_file must be executed via ToolExecutor inside the sandbox") + diff --git a/atropos/tools/terminal_hecate.py b/atropos/tools/terminal_hecate.py deleted file mode 100644 index fc25eec6d1..0000000000 --- a/atropos/tools/terminal_hecate.py +++ /dev/null @@ -1,437 +0,0 @@ -#!/usr/bin/env python3 -""" -Terminal Hecate Tool Module - -A terminal tool that executes commands on MorphCloud/Hecate VMs. -Uses E2B-style cloud VMs for execution with automatic lifecycle management. - -Features: -- Direct SSH command execution on cloud VMs -- Background task support -- VM lifecycle management with TTL -- Automatic cleanup after inactivity - -Usage: - from terminal_hecate import terminal_hecate_tool - - # Execute a simple command - result = terminal_hecate_tool("ls -la") - - # Execute in background - result = terminal_hecate_tool("python server.py", background=True) -""" - -import json -import os -import time -import threading -import atexit -from typing import Optional, Dict, Any - -# Tool description for LLM -TERMINAL_HECATE_DESCRIPTION = """Execute commands on a secure cloud Linux VM environment (Hecate/MorphCloud). - -**Environment:** -- Minimal Debian-based OS with internet access -- Automatic VM lifecycle management (creates on-demand, reuses, cleans up) -- Filesystem is persisted between tool calls but environment variables, venvs, etc are reset. - -**Command Execution:** -- Simple commands: Just provide the 'command' parameter -- Background processes: Set 'background': True for servers/long-running tasks -- Command timeout: Optional 'timeout' parameter in seconds - -**Examples:** -- Run command: `{"command": "ls -la"}` -- Background task: `{"command": "source path/to/my/venv/bin/activate && python server.py", "background": True}` -- With timeout: `{"command": "long_task.sh", "timeout": 300}` - -**Best Practices:** -- Run servers/long processes in background -- Monitor disk usage for large tasks -- Install whatever tools you need with sudo apt-get -- Do not be afraid to run pip with --break-system-packages - -**Things to avoid** -- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. Even git sometimes becomes interactive if the output is large. If you're not sure pipe to cat. -""" - -# Global state for VM lifecycle management -_active_instances: Dict[str, Any] = {} -_last_activity: Dict[str, float] = {} -_instance_lock = threading.Lock() -_cleanup_thread = None -_cleanup_running = False - - -def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300): - """Clean up VMs that have been inactive for longer than vm_lifetime_seconds.""" - global _active_instances, _last_activity - - current_time = time.time() - tasks_to_cleanup = [] - - with _instance_lock: - for task_id, last_time in list(_last_activity.items()): - if current_time - last_time > vm_lifetime_seconds: - tasks_to_cleanup.append(task_id) - - for task_id in tasks_to_cleanup: - try: - if task_id in _active_instances: - instance = _active_instances[task_id] - if hasattr(instance, 'terminate'): - instance.terminate() - elif hasattr(instance, 'stop'): - instance.stop() - elif hasattr(instance, 'delete'): - instance.delete() - - del _active_instances[task_id] - print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}") - - if task_id in _last_activity: - del _last_activity[task_id] - - except Exception as e: - # 404 errors are benign - VM already cleaned up by TTL - error_str = str(e) - if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower(): - print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)") - else: - print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}") - - # Always remove from tracking dicts to prevent infinite retry loops - if task_id in _active_instances: - del _active_instances[task_id] - if task_id in _last_activity: - del _last_activity[task_id] - - -def _cleanup_thread_worker(): - """Background thread worker that periodically cleans up inactive VMs.""" - global _cleanup_running - - while _cleanup_running: - try: - vm_lifetime = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300")) - _cleanup_inactive_vms(vm_lifetime) - except Exception as e: - print(f"[VM Cleanup] Error in cleanup thread: {e}") - - for _ in range(60): - if not _cleanup_running: - break - time.sleep(1) - - -def _start_cleanup_thread(): - """Start the background cleanup thread if not already running.""" - global _cleanup_thread, _cleanup_running - - with _instance_lock: - if _cleanup_thread is None or not _cleanup_thread.is_alive(): - _cleanup_running = True - _cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True) - _cleanup_thread.start() - - -def _stop_cleanup_thread(): - """Stop the background cleanup thread.""" - global _cleanup_running - _cleanup_running = False - if _cleanup_thread is not None: - _cleanup_thread.join(timeout=5) - - -def cleanup_vm(task_id: str): - """Manually clean up a specific VM by task_id.""" - global _active_instances, _last_activity - - with _instance_lock: - try: - if task_id in _active_instances: - instance = _active_instances[task_id] - if hasattr(instance, 'terminate'): - instance.terminate() - elif hasattr(instance, 'stop'): - instance.stop() - elif hasattr(instance, 'delete'): - instance.delete() - - del _active_instances[task_id] - print(f"[VM Cleanup] Manually terminated VM for task: {task_id}") - - if task_id in _last_activity: - del _last_activity[task_id] - - except Exception as e: - # 404 errors are benign - VM already cleaned up by TTL - error_str = str(e) - if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower(): - print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)") - else: - print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}") - - -atexit.register(_stop_cleanup_thread) - - -def _execute_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]: - """ - Execute a command on the VM instance using instance.exec() for proper stderr capture. - - Args: - instance: MorphVM instance - command: Command to execute - timeout: Optional timeout in seconds (Note: exec() may not support timeout directly) - - Returns: - dict with stdout, stderr, returncode - """ - try: - # Use instance.exec() which properly captures both stdout and stderr - # (unlike ssh.run() which doesn't capture stderr correctly) - result = instance.exec(command) - - # Debug logging only for verbose mode or unusual cases - # Note: Non-zero exit codes are normal (model's command failed) - not a tool error - if result.exit_code != 0 and not result.stdout and not result.stderr: - # Only log if we got absolutely no output - might indicate an issue - print(f"โš ๏ธ Command returned exit={result.exit_code} with no output") - - return { - "stdout": result.stdout or "", - "stderr": result.stderr or "", - "returncode": result.exit_code - } - - except Exception as e: - # Check if it's a timeout - error_str = str(e).lower() - if "timeout" in error_str: - return { - "stdout": "", - "stderr": f"Command timed out after {timeout or 120} seconds", - "returncode": 124 - } - - return { - "stdout": "", - "stderr": f"Command execution failed: {str(e)}", - "returncode": -1 - } - - -def terminal_hecate_tool( - command: str, - background: bool = False, - timeout: Optional[int] = None, - task_id: Optional[str] = None -) -> str: - """ - Execute a command on a MorphCloud/Hecate VM without session persistence. - - Args: - command: The command to execute - background: Whether to run in background (default: False) - timeout: Command timeout in seconds (default: 120) - task_id: Unique identifier for VM isolation (optional) - - Returns: - str: JSON string with output, exit_code, and error fields - - Examples: - # Execute a simple command - >>> result = terminal_hecate_tool(command="ls -la /tmp") - - # Run a background task - >>> result = terminal_hecate_tool(command="python server.py", background=True) - - # With custom timeout - >>> result = terminal_hecate_tool(command="long_task.sh", timeout=300) - """ - global _active_instances, _last_activity - - try: - # Import required modules - try: - from morphcloud.api import MorphCloudClient - except ImportError as import_error: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Terminal tool disabled: {import_error}", - "status": "disabled" - }, ensure_ascii=False) - - # Get configuration - vm_ttl_seconds = int(os.getenv("HECATE_VM_TTL_SECONDS", "1200")) - snapshot_id = os.getenv("HECATE_DEFAULT_SNAPSHOT_ID", "snapshot_defv9tjg") - - # Check API key - morph_api_key = os.getenv("MORPH_API_KEY") - if not morph_api_key: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": "MORPH_API_KEY environment variable not set", - "status": "disabled" - }, ensure_ascii=False) - - # Use task_id for VM isolation - effective_task_id = task_id or "default" - - # Start cleanup thread - _start_cleanup_thread() - - # Get or create VM instance - with _instance_lock: - if effective_task_id not in _active_instances: - morph_client = MorphCloudClient(api_key=morph_api_key) - _active_instances[effective_task_id] = morph_client.instances.start( - snapshot_id=snapshot_id, - ttl_seconds=vm_ttl_seconds, - ttl_action="stop" - ) - - # Update last activity time - _last_activity[effective_task_id] = time.time() - instance = _active_instances[effective_task_id] - - # Wait for instance to be ready - instance.wait_until_ready() - - # Prepare command for execution - if background: - # Run in background with nohup and redirect output - exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &" - result = _execute_command(instance, exec_command, timeout=10) - - # For background tasks, return immediately with info - if result["returncode"] == 0: - return json.dumps({ - "output": "Background task started successfully", - "exit_code": 0, - "error": None - }, ensure_ascii=False) - else: - # Include stderr in output but don't set error (command failure, not tool failure) - bg_output = result["stdout"] - if result["stderr"]: - bg_output = f"{bg_output}\n{result['stderr']}" if bg_output else result["stderr"] - return json.dumps({ - "output": bg_output, - "exit_code": result["returncode"], - "error": None # Only set for actual tool failures - }, ensure_ascii=False) - else: - # Run foreground command with retry logic for transient failures - max_retries = 3 - retry_count = 0 - result = None - - while retry_count <= max_retries: - result = _execute_command(instance, command, timeout=timeout) - - # Check if we should retry (only for transient errors, not normal results) - stdout = result.get("stdout", "") - stderr = result.get("stderr", "") - returncode = result.get("returncode", 0) - - should_retry = False - retry_reason = "" - - # NOTE: Empty output with exit_code=0 is NORMAL for many commands: - # - File writes: cat > file, echo > file - # - Directory ops: mkdir, cd - # - Silent installs: pip install --quiet - # So we do NOT retry on exit_code=0, even with empty output. - - # Only retry on special error codes that suggest transient/infra issues - if not stdout and not stderr and returncode in [-1, 124]: - should_retry = True - retry_reason = f"transient error (code {returncode})" - - if should_retry and retry_count < max_retries: - retry_count += 1 - wait_time = 2 ** retry_count # Exponential backoff: 2s, 4s, 8s - print(f"โš ๏ธ Terminal: {retry_reason}, retrying in {wait_time}s (attempt {retry_count}/{max_retries})") - time.sleep(wait_time) - continue - - # Got a result (success or normal command failure) - exit retry loop - break - - # Combine stdout and stderr for output - output = result["stdout"] - if result["stderr"] and result["returncode"] != 0: - output = f"{output}\n{result['stderr']}" if output else result["stderr"] - - # Truncate output if too long (max 50,000 chars to avoid context explosion) - MAX_OUTPUT_CHARS = 50000 - if len(output) > MAX_OUTPUT_CHARS: - truncated_notice = f"\n\n... [OUTPUT TRUNCATED - showing last {MAX_OUTPUT_CHARS} chars of {len(output)} total] ..." - output = truncated_notice + output[-MAX_OUTPUT_CHARS:] - - # NOTE: error is only set for FUNCTIONAL tool failures (VM issues, timeouts, etc.) - # Non-zero exit codes from the model's commands are NOT tool failures - - # the model can self-correct. The exit_code field tells the model if the command succeeded. - # Retries that eventually succeed also don't count as failures. - return json.dumps({ - "output": output.strip(), - "exit_code": result["returncode"], - "error": None # Only set for actual tool failures, not command failures - }, ensure_ascii=False) - - except Exception as e: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Failed to execute command: {str(e)}", - "status": "error" - }, ensure_ascii=False) - - -def check_hecate_requirements() -> bool: - """Check if all requirements for the Hecate terminal tool are met.""" - required_vars = ["MORPH_API_KEY"] - missing_required = [var for var in required_vars if not os.getenv(var)] - - if missing_required: - print(f"Missing required environment variables: {', '.join(missing_required)}") - return False - - try: - from morphcloud.api import MorphCloudClient - return True - except Exception as e: - print(f"MorphCloud not available: {e}") - return False - - -if __name__ == "__main__": - """Simple test when run directly.""" - print("Terminal Hecate Tool Module (MorphCloud/E2B)") - print("=" * 40) - - if not check_hecate_requirements(): - print("Requirements not met. Please check the messages above.") - exit(1) - - print("All requirements met!") - print("\nAvailable Tool:") - print(" - terminal_hecate_tool: Execute commands on cloud VMs") - - print("\nUsage Examples:") - print(" # Execute a command") - print(" result = terminal_hecate_tool(command='ls -la')") - print(" ") - print(" # Run a background task") - print(" result = terminal_hecate_tool(command='python server.py', background=True)") - - print("\nEnvironment Variables:") - print(f" MORPH_API_KEY: {'Set' if os.getenv('MORPH_API_KEY') else 'Not set'}") - print(f" HECATE_VM_TTL_SECONDS: {os.getenv('HECATE_VM_TTL_SECONDS', '1200')} (default: 1200 / 20 minutes)") - print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300 / 5 minutes)") - print(f" HECATE_DEFAULT_SNAPSHOT_ID: {os.getenv('HECATE_DEFAULT_SNAPSHOT_ID', 'snapshot_defv9tjg')}") diff --git a/atropos/tools/terminal_stateful_tool.py b/atropos/tools/terminal_stateful_tool.py index 0e5786ca82..8136982311 100644 --- a/atropos/tools/terminal_stateful_tool.py +++ b/atropos/tools/terminal_stateful_tool.py @@ -11,7 +11,6 @@ from __future__ import annotations from typing import Optional from .base import Tool, ToolResult, ToolSchema -from .basic_tools import BashTool class TerminalStatefulTool(Tool): @@ -39,7 +38,8 @@ class TerminalStatefulTool(Tool): return True, None async def execute(self, command: str, timeout: Optional[int] = None) -> ToolResult: - # Fallback direct execution (not stateful) when used outside ToolExecutor. - bash = BashTool(timeout=float(timeout) if timeout else 30.0) - return await bash.execute(command=command) - + _ = (command, timeout) + return ToolResult( + success=False, + error="terminal_stateful must be executed via ToolExecutor inside the sandbox", + ) diff --git a/atropos/tools/terminal_tool.py b/atropos/tools/terminal_tool.py deleted file mode 100644 index a865a274be..0000000000 --- a/atropos/tools/terminal_tool.py +++ /dev/null @@ -1,493 +0,0 @@ -#!/usr/bin/env python3 -""" -Terminal Tool Module (mini-swe-agent backend) - -A terminal tool that executes commands using mini-swe-agent's execution environments. -Supports local execution, Docker containers, and Modal cloud sandboxes. - -Environment Selection (via TERMINAL_ENV environment variable): -- "local": Execute directly on the host machine (default, fastest) -- "docker": Execute in Docker containers (isolated, requires Docker) -- "modal": Execute in Modal cloud sandboxes (scalable, requires Modal account) - -Features: -- Multiple execution backends (local, docker, modal) -- Background task support -- VM/container lifecycle management -- Automatic cleanup after inactivity - -Usage: - from terminal_tool import terminal_tool - - # Execute a simple command - result = terminal_tool("ls -la") - - # Execute in background - result = terminal_tool("python server.py", background=True) -""" - -import json -import os -import sys -import time -import threading -import atexit -from pathlib import Path -from typing import Optional, Dict, Any - -# Add mini-swe-agent to path if not installed -mini_swe_path = Path(__file__).parent.parent / "mini-swe-agent" / "src" -if mini_swe_path.exists(): - sys.path.insert(0, str(mini_swe_path)) - -# Tool description for LLM -TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment. - -**Environment:** -- Isolated execution environment (local, Docker, or Modal cloud based on configuration) -- Filesystem persists between tool calls within the same task -- Internet access available - -**Command Execution:** -- Simple commands: Just provide the 'command' parameter -- Background processes: Set 'background': True for servers/long-running tasks -- Command timeout: Optional 'timeout' parameter in seconds - -**Examples:** -- Run command: `{"command": "ls -la"}` -- Background task: `{"command": "source venv/bin/activate && python server.py", "background": True}` -- With timeout: `{"command": "long_task.sh", "timeout": 300}` - -**Best Practices:** -- Run servers/long processes in background -- Monitor disk usage for large tasks -- Install whatever tools you need with apt-get or pip -- Do not be afraid to run pip with --break-system-packages - -**Things to avoid:** -- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. -- Even git sometimes becomes interactive if the output is large. If you're not sure, pipe to cat. -""" - -# Global state for environment lifecycle management -_active_environments: Dict[str, Any] = {} -_last_activity: Dict[str, float] = {} -_env_lock = threading.Lock() -_cleanup_thread = None -_cleanup_running = False - -# Configuration from environment variables -def _get_env_config() -> Dict[str, Any]: - """Get terminal environment configuration from environment variables.""" - return { - "env_type": os.getenv("TERMINAL_ENV", "local"), # local, docker, or modal - "docker_image": os.getenv("TERMINAL_DOCKER_IMAGE", "python:3.11-slim"), - "modal_image": os.getenv("TERMINAL_MODAL_IMAGE", "python:3.11-slim"), - "cwd": os.getenv("TERMINAL_CWD", "/tmp"), - "timeout": int(os.getenv("TERMINAL_TIMEOUT", "60")), - "lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")), - } - - -def _create_environment(env_type: str, image: str, cwd: str, timeout: int): - """ - Create an execution environment from mini-swe-agent. - - Args: - env_type: One of "local", "docker", "modal" - image: Docker/Modal image name (ignored for local) - cwd: Working directory - timeout: Default command timeout - - Returns: - Environment instance with execute() method - """ - if env_type == "local": - from minisweagent.environments.local import LocalEnvironment - return LocalEnvironment(cwd=cwd, timeout=timeout) - - elif env_type == "docker": - from minisweagent.environments.docker import DockerEnvironment - return DockerEnvironment(image=image, cwd=cwd, timeout=timeout) - - elif env_type == "modal": - from minisweagent.environments.extra.swerex_modal import SwerexModalEnvironment - return SwerexModalEnvironment(image=image, cwd=cwd, timeout=timeout) - - else: - raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', or 'modal'") - - -def _cleanup_inactive_envs(lifetime_seconds: int = 300): - """Clean up environments that have been inactive for longer than lifetime_seconds.""" - global _active_environments, _last_activity - - current_time = time.time() - tasks_to_cleanup = [] - - with _env_lock: - for task_id, last_time in list(_last_activity.items()): - if current_time - last_time > lifetime_seconds: - tasks_to_cleanup.append(task_id) - - for task_id in tasks_to_cleanup: - try: - if task_id in _active_environments: - env = _active_environments[task_id] - # Try various cleanup methods - if hasattr(env, 'cleanup'): - env.cleanup() - elif hasattr(env, 'stop'): - env.stop() - elif hasattr(env, 'terminate'): - env.terminate() - - del _active_environments[task_id] - print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}") - - if task_id in _last_activity: - del _last_activity[task_id] - - except Exception as e: - error_str = str(e) - if "404" in error_str or "not found" in error_str.lower(): - print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") - else: - print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") - - # Always remove from tracking dicts - if task_id in _active_environments: - del _active_environments[task_id] - if task_id in _last_activity: - del _last_activity[task_id] - - -def _cleanup_thread_worker(): - """Background thread worker that periodically cleans up inactive environments.""" - global _cleanup_running - - while _cleanup_running: - try: - config = _get_env_config() - _cleanup_inactive_envs(config["lifetime_seconds"]) - except Exception as e: - print(f"[Terminal Cleanup] Error in cleanup thread: {e}") - - for _ in range(60): - if not _cleanup_running: - break - time.sleep(1) - - -def _start_cleanup_thread(): - """Start the background cleanup thread if not already running.""" - global _cleanup_thread, _cleanup_running - - with _env_lock: - if _cleanup_thread is None or not _cleanup_thread.is_alive(): - _cleanup_running = True - _cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True) - _cleanup_thread.start() - - -def _stop_cleanup_thread(): - """Stop the background cleanup thread.""" - global _cleanup_running - _cleanup_running = False - if _cleanup_thread is not None: - _cleanup_thread.join(timeout=5) - - -def cleanup_vm(task_id: str): - """Manually clean up a specific environment by task_id.""" - global _active_environments, _last_activity - - with _env_lock: - try: - if task_id in _active_environments: - env = _active_environments[task_id] - if hasattr(env, 'cleanup'): - env.cleanup() - elif hasattr(env, 'stop'): - env.stop() - elif hasattr(env, 'terminate'): - env.terminate() - - del _active_environments[task_id] - print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}") - - if task_id in _last_activity: - del _last_activity[task_id] - - except Exception as e: - error_str = str(e) - if "404" in error_str or "not found" in error_str.lower(): - print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") - else: - print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") - - -atexit.register(_stop_cleanup_thread) - - -def terminal_tool( - command: str, - background: bool = False, - timeout: Optional[int] = None, - task_id: Optional[str] = None -) -> str: - """ - Execute a command using mini-swe-agent's execution environments. - - Args: - command: The command to execute - background: Whether to run in background (default: False) - timeout: Command timeout in seconds (default: from config) - task_id: Unique identifier for environment isolation (optional) - - Returns: - str: JSON string with output, exit_code, and error fields - - Examples: - # Execute a simple command - >>> result = terminal_tool(command="ls -la /tmp") - - # Run a background task - >>> result = terminal_tool(command="python server.py", background=True) - - # With custom timeout - >>> result = terminal_tool(command="long_task.sh", timeout=300) - """ - global _active_environments, _last_activity - - try: - # Get configuration - config = _get_env_config() - env_type = config["env_type"] - - # Select image based on env type - if env_type == "docker": - image = config["docker_image"] - elif env_type == "modal": - image = config["modal_image"] - else: - image = "" - - cwd = config["cwd"] - default_timeout = config["timeout"] - effective_timeout = timeout or default_timeout - - # Use task_id for environment isolation - effective_task_id = task_id or "default" - - # Start cleanup thread - _start_cleanup_thread() - - # Get or create environment - with _env_lock: - if effective_task_id not in _active_environments: - try: - _active_environments[effective_task_id] = _create_environment( - env_type=env_type, - image=image, - cwd=cwd, - timeout=effective_timeout - ) - except ImportError as e: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Terminal tool disabled: mini-swe-agent not available ({e})", - "status": "disabled" - }, ensure_ascii=False) - - # Update last activity time - _last_activity[effective_task_id] = time.time() - env = _active_environments[effective_task_id] - - # Prepare command for execution - if background: - # Run in background with nohup and redirect output - exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &" - try: - result = env.execute(exec_command, timeout=10) - return json.dumps({ - "output": "Background task started successfully", - "exit_code": 0, - "error": None - }, ensure_ascii=False) - except Exception as e: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Failed to start background task: {str(e)}" - }, ensure_ascii=False) - else: - # Run foreground command with retry logic - max_retries = 3 - retry_count = 0 - result = None - - while retry_count <= max_retries: - try: - result = env.execute(command, timeout=effective_timeout) - except Exception as e: - error_str = str(e).lower() - if "timeout" in error_str: - return json.dumps({ - "output": "", - "exit_code": 124, - "error": f"Command timed out after {effective_timeout} seconds" - }, ensure_ascii=False) - - # Retry on transient errors - if retry_count < max_retries: - retry_count += 1 - wait_time = 2 ** retry_count - print(f"โš ๏ธ Terminal: execution error, retrying in {wait_time}s (attempt {retry_count}/{max_retries})") - time.sleep(wait_time) - continue - - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Command execution failed: {str(e)}" - }, ensure_ascii=False) - - # Got a result - break - - # Extract output - output = result.get("output", "") - returncode = result.get("returncode", 0) - - # Truncate output if too long - MAX_OUTPUT_CHARS = 50000 - if len(output) > MAX_OUTPUT_CHARS: - truncated_notice = f"\n\n... [OUTPUT TRUNCATED - showing last {MAX_OUTPUT_CHARS} chars of {len(output)} total] ..." - output = truncated_notice + output[-MAX_OUTPUT_CHARS:] - - return json.dumps({ - "output": output.strip() if output else "", - "exit_code": returncode, - "error": None - }, ensure_ascii=False) - - except Exception as e: - return json.dumps({ - "output": "", - "exit_code": -1, - "error": f"Failed to execute command: {str(e)}", - "status": "error" - }, ensure_ascii=False) - - -def check_terminal_requirements() -> bool: - """Check if all requirements for the terminal tool are met.""" - config = _get_env_config() - env_type = config["env_type"] - - try: - if env_type == "local": - from minisweagent.environments.local import LocalEnvironment - return True - elif env_type == "docker": - from minisweagent.environments.docker import DockerEnvironment - # Check if docker is available - import subprocess - result = subprocess.run(["docker", "version"], capture_output=True, timeout=5) - return result.returncode == 0 - elif env_type == "modal": - from minisweagent.environments.extra.swerex_modal import SwerexModalEnvironment - # Check for modal token - return os.getenv("MODAL_TOKEN_ID") is not None or Path.home().joinpath(".modal.toml").exists() - else: - return False - except Exception as e: - print(f"Terminal requirements check failed: {e}") - return False - - -# ============================================================================= -# Atropos-Agent Tool wrapper (Hermes compatibility) -# ============================================================================= - -from .base import Tool, ToolResult, ToolSchema # noqa: E402 -from .basic_tools import BashTool # noqa: E402 - - -class TerminalTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="terminal", - description=TERMINAL_TOOL_DESCRIPTION, - parameters={ - "command": {"type": "string", "description": "The command to execute"}, - "background": { - "type": "boolean", - "description": "Run the command in the background (not supported in sandbox).", - "default": False, - }, - "timeout": { - "type": "integer", - "description": "Command timeout in seconds (optional).", - "minimum": 1, - }, - }, - required=["command"], - ) - - def is_available(self) -> tuple[bool, str | None]: - # The canonical terminal tool in atropos-agent is sandboxed; availability is governed - # by whether the environment includes this tool and its own network policy. - return True, None - - async def execute( - self, - command: str, - background: bool = False, - timeout: Optional[int] = None, - ) -> ToolResult: - if background: - return ToolResult(success=False, error="background execution is not supported in sandboxed terminal") - bash = BashTool(timeout=float(timeout) if timeout else 30.0) - return await bash.execute(command=command) - - -if __name__ == "__main__": - """Simple test when run directly.""" - print("Terminal Tool Module (mini-swe-agent backend)") - print("=" * 50) - - config = _get_env_config() - print(f"\nCurrent Configuration:") - print(f" Environment type: {config['env_type']}") - print(f" Docker image: {config['docker_image']}") - print(f" Modal image: {config['modal_image']}") - print(f" Working directory: {config['cwd']}") - print(f" Default timeout: {config['timeout']}s") - print(f" Lifetime: {config['lifetime_seconds']}s") - - if not check_terminal_requirements(): - print("\nโŒ Requirements not met. Please check the messages above.") - exit(1) - - print("\nโœ… All requirements met!") - print("\nAvailable Tool:") - print(" - terminal_tool: Execute commands using mini-swe-agent environments") - - print("\nUsage Examples:") - print(" # Execute a command") - print(" result = terminal_tool(command='ls -la')") - print(" ") - print(" # Run a background task") - print(" result = terminal_tool(command='python server.py', background=True)") - - print("\nEnvironment Variables:") - print(f" TERMINAL_ENV: {os.getenv('TERMINAL_ENV', 'local')} (local/docker/modal)") - print(f" TERMINAL_DOCKER_IMAGE: {os.getenv('TERMINAL_DOCKER_IMAGE', 'python:3.11-slim')}") - print(f" TERMINAL_MODAL_IMAGE: {os.getenv('TERMINAL_MODAL_IMAGE', 'python:3.11-slim')}") - print(f" TERMINAL_CWD: {os.getenv('TERMINAL_CWD', '/tmp')}") - print(f" TERMINAL_TIMEOUT: {os.getenv('TERMINAL_TIMEOUT', '60')}") - print(f" TERMINAL_LIFETIME_SECONDS: {os.getenv('TERMINAL_LIFETIME_SECONDS', '300')}") diff --git a/atropos/tools/toolset_distributions.py b/atropos/tools/toolset_distributions.py deleted file mode 100644 index 885649e1ab..0000000000 --- a/atropos/tools/toolset_distributions.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -Toolset distributions (Hermes-Agent inspired). - -Distributions are optional helpers for data generation runs where you want -probabilistic inclusion of toolsets per trajectory/item. -""" - -from __future__ import annotations - -import random -from typing import Dict, List, Optional, TypedDict - -from .toolsets import validate_toolset - - -class DistributionDef(TypedDict): - description: str - toolsets: Dict[str, int] - - -DISTRIBUTIONS: Dict[str, DistributionDef] = { - "default": { - "description": "All common sandbox tools.", - "toolsets": {"sandbox": 100}, - }, - "code_agent_plus_image": { - "description": "Sandbox tools with optional image generation.", - "toolsets": {"sandbox": 100, "image_gen": 30}, - }, - "sandbox_only": { - "description": "Only sandbox tools (terminal + filesystem).", - "toolsets": {"sandbox": 100}, - }, -} - - -def get_distribution(name: str) -> Optional[DistributionDef]: - return DISTRIBUTIONS.get(name) - - -def list_distributions() -> Dict[str, DistributionDef]: - return DISTRIBUTIONS.copy() - - -def validate_distribution(name: str) -> bool: - return name in DISTRIBUTIONS - - -def sample_toolsets_from_distribution(distribution_name: str) -> List[str]: - dist = get_distribution(distribution_name) - if not dist: - raise ValueError(f"Unknown distribution: {distribution_name}") - - selected: List[str] = [] - for toolset_name, probability in dist["toolsets"].items(): - if not validate_toolset(toolset_name): - continue - if random.random() * 100 < probability: - selected.append(toolset_name) - - # Ensure at least one toolset if the distribution isn't empty. - if not selected and dist["toolsets"]: - highest = max(dist["toolsets"].items(), key=lambda x: x[1])[0] - if validate_toolset(highest): - selected.append(highest) - - return selected - diff --git a/atropos/tools/toolset_resolver.py b/atropos/tools/toolset_resolver.py new file mode 100644 index 0000000000..9a450cc2ca --- /dev/null +++ b/atropos/tools/toolset_resolver.py @@ -0,0 +1,88 @@ +""" +Toolset resolution for Hermes-Agent Atropos integration. + +We primarily reuse Hermes-Agent toolsets (`toolsets.py`), but Atropos training/envs +need a few extra sandbox-oriented toolsets that Hermes doesn't expose by default +(e.g. filesystem + stateful terminal). +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Set + +import toolsets as hermes_toolsets + + +ATROPOS_TOOLSETS: Dict[str, Dict[str, Any]] = { + "filesystem": { + "description": "Read/write files in the sandbox workspace.", + "tools": ["read_file", "write_file"], + "includes": [], + }, + "terminal_stateful": { + "description": "Stateful terminal execution (tmux/TUI support) inside the sandbox.", + "tools": ["terminal_stateful", "tmux"], + "includes": [], + }, + "sandbox": { + "description": "Sandbox tools (terminal + filesystem).", + "tools": [], + "includes": ["terminal", "filesystem"], + }, + "default": { + "description": "Default toolset for Atropos AgentEnv tasks.", + "tools": [], + "includes": ["sandbox"], + }, + "full": { + "description": "All Hermes tools plus Atropos sandbox additions.", + "tools": [], + "includes": ["all", "filesystem", "sandbox", "terminal_stateful"], + }, +} + + +def validate_toolset(name: str) -> bool: + if name in {"all", "*"}: + return True + return hermes_toolsets.validate_toolset(name) or name in ATROPOS_TOOLSETS + + +def resolve_toolset(name: str, visited: Optional[Set[str]] = None) -> List[str]: + if visited is None: + visited = set() + + if name in {"all", "*"}: + # Union Hermes + Atropos toolsets. + all_tools: Set[str] = set() + for tname in hermes_toolsets.get_toolset_names(): + all_tools.update(resolve_toolset(tname, visited=set())) + for tname, spec in ATROPOS_TOOLSETS.items(): + # Avoid recursion: some Atropos toolsets (e.g. "full") include "all". + if tname == "full" or "all" in (spec.get("includes") or []): + continue + all_tools.update(resolve_toolset(tname, visited=set())) + return sorted(all_tools) + + if name in ATROPOS_TOOLSETS: + if name in visited: + return [] + visited.add(name) + spec = ATROPOS_TOOLSETS[name] + tools: Set[str] = set(spec.get("tools", [])) + for inc in spec.get("includes", []): + tools.update(resolve_toolset(inc, visited=set(visited))) + return sorted(tools) + + # Fall back to Hermes toolsets. + # IMPORTANT: do not pre-add `name` to `visited` here; Hermes' resolver uses + # `visited` for its own cycle detection and will treat the presence of `name` + # as a circular dependency. + return sorted(hermes_toolsets.resolve_toolset(name, visited=set(visited))) + + +def resolve_multiple_toolsets(names: List[str]) -> List[str]: + tools: Set[str] = set() + for name in names: + tools.update(resolve_toolset(name, visited=set())) + return sorted(tools) diff --git a/atropos/tools/toolsets.py b/atropos/tools/toolsets.py deleted file mode 100644 index ddd4df0586..0000000000 --- a/atropos/tools/toolsets.py +++ /dev/null @@ -1,157 +0,0 @@ -""" -Toolsets (Hermes-Agent inspired). - -Toolsets are named groups of tools with optional composition (includes). -They are used to decide which tools are advertised to the model and/or enabled -for a particular environment run. - -This module is intentionally lightweight and dependency-free. -""" - -from __future__ import annotations - -from typing import Any, Dict, List, Optional, Set, TypedDict - - -class ToolsetDef(TypedDict): - description: str - tools: List[str] - includes: List[str] - - -TOOLSETS: Dict[str, ToolsetDef] = { - # Primitive building blocks - "filesystem": { - "description": "Read/write files in the current workspace.", - "tools": ["read_file", "write_file"], - "includes": [], - }, - "terminal": { - "description": "Terminal/command execution tools.", - # Prefer `terminal` for Hermes compatibility; keep `bash` as a legacy alias. - "tools": ["terminal", "bash"], - "includes": [], - }, - "terminal_stateful": { - "description": "Stateful terminal execution (enables persistent background processes like tmux).", - "tools": ["terminal_stateful", "tmux"], - "includes": [], - }, - "sandbox": { - "description": "Standard sandbox tools (terminal + filesystem).", - "tools": [], - "includes": ["terminal", "filesystem"], - }, - # External tools (executed via ToolServer) - "web": { - "description": "Web research and content extraction tools (external).", - "tools": ["web_search", "web_extract", "web_crawl"], - "includes": [], - }, - "vision": { - "description": "Vision/image analysis tools (external).", - "tools": ["vision_analyze"], - "includes": [], - }, - "image_gen": { - "description": "Image generation tools (external).", - "tools": ["image_generate"], - "includes": [], - }, - "moa": { - "description": "Advanced reasoning tools (Mixture-of-Agents, external).", - "tools": ["mixture_of_agents"], - "includes": [], - }, - # Convenience presets - "default": { - "description": "Default toolset for code-agent tasks.", - "tools": [], - "includes": ["sandbox"], - }, - "debugging": { - "description": "Debugging toolkit (terminal + web).", - "tools": [], - "includes": ["sandbox", "web"], - }, - "research": { - "description": "Research toolkit (web + vision + reasoning).", - "tools": [], - "includes": ["web", "vision", "moa"], - }, - "safe": { - "description": "Safe toolkit without terminal access.", - "tools": [], - "includes": ["web", "vision", "image_gen", "moa"], - }, - "full": { - "description": "All common tools (sandbox + external).", - "tools": [], - "includes": ["sandbox", "web", "vision", "image_gen", "moa"], - }, -} - - -def get_toolset(name: str) -> Optional[ToolsetDef]: - return TOOLSETS.get(name) - - -def get_toolset_names() -> List[str]: - return list(TOOLSETS.keys()) - - -def validate_toolset(name: str) -> bool: - return name in {"all", "*"} or name in TOOLSETS - - -def resolve_toolset(name: str, visited: Optional[Set[str]] = None) -> List[str]: - """ - Recursively resolve a toolset to a list of tool names. - - Includes are expanded depth-first with cycle protection. - """ - if visited is None: - visited = set() - - if name in {"all", "*"}: - all_tools: Set[str] = set() - for toolset_name in get_toolset_names(): - all_tools.update(resolve_toolset(toolset_name, visited=set())) - return sorted(all_tools) - - if name in visited: - # Cycle: return empty to avoid infinite recursion. - return [] - - visited.add(name) - toolset = TOOLSETS.get(name) - if toolset is None: - return [] - - tools: Set[str] = set(toolset.get("tools", [])) - for included in toolset.get("includes", []): - tools.update(resolve_toolset(included, visited=set(visited))) - return sorted(tools) - - -def resolve_multiple_toolsets(toolset_names: List[str]) -> List[str]: - tools: Set[str] = set() - for name in toolset_names: - tools.update(resolve_toolset(name)) - return sorted(tools) - - -def get_toolset_info(name: str) -> Optional[Dict[str, Any]]: - toolset = get_toolset(name) - if toolset is None: - return None - resolved = resolve_toolset(name) - return { - "name": name, - "description": toolset["description"], - "direct_tools": toolset["tools"], - "includes": toolset["includes"], - "resolved_tools": resolved, - "tool_count": len(resolved), - "is_composite": bool(toolset["includes"]), - } diff --git a/atropos/tools/vision_tools.py b/atropos/tools/vision_tools.py deleted file mode 100644 index 1adc0667a8..0000000000 --- a/atropos/tools/vision_tools.py +++ /dev/null @@ -1,553 +0,0 @@ -#!/usr/bin/env python3 -""" -Vision Tools Module - -This module provides vision analysis tools that work with image URLs. -Uses Gemini 3 Flash Preview via OpenRouter API for intelligent image understanding. - -Available tools: -- vision_analyze_tool: Analyze images from URLs with custom prompts - -Features: -- Downloads images from URLs and converts to base64 for API compatibility -- Comprehensive image description -- Context-aware analysis based on user queries -- Automatic temporary file cleanup -- Proper error handling and validation -- Debug logging support - -Usage: - from vision_tools import vision_analyze_tool - import asyncio - - # Analyze an image - result = await vision_analyze_tool( - image_url="https://example.com/image.jpg", - user_prompt="What architectural style is this building?" - ) -""" - -import json -import os -import asyncio -import uuid -import datetime -import base64 -from pathlib import Path -from typing import Dict, Any, Optional -from openai import AsyncOpenAI -import httpx # Use httpx for async HTTP requests - -# Initialize OpenRouter API client for vision processing -openrouter_client = AsyncOpenAI( - api_key=os.getenv("OPENROUTER_API_KEY"), - base_url="https://openrouter.ai/api/v1" -) - -# Configuration for vision processing -DEFAULT_VISION_MODEL = "google/gemini-3-flash-preview" - -# Debug mode configuration -DEBUG_MODE = os.getenv("VISION_TOOLS_DEBUG", "false").lower() == "true" -DEBUG_SESSION_ID = str(uuid.uuid4()) -DEBUG_LOG_PATH = Path("./logs") -DEBUG_DATA = { - "session_id": DEBUG_SESSION_ID, - "start_time": datetime.datetime.now().isoformat(), - "debug_enabled": DEBUG_MODE, - "tool_calls": [] -} if DEBUG_MODE else None - -# Create logs directory if debug mode is enabled -if DEBUG_MODE: - DEBUG_LOG_PATH.mkdir(exist_ok=True) - print(f"๐Ÿ› Vision debug mode enabled - Session ID: {DEBUG_SESSION_ID}") - - -def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None: - """ - Log a debug call entry to the global debug data structure. - - Args: - tool_name (str): Name of the tool being called - call_data (Dict[str, Any]): Data about the call including parameters and results - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - call_entry = { - "timestamp": datetime.datetime.now().isoformat(), - "tool_name": tool_name, - **call_data - } - - DEBUG_DATA["tool_calls"].append(call_entry) - - -def _save_debug_log() -> None: - """ - Save the current debug data to a JSON file in the logs directory. - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - try: - debug_filename = f"vision_tools_debug_{DEBUG_SESSION_ID}.json" - debug_filepath = DEBUG_LOG_PATH / debug_filename - - # Update end time - DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat() - DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"]) - - with open(debug_filepath, 'w', encoding='utf-8') as f: - json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False) - - print(f"๐Ÿ› Vision debug log saved: {debug_filepath}") - - except Exception as e: - print(f"โŒ Error saving vision debug log: {str(e)}") - - -def _validate_image_url(url: str) -> bool: - """ - Basic validation of image URL format. - - Args: - url (str): The URL to validate - - Returns: - bool: True if URL appears to be valid, False otherwise - """ - if not url or not isinstance(url, str): - return False - - # Check if it's a valid URL format - if not (url.startswith('http://') or url.startswith('https://')): - return False - - # Check for common image extensions (optional, as URLs may not have extensions) - image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'] - - return True # Allow all HTTP/HTTPS URLs for flexibility - - -async def _download_image(image_url: str, destination: Path, max_retries: int = 3) -> Path: - """ - Download an image from a URL to a local destination (async) with retry logic. - - Args: - image_url (str): The URL of the image to download - destination (Path): The path where the image should be saved - max_retries (int): Maximum number of retry attempts (default: 3) - - Returns: - Path: The path to the downloaded image - - Raises: - Exception: If download fails after all retries - """ - import asyncio - - # Create parent directories if they don't exist - destination.parent.mkdir(parents=True, exist_ok=True) - - last_error = None - for attempt in range(max_retries): - try: - # Download the image with appropriate headers using async httpx - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.get( - image_url, - headers={"User-Agent": "hermes-agent-vision/1.0"}, - ) - response.raise_for_status() - - # Save the image content - destination.write_bytes(response.content) - - return destination - except Exception as e: - last_error = e - if attempt < max_retries - 1: - wait_time = 2 ** (attempt + 1) # 2s, 4s, 8s - print(f"โš ๏ธ Image download failed (attempt {attempt + 1}/{max_retries}): {str(e)[:50]}") - print(f" Retrying in {wait_time}s...") - await asyncio.sleep(wait_time) - else: - print(f"โŒ Image download failed after {max_retries} attempts: {str(e)[:100]}") - - raise last_error - - -def _determine_mime_type(image_path: Path) -> str: - """ - Determine the MIME type of an image based on its file extension. - - Args: - image_path (Path): Path to the image file - - Returns: - str: The MIME type (defaults to image/jpeg if unknown) - """ - extension = image_path.suffix.lower() - mime_types = { - '.jpg': 'image/jpeg', - '.jpeg': 'image/jpeg', - '.png': 'image/png', - '.gif': 'image/gif', - '.bmp': 'image/bmp', - '.webp': 'image/webp', - '.svg': 'image/svg+xml' - } - return mime_types.get(extension, 'image/jpeg') - - -def _image_to_base64_data_url(image_path: Path, mime_type: Optional[str] = None) -> str: - """ - Convert an image file to a base64-encoded data URL. - - Args: - image_path (Path): Path to the image file - mime_type (Optional[str]): MIME type of the image (auto-detected if None) - - Returns: - str: Base64-encoded data URL (e.g., "data:image/jpeg;base64,...") - """ - # Read the image as bytes - data = image_path.read_bytes() - - # Encode to base64 - encoded = base64.b64encode(data).decode("ascii") - - # Determine MIME type - mime = mime_type or _determine_mime_type(image_path) - - # Create data URL - data_url = f"data:{mime};base64,{encoded}" - - return data_url - - -async def vision_analyze_tool( - image_url: str, - user_prompt: str, - model: str = DEFAULT_VISION_MODEL -) -> str: - """ - Analyze an image from a URL using vision AI. - - This tool downloads images from URLs, converts them to base64, and processes - them using Gemini 3 Flash Preview via OpenRouter API. The image is downloaded to a - temporary location and automatically cleaned up after processing. - - The user_prompt parameter is expected to be pre-formatted by the calling - function (typically model_tools.py) to include both full description - requests and specific questions. - - Args: - image_url (str): The URL of the image to analyze (must be http:// or https://) - user_prompt (str): The pre-formatted prompt for the vision model - model (str): The vision model to use (default: google/gemini-3-flash-preview) - - Returns: - str: JSON string containing the analysis results with the following structure: - { - "success": bool, - "analysis": str (defaults to error message if None) - } - - Raises: - Exception: If download fails, analysis fails, or API key is not set - - Note: - - Temporary images are stored in ./temp_vision_images/ - - Images are automatically deleted after processing - - Supports common image formats (JPEG, PNG, GIF, WebP, etc.) - """ - debug_call_data = { - "parameters": { - "image_url": image_url, - "user_prompt": user_prompt[:200] + "..." if len(user_prompt) > 200 else user_prompt, - "model": model - }, - "error": None, - "success": False, - "analysis_length": 0, - "model_used": model, - "image_size_bytes": 0 - } - - temp_image_path = None - - try: - print(f"๐Ÿ” Analyzing image from URL: {image_url[:60]}{'...' if len(image_url) > 60 else ''}", flush=True) - print(f"๐Ÿ“ User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}", flush=True) - - # Validate image URL - if not _validate_image_url(image_url): - raise ValueError("Invalid image URL format. Must start with http:// or https://") - - # Check API key availability - if not os.getenv("OPENROUTER_API_KEY"): - raise ValueError("OPENROUTER_API_KEY environment variable not set") - - # Download the image to a temporary location - print(f"โฌ‡๏ธ Downloading image from URL...", flush=True) - temp_dir = Path("./temp_vision_images") - temp_image_path = temp_dir / f"temp_image_{uuid.uuid4()}.jpg" - - await _download_image(image_url, temp_image_path) - - # Get image file size for logging - image_size_bytes = temp_image_path.stat().st_size - image_size_kb = image_size_bytes / 1024 - print(f"โœ… Image downloaded successfully ({image_size_kb:.1f} KB)", flush=True) - - # Convert image to base64 data URL - print(f"๐Ÿ”„ Converting image to base64...", flush=True) - image_data_url = _image_to_base64_data_url(temp_image_path) - # Calculate size in KB for better readability - data_size_kb = len(image_data_url) / 1024 - print(f"โœ… Image converted to base64 ({data_size_kb:.1f} KB)", flush=True) - - debug_call_data["image_size_bytes"] = image_size_bytes - - # Use the prompt as provided (model_tools.py now handles full description formatting) - comprehensive_prompt = user_prompt - - # Prepare the message with base64-encoded image - messages = [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": comprehensive_prompt - }, - { - "type": "image_url", - "image_url": { - "url": image_data_url - } - } - ] - } - ] - - print(f"๐Ÿง  Processing image with {model}...", flush=True) - - # Call the vision API with reasoning enabled - response = await openrouter_client.chat.completions.create( - model=model, - messages=messages, - temperature=0.1, # Low temperature for consistent analysis - max_tokens=2000, # Generous limit for detailed analysis - extra_body={ - "reasoning": { - "enabled": True, - "effort": "xhigh" - } - } - ) - - # Extract the analysis - analysis = response.choices[0].message.content.strip() - analysis_length = len(analysis) - - print(f"โœ… Image analysis completed ({analysis_length} characters)", flush=True) - - # Prepare successful response - result = { - "success": True, - "analysis": analysis or "There was a problem with the request and the image could not be analyzed." - } - - debug_call_data["success"] = True - debug_call_data["analysis_length"] = analysis_length - - # Log debug information - _log_debug_call("vision_analyze_tool", debug_call_data) - _save_debug_log() - - return json.dumps(result, indent=2, ensure_ascii=False) - - except Exception as e: - error_msg = f"Error analyzing image: {str(e)}" - print(f"โŒ {error_msg}", flush=True) - - # Prepare error response - result = { - "success": False, - "analysis": "There was a problem with the request and the image could not be analyzed." - } - - debug_call_data["error"] = error_msg - _log_debug_call("vision_analyze_tool", debug_call_data) - _save_debug_log() - - return json.dumps(result, indent=2, ensure_ascii=False) - - finally: - # Clean up temporary image file - if temp_image_path and temp_image_path.exists(): - try: - temp_image_path.unlink() - print(f"๐Ÿงน Cleaned up temporary image file", flush=True) - except Exception as cleanup_error: - print(f"โš ๏ธ Warning: Could not delete temporary file: {cleanup_error}", flush=True) - - -def check_openrouter_api_key() -> bool: - """ - Check if the OpenRouter API key is available in environment variables. - - Returns: - bool: True if API key is set, False otherwise - """ - return bool(os.getenv("OPENROUTER_API_KEY")) - - -def check_vision_requirements() -> bool: - """ - Check if all requirements for vision tools are met. - - Returns: - bool: True if requirements are met, False otherwise - """ - return check_openrouter_api_key() - - -def get_debug_session_info() -> Dict[str, Any]: - """ - Get information about the current debug session. - - Returns: - Dict[str, Any]: Dictionary containing debug session information - """ - if not DEBUG_MODE or not DEBUG_DATA: - return { - "enabled": False, - "session_id": None, - "log_path": None, - "total_calls": 0 - } - - return { - "enabled": True, - "session_id": DEBUG_SESSION_ID, - "log_path": str(DEBUG_LOG_PATH / f"vision_tools_debug_{DEBUG_SESSION_ID}.json"), - "total_calls": len(DEBUG_DATA["tool_calls"]) - } - - -# ============================================================================= -# Atropos-Agent Tool wrapper (Hermes compatibility) -# ============================================================================= - -from .base import Tool, ToolResult, ToolSchema # noqa: E402 - - -def _tool_result_from_json(output: str) -> ToolResult: - try: - data = json.loads(output) - except Exception: - return ToolResult(success=True, output=output) - - if isinstance(data, dict) and "success" in data: - ok = bool(data.get("success")) - if not ok: - err = data.get("error") or "Vision analysis failed" - return ToolResult(success=False, output=output, error=str(err)) - return ToolResult(success=True, output=output) - - return ToolResult(success=True, output=output) - - -class VisionAnalyzeTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="vision_analyze", - description=( - "Analyze images from URLs using AI vision. Provides a comprehensive description and answers a question." - ), - parameters={ - "image_url": { - "type": "string", - "description": "The URL of the image to analyze (must be publicly accessible HTTP/HTTPS URL)", - }, - "question": { - "type": "string", - "description": "A specific question about the image content to answer.", - }, - }, - required=["image_url", "question"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - if not os.getenv("OPENROUTER_API_KEY"): - return False, "OPENROUTER_API_KEY not set" - return True, None - - async def execute(self, image_url: str, question: str) -> ToolResult: - full_prompt = ( - "Fully describe and explain everything about this image, then answer the following question:\n\n" - f"{question}" - ) - output = await vision_analyze_tool(image_url=image_url, user_prompt=full_prompt, model=DEFAULT_VISION_MODEL) - return _tool_result_from_json(output) - - -if __name__ == "__main__": - """ - Simple test/demo when run directly - """ - print("๐Ÿ‘๏ธ Vision Tools Module") - print("=" * 40) - - # Check if API key is available - api_available = check_openrouter_api_key() - - if not api_available: - print("โŒ OPENROUTER_API_KEY environment variable not set") - print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'") - print("Get API key at: https://openrouter.ai/") - exit(1) - else: - print("โœ… OpenRouter API key found") - - print("๐Ÿ› ๏ธ Vision tools ready for use!") - print(f"๐Ÿง  Using model: {DEFAULT_VISION_MODEL}") - - # Show debug mode status - if DEBUG_MODE: - print(f"๐Ÿ› Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}") - print(f" Debug logs will be saved to: ./logs/vision_tools_debug_{DEBUG_SESSION_ID}.json") - else: - print("๐Ÿ› Debug mode disabled (set VISION_TOOLS_DEBUG=true to enable)") - - print("\nBasic usage:") - print(" from vision_tools import vision_analyze_tool") - print(" import asyncio") - print("") - print(" async def main():") - print(" result = await vision_analyze_tool(") - print(" image_url='https://example.com/image.jpg',") - print(" user_prompt='What do you see in this image?'") - print(" )") - print(" print(result)") - print(" asyncio.run(main())") - - print("\nExample prompts:") - print(" - 'What architectural style is this building?'") - print(" - 'Describe the emotions and mood in this image'") - print(" - 'What text can you read in this image?'") - print(" - 'Identify any safety hazards visible'") - print(" - 'What products or brands are shown?'") - - print("\nDebug mode:") - print(" # Enable debug logging") - print(" export VISION_TOOLS_DEBUG=true") - print(" # Debug logs capture all vision analysis calls and results") - print(" # Logs saved to: ./logs/vision_tools_debug_UUID.json") diff --git a/atropos/tools/web_tools.py b/atropos/tools/web_tools.py deleted file mode 100644 index f0e253afce..0000000000 --- a/atropos/tools/web_tools.py +++ /dev/null @@ -1,1417 +0,0 @@ -#!/usr/bin/env python3 -""" -Standalone Web Tools Module - -This module provides generic web tools that work with multiple backend providers. -Currently uses Firecrawl as the backend, and the interface makes it easy to swap -providers without changing the function signatures. - -Available tools: -- web_search_tool: Search the web for information -- web_extract_tool: Extract content from specific web pages -- web_crawl_tool: Crawl websites with specific instructions - -Backend compatibility: -- Firecrawl: https://docs.firecrawl.dev/introduction - -LLM Processing: -- Uses OpenRouter API with Gemini 3 Flash Preview for intelligent content extraction -- Extracts key excerpts and creates markdown summaries to reduce token usage - -Debug Mode: -- Set WEB_TOOLS_DEBUG=true to enable detailed logging -- Creates web_tools_debug_UUID.json in ./logs directory -- Captures all tool calls, results, and compression metrics - -Usage: - from web_tools import web_search_tool, web_extract_tool, web_crawl_tool - - # Search the web - results = web_search_tool("Python machine learning libraries", limit=3) - - # Extract content from URLs - content = web_extract_tool(["https://example.com"], format="markdown") - - # Crawl a website - crawl_data = web_crawl_tool("example.com", "Find contact information") -""" - -#TODO: Search Capabilities over the scraped pages -#TODO: Store the pages in something -#TODO: Tool to see what pages are available/saved to search over - -import json -import os -import re -import asyncio -import uuid -import datetime -from pathlib import Path -from typing import List, Dict, Any, Optional -from openai import AsyncOpenAI - -try: - from firecrawl import Firecrawl # type: ignore -except Exception: # pragma: no cover - optional dependency - Firecrawl = None # type: ignore[assignment] - -_firecrawl_client = None -_summarizer_client: Optional[AsyncOpenAI] = None - - -def _get_firecrawl_client(): - global _firecrawl_client - - if _firecrawl_client is not None: - return _firecrawl_client - - if Firecrawl is None: - raise RuntimeError("firecrawl package not installed") - - api_key = os.getenv("FIRECRAWL_API_KEY") - if not api_key: - raise RuntimeError("FIRECRAWL_API_KEY environment variable not set") - - _firecrawl_client = Firecrawl(api_key=api_key) - return _firecrawl_client - - -def _get_summarizer_client() -> AsyncOpenAI: - global _summarizer_client - - if _summarizer_client is not None: - return _summarizer_client - - _summarizer_client = AsyncOpenAI( - api_key=os.getenv("OPENROUTER_API_KEY"), - base_url="https://openrouter.ai/api/v1", - ) - return _summarizer_client - -# Configuration for LLM processing -DEFAULT_SUMMARIZER_MODEL = "google/gemini-3-flash-preview" -DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000 - -# Debug mode configuration -DEBUG_MODE = os.getenv("WEB_TOOLS_DEBUG", "false").lower() == "true" -DEBUG_SESSION_ID = str(uuid.uuid4()) -DEBUG_LOG_PATH = Path("./logs") -DEBUG_DATA = { - "session_id": DEBUG_SESSION_ID, - "start_time": datetime.datetime.now().isoformat(), - "debug_enabled": DEBUG_MODE, - "tool_calls": [] -} if DEBUG_MODE else None - -# Create logs directory if debug mode is enabled -if DEBUG_MODE: - DEBUG_LOG_PATH.mkdir(exist_ok=True) - print(f"๐Ÿ› Debug mode enabled - Session ID: {DEBUG_SESSION_ID}") - - -def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None: - """ - Log a debug call entry to the global debug data structure. - - Args: - tool_name (str): Name of the tool being called - call_data (Dict[str, Any]): Data about the call including parameters and results - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - call_entry = { - "timestamp": datetime.datetime.now().isoformat(), - "tool_name": tool_name, - **call_data - } - - DEBUG_DATA["tool_calls"].append(call_entry) - - -def _save_debug_log() -> None: - """ - Save the current debug data to a JSON file in the logs directory. - """ - if not DEBUG_MODE or not DEBUG_DATA: - return - - try: - debug_filename = f"web_tools_debug_{DEBUG_SESSION_ID}.json" - debug_filepath = DEBUG_LOG_PATH / debug_filename - - # Update end time - DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat() - DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"]) - - with open(debug_filepath, 'w', encoding='utf-8') as f: - json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False) - - print(f"๐Ÿ› Debug log saved: {debug_filepath}") - - except Exception as e: - print(f"โŒ Error saving debug log: {str(e)}") - - -async def process_content_with_llm( - content: str, - url: str = "", - title: str = "", - model: str = DEFAULT_SUMMARIZER_MODEL, - min_length: int = DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION -) -> Optional[str]: - """ - Process web content using LLM to create intelligent summaries with key excerpts. - - This function uses Gemini 3 Flash Preview (or specified model) via OpenRouter API - to intelligently extract key information and create markdown summaries, - significantly reducing token usage while preserving all important information. - - For very large content (>500k chars), uses chunked processing with synthesis. - For extremely large content (>2M chars), refuses to process entirely. - - Args: - content (str): The raw content to process - url (str): The source URL (for context, optional) - title (str): The page title (for context, optional) - model (str): The model to use for processing (default: google/gemini-3-flash-preview) - min_length (int): Minimum content length to trigger processing (default: 5000) - - Returns: - Optional[str]: Processed markdown content, or None if content too short or processing fails - """ - # Size thresholds - MAX_CONTENT_SIZE = 2_000_000 # 2M chars - refuse entirely above this - CHUNK_THRESHOLD = 500_000 # 500k chars - use chunked processing above this - CHUNK_SIZE = 100_000 # 100k chars per chunk - MAX_OUTPUT_SIZE = 5000 # Hard cap on final output size - - try: - content_len = len(content) - - # Refuse if content is absurdly large - if content_len > MAX_CONTENT_SIZE: - size_mb = content_len / 1_000_000 - print(f"๐Ÿšซ Content too large ({size_mb:.1f}MB > 2MB limit). Refusing to process.") - return f"[Content too large to process: {size_mb:.1f}MB. Try using web_crawl with specific extraction instructions, or search for a more focused source.]" - - # Skip processing if content is too short - if content_len < min_length: - print(f"๐Ÿ“ Content too short ({content_len} < {min_length} chars), skipping LLM processing") - return None - - # If we don't have an OpenRouter key, skip processing rather than failing. - if not os.getenv("OPENROUTER_API_KEY"): - print("๐Ÿ”‘ OPENROUTER_API_KEY not set; skipping LLM processing") - return None - - # Create context information - context_info = [] - if title: - context_info.append(f"Title: {title}") - if url: - context_info.append(f"Source: {url}") - context_str = "\n".join(context_info) + "\n\n" if context_info else "" - - # Check if we need chunked processing - if content_len > CHUNK_THRESHOLD: - print(f"๐Ÿ“ฆ Content large ({content_len:,} chars). Using chunked processing...") - return await _process_large_content_chunked( - content, context_str, model, CHUNK_SIZE, MAX_OUTPUT_SIZE - ) - - # Standard single-pass processing for normal content - print(f"๐Ÿง  Processing content with LLM ({content_len} characters)") - - processed_content = await _call_summarizer_llm(content, context_str, model) - - if processed_content: - # Enforce output cap - if len(processed_content) > MAX_OUTPUT_SIZE: - processed_content = processed_content[:MAX_OUTPUT_SIZE] + "\n\n[... summary truncated for context management ...]" - - # Log compression metrics - processed_length = len(processed_content) - compression_ratio = processed_length / content_len if content_len > 0 else 1.0 - print(f"โœ… Content processed: {content_len} โ†’ {processed_length} chars ({compression_ratio:.1%})") - - return processed_content - - except Exception as e: - print(f"โŒ Error processing content with LLM: {str(e)}") - return f"[Failed to process content: {str(e)[:100]}. Content size: {len(content):,} chars]" - - -async def _call_summarizer_llm( - content: str, - context_str: str, - model: str, - max_tokens: int = 4000, - is_chunk: bool = False, - chunk_info: str = "" -) -> Optional[str]: - """ - Make a single LLM call to summarize content. - - Args: - content: The content to summarize - context_str: Context information (title, URL) - model: Model to use - max_tokens: Maximum output tokens - is_chunk: Whether this is a chunk of a larger document - chunk_info: Information about chunk position (e.g., "Chunk 2/5") - - Returns: - Summarized content or None on failure - """ - if is_chunk: - # Chunk-specific prompt - aware that this is partial content - system_prompt = """You are an expert content analyst processing a SECTION of a larger document. Your job is to extract and summarize the key information from THIS SECTION ONLY. - -Important guidelines for chunk processing: -1. Do NOT write introductions or conclusions - this is a partial document -2. Focus on extracting ALL key facts, figures, data points, and insights from this section -3. Preserve important quotes, code snippets, and specific details verbatim -4. Use bullet points and structured formatting for easy synthesis later -5. Note any references to other sections (e.g., "as mentioned earlier", "see below") without trying to resolve them - -Your output will be combined with summaries of other sections, so focus on thorough extraction rather than narrative flow.""" - - user_prompt = f"""Extract key information from this SECTION of a larger document: - -{context_str}{chunk_info} - -SECTION CONTENT: -{content} - -Extract all important information from this section in a structured format. Focus on facts, data, insights, and key details. Do not add introductions or conclusions.""" - - else: - # Standard full-document prompt - system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk. - -Create a well-structured markdown summary that includes: -1. Key excerpts (quotes, code snippets, important facts) in their original format -2. Comprehensive summary of all other important information -3. Proper markdown formatting with headers, bullets, and emphasis - -Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized.""" - - user_prompt = f"""Please process this web content and create a comprehensive markdown summary: - -{context_str}CONTENT TO PROCESS: -{content} - -Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights.""" - - # Call the LLM with retry logic - max_retries = 6 - retry_delay = 2 - last_error = None - - for attempt in range(max_retries): - try: - response = await _get_summarizer_client().chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ], - temperature=0.1, - max_tokens=max_tokens, - extra_body={ - "reasoning": { - "enabled": True, - "effort": "xhigh" - } - } - ) - return response.choices[0].message.content.strip() - except Exception as api_error: - last_error = api_error - if attempt < max_retries - 1: - print(f"โš ๏ธ LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}") - print(f" Retrying in {retry_delay}s...") - await asyncio.sleep(retry_delay) - retry_delay = min(retry_delay * 2, 60) - else: - raise last_error - - return None - - -async def _process_large_content_chunked( - content: str, - context_str: str, - model: str, - chunk_size: int, - max_output_size: int -) -> Optional[str]: - """ - Process large content by chunking, summarizing each chunk in parallel, - then synthesizing the summaries. - - Args: - content: The large content to process - context_str: Context information - model: Model to use - chunk_size: Size of each chunk in characters - max_output_size: Maximum final output size - - Returns: - Synthesized summary or None on failure - """ - # Split content into chunks - chunks = [] - for i in range(0, len(content), chunk_size): - chunk = content[i:i + chunk_size] - chunks.append(chunk) - - print(f" ๐Ÿ“ฆ Split into {len(chunks)} chunks of ~{chunk_size:,} chars each") - - # Summarize each chunk in parallel - async def summarize_chunk(chunk_idx: int, chunk_content: str) -> tuple[int, Optional[str]]: - """Summarize a single chunk.""" - try: - chunk_info = f"[Processing chunk {chunk_idx + 1} of {len(chunks)}]" - summary = await _call_summarizer_llm( - chunk_content, - context_str, - model, - max_tokens=2000, - is_chunk=True, - chunk_info=chunk_info - ) - if summary: - print(f" โœ… Chunk {chunk_idx + 1}/{len(chunks)} summarized: {len(chunk_content):,} โ†’ {len(summary):,} chars") - return chunk_idx, summary - except Exception as e: - print(f" โš ๏ธ Chunk {chunk_idx + 1}/{len(chunks)} failed: {str(e)[:50]}") - return chunk_idx, None - - # Run all chunk summarizations in parallel - tasks = [summarize_chunk(i, chunk) for i, chunk in enumerate(chunks)] - results = await asyncio.gather(*tasks) - - # Collect successful summaries in order - summaries = [] - for chunk_idx, summary in sorted(results, key=lambda x: x[0]): - if summary: - summaries.append(f"## Section {chunk_idx + 1}\n{summary}") - - if not summaries: - print(f" โŒ All chunk summarizations failed") - return "[Failed to process large content: all chunk summarizations failed]" - - print(f" ๐Ÿ“Š Got {len(summaries)}/{len(chunks)} chunk summaries") - - # If only one chunk succeeded, just return it (with cap) - if len(summaries) == 1: - result = summaries[0] - if len(result) > max_output_size: - result = result[:max_output_size] + "\n\n[... truncated ...]" - return result - - # Synthesize the summaries into a final summary - print(f" ๐Ÿ”— Synthesizing {len(summaries)} summaries...") - - combined_summaries = "\n\n---\n\n".join(summaries) - - synthesis_prompt = f"""You have been given summaries of different sections of a large document. -Synthesize these into ONE cohesive, comprehensive summary that: -1. Removes redundancy between sections -2. Preserves all key facts, figures, and actionable information -3. Is well-organized with clear structure -4. Is under {max_output_size} characters - -{context_str}SECTION SUMMARIES: -{combined_summaries} - -Create a single, unified markdown summary.""" - - try: - response = await _get_summarizer_client().chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": "You synthesize multiple summaries into one cohesive, comprehensive summary. Be thorough but concise."}, - {"role": "user", "content": synthesis_prompt} - ], - temperature=0.1, - max_tokens=4000, - extra_body={ - "reasoning": { - "enabled": True, - "effort": "xhigh" - } - } - ) - final_summary = response.choices[0].message.content.strip() - - # Enforce hard cap - if len(final_summary) > max_output_size: - final_summary = final_summary[:max_output_size] + "\n\n[... summary truncated for context management ...]" - - original_len = len(content) - final_len = len(final_summary) - compression = final_len / original_len if original_len > 0 else 1.0 - - print(f" โœ… Synthesis complete: {original_len:,} โ†’ {final_len:,} chars ({compression:.2%})") - return final_summary - - except Exception as e: - print(f" โš ๏ธ Synthesis failed: {str(e)[:100]}") - # Fall back to concatenated summaries with truncation - fallback = "\n\n".join(summaries) - if len(fallback) > max_output_size: - fallback = fallback[:max_output_size] + "\n\n[... truncated due to synthesis failure ...]" - return fallback - - -def clean_base64_images(text: str) -> str: - """ - Remove base64 encoded images from text to reduce token count and clutter. - - This function finds and removes base64 encoded images in various formats: - - (data:image/png;base64,...) - - (data:image/jpeg;base64,...) - - (data:image/svg+xml;base64,...) - - data:image/[type];base64,... (without parentheses) - - Args: - text: The text content to clean - - Returns: - Cleaned text with base64 images replaced with placeholders - """ - # Pattern to match base64 encoded images wrapped in parentheses - # Matches: (data:image/[type];base64,[base64-string]) - base64_with_parens_pattern = r'\(data:image/[^;]+;base64,[A-Za-z0-9+/=]+\)' - - # Pattern to match base64 encoded images without parentheses - # Matches: data:image/[type];base64,[base64-string] - base64_pattern = r'data:image/[^;]+;base64,[A-Za-z0-9+/=]+' - - # Replace parentheses-wrapped images first - cleaned_text = re.sub(base64_with_parens_pattern, '[BASE64_IMAGE_REMOVED]', text) - - # Then replace any remaining non-parentheses images - cleaned_text = re.sub(base64_pattern, '[BASE64_IMAGE_REMOVED]', cleaned_text) - - return cleaned_text - - -def web_search_tool(query: str, limit: int = 5) -> str: - """ - Search the web for information using available search API backend. - - This function provides a generic interface for web search that can work - with multiple backends. Currently uses Firecrawl. - - Note: This function returns search result metadata only (URLs, titles, descriptions). - Use web_extract_tool to get full content from specific URLs. - - Args: - query (str): The search query to look up - limit (int): Maximum number of results to return (default: 5) - - Returns: - str: JSON string containing search results with the following structure: - { - "success": bool, - "data": { - "web": [ - { - "title": str, - "url": str, - "description": str, - "position": int - }, - ... - ] - } - } - - Raises: - Exception: If search fails or API key is not set - """ - debug_call_data = { - "parameters": { - "query": query, - "limit": limit - }, - "error": None, - "results_count": 0, - "original_response_size": 0, - "final_response_size": 0 - } - - try: - print(f"๐Ÿ” Searching the web for: '{query}' (limit: {limit})") - - # Use Firecrawl's v2 search functionality WITHOUT scraping - # We only want search result metadata, not scraped content - # Docs: https://docs.firecrawl.dev/features/search - response = _get_firecrawl_client().search( - query=query, - limit=limit - ) - - # The response is a SearchData object with web, news, and images attributes - # When not scraping, the results are directly in these attributes - web_results = [] - - # Check if response has web attribute (SearchData object) - if hasattr(response, 'web'): - # Response is a SearchData object with web attribute - if response.web: - # Convert each SearchResultWeb object to dict - for result in response.web: - if hasattr(result, 'model_dump'): - # Pydantic model - use model_dump - web_results.append(result.model_dump()) - elif hasattr(result, '__dict__'): - # Regular object - use __dict__ - web_results.append(result.__dict__) - elif isinstance(result, dict): - # Already a dict - web_results.append(result) - elif hasattr(response, 'model_dump'): - # Response has model_dump method - use it to get dict - response_dict = response.model_dump() - if 'web' in response_dict and response_dict['web']: - web_results = response_dict['web'] - elif isinstance(response, dict): - # Response is already a dictionary - if 'web' in response and response['web']: - web_results = response['web'] - - results_count = len(web_results) - print(f"โœ… Found {results_count} search results") - - # Build response with just search metadata (URLs, titles, descriptions) - response_data = { - "success": True, - "data": { - "web": web_results - } - } - - # Capture debug information - debug_call_data["results_count"] = results_count - - # Convert to JSON - result_json = json.dumps(response_data, indent=2, ensure_ascii=False) - - debug_call_data["final_response_size"] = len(result_json) - - # Log debug information - _log_debug_call("web_search_tool", debug_call_data) - _save_debug_log() - - return result_json - - except Exception as e: - error_msg = f"Error searching web: {str(e)}" - print(f"โŒ {error_msg}") - - debug_call_data["error"] = error_msg - _log_debug_call("web_search_tool", debug_call_data) - _save_debug_log() - - return json.dumps({"error": error_msg}, ensure_ascii=False) - - -async def web_extract_tool( - urls: List[str], - format: str = None, - use_llm_processing: bool = True, - model: str = DEFAULT_SUMMARIZER_MODEL, - min_length: int = DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION -) -> str: - """ - Extract content from specific web pages using available extraction API backend. - - This function provides a generic interface for web content extraction that - can work with multiple backends. Currently uses Firecrawl. - - Args: - urls (List[str]): List of URLs to extract content from - format (str): Desired output format ("markdown" or "html", optional) - use_llm_processing (bool): Whether to process content with LLM for summarization (default: True) - model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview) - min_length (int): Minimum content length to trigger LLM processing (default: 5000) - - Returns: - str: JSON string containing extracted content. If LLM processing is enabled and successful, - the 'content' field will contain the processed markdown summary instead of raw content. - - Raises: - Exception: If extraction fails or API key is not set - """ - debug_call_data = { - "parameters": { - "urls": urls, - "format": format, - "use_llm_processing": use_llm_processing, - "model": model, - "min_length": min_length - }, - "error": None, - "pages_extracted": 0, - "pages_processed_with_llm": 0, - "original_response_size": 0, - "final_response_size": 0, - "compression_metrics": [], - "processing_applied": [] - } - - try: - print(f"๐Ÿ“„ Extracting content from {len(urls)} URL(s)") - - # Determine requested formats for Firecrawl v2 - formats: List[str] = [] - if format == "markdown": - formats = ["markdown"] - elif format == "html": - formats = ["html"] - else: - # Default: request markdown for LLM-readiness and include html as backup - formats = ["markdown", "html"] - - # Always use individual scraping for simplicity and reliability - # Batch scraping adds complexity without much benefit for small numbers of URLs - results: List[Dict[str, Any]] = [] - - for url in urls: - try: - print(f" ๐Ÿ“„ Scraping: {url}") - scrape_result = _get_firecrawl_client().scrape( - url=url, - formats=formats - ) - - # Process the result - properly handle object serialization - metadata = {} - title = "" - content_markdown = None - content_html = None - - # Extract data from the scrape result - if hasattr(scrape_result, 'model_dump'): - # Pydantic model - use model_dump to get dict - result_dict = scrape_result.model_dump() - content_markdown = result_dict.get('markdown') - content_html = result_dict.get('html') - metadata = result_dict.get('metadata', {}) - elif hasattr(scrape_result, '__dict__'): - # Regular object with attributes - content_markdown = getattr(scrape_result, 'markdown', None) - content_html = getattr(scrape_result, 'html', None) - - # Handle metadata - convert to dict if it's an object - metadata_obj = getattr(scrape_result, 'metadata', {}) - if hasattr(metadata_obj, 'model_dump'): - metadata = metadata_obj.model_dump() - elif hasattr(metadata_obj, '__dict__'): - metadata = metadata_obj.__dict__ - elif isinstance(metadata_obj, dict): - metadata = metadata_obj - else: - metadata = {} - elif isinstance(scrape_result, dict): - # Already a dictionary - content_markdown = scrape_result.get('markdown') - content_html = scrape_result.get('html') - metadata = scrape_result.get('metadata', {}) - - # Ensure metadata is a dict (not an object) - if not isinstance(metadata, dict): - if hasattr(metadata, 'model_dump'): - metadata = metadata.model_dump() - elif hasattr(metadata, '__dict__'): - metadata = metadata.__dict__ - else: - metadata = {} - - # Get title from metadata - title = metadata.get("title", "") - - # Choose content based on requested format - chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or "" - - results.append({ - "url": metadata.get("sourceURL", url), - "title": title, - "content": chosen_content, - "raw_content": chosen_content, - "metadata": metadata # Now guaranteed to be a dict - }) - - except Exception as scrape_err: - print(f" โŒ Error scraping {url}: {str(scrape_err)}") - results.append({ - "url": url, - "title": "", - "content": "", - "raw_content": "", - "error": str(scrape_err) - }) - - response = {"results": results} - - pages_extracted = len(response.get('results', [])) - print(f"โœ… Extracted content from {pages_extracted} pages") - - debug_call_data["pages_extracted"] = pages_extracted - debug_call_data["original_response_size"] = len(json.dumps(response)) - - # Process each result with LLM if enabled - if use_llm_processing and os.getenv("OPENROUTER_API_KEY"): - print("๐Ÿง  Processing extracted content with LLM (parallel)...") - debug_call_data["processing_applied"].append("llm_processing") - - # Prepare tasks for parallel processing - async def process_single_result(result): - """Process a single result with LLM and return updated result with metrics.""" - url = result.get('url', 'Unknown URL') - title = result.get('title', '') - raw_content = result.get('raw_content', '') or result.get('content', '') - - if not raw_content: - return result, None, "no_content" - - original_size = len(raw_content) - - # Process content with LLM - processed = await process_content_with_llm( - raw_content, url, title, model, min_length - ) - - if processed: - processed_size = len(processed) - compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - - # Update result with processed content - result['content'] = processed - result['raw_content'] = raw_content - - metrics = { - "url": url, - "original_size": original_size, - "processed_size": processed_size, - "compression_ratio": compression_ratio, - "model_used": model - } - return result, metrics, "processed" - else: - metrics = { - "url": url, - "original_size": original_size, - "processed_size": original_size, - "compression_ratio": 1.0, - "model_used": None, - "reason": "content_too_short" - } - return result, metrics, "too_short" - - # Run all LLM processing in parallel - results_list = response.get('results', []) - tasks = [process_single_result(result) for result in results_list] - processed_results = await asyncio.gather(*tasks) - - # Collect metrics and print results - for result, metrics, status in processed_results: - url = result.get('url', 'Unknown URL') - if status == "processed": - debug_call_data["compression_metrics"].append(metrics) - debug_call_data["pages_processed_with_llm"] += 1 - print(f" ๐Ÿ“ {url} (processed)") - elif status == "too_short": - debug_call_data["compression_metrics"].append(metrics) - print(f" ๐Ÿ“ {url} (no processing - content too short)") - else: - print(f" โš ๏ธ {url} (no content to process)") - else: - if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"): - print("โš ๏ธ LLM processing requested but OPENROUTER_API_KEY not set, returning raw content") - debug_call_data["processing_applied"].append("llm_processing_unavailable") - - # Print summary of extracted pages for debugging (original behavior) - for result in response.get('results', []): - url = result.get('url', 'Unknown URL') - content_length = len(result.get('raw_content', '')) - print(f" ๐Ÿ“ {url} ({content_length} characters)") - - # Trim output to minimal fields per entry: title, content, error - trimmed_results = [ - { - "title": r.get("title", ""), - "content": r.get("content", ""), - "error": r.get("error"), - } - for r in response.get("results", []) - ] - trimmed_response = {"results": trimmed_results} - - if trimmed_response.get("results") == []: - result_json = json.dumps({"error": "Content was inaccessible or not found"}, ensure_ascii=False) - - cleaned_result = clean_base64_images(result_json) - - else: - result_json = json.dumps(trimmed_response, indent=2, ensure_ascii=False) - - cleaned_result = clean_base64_images(result_json) - - debug_call_data["final_response_size"] = len(cleaned_result) - debug_call_data["processing_applied"].append("base64_image_removal") - - # Log debug information - _log_debug_call("web_extract_tool", debug_call_data) - _save_debug_log() - - return cleaned_result - - except Exception as e: - error_msg = f"Error extracting content: {str(e)}" - print(f"โŒ {error_msg}") - - debug_call_data["error"] = error_msg - _log_debug_call("web_extract_tool", debug_call_data) - _save_debug_log() - - return json.dumps({"error": error_msg}, ensure_ascii=False) - - -async def web_crawl_tool( - url: str, - instructions: str = None, - depth: str = "basic", - use_llm_processing: bool = True, - model: str = DEFAULT_SUMMARIZER_MODEL, - min_length: int = DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION -) -> str: - """ - Crawl a website with specific instructions using available crawling API backend. - - This function provides a generic interface for web crawling that can work - with multiple backends. Currently uses Firecrawl. - - Args: - url (str): The base URL to crawl (can include or exclude https://) - instructions (str): Instructions for what to crawl/extract using LLM intelligence (optional) - depth (str): Depth of extraction ("basic" or "advanced", default: "basic") - use_llm_processing (bool): Whether to process content with LLM for summarization (default: True) - model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview) - min_length (int): Minimum content length to trigger LLM processing (default: 5000) - - Returns: - str: JSON string containing crawled content. If LLM processing is enabled and successful, - the 'content' field will contain the processed markdown summary instead of raw content. - Each page is processed individually. - - Raises: - Exception: If crawling fails or API key is not set - """ - debug_call_data = { - "parameters": { - "url": url, - "instructions": instructions, - "depth": depth, - "use_llm_processing": use_llm_processing, - "model": model, - "min_length": min_length - }, - "error": None, - "pages_crawled": 0, - "pages_processed_with_llm": 0, - "original_response_size": 0, - "final_response_size": 0, - "compression_metrics": [], - "processing_applied": [] - } - - try: - # Ensure URL has protocol - if not url.startswith(('http://', 'https://')): - url = f'https://{url}' - print(f" ๐Ÿ“ Added https:// prefix to URL: {url}") - - instructions_text = f" with instructions: '{instructions}'" if instructions else "" - print(f"๐Ÿ•ท๏ธ Crawling {url}{instructions_text}") - - # Use Firecrawl's v2 crawl functionality - # Docs: https://docs.firecrawl.dev/features/crawl - # The crawl() method automatically waits for completion and returns all data - - # Build crawl parameters - keep it simple - crawl_params = { - "limit": 20, # Limit number of pages to crawl - "scrape_options": { - "formats": ["markdown"] # Just markdown for simplicity - } - } - - # Note: The 'prompt' parameter is not documented for crawl - # Instructions are typically used with the Extract endpoint, not Crawl - if instructions: - print(f" โ„น๏ธ Note: Instructions parameter ignored (not supported in crawl API)") - - # Use the crawl method which waits for completion automatically - try: - crawl_result = _get_firecrawl_client().crawl( - url=url, - **crawl_params - ) - except Exception as e: - print(f" โŒ Crawl API call failed: {e}") - raise - - pages: List[Dict[str, Any]] = [] - - # Process crawl results - the crawl method returns a CrawlJob object with data attribute - data_list = [] - - # The crawl_result is a CrawlJob object with a 'data' attribute containing list of Document objects - if hasattr(crawl_result, 'data'): - data_list = crawl_result.data if crawl_result.data else [] - print(f" ๐Ÿ“Š Status: {getattr(crawl_result, 'status', 'unknown')}") - print(f" ๐Ÿ“„ Retrieved {len(data_list)} pages") - - # Debug: Check other attributes if no data - if not data_list: - print(f" ๐Ÿ” Debug - CrawlJob attributes: {[attr for attr in dir(crawl_result) if not attr.startswith('_')]}") - print(f" ๐Ÿ” Debug - Status: {getattr(crawl_result, 'status', 'N/A')}") - print(f" ๐Ÿ” Debug - Total: {getattr(crawl_result, 'total', 'N/A')}") - print(f" ๐Ÿ” Debug - Completed: {getattr(crawl_result, 'completed', 'N/A')}") - - elif isinstance(crawl_result, dict) and 'data' in crawl_result: - data_list = crawl_result.get("data", []) - else: - print(" โš ๏ธ Unexpected crawl result type") - print(f" ๐Ÿ” Debug - Result type: {type(crawl_result)}") - if hasattr(crawl_result, '__dict__'): - print(f" ๐Ÿ” Debug - Result attributes: {list(crawl_result.__dict__.keys())}") - - for item in data_list: - # Process each crawled page - properly handle object serialization - page_url = "Unknown URL" - title = "" - content_markdown = None - content_html = None - metadata = {} - - # Extract data from the item - if hasattr(item, 'model_dump'): - # Pydantic model - use model_dump to get dict - item_dict = item.model_dump() - content_markdown = item_dict.get('markdown') - content_html = item_dict.get('html') - metadata = item_dict.get('metadata', {}) - elif hasattr(item, '__dict__'): - # Regular object with attributes - content_markdown = getattr(item, 'markdown', None) - content_html = getattr(item, 'html', None) - - # Handle metadata - convert to dict if it's an object - metadata_obj = getattr(item, 'metadata', {}) - if hasattr(metadata_obj, 'model_dump'): - metadata = metadata_obj.model_dump() - elif hasattr(metadata_obj, '__dict__'): - metadata = metadata_obj.__dict__ - elif isinstance(metadata_obj, dict): - metadata = metadata_obj - else: - metadata = {} - elif isinstance(item, dict): - # Already a dictionary - content_markdown = item.get('markdown') - content_html = item.get('html') - metadata = item.get('metadata', {}) - - # Ensure metadata is a dict (not an object) - if not isinstance(metadata, dict): - if hasattr(metadata, 'model_dump'): - metadata = metadata.model_dump() - elif hasattr(metadata, '__dict__'): - metadata = metadata.__dict__ - else: - metadata = {} - - # Extract URL and title from metadata - page_url = metadata.get("sourceURL", metadata.get("url", "Unknown URL")) - title = metadata.get("title", "") - - # Choose content (prefer markdown) - content = content_markdown or content_html or "" - - pages.append({ - "url": page_url, - "title": title, - "content": content, - "raw_content": content, - "metadata": metadata # Now guaranteed to be a dict - }) - - response = {"results": pages} - - pages_crawled = len(response.get('results', [])) - print(f"โœ… Crawled {pages_crawled} pages") - - debug_call_data["pages_crawled"] = pages_crawled - debug_call_data["original_response_size"] = len(json.dumps(response)) - - # Process each result with LLM if enabled - if use_llm_processing and os.getenv("OPENROUTER_API_KEY"): - print("๐Ÿง  Processing crawled content with LLM (parallel)...") - debug_call_data["processing_applied"].append("llm_processing") - - # Prepare tasks for parallel processing - async def process_single_crawl_result(result): - """Process a single crawl result with LLM and return updated result with metrics.""" - page_url = result.get('url', 'Unknown URL') - title = result.get('title', '') - content = result.get('content', '') - - if not content: - return result, None, "no_content" - - original_size = len(content) - - # Process content with LLM - processed = await process_content_with_llm( - content, page_url, title, model, min_length - ) - - if processed: - processed_size = len(processed) - compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - - # Update result with processed content - result['raw_content'] = content - result['content'] = processed - - metrics = { - "url": page_url, - "original_size": original_size, - "processed_size": processed_size, - "compression_ratio": compression_ratio, - "model_used": model - } - return result, metrics, "processed" - else: - metrics = { - "url": page_url, - "original_size": original_size, - "processed_size": original_size, - "compression_ratio": 1.0, - "model_used": None, - "reason": "content_too_short" - } - return result, metrics, "too_short" - - # Run all LLM processing in parallel - results_list = response.get('results', []) - tasks = [process_single_crawl_result(result) for result in results_list] - processed_results = await asyncio.gather(*tasks) - - # Collect metrics and print results - for result, metrics, status in processed_results: - page_url = result.get('url', 'Unknown URL') - if status == "processed": - debug_call_data["compression_metrics"].append(metrics) - debug_call_data["pages_processed_with_llm"] += 1 - print(f" ๐ŸŒ {page_url} (processed)") - elif status == "too_short": - debug_call_data["compression_metrics"].append(metrics) - print(f" ๐ŸŒ {page_url} (no processing - content too short)") - else: - print(f" โš ๏ธ {page_url} (no content to process)") - else: - if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"): - print("โš ๏ธ LLM processing requested but OPENROUTER_API_KEY not set, returning raw content") - debug_call_data["processing_applied"].append("llm_processing_unavailable") - - # Print summary of crawled pages for debugging (original behavior) - for result in response.get('results', []): - page_url = result.get('url', 'Unknown URL') - content_length = len(result.get('content', '')) - print(f" ๐ŸŒ {page_url} ({content_length} characters)") - - # Trim output to minimal fields per entry: title, content, error - trimmed_results = [ - { - "title": r.get("title", ""), - "content": r.get("content", ""), - "error": r.get("error") - } - for r in response.get("results", []) - ] - trimmed_response = {"results": trimmed_results} - - result_json = json.dumps(trimmed_response, indent=2, ensure_ascii=False) - # Clean base64 images from crawled content - cleaned_result = clean_base64_images(result_json) - - debug_call_data["final_response_size"] = len(cleaned_result) - debug_call_data["processing_applied"].append("base64_image_removal") - - # Log debug information - _log_debug_call("web_crawl_tool", debug_call_data) - _save_debug_log() - - return cleaned_result - - except Exception as e: - error_msg = f"Error crawling website: {str(e)}" - print(f"โŒ {error_msg}") - - debug_call_data["error"] = error_msg - _log_debug_call("web_crawl_tool", debug_call_data) - _save_debug_log() - - return json.dumps({"error": error_msg}, ensure_ascii=False) - - -# Convenience function to check if API key is available -def check_firecrawl_api_key() -> bool: - """ - Check if the Firecrawl API key is available in environment variables. - - Returns: - bool: True if API key is set, False otherwise - """ - return bool(os.getenv("FIRECRAWL_API_KEY")) and Firecrawl is not None - - -def check_nous_api_key() -> bool: - """ - Check if the Nous Research API key is available in environment variables. - - Returns: - bool: True if API key is set, False otherwise - """ - return bool(os.getenv("OPENROUTER_API_KEY")) - - -def get_debug_session_info() -> Dict[str, Any]: - """ - Get information about the current debug session. - - Returns: - Dict[str, Any]: Dictionary containing debug session information: - - enabled: Whether debug mode is enabled - - session_id: Current session UUID (if enabled) - - log_path: Path where debug logs are saved (if enabled) - - total_calls: Number of tool calls logged so far (if enabled) - """ - if not DEBUG_MODE or not DEBUG_DATA: - return { - "enabled": False, - "session_id": None, - "log_path": None, - "total_calls": 0 - } - - return { - "enabled": True, - "session_id": DEBUG_SESSION_ID, - "log_path": str(DEBUG_LOG_PATH / f"web_tools_debug_{DEBUG_SESSION_ID}.json"), - "total_calls": len(DEBUG_DATA["tool_calls"]) - } - - -# ============================================================================= -# Atropos-Agent Tool wrappers (Hermes compatibility) -# ============================================================================= - -from .base import Tool, ToolResult, ToolSchema # noqa: E402 - - -def _tool_result_from_json(output: str) -> ToolResult: - try: - data = json.loads(output) - except Exception: - return ToolResult(success=True, output=output) - - if isinstance(data, dict): - if data.get("success") is False: - err = data.get("error") or data.get("message") or "Tool failed" - return ToolResult(success=False, output=output, error=str(err)) - if "error" in data and data.get("error"): - return ToolResult(success=False, output=output, error=str(data["error"])) - - return ToolResult(success=True, output=output) - - -class WebSearchTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="web_search", - description=( - "Search the web for information on any topic. Returns up to 5 relevant results with titles and URLs." - ), - parameters={ - "query": {"type": "string", "description": "The search query to look up on the web"}, - }, - required=["query"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - if Firecrawl is None: - return False, "firecrawl package not installed" - if not os.getenv("FIRECRAWL_API_KEY"): - return False, "FIRECRAWL_API_KEY not set" - return True, None - - async def execute(self, query: str) -> ToolResult: - output = web_search_tool(query, limit=5) - return _tool_result_from_json(output) - - -class WebExtractTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="web_extract", - description=( - "Extract and read the full content from specific web page URLs. Returns excerpts and key points." - ), - parameters={ - "urls": { - "type": "array", - "items": {"type": "string"}, - "description": "List of URLs to extract content from (max 5 URLs per call)", - "maxItems": 5, - }, - }, - required=["urls"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - if Firecrawl is None: - return False, "firecrawl package not installed" - if not os.getenv("FIRECRAWL_API_KEY"): - return False, "FIRECRAWL_API_KEY not set" - return True, None - - async def execute(self, urls: List[str]) -> ToolResult: - safe_urls = urls[:5] if isinstance(urls, list) else [] - output = await web_extract_tool(safe_urls, format="markdown", use_llm_processing=True) - return _tool_result_from_json(output) - - -class WebCrawlTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="web_crawl", - description="Crawl a website and extract relevant content across pages.", - parameters={ - "url": {"type": "string", "description": "The base URL to crawl (can include or exclude https://)"}, - "instructions": { - "type": "string", - "description": "Specific instructions for what to crawl/extract (optional).", - }, - }, - required=["url"], - external=True, - ) - - def is_available(self) -> tuple[bool, str | None]: - if Firecrawl is None: - return False, "firecrawl package not installed" - if not os.getenv("FIRECRAWL_API_KEY"): - return False, "FIRECRAWL_API_KEY not set" - return True, None - - async def execute(self, url: str, instructions: Optional[str] = None) -> ToolResult: - output = await web_crawl_tool(url, instructions=instructions, depth="basic", use_llm_processing=True) - return _tool_result_from_json(output) - - -if __name__ == "__main__": - """ - Simple test/demo when run directly - """ - print("๐ŸŒ Standalone Web Tools Module") - print("=" * 40) - - # Check if API keys are available - firecrawl_available = check_firecrawl_api_key() - nous_available = check_nous_api_key() - - if not firecrawl_available: - print("โŒ FIRECRAWL_API_KEY environment variable not set") - print("Please set your API key: export FIRECRAWL_API_KEY='your-key-here'") - print("Get API key at: https://firecrawl.dev/") - else: - print("โœ… Firecrawl API key found") - - if not nous_available: - print("โŒ OPENROUTER_API_KEY environment variable not set") - print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'") - print("Get API key at: https://inference-api.nousresearch.com/") - print("โš ๏ธ Without Nous API key, LLM content processing will be disabled") - else: - print("โœ… Nous Research API key found") - - if not firecrawl_available: - exit(1) - - print("๐Ÿ› ๏ธ Web tools ready for use!") - - if nous_available: - print("๐Ÿง  LLM content processing available with Gemini 3 Flash Preview via OpenRouter") - print(f" Default min length for processing: {DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION} chars") - - # Show debug mode status - if DEBUG_MODE: - print(f"๐Ÿ› Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}") - print(f" Debug logs will be saved to: ./logs/web_tools_debug_{DEBUG_SESSION_ID}.json") - else: - print("๐Ÿ› Debug mode disabled (set WEB_TOOLS_DEBUG=true to enable)") - - print("\nBasic usage:") - print(" from web_tools import web_search_tool, web_extract_tool, web_crawl_tool") - print(" import asyncio") - print("") - print(" # Search (synchronous)") - print(" results = web_search_tool('Python tutorials')") - print("") - print(" # Extract and crawl (asynchronous)") - print(" async def main():") - print(" content = await web_extract_tool(['https://example.com'])") - print(" crawl_data = await web_crawl_tool('example.com', 'Find docs')") - print(" asyncio.run(main())") - - if nous_available: - print("\nLLM-enhanced usage:") - print(" # Content automatically processed for pages >5000 chars (default)") - print(" content = await web_extract_tool(['https://python.org/about/'])") - print("") - print(" # Customize processing parameters") - print(" crawl_data = await web_crawl_tool(") - print(" 'docs.python.org',") - print(" 'Find key concepts',") - print(" model='google/gemini-3-flash-preview',") - print(" min_length=3000") - print(" )") - print("") - print(" # Disable LLM processing") - print(" raw_content = await web_extract_tool(['https://example.com'], use_llm_processing=False)") - - print("\nDebug mode:") - print(" # Enable debug logging") - print(" export WEB_TOOLS_DEBUG=true") - print(" # Debug logs capture:") - print(" # - All tool calls with parameters") - print(" # - Original API responses") - print(" # - LLM compression metrics") - print(" # - Final processed results") - print(" # Logs saved to: ./logs/web_tools_debug_UUID.json") - - print(f"\n๐Ÿ“ Run 'python test_web_tools_llm.py' to test LLM processing capabilities")