mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
Compare commits
5 Commits
skill/gith
...
feat/acp-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9d74c817b | ||
|
|
e644f6b069 | ||
|
|
c100a486de | ||
|
|
62479afd88 | ||
|
|
77796d3184 |
@@ -22,6 +22,9 @@ from acp.schema import (
|
||||
InitializeResponse,
|
||||
ListSessionsResponse,
|
||||
LoadSessionResponse,
|
||||
McpServerHttp,
|
||||
McpServerSse,
|
||||
McpServerStdio,
|
||||
NewSessionResponse,
|
||||
PromptResponse,
|
||||
ResumeSessionResponse,
|
||||
@@ -93,6 +96,71 @@ class HermesACPAgent(acp.Agent):
|
||||
self._conn = conn
|
||||
logger.info("ACP client connected")
|
||||
|
||||
async def _register_session_mcp_servers(
|
||||
self,
|
||||
state: SessionState,
|
||||
mcp_servers: list[McpServerStdio | McpServerHttp | McpServerSse] | None,
|
||||
) -> None:
|
||||
"""Register ACP-provided MCP servers and refresh the agent tool surface."""
|
||||
if not mcp_servers:
|
||||
return
|
||||
|
||||
try:
|
||||
from tools.mcp_tool import register_mcp_servers
|
||||
|
||||
config_map: dict[str, dict] = {}
|
||||
for server in mcp_servers:
|
||||
name = server.name
|
||||
if isinstance(server, McpServerStdio):
|
||||
config = {
|
||||
"command": server.command,
|
||||
"args": list(server.args),
|
||||
"env": {item.name: item.value for item in server.env},
|
||||
}
|
||||
else:
|
||||
config = {
|
||||
"url": server.url,
|
||||
"headers": {item.name: item.value for item in server.headers},
|
||||
}
|
||||
config_map[name] = config
|
||||
|
||||
await asyncio.to_thread(register_mcp_servers, config_map)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Session %s: failed to register ACP MCP servers",
|
||||
state.session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
from model_tools import get_tool_definitions
|
||||
|
||||
enabled_toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"]
|
||||
disabled_toolsets = getattr(state.agent, "disabled_toolsets", None)
|
||||
state.agent.tools = get_tool_definitions(
|
||||
enabled_toolsets=enabled_toolsets,
|
||||
disabled_toolsets=disabled_toolsets,
|
||||
quiet_mode=True,
|
||||
)
|
||||
state.agent.valid_tool_names = {
|
||||
tool["function"]["name"] for tool in state.agent.tools or []
|
||||
}
|
||||
invalidate = getattr(state.agent, "_invalidate_system_prompt", None)
|
||||
if callable(invalidate):
|
||||
invalidate()
|
||||
logger.info(
|
||||
"Session %s: refreshed tool surface after ACP MCP registration (%d tools)",
|
||||
state.session_id,
|
||||
len(state.agent.tools or []),
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Session %s: failed to refresh tool surface after ACP MCP registration",
|
||||
state.session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# ---- ACP lifecycle ------------------------------------------------------
|
||||
|
||||
async def initialize(
|
||||
@@ -149,6 +217,7 @@ class HermesACPAgent(acp.Agent):
|
||||
**kwargs: Any,
|
||||
) -> NewSessionResponse:
|
||||
state = self.session_manager.create_session(cwd=cwd)
|
||||
await self._register_session_mcp_servers(state, mcp_servers)
|
||||
logger.info("New session %s (cwd=%s)", state.session_id, cwd)
|
||||
return NewSessionResponse(session_id=state.session_id)
|
||||
|
||||
@@ -163,6 +232,7 @@ class HermesACPAgent(acp.Agent):
|
||||
if state is None:
|
||||
logger.warning("load_session: session %s not found", session_id)
|
||||
return None
|
||||
await self._register_session_mcp_servers(state, mcp_servers)
|
||||
logger.info("Loaded session %s", session_id)
|
||||
return LoadSessionResponse()
|
||||
|
||||
@@ -177,6 +247,7 @@ class HermesACPAgent(acp.Agent):
|
||||
if state is None:
|
||||
logger.warning("resume_session: session %s not found, creating new", session_id)
|
||||
state = self.session_manager.create_session(cwd=cwd)
|
||||
await self._register_session_mcp_servers(state, mcp_servers)
|
||||
logger.info("Resumed session %s", state.session_id)
|
||||
return ResumeSessionResponse()
|
||||
|
||||
@@ -200,6 +271,8 @@ class HermesACPAgent(acp.Agent):
|
||||
) -> ForkSessionResponse:
|
||||
state = self.session_manager.fork_session(session_id, cwd=cwd)
|
||||
new_id = state.session_id if state else ""
|
||||
if state is not None:
|
||||
await self._register_session_mcp_servers(state, mcp_servers)
|
||||
logger.info("Forked session %s -> %s", session_id, new_id)
|
||||
return ForkSessionResponse(session_id=new_id)
|
||||
|
||||
|
||||
@@ -5574,15 +5574,25 @@ class GatewayRunner:
|
||||
_loop_for_step = asyncio.get_event_loop()
|
||||
_hooks_ref = self.hooks
|
||||
|
||||
def _step_callback_sync(iteration: int, tool_names: list) -> None:
|
||||
def _step_callback_sync(iteration: int, prev_tools: list) -> None:
|
||||
try:
|
||||
# prev_tools may be list[str] or list[dict] with "name"/"result"
|
||||
# keys. Normalise to keep "tool_names" backward-compatible for
|
||||
# user-authored hooks that do ', '.join(tool_names)'.
|
||||
_names: list[str] = []
|
||||
for _t in (prev_tools or []):
|
||||
if isinstance(_t, dict):
|
||||
_names.append(_t.get("name") or "")
|
||||
else:
|
||||
_names.append(str(_t))
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_hooks_ref.emit("agent:step", {
|
||||
"platform": source.platform.value if source.platform else "",
|
||||
"user_id": source.user_id,
|
||||
"session_id": session_id,
|
||||
"iteration": iteration,
|
||||
"tool_names": tool_names,
|
||||
"tool_names": _names,
|
||||
"tools": prev_tools,
|
||||
}),
|
||||
_loop_for_step,
|
||||
)
|
||||
|
||||
15
run_agent.py
15
run_agent.py
@@ -6680,10 +6680,21 @@ class AIAgent:
|
||||
if self.step_callback is not None:
|
||||
try:
|
||||
prev_tools = []
|
||||
for _m in reversed(messages):
|
||||
for _idx, _m in enumerate(reversed(messages)):
|
||||
if _m.get("role") == "assistant" and _m.get("tool_calls"):
|
||||
_fwd_start = len(messages) - _idx
|
||||
_results_by_id = {}
|
||||
for _tm in messages[_fwd_start:]:
|
||||
if _tm.get("role") != "tool":
|
||||
break
|
||||
_tcid = _tm.get("tool_call_id")
|
||||
if _tcid:
|
||||
_results_by_id[_tcid] = _tm.get("content", "")
|
||||
prev_tools = [
|
||||
tc["function"]["name"]
|
||||
{
|
||||
"name": tc["function"]["name"],
|
||||
"result": _results_by_id.get(tc.get("id")),
|
||||
}
|
||||
for tc in _m["tool_calls"]
|
||||
if isinstance(tc, dict)
|
||||
]
|
||||
|
||||
@@ -205,6 +205,47 @@ class TestStepCallback:
|
||||
assert "read_file" not in tool_call_ids
|
||||
mock_rcts.assert_called_once()
|
||||
|
||||
def test_result_passed_to_build_tool_complete(self, mock_conn, event_loop_fixture):
|
||||
"""Tool result from prev_tools dict is forwarded to build_tool_complete."""
|
||||
from collections import deque
|
||||
|
||||
tool_call_ids = {"terminal": deque(["tc-xyz789"])}
|
||||
loop = event_loop_fixture
|
||||
|
||||
cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
|
||||
|
||||
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts, \
|
||||
patch("acp_adapter.events.build_tool_complete") as mock_btc:
|
||||
future = MagicMock(spec=Future)
|
||||
future.result.return_value = None
|
||||
mock_rcts.return_value = future
|
||||
|
||||
# Provide a result string in the tool info dict
|
||||
cb(1, [{"name": "terminal", "result": '{"output": "hello"}'}])
|
||||
|
||||
mock_btc.assert_called_once_with(
|
||||
"tc-xyz789", "terminal", result='{"output": "hello"}'
|
||||
)
|
||||
|
||||
def test_none_result_passed_through(self, mock_conn, event_loop_fixture):
|
||||
"""When result is None (e.g. first iteration), None is passed through."""
|
||||
from collections import deque
|
||||
|
||||
tool_call_ids = {"web_search": deque(["tc-aaa"])}
|
||||
loop = event_loop_fixture
|
||||
|
||||
cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
|
||||
|
||||
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts, \
|
||||
patch("acp_adapter.events.build_tool_complete") as mock_btc:
|
||||
future = MagicMock(spec=Future)
|
||||
future.result.return_value = None
|
||||
mock_rcts.return_value = future
|
||||
|
||||
cb(1, [{"name": "web_search", "result": None}])
|
||||
|
||||
mock_btc.assert_called_once_with("tc-aaa", "web_search", result=None)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Message callback
|
||||
|
||||
349
tests/acp/test_mcp_e2e.py
Normal file
349
tests/acp/test_mcp_e2e.py
Normal file
@@ -0,0 +1,349 @@
|
||||
"""End-to-end tests for ACP MCP server registration and tool-result reporting.
|
||||
|
||||
Exercises the full flow through the ACP server layer:
|
||||
new_session(mcpServers) → MCP tools registered → prompt() →
|
||||
tool_progress_callback (ToolCallStart) →
|
||||
step_callback with results (ToolCallUpdate with rawOutput) →
|
||||
session_update events arrive at the mock client
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from collections import deque
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import acp
|
||||
from acp.schema import (
|
||||
EnvVariable,
|
||||
HttpHeader,
|
||||
McpServerHttp,
|
||||
McpServerStdio,
|
||||
NewSessionResponse,
|
||||
PromptResponse,
|
||||
TextContentBlock,
|
||||
ToolCallProgress,
|
||||
ToolCallStart,
|
||||
)
|
||||
|
||||
from acp_adapter.server import HermesACPAgent
|
||||
from acp_adapter.session import SessionManager
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_manager():
|
||||
return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent"))
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def acp_agent(mock_manager):
|
||||
return HermesACPAgent(session_manager=mock_manager)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# E2E: MCP registration → prompt → tool events
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMcpRegistrationE2E:
|
||||
"""Full flow: session with MCP servers → prompt with tool calls → ACP events."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_with_mcp_servers_registers_tools(self, acp_agent, mock_manager):
|
||||
"""new_session with mcpServers converts them to Hermes config and registers."""
|
||||
servers = [
|
||||
McpServerStdio(
|
||||
name="test-fs",
|
||||
command="/usr/bin/mcp-fs",
|
||||
args=["--root", "/tmp"],
|
||||
env=[EnvVariable(name="DEBUG", value="1")],
|
||||
),
|
||||
McpServerHttp(
|
||||
name="test-api",
|
||||
url="https://api.example.com/mcp",
|
||||
headers=[HttpHeader(name="Authorization", value="Bearer tok123")],
|
||||
),
|
||||
]
|
||||
|
||||
registered_configs = {}
|
||||
|
||||
def mock_register(config_map):
|
||||
registered_configs.update(config_map)
|
||||
return ["mcp_test_fs_read", "mcp_test_fs_write", "mcp_test_api_search"]
|
||||
|
||||
fake_tools = [
|
||||
{"function": {"name": "mcp_test_fs_read"}},
|
||||
{"function": {"name": "mcp_test_fs_write"}},
|
||||
{"function": {"name": "mcp_test_api_search"}},
|
||||
{"function": {"name": "terminal"}},
|
||||
]
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=mock_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=fake_tools):
|
||||
resp = await acp_agent.new_session(cwd="/tmp", mcp_servers=servers)
|
||||
|
||||
assert isinstance(resp, NewSessionResponse)
|
||||
state = mock_manager.get_session(resp.session_id)
|
||||
|
||||
# Verify stdio server was converted correctly
|
||||
assert "test-fs" in registered_configs
|
||||
fs_cfg = registered_configs["test-fs"]
|
||||
assert fs_cfg["command"] == "/usr/bin/mcp-fs"
|
||||
assert fs_cfg["args"] == ["--root", "/tmp"]
|
||||
assert fs_cfg["env"] == {"DEBUG": "1"}
|
||||
|
||||
# Verify HTTP server was converted correctly
|
||||
assert "test-api" in registered_configs
|
||||
api_cfg = registered_configs["test-api"]
|
||||
assert api_cfg["url"] == "https://api.example.com/mcp"
|
||||
assert api_cfg["headers"] == {"Authorization": "Bearer tok123"}
|
||||
|
||||
# Verify agent tool surface was refreshed
|
||||
assert state.agent.tools == fake_tools
|
||||
assert state.agent.valid_tool_names == {
|
||||
"mcp_test_fs_read", "mcp_test_fs_write", "mcp_test_api_search", "terminal"
|
||||
}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prompt_with_tool_calls_emits_acp_events(self, acp_agent, mock_manager):
|
||||
"""Prompt → agent fires callbacks → ACP ToolCallStart + ToolCallUpdate events."""
|
||||
resp = await acp_agent.new_session(cwd="/tmp")
|
||||
session_id = resp.session_id
|
||||
state = mock_manager.get_session(session_id)
|
||||
|
||||
# Wire up a mock ACP client connection
|
||||
mock_conn = MagicMock(spec=acp.Client)
|
||||
mock_conn.session_update = AsyncMock()
|
||||
mock_conn.request_permission = AsyncMock()
|
||||
acp_agent._conn = mock_conn
|
||||
|
||||
def mock_run_conversation(user_message, conversation_history=None, task_id=None):
|
||||
"""Simulate an agent turn that calls terminal, gets a result, then responds."""
|
||||
agent = state.agent
|
||||
|
||||
# 1) Agent fires tool_progress_callback (ToolCallStart)
|
||||
if agent.tool_progress_callback:
|
||||
agent.tool_progress_callback(
|
||||
"terminal", "$ echo hello", {"command": "echo hello"}
|
||||
)
|
||||
|
||||
# 2) Agent fires step_callback with tool results (ToolCallUpdate)
|
||||
if agent.step_callback:
|
||||
agent.step_callback(1, [
|
||||
{"name": "terminal", "result": '{"output": "hello\\n", "exit_code": 0}'}
|
||||
])
|
||||
|
||||
return {
|
||||
"final_response": "The command output 'hello'.",
|
||||
"messages": [
|
||||
{"role": "user", "content": user_message},
|
||||
{"role": "assistant", "content": "The command output 'hello'."},
|
||||
],
|
||||
}
|
||||
|
||||
state.agent.run_conversation = mock_run_conversation
|
||||
|
||||
prompt = [TextContentBlock(type="text", text="run echo hello")]
|
||||
resp = await acp_agent.prompt(prompt=prompt, session_id=session_id)
|
||||
|
||||
assert isinstance(resp, PromptResponse)
|
||||
assert resp.stop_reason == "end_turn"
|
||||
|
||||
# Collect all session_update calls
|
||||
updates = []
|
||||
for call in mock_conn.session_update.call_args_list:
|
||||
# session_update(session_id, update) — grab the update
|
||||
update_arg = call[1].get("update") or call[0][1]
|
||||
updates.append(update_arg)
|
||||
|
||||
# Find tool_call (start) and tool_call_update (completion) events
|
||||
starts = [u for u in updates if getattr(u, "session_update", None) == "tool_call"]
|
||||
completions = [u for u in updates if getattr(u, "session_update", None) == "tool_call_update"]
|
||||
|
||||
# Should have at least one ToolCallStart for "terminal"
|
||||
assert len(starts) >= 1, f"Expected ToolCallStart, got updates: {[getattr(u, 'session_update', '?') for u in updates]}"
|
||||
start_event = starts[0]
|
||||
assert isinstance(start_event, ToolCallStart)
|
||||
assert start_event.title.startswith("terminal:")
|
||||
|
||||
# Should have at least one ToolCallUpdate (completion) with rawOutput
|
||||
assert len(completions) >= 1, f"Expected ToolCallUpdate, got updates: {[getattr(u, 'session_update', '?') for u in updates]}"
|
||||
complete_event = completions[0]
|
||||
assert isinstance(complete_event, ToolCallProgress)
|
||||
assert complete_event.status == "completed"
|
||||
# rawOutput should contain the tool result string
|
||||
assert complete_event.raw_output is not None
|
||||
assert "hello" in str(complete_event.raw_output)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prompt_tool_results_paired_by_call_id(self, acp_agent, mock_manager):
|
||||
"""The ToolCallUpdate's toolCallId must match the ToolCallStart's."""
|
||||
resp = await acp_agent.new_session(cwd="/tmp")
|
||||
session_id = resp.session_id
|
||||
state = mock_manager.get_session(session_id)
|
||||
|
||||
mock_conn = MagicMock(spec=acp.Client)
|
||||
mock_conn.session_update = AsyncMock()
|
||||
mock_conn.request_permission = AsyncMock()
|
||||
acp_agent._conn = mock_conn
|
||||
|
||||
def mock_run(user_message, conversation_history=None, task_id=None):
|
||||
agent = state.agent
|
||||
# Fire two tool calls
|
||||
if agent.tool_progress_callback:
|
||||
agent.tool_progress_callback("read_file", "read: /etc/hosts", {"path": "/etc/hosts"})
|
||||
agent.tool_progress_callback("web_search", "web search: test", {"query": "test"})
|
||||
|
||||
if agent.step_callback:
|
||||
agent.step_callback(1, [
|
||||
{"name": "read_file", "result": '{"content": "127.0.0.1 localhost"}'},
|
||||
{"name": "web_search", "result": '{"data": {"web": []}}'},
|
||||
])
|
||||
|
||||
return {"final_response": "Done.", "messages": []}
|
||||
|
||||
state.agent.run_conversation = mock_run
|
||||
|
||||
prompt = [TextContentBlock(type="text", text="test")]
|
||||
await acp_agent.prompt(prompt=prompt, session_id=session_id)
|
||||
|
||||
updates = []
|
||||
for call in mock_conn.session_update.call_args_list:
|
||||
update_arg = call[1].get("update") or call[0][1]
|
||||
updates.append(update_arg)
|
||||
|
||||
starts = [u for u in updates if getattr(u, "session_update", None) == "tool_call"]
|
||||
completions = [u for u in updates if getattr(u, "session_update", None) == "tool_call_update"]
|
||||
|
||||
assert len(starts) == 2, f"Expected 2 starts, got {len(starts)}"
|
||||
assert len(completions) == 2, f"Expected 2 completions, got {len(completions)}"
|
||||
|
||||
# Each completion's toolCallId must match a start's toolCallId
|
||||
start_ids = {s.tool_call_id for s in starts}
|
||||
completion_ids = {c.tool_call_id for c in completions}
|
||||
assert start_ids == completion_ids, (
|
||||
f"IDs must match: starts={start_ids}, completions={completion_ids}"
|
||||
)
|
||||
|
||||
|
||||
class TestMcpSanitizationE2E:
|
||||
"""Verify server names with special chars work end-to-end."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_slashed_server_name_registers_cleanly(self, acp_agent, mock_manager):
|
||||
"""Server name 'ai.exa/exa' should not crash — tools get sanitized names."""
|
||||
servers = [
|
||||
McpServerHttp(
|
||||
name="ai.exa/exa",
|
||||
url="https://exa.ai/mcp",
|
||||
headers=[],
|
||||
),
|
||||
]
|
||||
|
||||
registered_configs = {}
|
||||
def mock_register(config_map):
|
||||
registered_configs.update(config_map)
|
||||
return ["mcp_ai_exa_exa_search"]
|
||||
|
||||
fake_tools = [{"function": {"name": "mcp_ai_exa_exa_search"}}]
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=mock_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=fake_tools):
|
||||
resp = await acp_agent.new_session(cwd="/tmp", mcp_servers=servers)
|
||||
|
||||
state = mock_manager.get_session(resp.session_id)
|
||||
|
||||
# Raw server name preserved as config key
|
||||
assert "ai.exa/exa" in registered_configs
|
||||
# Agent tools refreshed with sanitized name
|
||||
assert "mcp_ai_exa_exa_search" in state.agent.valid_tool_names
|
||||
|
||||
|
||||
class TestSessionLifecycleMcpE2E:
|
||||
"""Verify MCP servers are registered on all session lifecycle methods."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_session_registers_mcp(self, acp_agent, mock_manager):
|
||||
"""load_session re-registers MCP servers (spec says agents may not retain them)."""
|
||||
# Create a session first
|
||||
create_resp = await acp_agent.new_session(cwd="/tmp")
|
||||
sid = create_resp.session_id
|
||||
|
||||
servers = [
|
||||
McpServerStdio(name="srv", command="/bin/test", args=[], env=[]),
|
||||
]
|
||||
|
||||
registered = {}
|
||||
def mock_register(config_map):
|
||||
registered.update(config_map)
|
||||
return []
|
||||
|
||||
state = mock_manager.get_session(sid)
|
||||
state.agent.enabled_toolsets = ["hermes-acp"]
|
||||
state.agent.disabled_toolsets = None
|
||||
state.agent.tools = []
|
||||
state.agent.valid_tool_names = set()
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=mock_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=[]):
|
||||
await acp_agent.load_session(cwd="/tmp", session_id=sid, mcp_servers=servers)
|
||||
|
||||
assert "srv" in registered
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resume_session_registers_mcp(self, acp_agent, mock_manager):
|
||||
"""resume_session re-registers MCP servers."""
|
||||
create_resp = await acp_agent.new_session(cwd="/tmp")
|
||||
sid = create_resp.session_id
|
||||
|
||||
servers = [
|
||||
McpServerStdio(name="srv2", command="/bin/test2", args=[], env=[]),
|
||||
]
|
||||
|
||||
registered = {}
|
||||
def mock_register(config_map):
|
||||
registered.update(config_map)
|
||||
return []
|
||||
|
||||
state = mock_manager.get_session(sid)
|
||||
state.agent.enabled_toolsets = ["hermes-acp"]
|
||||
state.agent.disabled_toolsets = None
|
||||
state.agent.tools = []
|
||||
state.agent.valid_tool_names = set()
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=mock_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=[]):
|
||||
await acp_agent.resume_session(cwd="/tmp", session_id=sid, mcp_servers=servers)
|
||||
|
||||
assert "srv2" in registered
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fork_session_registers_mcp(self, acp_agent, mock_manager):
|
||||
"""fork_session registers MCP servers on the new forked session."""
|
||||
create_resp = await acp_agent.new_session(cwd="/tmp")
|
||||
sid = create_resp.session_id
|
||||
|
||||
servers = [
|
||||
McpServerHttp(name="api", url="https://api.test/mcp", headers=[]),
|
||||
]
|
||||
|
||||
registered = {}
|
||||
def mock_register(config_map):
|
||||
registered.update(config_map)
|
||||
return []
|
||||
|
||||
# Need to set up the forked session's agent too
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=mock_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=[]):
|
||||
fork_resp = await acp_agent.fork_session(
|
||||
cwd="/tmp", session_id=sid, mcp_servers=servers
|
||||
)
|
||||
|
||||
assert fork_resp.session_id != ""
|
||||
assert "api" in registered
|
||||
@@ -505,3 +505,179 @@ class TestSlashCommands:
|
||||
assert state.agent.provider == "anthropic"
|
||||
assert state.agent.base_url == "https://anthropic.example/v1"
|
||||
assert runtime_calls[-1] == "anthropic"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _register_session_mcp_servers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegisterSessionMcpServers:
|
||||
"""Tests for ACP MCP server registration in session lifecycle."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_noop_when_no_servers(self, agent, mock_manager):
|
||||
"""No-op when mcp_servers is None or empty."""
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
# Should not raise
|
||||
await agent._register_session_mcp_servers(state, None)
|
||||
await agent._register_session_mcp_servers(state, [])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_registers_stdio_servers(self, agent, mock_manager):
|
||||
"""McpServerStdio servers are converted and passed to register_mcp_servers."""
|
||||
from acp.schema import McpServerStdio, EnvVariable
|
||||
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
# Give the mock agent the attributes _register_session_mcp_servers reads
|
||||
state.agent.enabled_toolsets = ["hermes-acp"]
|
||||
state.agent.disabled_toolsets = None
|
||||
state.agent.tools = []
|
||||
state.agent.valid_tool_names = set()
|
||||
|
||||
server = McpServerStdio(
|
||||
name="test-server",
|
||||
command="/usr/bin/test",
|
||||
args=["--flag"],
|
||||
env=[EnvVariable(name="KEY", value="val")],
|
||||
)
|
||||
|
||||
registered_config = {}
|
||||
def capture_register(config_map):
|
||||
registered_config.update(config_map)
|
||||
return ["mcp_test_server_tool1"]
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=capture_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=[]):
|
||||
await agent._register_session_mcp_servers(state, [server])
|
||||
|
||||
assert "test-server" in registered_config
|
||||
cfg = registered_config["test-server"]
|
||||
assert cfg["command"] == "/usr/bin/test"
|
||||
assert cfg["args"] == ["--flag"]
|
||||
assert cfg["env"] == {"KEY": "val"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_registers_http_servers(self, agent, mock_manager):
|
||||
"""McpServerHttp servers are converted correctly."""
|
||||
from acp.schema import McpServerHttp, HttpHeader
|
||||
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
state.agent.enabled_toolsets = ["hermes-acp"]
|
||||
state.agent.disabled_toolsets = None
|
||||
state.agent.tools = []
|
||||
state.agent.valid_tool_names = set()
|
||||
|
||||
server = McpServerHttp(
|
||||
name="http-server",
|
||||
url="https://api.example.com/mcp",
|
||||
headers=[HttpHeader(name="Authorization", value="Bearer tok")],
|
||||
)
|
||||
|
||||
registered_config = {}
|
||||
def capture_register(config_map):
|
||||
registered_config.update(config_map)
|
||||
return []
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=capture_register), \
|
||||
patch("model_tools.get_tool_definitions", return_value=[]):
|
||||
await agent._register_session_mcp_servers(state, [server])
|
||||
|
||||
assert "http-server" in registered_config
|
||||
cfg = registered_config["http-server"]
|
||||
assert cfg["url"] == "https://api.example.com/mcp"
|
||||
assert cfg["headers"] == {"Authorization": "Bearer tok"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refreshes_agent_tool_surface(self, agent, mock_manager):
|
||||
"""After MCP registration, agent.tools and valid_tool_names are refreshed."""
|
||||
from acp.schema import McpServerStdio
|
||||
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
state.agent.enabled_toolsets = ["hermes-acp"]
|
||||
state.agent.disabled_toolsets = None
|
||||
state.agent.tools = []
|
||||
state.agent.valid_tool_names = set()
|
||||
state.agent._cached_system_prompt = "old prompt"
|
||||
|
||||
server = McpServerStdio(
|
||||
name="srv",
|
||||
command="/bin/test",
|
||||
args=[],
|
||||
env=[],
|
||||
)
|
||||
|
||||
fake_tools = [
|
||||
{"function": {"name": "mcp_srv_search"}},
|
||||
{"function": {"name": "terminal"}},
|
||||
]
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", return_value=["mcp_srv_search"]), \
|
||||
patch("model_tools.get_tool_definitions", return_value=fake_tools):
|
||||
await agent._register_session_mcp_servers(state, [server])
|
||||
|
||||
assert state.agent.tools == fake_tools
|
||||
assert state.agent.valid_tool_names == {"mcp_srv_search", "terminal"}
|
||||
# _invalidate_system_prompt should have been called
|
||||
state.agent._invalidate_system_prompt.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_failure_logs_warning(self, agent, mock_manager):
|
||||
"""If register_mcp_servers raises, warning is logged but no crash."""
|
||||
from acp.schema import McpServerStdio
|
||||
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
server = McpServerStdio(
|
||||
name="bad",
|
||||
command="/nonexistent",
|
||||
args=[],
|
||||
env=[],
|
||||
)
|
||||
|
||||
with patch("tools.mcp_tool.register_mcp_servers", side_effect=RuntimeError("boom")):
|
||||
# Should not raise
|
||||
await agent._register_session_mcp_servers(state, [server])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_session_calls_register(self, agent, mock_manager):
|
||||
"""new_session passes mcp_servers to _register_session_mcp_servers."""
|
||||
with patch.object(agent, "_register_session_mcp_servers", new_callable=AsyncMock) as mock_reg:
|
||||
resp = await agent.new_session(cwd="/tmp", mcp_servers=["fake"])
|
||||
assert resp is not None
|
||||
mock_reg.assert_called_once()
|
||||
# Second arg should be the mcp_servers list
|
||||
assert mock_reg.call_args[0][1] == ["fake"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_session_calls_register(self, agent, mock_manager):
|
||||
"""load_session passes mcp_servers to _register_session_mcp_servers."""
|
||||
# Create a session first so load can find it
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
sid = state.session_id
|
||||
|
||||
with patch.object(agent, "_register_session_mcp_servers", new_callable=AsyncMock) as mock_reg:
|
||||
resp = await agent.load_session(cwd="/tmp", session_id=sid, mcp_servers=["fake"])
|
||||
assert resp is not None
|
||||
mock_reg.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resume_session_calls_register(self, agent, mock_manager):
|
||||
"""resume_session passes mcp_servers to _register_session_mcp_servers."""
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
sid = state.session_id
|
||||
|
||||
with patch.object(agent, "_register_session_mcp_servers", new_callable=AsyncMock) as mock_reg:
|
||||
resp = await agent.resume_session(cwd="/tmp", session_id=sid, mcp_servers=["fake"])
|
||||
assert resp is not None
|
||||
mock_reg.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fork_session_calls_register(self, agent, mock_manager):
|
||||
"""fork_session passes mcp_servers to _register_session_mcp_servers."""
|
||||
state = mock_manager.create_session(cwd="/tmp")
|
||||
sid = state.session_id
|
||||
|
||||
with patch.object(agent, "_register_session_mcp_servers", new_callable=AsyncMock) as mock_reg:
|
||||
resp = await agent.fork_session(cwd="/tmp", session_id=sid, mcp_servers=["fake"])
|
||||
assert resp is not None
|
||||
mock_reg.assert_called_once()
|
||||
|
||||
133
tests/gateway/test_step_callback_compat.py
Normal file
133
tests/gateway/test_step_callback_compat.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Tests for step_callback backward compatibility.
|
||||
|
||||
Verifies that the gateway's step_callback normalization keeps
|
||||
``tool_names`` as a list of strings for backward-compatible hooks,
|
||||
while also providing the enriched ``tools`` list with results.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestStepCallbackNormalization:
|
||||
"""The gateway's _step_callback_sync normalizes prev_tools from run_agent."""
|
||||
|
||||
def _extract_step_callback(self):
|
||||
"""Build a minimal _step_callback_sync using the same logic as gateway/run.py.
|
||||
|
||||
We replicate the closure so we can test normalisation in isolation
|
||||
without spinning up the full gateway.
|
||||
"""
|
||||
captured_events = []
|
||||
|
||||
class FakeHooks:
|
||||
async def emit(self, event_type, data):
|
||||
captured_events.append((event_type, data))
|
||||
|
||||
hooks_ref = FakeHooks()
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
def _step_callback_sync(iteration: int, prev_tools: list) -> None:
|
||||
_names: list[str] = []
|
||||
for _t in (prev_tools or []):
|
||||
if isinstance(_t, dict):
|
||||
_names.append(_t.get("name") or "")
|
||||
else:
|
||||
_names.append(str(_t))
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
hooks_ref.emit("agent:step", {
|
||||
"iteration": iteration,
|
||||
"tool_names": _names,
|
||||
"tools": prev_tools,
|
||||
}),
|
||||
loop,
|
||||
)
|
||||
|
||||
return _step_callback_sync, captured_events, loop
|
||||
|
||||
def test_dict_prev_tools_produce_string_tool_names(self):
|
||||
"""When prev_tools is list[dict], tool_names should be list[str]."""
|
||||
cb, events, loop = self._extract_step_callback()
|
||||
|
||||
# Simulate the enriched format from run_agent.py
|
||||
prev_tools = [
|
||||
{"name": "terminal", "result": '{"output": "hello"}'},
|
||||
{"name": "read_file", "result": '{"content": "..."}'},
|
||||
]
|
||||
|
||||
try:
|
||||
loop.run_until_complete(asyncio.sleep(0)) # prime the loop
|
||||
import threading
|
||||
t = threading.Thread(target=cb, args=(1, prev_tools))
|
||||
t.start()
|
||||
t.join(timeout=2)
|
||||
loop.run_until_complete(asyncio.sleep(0.1))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
assert len(events) == 1
|
||||
_, data = events[0]
|
||||
# tool_names must be strings for backward compat
|
||||
assert data["tool_names"] == ["terminal", "read_file"]
|
||||
assert all(isinstance(n, str) for n in data["tool_names"])
|
||||
# tools should be the enriched dicts
|
||||
assert data["tools"] == prev_tools
|
||||
|
||||
def test_string_prev_tools_still_work(self):
|
||||
"""When prev_tools is list[str] (legacy), tool_names should pass through."""
|
||||
cb, events, loop = self._extract_step_callback()
|
||||
|
||||
prev_tools = ["terminal", "read_file"]
|
||||
|
||||
try:
|
||||
loop.run_until_complete(asyncio.sleep(0))
|
||||
import threading
|
||||
t = threading.Thread(target=cb, args=(2, prev_tools))
|
||||
t.start()
|
||||
t.join(timeout=2)
|
||||
loop.run_until_complete(asyncio.sleep(0.1))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
assert len(events) == 1
|
||||
_, data = events[0]
|
||||
assert data["tool_names"] == ["terminal", "read_file"]
|
||||
|
||||
def test_empty_prev_tools(self):
|
||||
"""Empty or None prev_tools should produce empty tool_names."""
|
||||
cb, events, loop = self._extract_step_callback()
|
||||
|
||||
try:
|
||||
loop.run_until_complete(asyncio.sleep(0))
|
||||
import threading
|
||||
t = threading.Thread(target=cb, args=(1, []))
|
||||
t.start()
|
||||
t.join(timeout=2)
|
||||
loop.run_until_complete(asyncio.sleep(0.1))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
assert len(events) == 1
|
||||
_, data = events[0]
|
||||
assert data["tool_names"] == []
|
||||
|
||||
def test_joinable_for_hook_example(self):
|
||||
"""The documented hook example: ', '.join(tool_names) should work."""
|
||||
# This is the exact pattern from the docs
|
||||
prev_tools = [
|
||||
{"name": "terminal", "result": "ok"},
|
||||
{"name": "web_search", "result": None},
|
||||
]
|
||||
|
||||
_names = []
|
||||
for _t in prev_tools:
|
||||
if isinstance(_t, dict):
|
||||
_names.append(_t.get("name") or "")
|
||||
else:
|
||||
_names.append(str(_t))
|
||||
|
||||
# This must not raise — documented hook pattern
|
||||
result = ", ".join(_names)
|
||||
assert result == "terminal, web_search"
|
||||
@@ -2900,3 +2900,164 @@ class TestMCPBuiltinCollisionGuard:
|
||||
assert mock_registry.get_toolset_for_tool("mcp_srv_do_thing") == "mcp-srv"
|
||||
|
||||
_servers.pop("srv", None)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# sanitize_mcp_name_component
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSanitizeMcpNameComponent:
|
||||
"""Verify sanitize_mcp_name_component handles all edge cases."""
|
||||
|
||||
def test_hyphens_replaced(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("my-server") == "my_server"
|
||||
|
||||
def test_dots_replaced(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("ai.exa") == "ai_exa"
|
||||
|
||||
def test_slashes_replaced(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("ai.exa/exa") == "ai_exa_exa"
|
||||
|
||||
def test_mixed_special_characters(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("@scope/my-pkg.v2") == "_scope_my_pkg_v2"
|
||||
|
||||
def test_alphanumeric_and_underscores_preserved(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("my_server_123") == "my_server_123"
|
||||
|
||||
def test_empty_string(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component("") == ""
|
||||
|
||||
def test_none_returns_empty(self):
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
assert sanitize_mcp_name_component(None) == ""
|
||||
|
||||
def test_slash_in_convert_mcp_schema(self):
|
||||
"""Server names with slashes produce valid tool names via _convert_mcp_schema."""
|
||||
from tools.mcp_tool import _convert_mcp_schema
|
||||
|
||||
mcp_tool = _make_mcp_tool(name="search")
|
||||
schema = _convert_mcp_schema("ai.exa/exa", mcp_tool)
|
||||
assert schema["name"] == "mcp_ai_exa_exa_search"
|
||||
# Must match Anthropic's pattern: ^[a-zA-Z0-9_-]{1,128}$
|
||||
import re
|
||||
assert re.match(r"^[a-zA-Z0-9_-]{1,128}$", schema["name"])
|
||||
|
||||
def test_slash_in_build_utility_schemas(self):
|
||||
"""Server names with slashes produce valid utility tool names."""
|
||||
from tools.mcp_tool import _build_utility_schemas
|
||||
|
||||
schemas = _build_utility_schemas("ai.exa/exa")
|
||||
for s in schemas:
|
||||
name = s["schema"]["name"]
|
||||
assert "/" not in name
|
||||
assert "." not in name
|
||||
|
||||
def test_slash_in_sync_mcp_toolsets(self):
|
||||
"""_sync_mcp_toolsets uses sanitize consistently with _convert_mcp_schema."""
|
||||
from tools.mcp_tool import sanitize_mcp_name_component
|
||||
|
||||
# Verify the prefix generation matches what _convert_mcp_schema produces
|
||||
server_name = "ai.exa/exa"
|
||||
safe_prefix = f"mcp_{sanitize_mcp_name_component(server_name)}_"
|
||||
assert safe_prefix == "mcp_ai_exa_exa_"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# register_mcp_servers public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegisterMcpServers:
|
||||
"""Verify the new register_mcp_servers() public API."""
|
||||
|
||||
def test_empty_servers_returns_empty(self):
|
||||
from tools.mcp_tool import register_mcp_servers
|
||||
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True):
|
||||
result = register_mcp_servers({})
|
||||
assert result == []
|
||||
|
||||
def test_mcp_not_available_returns_empty(self):
|
||||
from tools.mcp_tool import register_mcp_servers
|
||||
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", False):
|
||||
result = register_mcp_servers({"srv": {"command": "test"}})
|
||||
assert result == []
|
||||
|
||||
def test_skips_already_connected_servers(self):
|
||||
from tools.mcp_tool import register_mcp_servers, _servers
|
||||
|
||||
mock_server = _make_mock_server("existing")
|
||||
_servers["existing"] = mock_server
|
||||
|
||||
try:
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_existing_tool"]):
|
||||
result = register_mcp_servers({"existing": {"command": "test"}})
|
||||
assert result == ["mcp_existing_tool"]
|
||||
finally:
|
||||
_servers.pop("existing", None)
|
||||
|
||||
def test_skips_disabled_servers(self):
|
||||
from tools.mcp_tool import register_mcp_servers, _servers
|
||||
|
||||
try:
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._existing_tool_names", return_value=[]):
|
||||
result = register_mcp_servers({"srv": {"command": "test", "enabled": False}})
|
||||
assert result == []
|
||||
finally:
|
||||
_servers.pop("srv", None)
|
||||
|
||||
def test_connects_new_servers(self):
|
||||
from tools.mcp_tool import register_mcp_servers, _servers, _ensure_mcp_loop
|
||||
|
||||
fake_config = {"my_server": {"command": "npx", "args": ["test"]}}
|
||||
|
||||
async def fake_register(name, cfg):
|
||||
server = _make_mock_server(name)
|
||||
server._registered_tool_names = ["mcp_my_server_tool1"]
|
||||
_servers[name] = server
|
||||
return ["mcp_my_server_tool1"]
|
||||
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
|
||||
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_my_server_tool1"]):
|
||||
_ensure_mcp_loop()
|
||||
result = register_mcp_servers(fake_config)
|
||||
|
||||
assert "mcp_my_server_tool1" in result
|
||||
_servers.pop("my_server", None)
|
||||
|
||||
def test_logs_summary_on_success(self):
|
||||
from tools.mcp_tool import register_mcp_servers, _servers, _ensure_mcp_loop
|
||||
|
||||
fake_config = {"srv": {"command": "npx", "args": ["test"]}}
|
||||
|
||||
async def fake_register(name, cfg):
|
||||
server = _make_mock_server(name)
|
||||
server._registered_tool_names = ["mcp_srv_t1", "mcp_srv_t2"]
|
||||
_servers[name] = server
|
||||
return ["mcp_srv_t1", "mcp_srv_t2"]
|
||||
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
|
||||
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_srv_t1", "mcp_srv_t2"]):
|
||||
_ensure_mcp_loop()
|
||||
|
||||
with patch("tools.mcp_tool.logger") as mock_logger:
|
||||
register_mcp_servers(fake_config)
|
||||
|
||||
info_calls = [str(c) for c in mock_logger.info.call_args_list]
|
||||
assert any("2 tool(s)" in c and "1 server(s)" in c for c in info_calls), (
|
||||
f"Summary should report 2 tools from 1 server, got: {info_calls}"
|
||||
)
|
||||
|
||||
_servers.pop("srv", None)
|
||||
|
||||
@@ -1406,6 +1406,17 @@ def _normalize_mcp_input_schema(schema: dict | None) -> dict:
|
||||
return schema
|
||||
|
||||
|
||||
def sanitize_mcp_name_component(value: str) -> str:
|
||||
"""Return an MCP name component safe for tool and prefix generation.
|
||||
|
||||
Preserves Hermes's historical behavior of converting hyphens to
|
||||
underscores, and also replaces any other character outside
|
||||
``[A-Za-z0-9_]`` with ``_`` so generated tool names are compatible with
|
||||
provider validation rules.
|
||||
"""
|
||||
return re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
|
||||
|
||||
|
||||
def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
|
||||
"""Convert an MCP tool listing to the Hermes registry schema format.
|
||||
|
||||
@@ -1417,9 +1428,8 @@ def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
|
||||
Returns:
|
||||
A dict suitable for ``registry.register(schema=...)``.
|
||||
"""
|
||||
# Sanitize: replace hyphens and dots with underscores for LLM API compatibility
|
||||
safe_tool_name = mcp_tool.name.replace("-", "_").replace(".", "_")
|
||||
safe_server_name = server_name.replace("-", "_").replace(".", "_")
|
||||
safe_tool_name = sanitize_mcp_name_component(mcp_tool.name)
|
||||
safe_server_name = sanitize_mcp_name_component(server_name)
|
||||
prefixed_name = f"mcp_{safe_server_name}_{safe_tool_name}"
|
||||
return {
|
||||
"name": prefixed_name,
|
||||
@@ -1449,7 +1459,7 @@ def _sync_mcp_toolsets(server_names: Optional[List[str]] = None) -> None:
|
||||
all_mcp_tools: List[str] = []
|
||||
|
||||
for server_name in server_names:
|
||||
safe_prefix = f"mcp_{server_name.replace('-', '_').replace('.', '_')}_"
|
||||
safe_prefix = f"mcp_{sanitize_mcp_name_component(server_name)}_"
|
||||
server_tools = sorted(
|
||||
t for t in existing if t.startswith(safe_prefix)
|
||||
)
|
||||
@@ -1485,7 +1495,7 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
|
||||
Returns a list of (schema, handler_factory_name) tuples encoded as dicts
|
||||
with keys: schema, handler_key.
|
||||
"""
|
||||
safe_name = server_name.replace("-", "_").replace(".", "_")
|
||||
safe_name = sanitize_mcp_name_component(server_name)
|
||||
return [
|
||||
{
|
||||
"schema": {
|
||||
@@ -1772,6 +1782,86 @@ async def _discover_and_register_server(name: str, config: dict) -> List[str]:
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
|
||||
"""Connect to explicit MCP servers and register their tools.
|
||||
|
||||
Idempotent for already-connected server names. Servers with
|
||||
``enabled: false`` are skipped without disconnecting existing sessions.
|
||||
|
||||
Args:
|
||||
servers: Mapping of ``{server_name: server_config}``.
|
||||
|
||||
Returns:
|
||||
List of all currently registered MCP tool names.
|
||||
"""
|
||||
if not _MCP_AVAILABLE:
|
||||
logger.debug("MCP SDK not available -- skipping explicit MCP registration")
|
||||
return []
|
||||
|
||||
if not servers:
|
||||
logger.debug("No explicit MCP servers provided")
|
||||
return []
|
||||
|
||||
# Only attempt servers that aren't already connected and are enabled
|
||||
# (enabled: false skips the server entirely without removing its config)
|
||||
with _lock:
|
||||
new_servers = {
|
||||
k: v
|
||||
for k, v in servers.items()
|
||||
if k not in _servers and _parse_boolish(v.get("enabled", True), default=True)
|
||||
}
|
||||
|
||||
if not new_servers:
|
||||
_sync_mcp_toolsets(list(servers.keys()))
|
||||
return _existing_tool_names()
|
||||
|
||||
# Start the background event loop for MCP connections
|
||||
_ensure_mcp_loop()
|
||||
|
||||
async def _discover_one(name: str, cfg: dict) -> List[str]:
|
||||
"""Connect to a single server and return its registered tool names."""
|
||||
return await _discover_and_register_server(name, cfg)
|
||||
|
||||
async def _discover_all():
|
||||
server_names = list(new_servers.keys())
|
||||
# Connect to all servers in PARALLEL
|
||||
results = await asyncio.gather(
|
||||
*(_discover_one(name, cfg) for name, cfg in new_servers.items()),
|
||||
return_exceptions=True,
|
||||
)
|
||||
for name, result in zip(server_names, results):
|
||||
if isinstance(result, Exception):
|
||||
command = new_servers.get(name, {}).get("command")
|
||||
logger.warning(
|
||||
"Failed to connect to MCP server '%s'%s: %s",
|
||||
name,
|
||||
f" (command={command})" if command else "",
|
||||
_format_connect_error(result),
|
||||
)
|
||||
|
||||
# Per-server timeouts are handled inside _discover_and_register_server.
|
||||
# The outer timeout is generous: 120s total for parallel discovery.
|
||||
_run_on_mcp_loop(_discover_all(), timeout=120)
|
||||
|
||||
_sync_mcp_toolsets(list(servers.keys()))
|
||||
|
||||
# Log a summary so ACP callers get visibility into what was registered.
|
||||
with _lock:
|
||||
connected = [n for n in new_servers if n in _servers]
|
||||
new_tool_count = sum(
|
||||
len(getattr(_servers[n], "_registered_tool_names", []))
|
||||
for n in connected
|
||||
)
|
||||
failed = len(new_servers) - len(connected)
|
||||
if new_tool_count or failed:
|
||||
summary = f"MCP: registered {new_tool_count} tool(s) from {len(connected)} server(s)"
|
||||
if failed:
|
||||
summary += f" ({failed} failed)"
|
||||
logger.info(summary)
|
||||
|
||||
return _existing_tool_names()
|
||||
|
||||
|
||||
def discover_mcp_tools() -> List[str]:
|
||||
"""Entry point: load config, connect to MCP servers, register tools.
|
||||
|
||||
@@ -1793,69 +1883,32 @@ def discover_mcp_tools() -> List[str]:
|
||||
logger.debug("No MCP servers configured")
|
||||
return []
|
||||
|
||||
# Only attempt servers that aren't already connected and are enabled
|
||||
# (enabled: false skips the server entirely without removing its config)
|
||||
with _lock:
|
||||
new_servers = {
|
||||
k: v
|
||||
for k, v in servers.items()
|
||||
if k not in _servers and _parse_boolish(v.get("enabled", True), default=True)
|
||||
}
|
||||
new_server_names = [
|
||||
name
|
||||
for name, cfg in servers.items()
|
||||
if name not in _servers and _parse_boolish(cfg.get("enabled", True), default=True)
|
||||
]
|
||||
|
||||
if not new_servers:
|
||||
_sync_mcp_toolsets(list(servers.keys()))
|
||||
return _existing_tool_names()
|
||||
tool_names = register_mcp_servers(servers)
|
||||
if not new_server_names:
|
||||
return tool_names
|
||||
|
||||
# Start the background event loop for MCP connections
|
||||
_ensure_mcp_loop()
|
||||
|
||||
all_tools: List[str] = []
|
||||
failed_count = 0
|
||||
|
||||
async def _discover_one(name: str, cfg: dict) -> List[str]:
|
||||
"""Connect to a single server and return its registered tool names."""
|
||||
return await _discover_and_register_server(name, cfg)
|
||||
|
||||
async def _discover_all():
|
||||
nonlocal failed_count
|
||||
server_names = list(new_servers.keys())
|
||||
# Connect to all servers in PARALLEL
|
||||
results = await asyncio.gather(
|
||||
*(_discover_one(name, cfg) for name, cfg in new_servers.items()),
|
||||
return_exceptions=True,
|
||||
with _lock:
|
||||
connected_server_names = [name for name in new_server_names if name in _servers]
|
||||
new_tool_count = sum(
|
||||
len(getattr(_servers[name], "_registered_tool_names", []))
|
||||
for name in connected_server_names
|
||||
)
|
||||
for name, result in zip(server_names, results):
|
||||
if isinstance(result, Exception):
|
||||
failed_count += 1
|
||||
command = new_servers.get(name, {}).get("command")
|
||||
logger.warning(
|
||||
"Failed to connect to MCP server '%s'%s: %s",
|
||||
name,
|
||||
f" (command={command})" if command else "",
|
||||
_format_connect_error(result),
|
||||
)
|
||||
elif isinstance(result, list):
|
||||
all_tools.extend(result)
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
# Per-server timeouts are handled inside _discover_and_register_server.
|
||||
# The outer timeout is generous: 120s total for parallel discovery.
|
||||
_run_on_mcp_loop(_discover_all(), timeout=120)
|
||||
|
||||
_sync_mcp_toolsets(list(servers.keys()))
|
||||
|
||||
# Print summary
|
||||
total_servers = len(new_servers)
|
||||
ok_servers = total_servers - failed_count
|
||||
if all_tools or failed_count:
|
||||
summary = f" MCP: {len(all_tools)} tool(s) from {ok_servers} server(s)"
|
||||
failed_count = len(new_server_names) - len(connected_server_names)
|
||||
if new_tool_count or failed_count:
|
||||
summary = f" MCP: {new_tool_count} tool(s) from {len(connected_server_names)} server(s)"
|
||||
if failed_count:
|
||||
summary += f" ({failed_count} failed)"
|
||||
logger.info(summary)
|
||||
|
||||
# Return ALL registered tools (existing + newly discovered)
|
||||
return _existing_tool_names()
|
||||
return tool_names
|
||||
|
||||
|
||||
def get_mcp_status() -> List[dict]:
|
||||
|
||||
Reference in New Issue
Block a user