mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-11 12:48:54 +08:00
Compare commits
5 Commits
fix/plugin
...
salvage/bu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98305119c1 | ||
|
|
020dd832ce | ||
|
|
0c3319cd15 | ||
|
|
a43269e3b1 | ||
|
|
9e2f0be093 |
@@ -1425,6 +1425,23 @@ def resolve_provider_client(
|
||||
|
||||
client = OpenAI(api_key=api_key, base_url=base_url,
|
||||
**({"default_headers": headers} if headers else {}))
|
||||
|
||||
# Copilot GPT-5+ models (except gpt-5-mini) require the Responses
|
||||
# API — they are not accessible via /chat/completions. Wrap the
|
||||
# plain client in CodexAuxiliaryClient so call_llm() transparently
|
||||
# routes through responses.stream().
|
||||
if provider == "copilot" and final_model and not raw_codex:
|
||||
try:
|
||||
from hermes_cli.models import _should_use_copilot_responses_api
|
||||
if _should_use_copilot_responses_api(final_model):
|
||||
logger.debug(
|
||||
"resolve_provider_client: copilot model %s needs "
|
||||
"Responses API — wrapping with CodexAuxiliaryClient",
|
||||
final_model)
|
||||
client = CodexAuxiliaryClient(client, final_model)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
logger.debug("resolve_provider_client: %s (%s)", provider, final_model)
|
||||
return (_to_async_client(client, final_model) if async_mode
|
||||
else (client, final_model))
|
||||
|
||||
49
agent/manual_compression_feedback.py
Normal file
49
agent/manual_compression_feedback.py
Normal file
@@ -0,0 +1,49 @@
|
||||
"""User-facing summaries for manual compression commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Sequence
|
||||
|
||||
|
||||
def summarize_manual_compression(
|
||||
before_messages: Sequence[dict[str, Any]],
|
||||
after_messages: Sequence[dict[str, Any]],
|
||||
before_tokens: int,
|
||||
after_tokens: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Return consistent user-facing feedback for manual compression."""
|
||||
before_count = len(before_messages)
|
||||
after_count = len(after_messages)
|
||||
noop = list(after_messages) == list(before_messages)
|
||||
|
||||
if noop:
|
||||
headline = f"No changes from compression: {before_count} messages"
|
||||
if after_tokens == before_tokens:
|
||||
token_line = (
|
||||
f"Rough transcript estimate: ~{before_tokens:,} tokens (unchanged)"
|
||||
)
|
||||
else:
|
||||
token_line = (
|
||||
f"Rough transcript estimate: ~{before_tokens:,} → "
|
||||
f"~{after_tokens:,} tokens"
|
||||
)
|
||||
else:
|
||||
headline = f"Compressed: {before_count} → {after_count} messages"
|
||||
token_line = (
|
||||
f"Rough transcript estimate: ~{before_tokens:,} → "
|
||||
f"~{after_tokens:,} tokens"
|
||||
)
|
||||
|
||||
note = None
|
||||
if not noop and after_count < before_count and after_tokens > before_tokens:
|
||||
note = (
|
||||
"Note: fewer messages can still raise this rough transcript estimate "
|
||||
"when compression rewrites the transcript into denser summaries."
|
||||
)
|
||||
|
||||
return {
|
||||
"noop": noop,
|
||||
"headline": headline,
|
||||
"token_line": token_line,
|
||||
"note": note,
|
||||
}
|
||||
22
cli.py
22
cli.py
@@ -5835,21 +5835,29 @@ class HermesCLI:
|
||||
original_count = len(self.conversation_history)
|
||||
try:
|
||||
from agent.model_metadata import estimate_messages_tokens_rough
|
||||
approx_tokens = estimate_messages_tokens_rough(self.conversation_history)
|
||||
from agent.manual_compression_feedback import summarize_manual_compression
|
||||
original_history = list(self.conversation_history)
|
||||
approx_tokens = estimate_messages_tokens_rough(original_history)
|
||||
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens)...")
|
||||
|
||||
compressed, _new_system = self.agent._compress_context(
|
||||
self.conversation_history,
|
||||
compressed, _ = self.agent._compress_context(
|
||||
original_history,
|
||||
self.agent._cached_system_prompt or "",
|
||||
approx_tokens=approx_tokens,
|
||||
)
|
||||
self.conversation_history = compressed
|
||||
new_count = len(self.conversation_history)
|
||||
new_tokens = estimate_messages_tokens_rough(self.conversation_history)
|
||||
print(
|
||||
f" ✅ Compressed: {original_count} → {new_count} messages "
|
||||
f"(~{approx_tokens:,} → ~{new_tokens:,} tokens)"
|
||||
summary = summarize_manual_compression(
|
||||
original_history,
|
||||
self.conversation_history,
|
||||
approx_tokens,
|
||||
new_tokens,
|
||||
)
|
||||
icon = "🗜️" if summary["noop"] else "✅"
|
||||
print(f" {icon} {summary['headline']}")
|
||||
print(f" {summary['token_line']}")
|
||||
if summary["note"]:
|
||||
print(f" {summary['note']}")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ Compression failed: {e}")
|
||||
|
||||
@@ -5196,6 +5196,7 @@ class GatewayRunner:
|
||||
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
from agent.manual_compression_feedback import summarize_manual_compression
|
||||
from agent.model_metadata import estimate_messages_tokens_rough
|
||||
|
||||
runtime_kwargs = _resolve_runtime_agent_kwargs()
|
||||
@@ -5223,6 +5224,13 @@ class GatewayRunner:
|
||||
)
|
||||
tmp_agent._print_fn = lambda *a, **kw: None
|
||||
|
||||
compressor = tmp_agent.context_compressor
|
||||
compress_start = compressor.protect_first_n
|
||||
compress_start = compressor._align_boundary_forward(msgs, compress_start)
|
||||
compress_end = compressor._find_tail_cut_by_tokens(msgs, compress_start)
|
||||
if compress_start >= compress_end:
|
||||
return "Nothing to compress yet (the transcript is still all protected context)."
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
compressed, _ = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -5243,13 +5251,17 @@ class GatewayRunner:
|
||||
self.session_store.update_session(
|
||||
session_entry.session_key, last_prompt_tokens=0
|
||||
)
|
||||
new_count = len(compressed)
|
||||
new_tokens = estimate_messages_tokens_rough(compressed)
|
||||
|
||||
return (
|
||||
f"🗜️ Compressed: {original_count} → {new_count} messages\n"
|
||||
f"~{approx_tokens:,} → ~{new_tokens:,} tokens"
|
||||
summary = summarize_manual_compression(
|
||||
msgs,
|
||||
compressed,
|
||||
approx_tokens,
|
||||
new_tokens,
|
||||
)
|
||||
lines = [f"🗜️ {summary['headline']}", summary["token_line"]]
|
||||
if summary["note"]:
|
||||
lines.append(summary["note"])
|
||||
return "\n".join(lines)
|
||||
except Exception as e:
|
||||
logger.warning("Manual compress failed: %s", e)
|
||||
return f"Compression failed: {e}"
|
||||
|
||||
@@ -538,6 +538,8 @@ DEFAULT_CONFIG = {
|
||||
"api_key": "", # API key for delegation.base_url (falls back to OPENAI_API_KEY)
|
||||
"max_iterations": 50, # per-subagent iteration cap (each subagent gets its own budget,
|
||||
# independent of the parent's max_iterations)
|
||||
"reasoning_effort": "", # reasoning effort for subagents: "xhigh", "high", "medium",
|
||||
# "low", "minimal", "none" (empty = inherit parent's level)
|
||||
},
|
||||
|
||||
# Ephemeral prefill messages file — JSON list of {role, content} dicts
|
||||
|
||||
@@ -756,6 +756,69 @@ class TestAuxiliaryPoolAwareness:
|
||||
assert call_kwargs["base_url"] == "https://api.githubcopilot.com"
|
||||
assert call_kwargs["default_headers"]["Editor-Version"]
|
||||
|
||||
def test_copilot_responses_api_model_wrapped_in_codex_client(self, monkeypatch):
|
||||
"""Copilot GPT-5+ models (needing Responses API) are wrapped in CodexAuxiliaryClient."""
|
||||
monkeypatch.delenv("GITHUB_TOKEN", raising=False)
|
||||
monkeypatch.delenv("GH_TOKEN", raising=False)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"hermes_cli.auth.resolve_api_key_provider_credentials",
|
||||
return_value={
|
||||
"provider": "copilot",
|
||||
"api_key": "test-token",
|
||||
"base_url": "https://api.githubcopilot.com",
|
||||
"source": "gh auth token",
|
||||
},
|
||||
),
|
||||
patch("agent.auxiliary_client.OpenAI"),
|
||||
):
|
||||
client, model = resolve_provider_client("copilot", model="gpt-5.4-mini")
|
||||
|
||||
from agent.auxiliary_client import CodexAuxiliaryClient
|
||||
assert isinstance(client, CodexAuxiliaryClient)
|
||||
assert model == "gpt-5.4-mini"
|
||||
|
||||
def test_copilot_chat_completions_model_not_wrapped(self, monkeypatch):
|
||||
"""Copilot models using Chat Completions are returned as plain OpenAI clients."""
|
||||
monkeypatch.delenv("GITHUB_TOKEN", raising=False)
|
||||
monkeypatch.delenv("GH_TOKEN", raising=False)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"hermes_cli.auth.resolve_api_key_provider_credentials",
|
||||
return_value={
|
||||
"provider": "copilot",
|
||||
"api_key": "test-token",
|
||||
"base_url": "https://api.githubcopilot.com",
|
||||
"source": "gh auth token",
|
||||
},
|
||||
),
|
||||
patch("agent.auxiliary_client.OpenAI") as mock_openai,
|
||||
):
|
||||
client, model = resolve_provider_client("copilot", model="gpt-4.1-mini")
|
||||
|
||||
from agent.auxiliary_client import CodexAuxiliaryClient
|
||||
assert not isinstance(client, CodexAuxiliaryClient)
|
||||
assert model == "gpt-4.1-mini"
|
||||
# Should be the raw mock OpenAI client
|
||||
assert client is mock_openai.return_value
|
||||
|
||||
def test_vision_auto_uses_active_provider_as_fallback(self, monkeypatch):
|
||||
"""When no OpenRouter/Nous available, vision auto falls back to active provider."""
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "***")
|
||||
with (
|
||||
patch("agent.auxiliary_client._read_nous_auth", return_value=None),
|
||||
patch("agent.auxiliary_client._read_main_provider", return_value="anthropic"),
|
||||
patch("agent.auxiliary_client._read_main_model", return_value="claude-sonnet-4"),
|
||||
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
|
||||
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="***"),
|
||||
):
|
||||
client, model = get_vision_auxiliary_client()
|
||||
|
||||
assert client is not None
|
||||
assert client.__class__.__name__ == "AnthropicAuxiliaryClient"
|
||||
|
||||
def test_vision_auto_prefers_active_provider_over_openrouter(self, monkeypatch):
|
||||
"""Active provider is tried before OpenRouter in vision auto."""
|
||||
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
|
||||
|
||||
66
tests/cli/test_manual_compress.py
Normal file
66
tests/cli/test_manual_compress.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Tests for CLI manual compression messaging."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from tests.cli.test_cli_init import _make_cli
|
||||
|
||||
|
||||
def _make_history() -> list[dict[str, str]]:
|
||||
return [
|
||||
{"role": "user", "content": "one"},
|
||||
{"role": "assistant", "content": "two"},
|
||||
{"role": "user", "content": "three"},
|
||||
{"role": "assistant", "content": "four"},
|
||||
]
|
||||
|
||||
|
||||
def test_manual_compress_reports_noop_without_success_banner(capsys):
|
||||
shell = _make_cli()
|
||||
history = _make_history()
|
||||
shell.conversation_history = history
|
||||
shell.agent = MagicMock()
|
||||
shell.agent.compression_enabled = True
|
||||
shell.agent._cached_system_prompt = ""
|
||||
shell.agent._compress_context.return_value = (list(history), "")
|
||||
|
||||
def _estimate(messages):
|
||||
assert messages == history
|
||||
return 100
|
||||
|
||||
with patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate):
|
||||
shell._manual_compress()
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "No changes from compression" in output
|
||||
assert "✅ Compressed" not in output
|
||||
assert "Rough transcript estimate: ~100 tokens (unchanged)" in output
|
||||
|
||||
|
||||
def test_manual_compress_explains_when_token_estimate_rises(capsys):
|
||||
shell = _make_cli()
|
||||
history = _make_history()
|
||||
compressed = [
|
||||
history[0],
|
||||
{"role": "assistant", "content": "Dense summary that still counts as more tokens."},
|
||||
history[-1],
|
||||
]
|
||||
shell.conversation_history = history
|
||||
shell.agent = MagicMock()
|
||||
shell.agent.compression_enabled = True
|
||||
shell.agent._cached_system_prompt = ""
|
||||
shell.agent._compress_context.return_value = (compressed, "")
|
||||
|
||||
def _estimate(messages):
|
||||
if messages == history:
|
||||
return 100
|
||||
if messages == compressed:
|
||||
return 120
|
||||
raise AssertionError(f"unexpected transcript: {messages!r}")
|
||||
|
||||
with patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate):
|
||||
shell._manual_compress()
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "✅ Compressed: 4 → 3 messages" in output
|
||||
assert "Rough transcript estimate: ~100 → ~120 tokens" in output
|
||||
assert "denser summaries" in output
|
||||
121
tests/gateway/test_compress_command.py
Normal file
121
tests/gateway/test_compress_command.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Tests for gateway /compress user-facing messaging."""
|
||||
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent
|
||||
from gateway.session import SessionEntry, SessionSource, build_session_key
|
||||
|
||||
|
||||
def _make_source() -> SessionSource:
|
||||
return SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
user_id="u1",
|
||||
chat_id="c1",
|
||||
user_name="tester",
|
||||
chat_type="dm",
|
||||
)
|
||||
|
||||
|
||||
def _make_event(text: str = "/compress") -> MessageEvent:
|
||||
return MessageEvent(text=text, source=_make_source(), message_id="m1")
|
||||
|
||||
|
||||
def _make_history() -> list[dict[str, str]]:
|
||||
return [
|
||||
{"role": "user", "content": "one"},
|
||||
{"role": "assistant", "content": "two"},
|
||||
{"role": "user", "content": "three"},
|
||||
{"role": "assistant", "content": "four"},
|
||||
]
|
||||
|
||||
|
||||
def _make_runner(history: list[dict[str, str]]):
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(
|
||||
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")}
|
||||
)
|
||||
session_entry = SessionEntry(
|
||||
session_key=build_session_key(_make_source()),
|
||||
session_id="sess-1",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
)
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store.get_or_create_session.return_value = session_entry
|
||||
runner.session_store.load_transcript.return_value = history
|
||||
runner.session_store.rewrite_transcript = MagicMock()
|
||||
runner.session_store.update_session = MagicMock()
|
||||
runner.session_store._save = MagicMock()
|
||||
return runner
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_compress_command_reports_noop_without_success_banner():
|
||||
history = _make_history()
|
||||
runner = _make_runner(history)
|
||||
agent_instance = MagicMock()
|
||||
agent_instance.context_compressor.protect_first_n = 0
|
||||
agent_instance.context_compressor._align_boundary_forward.return_value = 0
|
||||
agent_instance.context_compressor._find_tail_cut_by_tokens.return_value = 2
|
||||
agent_instance.session_id = "sess-1"
|
||||
agent_instance._compress_context.return_value = (list(history), "")
|
||||
|
||||
def _estimate(messages):
|
||||
assert messages == history
|
||||
return 100
|
||||
|
||||
with (
|
||||
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}),
|
||||
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
|
||||
patch("run_agent.AIAgent", return_value=agent_instance),
|
||||
patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate),
|
||||
):
|
||||
result = await runner._handle_compress_command(_make_event())
|
||||
|
||||
assert "No changes from compression" in result
|
||||
assert "Compressed:" not in result
|
||||
assert "Rough transcript estimate: ~100 tokens (unchanged)" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_compress_command_explains_when_token_estimate_rises():
|
||||
history = _make_history()
|
||||
compressed = [
|
||||
history[0],
|
||||
{"role": "assistant", "content": "Dense summary that still counts as more tokens."},
|
||||
history[-1],
|
||||
]
|
||||
runner = _make_runner(history)
|
||||
agent_instance = MagicMock()
|
||||
agent_instance.context_compressor.protect_first_n = 0
|
||||
agent_instance.context_compressor._align_boundary_forward.return_value = 0
|
||||
agent_instance.context_compressor._find_tail_cut_by_tokens.return_value = 2
|
||||
agent_instance.session_id = "sess-1"
|
||||
agent_instance._compress_context.return_value = (compressed, "")
|
||||
|
||||
def _estimate(messages):
|
||||
if messages == history:
|
||||
return 100
|
||||
if messages == compressed:
|
||||
return 120
|
||||
raise AssertionError(f"unexpected transcript: {messages!r}")
|
||||
|
||||
with (
|
||||
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}),
|
||||
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
|
||||
patch("run_agent.AIAgent", return_value=agent_instance),
|
||||
patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate),
|
||||
):
|
||||
result = await runner._handle_compress_command(_make_event())
|
||||
|
||||
assert "Compressed: 4 → 3 messages" in result
|
||||
assert "Rough transcript estimate: ~100 → ~120 tokens" in result
|
||||
assert "denser summaries" in result
|
||||
@@ -1210,5 +1210,73 @@ class TestDelegateHeartbeat(unittest.TestCase):
|
||||
f"Heartbeat should include last_activity_desc: {touch_calls}")
|
||||
|
||||
|
||||
class TestDelegationReasoningEffort(unittest.TestCase):
|
||||
"""Tests for delegation.reasoning_effort config override."""
|
||||
|
||||
@patch("tools.delegate_tool._load_config")
|
||||
@patch("run_agent.AIAgent")
|
||||
def test_inherits_parent_reasoning_when_no_override(self, MockAgent, mock_cfg):
|
||||
"""With no delegation.reasoning_effort, child inherits parent's config."""
|
||||
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": ""}
|
||||
MockAgent.return_value = MagicMock()
|
||||
parent = _make_mock_parent()
|
||||
parent.reasoning_config = {"enabled": True, "effort": "xhigh"}
|
||||
|
||||
_build_child_agent(
|
||||
task_index=0, goal="test", context=None, toolsets=None,
|
||||
model=None, max_iterations=50, parent_agent=parent,
|
||||
)
|
||||
call_kwargs = MockAgent.call_args[1]
|
||||
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "xhigh"})
|
||||
|
||||
@patch("tools.delegate_tool._load_config")
|
||||
@patch("run_agent.AIAgent")
|
||||
def test_override_reasoning_effort_from_config(self, MockAgent, mock_cfg):
|
||||
"""delegation.reasoning_effort overrides the parent's level."""
|
||||
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "low"}
|
||||
MockAgent.return_value = MagicMock()
|
||||
parent = _make_mock_parent()
|
||||
parent.reasoning_config = {"enabled": True, "effort": "xhigh"}
|
||||
|
||||
_build_child_agent(
|
||||
task_index=0, goal="test", context=None, toolsets=None,
|
||||
model=None, max_iterations=50, parent_agent=parent,
|
||||
)
|
||||
call_kwargs = MockAgent.call_args[1]
|
||||
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "low"})
|
||||
|
||||
@patch("tools.delegate_tool._load_config")
|
||||
@patch("run_agent.AIAgent")
|
||||
def test_override_reasoning_effort_none_disables(self, MockAgent, mock_cfg):
|
||||
"""delegation.reasoning_effort: 'none' disables thinking for subagents."""
|
||||
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "none"}
|
||||
MockAgent.return_value = MagicMock()
|
||||
parent = _make_mock_parent()
|
||||
parent.reasoning_config = {"enabled": True, "effort": "high"}
|
||||
|
||||
_build_child_agent(
|
||||
task_index=0, goal="test", context=None, toolsets=None,
|
||||
model=None, max_iterations=50, parent_agent=parent,
|
||||
)
|
||||
call_kwargs = MockAgent.call_args[1]
|
||||
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": False})
|
||||
|
||||
@patch("tools.delegate_tool._load_config")
|
||||
@patch("run_agent.AIAgent")
|
||||
def test_invalid_reasoning_effort_falls_back_to_parent(self, MockAgent, mock_cfg):
|
||||
"""Invalid delegation.reasoning_effort falls back to parent's config."""
|
||||
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "banana"}
|
||||
MockAgent.return_value = MagicMock()
|
||||
parent = _make_mock_parent()
|
||||
parent.reasoning_config = {"enabled": True, "effort": "medium"}
|
||||
|
||||
_build_child_agent(
|
||||
task_index=0, goal="test", context=None, toolsets=None,
|
||||
model=None, max_iterations=50, parent_agent=parent,
|
||||
)
|
||||
call_kwargs = MockAgent.call_args[1]
|
||||
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
148
tests/tools/test_file_operations_edge_cases.py
Normal file
148
tests/tools/test_file_operations_edge_cases.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""Tests for edge cases in tools/file_operations.py.
|
||||
|
||||
Covers:
|
||||
- ``_is_likely_binary()`` content-analysis branch (dead-code removal regression guard)
|
||||
- ``_check_lint()`` robustness against file paths containing curly braces
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from tools.file_operations import ShellFileOperations
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# _is_likely_binary edge cases
|
||||
# =========================================================================
|
||||
|
||||
|
||||
class TestIsLikelyBinary:
|
||||
"""Verify content-analysis logic after dead-code removal."""
|
||||
|
||||
@pytest.fixture()
|
||||
def ops(self):
|
||||
return ShellFileOperations.__new__(ShellFileOperations)
|
||||
|
||||
def test_binary_extension_returns_true(self, ops):
|
||||
"""Known binary extensions should short-circuit without content analysis."""
|
||||
assert ops._is_likely_binary("image.png") is True
|
||||
assert ops._is_likely_binary("archive.tar.gz", content_sample="hello") is True
|
||||
|
||||
def test_text_content_returns_false(self, ops):
|
||||
"""Normal printable text should not be classified as binary."""
|
||||
sample = "Hello, world!\nThis is a normal text file.\n"
|
||||
assert ops._is_likely_binary("unknown.xyz", content_sample=sample) is False
|
||||
|
||||
def test_binary_content_returns_true(self, ops):
|
||||
"""Content with >30% non-printable characters should be classified as binary."""
|
||||
# 500 NUL bytes + 500 printable = 50% non-printable → binary
|
||||
# Use .xyz extension (not in BINARY_EXTENSIONS) to ensure content analysis runs
|
||||
sample = "\x00" * 500 + "a" * 500
|
||||
assert ops._is_likely_binary("data.xyz", content_sample=sample) is True
|
||||
|
||||
def test_no_content_sample_returns_false(self, ops):
|
||||
"""When no content sample is provided and extension is unknown → not binary."""
|
||||
assert ops._is_likely_binary("mystery_file") is False
|
||||
|
||||
def test_none_content_sample_returns_false(self, ops):
|
||||
"""Explicit ``None`` content_sample should behave the same as missing."""
|
||||
assert ops._is_likely_binary("mystery_file", content_sample=None) is False
|
||||
|
||||
def test_empty_string_content_sample_returns_false(self, ops):
|
||||
"""Empty string is falsy, so content analysis should be skipped → not binary."""
|
||||
assert ops._is_likely_binary("mystery_file", content_sample="") is False
|
||||
|
||||
def test_threshold_boundary(self, ops):
|
||||
"""Exactly 30% non-printable should NOT trigger binary classification (> 0.30, not >=)."""
|
||||
# 300 NUL bytes + 700 printable = 30.0% → should be False (uses strict >)
|
||||
sample = "\x00" * 300 + "a" * 700
|
||||
assert ops._is_likely_binary("data.xyz", content_sample=sample) is False
|
||||
|
||||
def test_just_above_threshold(self, ops):
|
||||
"""301/1000 = 30.1% non-printable → should be binary."""
|
||||
sample = "\x00" * 301 + "a" * 699
|
||||
assert ops._is_likely_binary("data.xyz", content_sample=sample) is True
|
||||
|
||||
def test_tabs_and_newlines_excluded(self, ops):
|
||||
"""Tabs, carriage returns, and newlines should not count as non-printable."""
|
||||
sample = "\t" * 400 + "\n" * 300 + "\r" * 200 + "a" * 100
|
||||
assert ops._is_likely_binary("file.txt", content_sample=sample) is False
|
||||
|
||||
def test_content_sample_longer_than_1000(self, ops):
|
||||
"""Only the first 1000 characters should be analysed."""
|
||||
# First 1000 chars: 200 NUL + 800 printable = 20% → not binary
|
||||
# Remaining 1000 chars: all NUL → ignored by [:1000] slice
|
||||
sample = "\x00" * 200 + "a" * 800 + "\x00" * 1000
|
||||
assert ops._is_likely_binary("file.xyz", content_sample=sample) is False
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# _check_lint edge cases
|
||||
# =========================================================================
|
||||
|
||||
|
||||
class TestCheckLintBracePaths:
|
||||
"""Verify _check_lint handles file paths with curly braces safely."""
|
||||
|
||||
@pytest.fixture()
|
||||
def ops(self):
|
||||
obj = ShellFileOperations.__new__(ShellFileOperations)
|
||||
obj._command_cache = {}
|
||||
return obj
|
||||
|
||||
def test_normal_path(self, ops):
|
||||
"""Normal path without braces should work as before."""
|
||||
with patch.object(ops, "_has_command", return_value=True), \
|
||||
patch.object(ops, "_exec") as mock_exec:
|
||||
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
|
||||
result = ops._check_lint("/tmp/test_file.py")
|
||||
|
||||
assert result.success is True
|
||||
# Verify the command was built correctly
|
||||
cmd_arg = mock_exec.call_args[0][0]
|
||||
assert "'/tmp/test_file.py'" in cmd_arg
|
||||
|
||||
def test_path_with_curly_braces(self, ops):
|
||||
"""Path containing ``{`` and ``}`` must not raise KeyError/ValueError."""
|
||||
with patch.object(ops, "_has_command", return_value=True), \
|
||||
patch.object(ops, "_exec") as mock_exec:
|
||||
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
|
||||
# This would raise KeyError with .format() but works with .replace()
|
||||
result = ops._check_lint("/tmp/{test}_file.py")
|
||||
|
||||
assert result.success is True
|
||||
cmd_arg = mock_exec.call_args[0][0]
|
||||
assert "{test}" in cmd_arg
|
||||
|
||||
def test_path_with_nested_braces(self, ops):
|
||||
"""Path with complex brace patterns like ``{{var}}`` should be safe."""
|
||||
with patch.object(ops, "_has_command", return_value=True), \
|
||||
patch.object(ops, "_exec") as mock_exec:
|
||||
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
|
||||
result = ops._check_lint("/tmp/{{var}}.py")
|
||||
|
||||
assert result.success is True
|
||||
|
||||
def test_unsupported_extension_skipped(self, ops):
|
||||
"""Extensions without a linter should return a skipped result."""
|
||||
result = ops._check_lint("/tmp/file.unknown_ext")
|
||||
assert result.skipped is True
|
||||
|
||||
def test_missing_linter_skipped(self, ops):
|
||||
"""When the linter binary is not installed, skip gracefully."""
|
||||
with patch.object(ops, "_has_command", return_value=False):
|
||||
result = ops._check_lint("/tmp/test.py")
|
||||
assert result.skipped is True
|
||||
|
||||
def test_lint_failure_returns_output(self, ops):
|
||||
"""When the linter exits non-zero, result should capture output."""
|
||||
with patch.object(ops, "_has_command", return_value=True), \
|
||||
patch.object(ops, "_exec") as mock_exec:
|
||||
mock_exec.return_value = MagicMock(
|
||||
exit_code=1,
|
||||
stdout="SyntaxError: invalid syntax",
|
||||
)
|
||||
result = ops._check_lint("/tmp/bad.py")
|
||||
|
||||
assert result.success is False
|
||||
assert "SyntaxError" in result.output
|
||||
@@ -312,6 +312,25 @@ def _build_child_agent(
|
||||
effective_acp_command = override_acp_command or getattr(parent_agent, "acp_command", None)
|
||||
effective_acp_args = list(override_acp_args if override_acp_args is not None else (getattr(parent_agent, "acp_args", []) or []))
|
||||
|
||||
# Resolve reasoning config: delegation override > parent inherit
|
||||
parent_reasoning = getattr(parent_agent, "reasoning_config", None)
|
||||
child_reasoning = parent_reasoning
|
||||
try:
|
||||
delegation_cfg = _load_config()
|
||||
delegation_effort = str(delegation_cfg.get("reasoning_effort") or "").strip()
|
||||
if delegation_effort:
|
||||
from hermes_constants import parse_reasoning_effort
|
||||
parsed = parse_reasoning_effort(delegation_effort)
|
||||
if parsed is not None:
|
||||
child_reasoning = parsed
|
||||
else:
|
||||
logger.warning(
|
||||
"Unknown delegation.reasoning_effort '%s', inheriting parent level",
|
||||
delegation_effort,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Could not load delegation reasoning_effort: %s", exc)
|
||||
|
||||
child = AIAgent(
|
||||
base_url=effective_base_url,
|
||||
api_key=effective_api_key,
|
||||
@@ -322,7 +341,7 @@ def _build_child_agent(
|
||||
acp_args=effective_acp_args,
|
||||
max_iterations=max_iterations,
|
||||
max_tokens=getattr(parent_agent, "max_tokens", None),
|
||||
reasoning_config=getattr(parent_agent, "reasoning_config", None),
|
||||
reasoning_config=child_reasoning,
|
||||
prefill_messages=getattr(parent_agent, "prefill_messages", None),
|
||||
enabled_toolsets=child_toolsets,
|
||||
quiet_mode=True,
|
||||
|
||||
@@ -386,9 +386,7 @@ class ShellFileOperations(FileOperations):
|
||||
|
||||
# Content analysis: >30% non-printable chars = binary
|
||||
if content_sample:
|
||||
if not content_sample:
|
||||
return False
|
||||
non_printable = sum(1 for c in content_sample[:1000]
|
||||
non_printable = sum(1 for c in content_sample[:1000]
|
||||
if ord(c) < 32 and c not in '\n\r\t')
|
||||
return non_printable / min(len(content_sample), 1000) > 0.30
|
||||
|
||||
@@ -810,7 +808,7 @@ class ShellFileOperations(FileOperations):
|
||||
return LintResult(skipped=True, message=f"{base_cmd} not available")
|
||||
|
||||
# Run linter
|
||||
cmd = linter_cmd.format(file=self._escape_shell_arg(path))
|
||||
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
|
||||
result = self._exec(cmd, timeout=30)
|
||||
|
||||
return LintResult(
|
||||
|
||||
Reference in New Issue
Block a user