mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
feat(acp): register client-provided MCP servers as agent tools
ACP clients pass MCP server definitions in session/new, load_session,
resume_session, and fork_session. Previously these were accepted but
silently ignored — the agent never connected to them.
This wires the mcp_servers parameter into the existing MCP registration
pipeline (tools/mcp_tool.py) so client-provided servers are connected,
their tools discovered, and the agent's tool surface refreshed before
the first prompt.
Changes:
tools/mcp_tool.py:
- Extract sanitize_mcp_name_component() to replace all non-[A-Za-z0-9_]
characters (fixes crash when server names contain / or other chars
that violate provider tool-name validation rules)
- Use it in _convert_mcp_schema, _sync_mcp_toolsets, _build_utility_schemas
- Extract register_mcp_servers(servers: dict) as a public API that takes
an explicit {name: config} map. discover_mcp_tools() becomes a thin
wrapper that loads config.yaml and calls register_mcp_servers()
acp_adapter/server.py:
- Add _register_session_mcp_servers() which converts ACP McpServerStdio /
McpServerHttp / McpServerSse objects to Hermes MCP config dicts,
registers them via asyncio.to_thread (avoids blocking the ACP event
loop), then rebuilds agent.tools, valid_tool_names, and invalidates
the cached system prompt
- Call it from new_session, load_session, resume_session, fork_session
Tested with Eden (theproxycompany.com) as ACP client — 5 MCP servers
(HTTP + stdio) registered successfully, 110 tools available to the agent.
This commit is contained in:
@@ -22,6 +22,9 @@ from acp.schema import (
|
|||||||
InitializeResponse,
|
InitializeResponse,
|
||||||
ListSessionsResponse,
|
ListSessionsResponse,
|
||||||
LoadSessionResponse,
|
LoadSessionResponse,
|
||||||
|
McpServerHttp,
|
||||||
|
McpServerSse,
|
||||||
|
McpServerStdio,
|
||||||
NewSessionResponse,
|
NewSessionResponse,
|
||||||
PromptResponse,
|
PromptResponse,
|
||||||
ResumeSessionResponse,
|
ResumeSessionResponse,
|
||||||
@@ -93,6 +96,71 @@ class HermesACPAgent(acp.Agent):
|
|||||||
self._conn = conn
|
self._conn = conn
|
||||||
logger.info("ACP client connected")
|
logger.info("ACP client connected")
|
||||||
|
|
||||||
|
async def _register_session_mcp_servers(
|
||||||
|
self,
|
||||||
|
state: SessionState,
|
||||||
|
mcp_servers: list[McpServerStdio | McpServerHttp | McpServerSse] | None,
|
||||||
|
) -> None:
|
||||||
|
"""Register ACP-provided MCP servers and refresh the agent tool surface."""
|
||||||
|
if not mcp_servers:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
from tools.mcp_tool import register_mcp_servers, sanitize_mcp_name_component
|
||||||
|
|
||||||
|
config_map: dict[str, dict] = {}
|
||||||
|
for server in mcp_servers:
|
||||||
|
name = sanitize_mcp_name_component(server.name)
|
||||||
|
if isinstance(server, McpServerStdio):
|
||||||
|
config = {
|
||||||
|
"command": server.command,
|
||||||
|
"args": list(server.args),
|
||||||
|
"env": {item.name: item.value for item in server.env},
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
config = {
|
||||||
|
"url": server.url,
|
||||||
|
"headers": {item.name: item.value for item in server.headers},
|
||||||
|
}
|
||||||
|
config_map[name] = config
|
||||||
|
|
||||||
|
await asyncio.to_thread(register_mcp_servers, config_map)
|
||||||
|
except Exception:
|
||||||
|
logger.warning(
|
||||||
|
"Session %s: failed to register ACP MCP servers",
|
||||||
|
state.session_id,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
from model_tools import get_tool_definitions
|
||||||
|
|
||||||
|
enabled_toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"]
|
||||||
|
disabled_toolsets = getattr(state.agent, "disabled_toolsets", None)
|
||||||
|
state.agent.tools = get_tool_definitions(
|
||||||
|
enabled_toolsets=enabled_toolsets,
|
||||||
|
disabled_toolsets=disabled_toolsets,
|
||||||
|
quiet_mode=True,
|
||||||
|
)
|
||||||
|
state.agent.valid_tool_names = {
|
||||||
|
tool["function"]["name"] for tool in state.agent.tools or []
|
||||||
|
}
|
||||||
|
invalidate = getattr(state.agent, "_invalidate_system_prompt", None)
|
||||||
|
if callable(invalidate):
|
||||||
|
invalidate()
|
||||||
|
logger.info(
|
||||||
|
"Session %s: refreshed tool surface after ACP MCP registration (%d tools)",
|
||||||
|
state.session_id,
|
||||||
|
len(state.agent.tools or []),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.warning(
|
||||||
|
"Session %s: failed to refresh tool surface after ACP MCP registration",
|
||||||
|
state.session_id,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
# ---- ACP lifecycle ------------------------------------------------------
|
# ---- ACP lifecycle ------------------------------------------------------
|
||||||
|
|
||||||
async def initialize(
|
async def initialize(
|
||||||
@@ -149,6 +217,7 @@ class HermesACPAgent(acp.Agent):
|
|||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> NewSessionResponse:
|
) -> NewSessionResponse:
|
||||||
state = self.session_manager.create_session(cwd=cwd)
|
state = self.session_manager.create_session(cwd=cwd)
|
||||||
|
await self._register_session_mcp_servers(state, mcp_servers)
|
||||||
logger.info("New session %s (cwd=%s)", state.session_id, cwd)
|
logger.info("New session %s (cwd=%s)", state.session_id, cwd)
|
||||||
return NewSessionResponse(session_id=state.session_id)
|
return NewSessionResponse(session_id=state.session_id)
|
||||||
|
|
||||||
@@ -163,6 +232,7 @@ class HermesACPAgent(acp.Agent):
|
|||||||
if state is None:
|
if state is None:
|
||||||
logger.warning("load_session: session %s not found", session_id)
|
logger.warning("load_session: session %s not found", session_id)
|
||||||
return None
|
return None
|
||||||
|
await self._register_session_mcp_servers(state, mcp_servers)
|
||||||
logger.info("Loaded session %s", session_id)
|
logger.info("Loaded session %s", session_id)
|
||||||
return LoadSessionResponse()
|
return LoadSessionResponse()
|
||||||
|
|
||||||
@@ -177,6 +247,7 @@ class HermesACPAgent(acp.Agent):
|
|||||||
if state is None:
|
if state is None:
|
||||||
logger.warning("resume_session: session %s not found, creating new", session_id)
|
logger.warning("resume_session: session %s not found, creating new", session_id)
|
||||||
state = self.session_manager.create_session(cwd=cwd)
|
state = self.session_manager.create_session(cwd=cwd)
|
||||||
|
await self._register_session_mcp_servers(state, mcp_servers)
|
||||||
logger.info("Resumed session %s", state.session_id)
|
logger.info("Resumed session %s", state.session_id)
|
||||||
return ResumeSessionResponse()
|
return ResumeSessionResponse()
|
||||||
|
|
||||||
@@ -200,6 +271,8 @@ class HermesACPAgent(acp.Agent):
|
|||||||
) -> ForkSessionResponse:
|
) -> ForkSessionResponse:
|
||||||
state = self.session_manager.fork_session(session_id, cwd=cwd)
|
state = self.session_manager.fork_session(session_id, cwd=cwd)
|
||||||
new_id = state.session_id if state else ""
|
new_id = state.session_id if state else ""
|
||||||
|
if state is not None:
|
||||||
|
await self._register_session_mcp_servers(state, mcp_servers)
|
||||||
logger.info("Forked session %s -> %s", session_id, new_id)
|
logger.info("Forked session %s -> %s", session_id, new_id)
|
||||||
return ForkSessionResponse(session_id=new_id)
|
return ForkSessionResponse(session_id=new_id)
|
||||||
|
|
||||||
|
|||||||
@@ -1406,6 +1406,17 @@ def _normalize_mcp_input_schema(schema: dict | None) -> dict:
|
|||||||
return schema
|
return schema
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_mcp_name_component(value: str) -> str:
|
||||||
|
"""Return an MCP name component safe for tool and prefix generation.
|
||||||
|
|
||||||
|
Preserves Hermes's historical behavior of converting hyphens to
|
||||||
|
underscores, and also replaces any other character outside
|
||||||
|
``[A-Za-z0-9_]`` with ``_`` so generated tool names are compatible with
|
||||||
|
provider validation rules.
|
||||||
|
"""
|
||||||
|
return re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
|
||||||
|
|
||||||
|
|
||||||
def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
|
def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
|
||||||
"""Convert an MCP tool listing to the Hermes registry schema format.
|
"""Convert an MCP tool listing to the Hermes registry schema format.
|
||||||
|
|
||||||
@@ -1417,9 +1428,8 @@ def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
|
|||||||
Returns:
|
Returns:
|
||||||
A dict suitable for ``registry.register(schema=...)``.
|
A dict suitable for ``registry.register(schema=...)``.
|
||||||
"""
|
"""
|
||||||
# Sanitize: replace hyphens and dots with underscores for LLM API compatibility
|
safe_tool_name = sanitize_mcp_name_component(mcp_tool.name)
|
||||||
safe_tool_name = mcp_tool.name.replace("-", "_").replace(".", "_")
|
safe_server_name = sanitize_mcp_name_component(server_name)
|
||||||
safe_server_name = server_name.replace("-", "_").replace(".", "_")
|
|
||||||
prefixed_name = f"mcp_{safe_server_name}_{safe_tool_name}"
|
prefixed_name = f"mcp_{safe_server_name}_{safe_tool_name}"
|
||||||
return {
|
return {
|
||||||
"name": prefixed_name,
|
"name": prefixed_name,
|
||||||
@@ -1449,7 +1459,7 @@ def _sync_mcp_toolsets(server_names: Optional[List[str]] = None) -> None:
|
|||||||
all_mcp_tools: List[str] = []
|
all_mcp_tools: List[str] = []
|
||||||
|
|
||||||
for server_name in server_names:
|
for server_name in server_names:
|
||||||
safe_prefix = f"mcp_{server_name.replace('-', '_').replace('.', '_')}_"
|
safe_prefix = f"mcp_{sanitize_mcp_name_component(server_name)}_"
|
||||||
server_tools = sorted(
|
server_tools = sorted(
|
||||||
t for t in existing if t.startswith(safe_prefix)
|
t for t in existing if t.startswith(safe_prefix)
|
||||||
)
|
)
|
||||||
@@ -1485,7 +1495,7 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
|
|||||||
Returns a list of (schema, handler_factory_name) tuples encoded as dicts
|
Returns a list of (schema, handler_factory_name) tuples encoded as dicts
|
||||||
with keys: schema, handler_key.
|
with keys: schema, handler_key.
|
||||||
"""
|
"""
|
||||||
safe_name = server_name.replace("-", "_").replace(".", "_")
|
safe_name = sanitize_mcp_name_component(server_name)
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
"schema": {
|
"schema": {
|
||||||
@@ -1772,6 +1782,72 @@ async def _discover_and_register_server(name: str, config: dict) -> List[str]:
|
|||||||
# Public API
|
# Public API
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
|
||||||
|
"""Connect to explicit MCP servers and register their tools.
|
||||||
|
|
||||||
|
Idempotent for already-connected server names. Servers with
|
||||||
|
``enabled: false`` are skipped without disconnecting existing sessions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
servers: Mapping of ``{server_name: server_config}``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all currently registered MCP tool names.
|
||||||
|
"""
|
||||||
|
if not _MCP_AVAILABLE:
|
||||||
|
logger.debug("MCP SDK not available -- skipping explicit MCP registration")
|
||||||
|
return []
|
||||||
|
|
||||||
|
if not servers:
|
||||||
|
logger.debug("No explicit MCP servers provided")
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Only attempt servers that aren't already connected and are enabled
|
||||||
|
# (enabled: false skips the server entirely without removing its config)
|
||||||
|
with _lock:
|
||||||
|
new_servers = {
|
||||||
|
k: v
|
||||||
|
for k, v in servers.items()
|
||||||
|
if k not in _servers and _parse_boolish(v.get("enabled", True), default=True)
|
||||||
|
}
|
||||||
|
|
||||||
|
if not new_servers:
|
||||||
|
_sync_mcp_toolsets(list(servers.keys()))
|
||||||
|
return _existing_tool_names()
|
||||||
|
|
||||||
|
# Start the background event loop for MCP connections
|
||||||
|
_ensure_mcp_loop()
|
||||||
|
|
||||||
|
async def _discover_one(name: str, cfg: dict) -> List[str]:
|
||||||
|
"""Connect to a single server and return its registered tool names."""
|
||||||
|
return await _discover_and_register_server(name, cfg)
|
||||||
|
|
||||||
|
async def _discover_all():
|
||||||
|
server_names = list(new_servers.keys())
|
||||||
|
# Connect to all servers in PARALLEL
|
||||||
|
results = await asyncio.gather(
|
||||||
|
*(_discover_one(name, cfg) for name, cfg in new_servers.items()),
|
||||||
|
return_exceptions=True,
|
||||||
|
)
|
||||||
|
for name, result in zip(server_names, results):
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
command = new_servers.get(name, {}).get("command")
|
||||||
|
logger.warning(
|
||||||
|
"Failed to connect to MCP server '%s'%s: %s",
|
||||||
|
name,
|
||||||
|
f" (command={command})" if command else "",
|
||||||
|
_format_connect_error(result),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Per-server timeouts are handled inside _discover_and_register_server.
|
||||||
|
# The outer timeout is generous: 120s total for parallel discovery.
|
||||||
|
_run_on_mcp_loop(_discover_all(), timeout=120)
|
||||||
|
|
||||||
|
_sync_mcp_toolsets(list(servers.keys()))
|
||||||
|
|
||||||
|
return _existing_tool_names()
|
||||||
|
|
||||||
|
|
||||||
def discover_mcp_tools() -> List[str]:
|
def discover_mcp_tools() -> List[str]:
|
||||||
"""Entry point: load config, connect to MCP servers, register tools.
|
"""Entry point: load config, connect to MCP servers, register tools.
|
||||||
|
|
||||||
@@ -1793,69 +1869,32 @@ def discover_mcp_tools() -> List[str]:
|
|||||||
logger.debug("No MCP servers configured")
|
logger.debug("No MCP servers configured")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Only attempt servers that aren't already connected and are enabled
|
|
||||||
# (enabled: false skips the server entirely without removing its config)
|
|
||||||
with _lock:
|
with _lock:
|
||||||
new_servers = {
|
new_server_names = [
|
||||||
k: v
|
name
|
||||||
for k, v in servers.items()
|
for name, cfg in servers.items()
|
||||||
if k not in _servers and _parse_boolish(v.get("enabled", True), default=True)
|
if name not in _servers and _parse_boolish(cfg.get("enabled", True), default=True)
|
||||||
}
|
]
|
||||||
|
|
||||||
if not new_servers:
|
tool_names = register_mcp_servers(servers)
|
||||||
_sync_mcp_toolsets(list(servers.keys()))
|
if not new_server_names:
|
||||||
return _existing_tool_names()
|
return tool_names
|
||||||
|
|
||||||
# Start the background event loop for MCP connections
|
with _lock:
|
||||||
_ensure_mcp_loop()
|
connected_server_names = [name for name in new_server_names if name in _servers]
|
||||||
|
new_tool_count = sum(
|
||||||
all_tools: List[str] = []
|
len(getattr(_servers[name], "_registered_tool_names", []))
|
||||||
failed_count = 0
|
for name in connected_server_names
|
||||||
|
|
||||||
async def _discover_one(name: str, cfg: dict) -> List[str]:
|
|
||||||
"""Connect to a single server and return its registered tool names."""
|
|
||||||
return await _discover_and_register_server(name, cfg)
|
|
||||||
|
|
||||||
async def _discover_all():
|
|
||||||
nonlocal failed_count
|
|
||||||
server_names = list(new_servers.keys())
|
|
||||||
# Connect to all servers in PARALLEL
|
|
||||||
results = await asyncio.gather(
|
|
||||||
*(_discover_one(name, cfg) for name, cfg in new_servers.items()),
|
|
||||||
return_exceptions=True,
|
|
||||||
)
|
)
|
||||||
for name, result in zip(server_names, results):
|
|
||||||
if isinstance(result, Exception):
|
|
||||||
failed_count += 1
|
|
||||||
command = new_servers.get(name, {}).get("command")
|
|
||||||
logger.warning(
|
|
||||||
"Failed to connect to MCP server '%s'%s: %s",
|
|
||||||
name,
|
|
||||||
f" (command={command})" if command else "",
|
|
||||||
_format_connect_error(result),
|
|
||||||
)
|
|
||||||
elif isinstance(result, list):
|
|
||||||
all_tools.extend(result)
|
|
||||||
else:
|
|
||||||
failed_count += 1
|
|
||||||
|
|
||||||
# Per-server timeouts are handled inside _discover_and_register_server.
|
failed_count = len(new_server_names) - len(connected_server_names)
|
||||||
# The outer timeout is generous: 120s total for parallel discovery.
|
if new_tool_count or failed_count:
|
||||||
_run_on_mcp_loop(_discover_all(), timeout=120)
|
summary = f" MCP: {new_tool_count} tool(s) from {len(connected_server_names)} server(s)"
|
||||||
|
|
||||||
_sync_mcp_toolsets(list(servers.keys()))
|
|
||||||
|
|
||||||
# Print summary
|
|
||||||
total_servers = len(new_servers)
|
|
||||||
ok_servers = total_servers - failed_count
|
|
||||||
if all_tools or failed_count:
|
|
||||||
summary = f" MCP: {len(all_tools)} tool(s) from {ok_servers} server(s)"
|
|
||||||
if failed_count:
|
if failed_count:
|
||||||
summary += f" ({failed_count} failed)"
|
summary += f" ({failed_count} failed)"
|
||||||
logger.info(summary)
|
logger.info(summary)
|
||||||
|
|
||||||
# Return ALL registered tools (existing + newly discovered)
|
return tool_names
|
||||||
return _existing_tool_names()
|
|
||||||
|
|
||||||
|
|
||||||
def get_mcp_status() -> List[dict]:
|
def get_mcp_status() -> List[dict]:
|
||||||
|
|||||||
Reference in New Issue
Block a user