Compare commits

..

1 Commits

Author SHA1 Message Date
teknium1
02028a6a9e feat: add Anthropic Context Editing API support
Integrate Anthropic's server-side context management (beta) for Claude models.
When enabled, the API automatically clears old tool use/result pairs and
thinking blocks AFTER prompt cache lookup but BEFORE token counting — this
preserves prompt cache prefixes while freeing context space, something
impossible with client-side stripping.

Implementation:
- anthropic_adapter: add context-management-2025-06-27 to beta headers;
  build context_management edits in build_anthropic_kwargs() via extra_body;
  only include clear_thinking edit when reasoning is enabled (API requires it)
- run_agent: pipe context_editing config through AIAgent to the adapter
- cli/gateway: load context_editing config from config.yaml and pass to agent
- config: add context_editing section to DEFAULT_CONFIG with conservative
  defaults (disabled, auto-scale triggers to 60%/10% of context window,
  keep 5 tool uses and 2 thinking turns, exclude memory/skill_manage/todo)

Config (opt-in, add to config.yaml):
  context_editing:
    enabled: true
    trigger_tokens: null       # auto: 60% of context window
    keep_tool_uses: 5
    keep_thinking_turns: 2
    exclude_tools: [memory, skill_manage, todo]
    clear_tool_inputs: false
    clear_at_least_tokens: null  # auto: 10% of context window

Live tested with Anthropic API:
- Single turn with context_management: accepted, response normal
- Multi-turn with tool calls + thinking + context_management: works
- clear_thinking correctly omitted when thinking is disabled
- Config plumbing verified through AIAgent._build_api_kwargs()

Refs: #526, supersedes #528
2026-03-13 03:33:14 -07:00
14 changed files with 291 additions and 458 deletions

View File

@@ -43,6 +43,7 @@ def _supports_adaptive_thinking(model: str) -> bool:
_COMMON_BETAS = [
"interleaved-thinking-2025-05-14",
"fine-grained-tool-streaming-2025-05-14",
"context-management-2025-06-27",
]
# Additional beta headers required for OAuth/subscription auth
@@ -513,6 +514,7 @@ def build_anthropic_kwargs(
max_tokens: Optional[int],
reasoning_config: Optional[Dict[str, Any]],
tool_choice: Optional[str] = None,
context_editing: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build kwargs for anthropic.messages.create()."""
system, anthropic_messages = convert_messages_to_anthropic(messages)
@@ -562,6 +564,60 @@ def build_anthropic_kwargs(
kwargs["temperature"] = 1
kwargs["max_tokens"] = max(effective_max_tokens, budget + 4096)
# Anthropic Context Editing API — server-side context management.
# Clears old tool use/result pairs and thinking blocks server-side,
# AFTER prompt cache lookup but BEFORE token counting and inference.
# This preserves prompt cache prefixes while freeing context space.
# Passed via extra_body since context_management is a beta parameter.
if context_editing and isinstance(context_editing, dict) and context_editing.get("enabled"):
from agent.model_metadata import get_model_context_length
try:
context_length = get_model_context_length(model)
except Exception:
context_length = 200_000 # Conservative default for Claude
trigger_tokens = context_editing.get("trigger_tokens") or int(context_length * 0.60)
keep_tool_uses = context_editing.get("keep_tool_uses", 5)
keep_thinking_turns = context_editing.get("keep_thinking_turns", 2)
clear_at_least = context_editing.get("clear_at_least_tokens") or int(context_length * 0.10)
exclude_tools = context_editing.get("exclude_tools") or ["memory", "skill_manage", "todo"]
clear_tool_inputs = context_editing.get("clear_tool_inputs", False)
edits = []
# clear_thinking requires thinking to be enabled — only add when
# reasoning is active (i.e. kwargs already has a "thinking" key).
if "thinking" in kwargs:
edits.append({
"type": "clear_thinking_20251015",
"keep": {
"type": "thinking_turns",
"value": keep_thinking_turns,
},
})
edits.append({
"type": "clear_tool_uses_20250919",
"trigger": {
"type": "input_tokens",
"value": trigger_tokens,
},
"keep": {
"type": "tool_uses",
"value": keep_tool_uses,
},
"clear_at_least": {
"type": "input_tokens",
"value": clear_at_least,
},
"exclude_tools": exclude_tools,
"clear_tool_inputs": clear_tool_inputs,
})
kwargs["extra_body"] = {
"context_management": {"edits": edits}
}
return kwargs

View File

@@ -25,6 +25,29 @@ model:
# api_key: "your-key-here" # Uncomment to set here instead of .env
base_url: "https://openrouter.ai/api/v1"
# =============================================================================
# Anthropic Context Editing (Claude-only, optional)
# =============================================================================
# Server-side context management for Claude models. Automatically clears old
# tool call/result pairs and thinking blocks at the API level, AFTER prompt
# cache lookup but BEFORE token counting. This preserves prompt cache prefixes
# while freeing context space — something impossible with client-side stripping.
#
# Only works with direct Anthropic API (provider: anthropic). Disabled by default.
# Anthropic reports ~29% performance improvement with context editing enabled.
#
# context_editing:
# enabled: true # Enable server-side context editing
# trigger_tokens: null # Input token threshold to start clearing (null = auto: 60% of context window)
# keep_tool_uses: 5 # How many recent tool_use/result pairs to preserve
# keep_thinking_turns: 2 # How many recent thinking turns to preserve
# exclude_tools: # Tool calls that are NEVER cleared
# - memory
# - skill_manage
# - todo
# clear_tool_inputs: false # Also clear tool input params (default: false)
# clear_at_least_tokens: null # Minimum tokens to clear per activation (null = auto: 10% of context window)
# =============================================================================
# OpenRouter Provider Routing (only applies when using OpenRouter)
# =============================================================================

13
cli.py
View File

@@ -178,6 +178,15 @@ def load_cli_config() -> Dict[str, Any]:
"threshold": 0.50, # Compress at 50% of model's context limit
"summary_model": "google/gemini-3-flash-preview", # Fast/cheap model for summaries
},
"context_editing": {
"enabled": False,
"trigger_tokens": None, # None = auto (60% of context window)
"keep_tool_uses": 5,
"keep_thinking_turns": 2,
"exclude_tools": ["memory", "skill_manage", "todo"],
"clear_tool_inputs": False,
"clear_at_least_tokens": None, # None = auto (10% of context window)
},
"agent": {
"max_turns": 90, # Default max tool-calling iterations (shared with subagents)
"verbose": False,
@@ -1217,6 +1226,9 @@ class HermesCLI:
CLI_CONFIG["agent"].get("reasoning_effort", "")
)
# Context editing config (Anthropic server-side context management)
self.context_editing = CLI_CONFIG.get("context_editing") or {}
# OpenRouter provider routing preferences
pr = CLI_CONFIG.get("provider_routing", {}) or {}
self._provider_sort = pr.get("sort")
@@ -1503,6 +1515,7 @@ class HermesCLI:
ephemeral_system_prompt=self.system_prompt if self.system_prompt else None,
prefill_messages=self.prefill_messages or None,
reasoning_config=self.reasoning_config,
context_editing=self.context_editing,
providers_allowed=self._providers_only,
providers_ignored=self._providers_ignore,
providers_order=self._providers_order,

View File

@@ -29,7 +29,7 @@ from gateway.session import SessionSource, build_session_key
GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE = (
"Secure secret entry is not supported over messaging. "
"Load this skill in the local CLI to be prompted, or add the key to ~/.hermes/.env manually."
"Run `hermes setup` or update ~/.hermes/.env locally."
)

View File

@@ -228,6 +228,7 @@ class GatewayRunner:
self._prefill_messages = self._load_prefill_messages()
self._ephemeral_system_prompt = self._load_ephemeral_system_prompt()
self._reasoning_config = self._load_reasoning_config()
self._context_editing = self._load_context_editing()
self._show_reasoning = self._load_show_reasoning()
self._provider_routing = self._load_provider_routing()
self._fallback_model = self._load_fallback_model()
@@ -489,6 +490,23 @@ class GatewayRunner:
logger.warning("Unknown reasoning_effort '%s', using default (medium)", effort)
return None
@staticmethod
def _load_context_editing() -> dict:
"""Load context_editing config from config.yaml.
Returns the context_editing dict if present, or empty dict.
"""
try:
import yaml as _y
cfg_path = _hermes_home / "config.yaml"
if cfg_path.exists():
with open(cfg_path, encoding="utf-8") as _f:
cfg = _y.safe_load(_f) or {}
return cfg.get("context_editing") or {}
except Exception:
pass
return {}
@staticmethod
def _load_show_reasoning() -> bool:
"""Load show_reasoning toggle from config.yaml display section."""
@@ -1446,11 +1464,6 @@ class GatewayRunner:
response = agent_result.get("final_response", "")
agent_messages = agent_result.get("messages", [])
# If the agent's session_id changed during compression, update
# session_entry so transcript writes below go to the right session.
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
session_entry.session_id = agent_result["session_id"]
# Prepend reasoning/thinking if display is enabled
if getattr(self, "_show_reasoning", False) and response:
last_reasoning = agent_result.get("last_reasoning")
@@ -2200,6 +2213,7 @@ class GatewayRunner:
verbose_logging=False,
enabled_toolsets=enabled_toolsets,
reasoning_config=self._reasoning_config,
context_editing=getattr(self, "_context_editing", {}),
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
@@ -3365,6 +3379,7 @@ class GatewayRunner:
ephemeral_system_prompt=combined_ephemeral or None,
prefill_messages=self._prefill_messages or None,
reasoning_config=self._reasoning_config,
context_editing=getattr(self, "_context_editing", {}),
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
@@ -3500,23 +3515,6 @@ class GatewayRunner:
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
# Sync session_id: the agent may have created a new session during
# mid-run context compression (_compress_context splits sessions).
# If so, update the session store entry so the NEXT message loads
# the compressed transcript, not the stale pre-compression one.
agent = agent_holder[0]
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
logger.info(
"Session split detected: %s%s (compression)",
session_id, agent.session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = agent.session_id
self.session_store._save()
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
return {
"final_response": final_response,
"last_reasoning": result.get("last_reasoning"),
@@ -3525,7 +3523,6 @@ class GatewayRunner:
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,
"session_id": effective_session_id,
}
# Start progress message sender if enabled

View File

@@ -129,6 +129,16 @@ DEFAULT_CONFIG = {
"summary_provider": "auto",
},
"context_editing": {
"enabled": False,
"trigger_tokens": None,
"keep_tool_uses": 5,
"keep_thinking_turns": 2,
"exclude_tools": ["memory", "skill_manage", "todo"],
"clear_tool_inputs": False,
"clear_at_least_tokens": None,
},
# Auxiliary model config — provider:model for each side task.
# Format: provider is the provider name, model is the model slug.
# "auto" for provider = auto-detect best available provider.

View File

@@ -21,7 +21,6 @@ Usage:
"""
import atexit
import concurrent.futures
import copy
import hashlib
import json
@@ -194,14 +193,6 @@ class IterationBudget:
return max(0, self.max_total - self._used)
# Tools that must never run concurrently (interactive / user-facing).
# When any of these appear in a batch, we fall back to sequential execution.
_NEVER_PARALLEL_TOOLS = frozenset({"clarify"})
# Maximum number of concurrent worker threads for parallel tool execution.
_MAX_TOOL_WORKERS = 8
class AIAgent:
"""
AI Agent with tool calling capabilities.
@@ -241,6 +232,7 @@ class AIAgent:
step_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
context_editing: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
skip_context_files: bool = False,
@@ -361,6 +353,7 @@ class AIAgent:
# Model response configuration
self.max_tokens = max_tokens # None = use model default
self.reasoning_config = reasoning_config # None = use default (medium for OpenRouter)
self.context_editing = context_editing # Anthropic server-side context management
self.prefill_messages = prefill_messages or [] # Prefilled conversation turns
# Anthropic prompt caching: auto-enabled for Claude models via OpenRouter.
@@ -2669,6 +2662,7 @@ class AIAgent:
tools=self.tools,
max_tokens=self.max_tokens,
reasoning_config=self.reasoning_config,
context_editing=self.context_editing,
)
if self.api_mode == "codex_responses":
@@ -3131,260 +3125,7 @@ class AIAgent:
return compressed, new_system_prompt
def _execute_tool_calls(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute tool calls from the assistant message and append results to messages.
Dispatches to concurrent execution when multiple independent tool calls
are present, falling back to sequential execution for single calls or
when interactive tools (e.g. clarify) are in the batch.
"""
tool_calls = assistant_message.tool_calls
# Single tool call or interactive tool present → sequential
if (len(tool_calls) <= 1
or any(tc.function.name in _NEVER_PARALLEL_TOOLS for tc in tool_calls)):
return self._execute_tool_calls_sequential(
assistant_message, messages, effective_task_id, api_call_count
)
# Multiple non-interactive tools → concurrent
return self._execute_tool_calls_concurrent(
assistant_message, messages, effective_task_id, api_call_count
)
def _invoke_tool(self, function_name: str, function_args: dict, effective_task_id: str) -> str:
"""Invoke a single tool and return the result string. No display logic.
Handles both agent-level tools (todo, memory, etc.) and registry-dispatched
tools. Used by the concurrent execution path; the sequential path retains
its own inline invocation for backward-compatible display handling.
"""
if function_name == "todo":
from tools.todo_tool import todo_tool as _todo_tool
return _todo_tool(
todos=function_args.get("todos"),
merge=function_args.get("merge", False),
store=self._todo_store,
)
elif function_name == "session_search":
if not self._session_db:
return json.dumps({"success": False, "error": "Session database not available."})
from tools.session_search_tool import session_search as _session_search
return _session_search(
query=function_args.get("query", ""),
role_filter=function_args.get("role_filter"),
limit=function_args.get("limit", 3),
db=self._session_db,
current_session_id=self.session_id,
)
elif function_name == "memory":
target = function_args.get("target", "memory")
from tools.memory_tool import memory_tool as _memory_tool
result = _memory_tool(
action=function_args.get("action"),
target=target,
content=function_args.get("content"),
old_text=function_args.get("old_text"),
store=self._memory_store,
)
# Also send user observations to Honcho when active
if self._honcho and target == "user" and function_args.get("action") == "add":
self._honcho_save_user_observation(function_args.get("content", ""))
return result
elif function_name == "clarify":
from tools.clarify_tool import clarify_tool as _clarify_tool
return _clarify_tool(
question=function_args.get("question", ""),
choices=function_args.get("choices"),
callback=self.clarify_callback,
)
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
return _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=function_args.get("tasks"),
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
else:
return handle_function_call(
function_name, function_args, effective_task_id,
enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None,
)
def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute multiple tool calls concurrently using a thread pool.
Results are collected in the original tool-call order and appended to
messages so the API sees them in the expected sequence.
"""
tool_calls = assistant_message.tool_calls
num_tools = len(tool_calls)
# ── Pre-flight: interrupt check ──────────────────────────────────
if self._interrupt_requested:
print(f"{self.log_prefix}⚡ Interrupt: skipping {num_tools} tool call(s)")
for tc in tool_calls:
messages.append({
"role": "tool",
"content": f"[Tool execution cancelled — {tc.function.name} was skipped due to user interrupt]",
"tool_call_id": tc.id,
})
return
# ── Parse args + pre-execution bookkeeping ───────────────────────
parsed_calls = [] # list of (tool_call, function_name, function_args)
for tool_call in tool_calls:
function_name = tool_call.function.name
# Reset nudge counters
if function_name == "memory":
self._turns_since_memory = 0
elif function_name == "skill_manage":
self._iters_since_skill = 0
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
function_args = {}
if not isinstance(function_args, dict):
function_args = {}
# Checkpoint for file-mutating tools
if function_name in ("write_file", "patch") and self._checkpoint_mgr.enabled:
try:
file_path = function_args.get("path", "")
if file_path:
work_dir = self._checkpoint_mgr.get_working_dir_for_path(file_path)
self._checkpoint_mgr.ensure_checkpoint(work_dir, f"before {function_name}")
except Exception:
pass
parsed_calls.append((tool_call, function_name, function_args))
# ── Logging / callbacks ──────────────────────────────────────────
tool_names_str = ", ".join(name for _, name, _ in parsed_calls)
if not self.quiet_mode:
print(f" ⚡ Concurrent: {num_tools} tool calls — {tool_names_str}")
for i, (tc, name, args) in enumerate(parsed_calls, 1):
args_str = json.dumps(args, ensure_ascii=False)
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {name}({list(args.keys())}) - {args_preview}")
for _, name, args in parsed_calls:
if self.tool_progress_callback:
try:
preview = _build_tool_preview(name, args)
self.tool_progress_callback(name, preview, args)
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
# ── Concurrent execution ─────────────────────────────────────────
# Each slot holds (function_name, function_args, function_result, duration, error_flag)
results = [None] * num_tools
def _run_tool(index, tool_call, function_name, function_args):
"""Worker function executed in a thread."""
start = time.time()
try:
result = self._invoke_tool(function_name, function_args, effective_task_id)
except Exception as tool_error:
result = f"Error executing tool '{function_name}': {tool_error}"
logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True)
duration = time.time() - start
is_error, _ = _detect_tool_failure(function_name, result)
results[index] = (function_name, function_args, result, duration, is_error)
# Start spinner for CLI mode
spinner = None
if self.quiet_mode:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
spinner = KawaiiSpinner(f"{face} ⚡ running {num_tools} tools concurrently", spinner_type='dots')
spinner.start()
try:
max_workers = min(num_tools, _MAX_TOOL_WORKERS)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, (tc, name, args) in enumerate(parsed_calls):
f = executor.submit(_run_tool, i, tc, name, args)
futures.append(f)
# Wait for all to complete (exceptions are captured inside _run_tool)
concurrent.futures.wait(futures)
finally:
if spinner:
# Build a summary message for the spinner stop
completed = sum(1 for r in results if r is not None)
total_dur = sum(r[3] for r in results if r is not None)
spinner.stop(f"{completed}/{num_tools} tools completed in {total_dur:.1f}s total")
# ── Post-execution: display per-tool results ─────────────────────
for i, (tc, name, args) in enumerate(parsed_calls):
r = results[i]
if r is None:
# Shouldn't happen, but safety fallback
function_result = f"Error executing tool '{name}': thread did not return a result"
tool_duration = 0.0
else:
function_name, function_args, function_result, tool_duration, is_error = r
if is_error:
result_preview = function_result[:200] if len(function_result) > 200 else function_result
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
if self.verbose_logging:
result_preview = function_result[:200] if len(function_result) > 200 else function_result
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result preview: {result_preview}...")
# Print cute message per tool
if self.quiet_mode:
cute_msg = _get_cute_tool_message_impl(name, args, tool_duration, result=function_result)
print(f" {cute_msg}")
elif not self.quiet_mode:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}")
# Truncate oversized results
MAX_TOOL_RESULT_CHARS = 100_000
if len(function_result) > MAX_TOOL_RESULT_CHARS:
original_len = len(function_result)
function_result = (
function_result[:MAX_TOOL_RESULT_CHARS]
+ f"\n\n[Truncated: tool response was {original_len:,} chars, "
f"exceeding the {MAX_TOOL_RESULT_CHARS:,} char limit]"
)
# Append tool result message in order
tool_msg = {
"role": "tool",
"content": function_result,
"tool_call_id": tc.id,
}
messages.append(tool_msg)
# ── Budget pressure injection ────────────────────────────────────
budget_warning = self._get_budget_warning(api_call_count)
if budget_warning and messages and messages[-1].get("role") == "tool":
last_content = messages[-1]["content"]
try:
parsed = json.loads(last_content)
if isinstance(parsed, dict):
parsed["_budget_warning"] = budget_warning
messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False)
else:
messages[-1]["content"] = last_content + f"\n\n{budget_warning}"
except (json.JSONDecodeError, TypeError):
messages[-1]["content"] = last_content + f"\n\n{budget_warning}"
if not self.quiet_mode:
remaining = self.max_iterations - api_call_count
tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION"
print(f"{self.log_prefix}{tier}: {remaining} iterations remaining")
def _execute_tool_calls_sequential(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute tool calls sequentially (original behavior). Used for single calls or interactive tools."""
"""Execute tool calls from the assistant message and append results to messages."""
for i, tool_call in enumerate(assistant_message.tool_calls, 1):
# SAFETY: check interrupt BEFORE starting each tool.
# If the user sent "stop" during a previous tool's execution,

View File

@@ -192,7 +192,7 @@ Generate some audio.
msg = build_skill_invocation_message("/test-skill", "do stuff")
assert msg is not None
assert "local cli" in msg.lower()
assert "hermes setup" in msg.lower()
def test_preserves_remaining_remote_setup_warning(self, tmp_path, monkeypatch):
monkeypatch.setenv("TERMINAL_ENV", "ssh")

View File

@@ -14,7 +14,7 @@ from gateway.platforms.base import (
class TestSecretCaptureGuidance:
def test_gateway_secret_capture_message_points_to_local_setup(self):
message = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
assert "local cli" in message.lower()
assert "hermes setup" in message.lower()
assert "~/.hermes/.env" in message

View File

@@ -0,0 +1,158 @@
"""Tests for Anthropic Context Editing API integration."""
import pytest
from agent.anthropic_adapter import build_anthropic_kwargs
class TestContextEditing:
"""Tests for context_management parameter injection via extra_body."""
def _simple_messages(self):
return [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
]
def test_disabled_by_default(self):
"""No extra_body/context_management when context_editing is None."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
)
assert "extra_body" not in kwargs
assert "context_management" not in kwargs
def test_disabled_when_false(self):
"""No context_management when enabled is False."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={"enabled": False},
)
assert "extra_body" not in kwargs
def test_enabled_adds_context_management(self):
"""context_management is added via extra_body when enabled."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={"enabled": True},
)
assert "extra_body" in kwargs
cm = kwargs["extra_body"]["context_management"]
assert "edits" in cm
# Without thinking enabled, only tool_uses edit is included
assert len(cm["edits"]) == 1
assert cm["edits"][0]["type"] == "clear_tool_uses_20250919"
def test_thinking_edit_only_when_thinking_enabled(self):
"""clear_thinking is only added when reasoning/thinking is enabled."""
# Without thinking
kwargs_no_think = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={"enabled": True},
)
edits = kwargs_no_think["extra_body"]["context_management"]["edits"]
assert all(e["type"] != "clear_thinking_20251015" for e in edits)
# With thinking enabled
kwargs_with_think = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=16384,
reasoning_config={"enabled": True, "effort": "medium"},
context_editing={"enabled": True},
)
edits = kwargs_with_think["extra_body"]["context_management"]["edits"]
assert len(edits) == 2
assert edits[0]["type"] == "clear_thinking_20251015"
assert edits[1]["type"] == "clear_tool_uses_20250919"
def test_custom_values(self):
"""Custom config values are passed through."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=16384,
reasoning_config={"enabled": True, "effort": "medium"},
context_editing={
"enabled": True,
"trigger_tokens": 80000,
"keep_tool_uses": 10,
"keep_thinking_turns": 3,
"clear_at_least_tokens": 20000,
"exclude_tools": ["memory", "web_search"],
"clear_tool_inputs": True,
},
)
edits = kwargs["extra_body"]["context_management"]["edits"]
thinking = edits[0]
assert thinking["keep"]["value"] == 3
tools = edits[1]
assert tools["trigger"]["value"] == 80000
assert tools["keep"]["value"] == 10
assert tools["clear_at_least"]["value"] == 20000
assert tools["exclude_tools"] == ["memory", "web_search"]
assert tools["clear_tool_inputs"] is True
def test_default_exclude_tools(self):
"""Default exclude_tools list is memory, skill_manage, todo."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={"enabled": True},
)
exclude = kwargs["extra_body"]["context_management"]["edits"][0]["exclude_tools"]
assert "memory" in exclude
assert "skill_manage" in exclude
assert "todo" in exclude
def test_auto_scales_to_context_window(self):
"""Trigger and clear_at_least scale proportionally to context window."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={"enabled": True},
)
tools_edit = kwargs["extra_body"]["context_management"]["edits"][0]
trigger = tools_edit["trigger"]["value"]
clear_at_least = tools_edit["clear_at_least"]["value"]
# Should be proportional — trigger ~60%, clear_at_least ~10%
assert trigger > 50000
assert clear_at_least > 5000
assert trigger > clear_at_least
def test_empty_dict_does_nothing(self):
"""Empty config dict does not add context_management."""
kwargs = build_anthropic_kwargs(
model="claude-sonnet-4-20250514",
messages=self._simple_messages(),
tools=None,
max_tokens=4096,
reasoning_config=None,
context_editing={},
)
assert "extra_body" not in kwargs

View File

@@ -702,168 +702,6 @@ class TestExecuteToolCalls:
assert "Truncated" in messages[0]["content"]
class TestConcurrentToolExecution:
"""Tests for _execute_tool_calls_concurrent and dispatch logic."""
def test_single_tool_uses_sequential_path(self, agent):
"""Single tool call should use sequential path, not concurrent."""
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_clarify_forces_sequential(self, agent):
"""Batch containing clarify should use sequential path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="clarify", arguments='{"question":"ok?"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_multiple_tools_uses_concurrent_path(self, agent):
"""Multiple non-interactive tools should use concurrent path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_con.assert_called_once()
mock_seq.assert_not_called()
def test_concurrent_executes_all_tools(self, agent):
"""Concurrent path should execute all tools and append results in order."""
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"alpha"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"beta"}', call_id="c2")
tc3 = _mock_tool_call(name="web_search", arguments='{"q":"gamma"}', call_id="c3")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2, tc3])
messages = []
call_log = []
def fake_handle(name, args, task_id, **kwargs):
call_log.append(name)
return json.dumps({"result": args.get("q", "")})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 3
# Results must be in original order
assert messages[0]["tool_call_id"] == "c1"
assert messages[1]["tool_call_id"] == "c2"
assert messages[2]["tool_call_id"] == "c3"
# All should be tool messages
assert all(m["role"] == "tool" for m in messages)
# Content should contain the query results
assert "alpha" in messages[0]["content"]
assert "beta" in messages[1]["content"]
assert "gamma" in messages[2]["content"]
def test_concurrent_preserves_order_despite_timing(self, agent):
"""Even if tools finish in different order, messages should be in original order."""
import time as _time
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"slow"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"fast"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
def fake_handle(name, args, task_id, **kwargs):
q = args.get("q", "")
if q == "slow":
_time.sleep(0.1) # Slow tool
return f"result_{q}"
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert messages[0]["tool_call_id"] == "c1"
assert "result_slow" in messages[0]["content"]
assert messages[1]["tool_call_id"] == "c2"
assert "result_fast" in messages[1]["content"]
def test_concurrent_handles_tool_error(self, agent):
"""If one tool raises, others should still complete."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
call_count = [0]
def fake_handle(name, args, task_id, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("boom")
return "success"
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
# First tool should have error
assert "Error" in messages[0]["content"] or "boom" in messages[0]["content"]
# Second tool should succeed
assert "success" in messages[1]["content"]
def test_concurrent_interrupt_before_start(self, agent):
"""If interrupt is requested before concurrent execution, all tools are skipped."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="read_file", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch("run_agent._set_interrupt"):
agent.interrupt()
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
assert "cancelled" in messages[0]["content"].lower() or "skipped" in messages[0]["content"].lower()
assert "cancelled" in messages[1]["content"].lower() or "skipped" in messages[1]["content"].lower()
def test_concurrent_truncates_large_results(self, agent):
"""Concurrent path should truncate results over 100k chars."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
big_result = "x" * 150_000
with patch("run_agent.handle_function_call", return_value=big_result):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
for m in messages:
assert len(m["content"]) < 150_000
assert "Truncated" in m["content"]
def test_invoke_tool_dispatches_to_handle_function_call(self, agent):
"""_invoke_tool should route regular tools through handle_function_call."""
with patch("run_agent.handle_function_call", return_value="result") as mock_hfc:
result = agent._invoke_tool("web_search", {"q": "test"}, "task-1")
mock_hfc.assert_called_once_with(
"web_search", {"q": "test"}, "task-1",
enabled_tools=list(agent.valid_tool_names),
)
assert result == "result"
def test_invoke_tool_handles_agent_level_tools(self, agent):
"""_invoke_tool should handle todo tool directly."""
with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo:
result = agent._invoke_tool("todo", {"todos": []}, "task-1")
mock_todo.assert_called_once()
assert "ok" in result
class TestHandleMaxIterations:
def test_returns_summary(self, agent):
resp = _mock_response(content="Here is a summary of what I did.")

View File

@@ -91,11 +91,8 @@ class TestPreToolCheck:
agent._persist_session = MagicMock()
# Import and call the method
import types
from run_agent import AIAgent
# Bind the real methods to our mock so dispatch works correctly
agent._execute_tool_calls_sequential = types.MethodType(AIAgent._execute_tool_calls_sequential, agent)
agent._execute_tool_calls_concurrent = types.MethodType(AIAgent._execute_tool_calls_concurrent, agent)
# Bind the real method to our mock
AIAgent._execute_tool_calls(agent, assistant_msg, messages, "default")
# All 3 should be skipped

View File

@@ -511,7 +511,7 @@ class TestSkillViewSecureSetupOnLoad:
result = json.loads(raw)
assert result["success"] is True
assert called["value"] is False
assert "local cli" in result["gateway_setup_hint"].lower()
assert "hermes setup" in result["gateway_setup_hint"].lower()
assert result["content"].startswith("---")
@@ -845,7 +845,7 @@ class TestSkillViewPrerequisites:
raw = skill_view("backend-unknown")
result = json.loads(raw)
assert result["success"] is True
assert "local cli" in result["gateway_setup_hint"].lower()
assert "hermes setup" in result["gateway_setup_hint"].lower()
assert result["setup_needed"] is True
@pytest.mark.parametrize(

View File

@@ -394,7 +394,7 @@ def _gateway_setup_hint() -> str:
return GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
except Exception:
return "Secure secret entry is not available. Load this skill in the local CLI to be prompted, or add the key to ~/.hermes/.env manually."
return "Secure secret entry is not available. Run `hermes setup` or update ~/.hermes/.env locally."
def _build_setup_note(