Compare commits

...

2 Commits

Author SHA1 Message Date
Teknium
5dbd97bdfb fix(tests): update copilot_acp subprocess.Popen patch paths to acp_adapter.copilot_client
Main added two HOME-handling tests (test_run_prompt_prefers_profile_home_when_available,
test_run_prompt_passes_home_when_parent_env_is_clean) after PR #14424 was written.
These patch 'agent.copilot_acp_client.subprocess.Popen', but the shim module no longer
has 'subprocess' imported. Update patch strings to target the real module location.

Follow-up commit on the salvage PR; kshitijk4poor's original commit is preserved above.
2026-04-26 19:52:17 -07:00
kshitijk4poor
df018121a4 feat: add provider modules + wire transport single-path
Cycle 2 PR 1 (#14418). Introduces providers/ package with ProviderProfile
ABC and auto-discovery registry, then wires ChatCompletionsTransport to
delegate to profiles via a clean single-path method.

Provider profiles (8 providers):
- nvidia: default_max_tokens=16384
- kimi + kimi-cn: OMIT_TEMPERATURE, thinking + top-level reasoning_effort
- openrouter: provider_preferences, full reasoning_config passthrough
- nous: product tags, reasoning with Nous-specific disabled omission
- deepseek: base_url + env_vars
- qwen-oauth: vl_high_resolution extra_body, metadata top-level api_kwargs

Transport integration:
- _build_kwargs_from_profile() replaces the entire legacy flag-based
  assembly when provider_profile param is passed
- Single path: no dual-execution, no overwrites, no legacy fallthrough
- build_api_kwargs_extras() returns (extra_body, top_level) tuple to
  handle Kimi's top-level reasoning_effort vs OpenRouter's extra_body

Auth types: api_key | oauth_device_code | oauth_external | copilot | aws
(expanded from the lossy 'oauth' to match real Hermes auth modes).

64 new tests:
- 30 profile unit tests (registry, all 8 profiles, auth types)
- 19 transport parity tests (pin legacy flag-based behavior)
- 15 profile wiring tests (verify profile path = legacy path)
2026-04-26 19:50:21 -07:00
44 changed files with 3031 additions and 696 deletions

View File

@@ -0,0 +1,646 @@
"""OpenAI-compatible shim that forwards Hermes requests to `copilot --acp`.
This adapter lets Hermes treat the GitHub Copilot ACP server as a chat-style
backend. Each request starts a short-lived ACP session, sends the formatted
conversation as a single prompt, collects text chunks, and converts the result
back into the minimal shape Hermes expects from an OpenAI client.
"""
from __future__ import annotations
import json
import os
import queue
import re
import shlex
import subprocess
import threading
import time
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from agent.file_safety import get_read_block_error, is_write_denied
from agent.redact import redact_sensitive_text
ACP_MARKER_BASE_URL = "acp://copilot"
_DEFAULT_TIMEOUT_SECONDS = 900.0
_TOOL_CALL_BLOCK_RE = re.compile(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", re.DOTALL)
_TOOL_CALL_JSON_RE = re.compile(r"\{\s*\"id\"\s*:\s*\"[^\"]+\"\s*,\s*\"type\"\s*:\s*\"function\"\s*,\s*\"function\"\s*:\s*\{.*?\}\s*\}", re.DOTALL)
def _resolve_command() -> str:
return (
os.getenv("HERMES_COPILOT_ACP_COMMAND", "").strip()
or os.getenv("COPILOT_CLI_PATH", "").strip()
or "copilot"
)
def _resolve_args() -> list[str]:
raw = os.getenv("HERMES_COPILOT_ACP_ARGS", "").strip()
if not raw:
return ["--acp", "--stdio"]
return shlex.split(raw)
def _resolve_home_dir() -> str:
"""Return a stable HOME for child ACP processes."""
try:
from hermes_constants import get_subprocess_home
profile_home = get_subprocess_home()
if profile_home:
return profile_home
except Exception:
pass
home = os.environ.get("HOME", "").strip()
if home:
return home
expanded = os.path.expanduser("~")
if expanded and expanded != "~":
return expanded
try:
import pwd
resolved = pwd.getpwuid(os.getuid()).pw_dir.strip()
if resolved:
return resolved
except Exception:
pass
# Last resort: /tmp (writable on any POSIX system). Avoids crashing the
# subprocess with no HOME; callers can set HERMES_HOME explicitly if they
# need a different writable dir.
return "/tmp"
def _build_subprocess_env() -> dict[str, str]:
env = os.environ.copy()
env["HOME"] = _resolve_home_dir()
return env
def _jsonrpc_error(message_id: Any, code: int, message: str) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": code,
"message": message,
},
}
def _permission_denied(message_id: Any) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"outcome": {
"outcome": "cancelled",
}
},
}
def _format_messages_as_prompt(
messages: list[dict[str, Any]],
model: str | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
) -> str:
sections: list[str] = [
"You are being used as the active ACP agent backend for Hermes.",
"Use ACP capabilities to complete tasks.",
"IMPORTANT: If you take an action with a tool, you MUST output tool calls using <tool_call>{...}</tool_call> blocks with JSON exactly in OpenAI function-call shape.",
"If no tool is needed, answer normally.",
]
if model:
sections.append(f"Hermes requested model hint: {model}")
if isinstance(tools, list) and tools:
tool_specs: list[dict[str, Any]] = []
for t in tools:
if not isinstance(t, dict):
continue
fn = t.get("function") or {}
if not isinstance(fn, dict):
continue
name = fn.get("name")
if not isinstance(name, str) or not name.strip():
continue
tool_specs.append(
{
"name": name.strip(),
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
if tool_specs:
sections.append(
"Available tools (OpenAI function schema). "
"When using a tool, emit ONLY <tool_call>{...}</tool_call> with one JSON object "
"containing id/type/function{name,arguments}. arguments must be a JSON string.\n"
+ json.dumps(tool_specs, ensure_ascii=False)
)
if tool_choice is not None:
sections.append(f"Tool choice hint: {json.dumps(tool_choice, ensure_ascii=False)}")
transcript: list[str] = []
for message in messages:
if not isinstance(message, dict):
continue
role = str(message.get("role") or "unknown").strip().lower()
if role == "tool":
role = "tool"
elif role not in {"system", "user", "assistant"}:
role = "context"
content = message.get("content")
rendered = _render_message_content(content)
if not rendered:
continue
label = {
"system": "System",
"user": "User",
"assistant": "Assistant",
"tool": "Tool",
"context": "Context",
}.get(role, role.title())
transcript.append(f"{label}:\n{rendered}")
if transcript:
sections.append("Conversation transcript:\n\n" + "\n\n".join(transcript))
sections.append("Continue the conversation from the latest user request.")
return "\n\n".join(section.strip() for section in sections if section and section.strip())
def _render_message_content(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content.strip()
if isinstance(content, dict):
if "text" in content:
return str(content.get("text") or "").strip()
if "content" in content and isinstance(content.get("content"), str):
return str(content.get("content") or "").strip()
return json.dumps(content, ensure_ascii=True)
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return "\n".join(parts).strip()
return str(content).strip()
def _extract_tool_calls_from_text(text: str) -> tuple[list[SimpleNamespace], str]:
if not isinstance(text, str) or not text.strip():
return [], ""
extracted: list[SimpleNamespace] = []
consumed_spans: list[tuple[int, int]] = []
def _try_add_tool_call(raw_json: str) -> None:
try:
obj = json.loads(raw_json)
except Exception:
return
if not isinstance(obj, dict):
return
fn = obj.get("function")
if not isinstance(fn, dict):
return
fn_name = fn.get("name")
if not isinstance(fn_name, str) or not fn_name.strip():
return
fn_args = fn.get("arguments", "{}")
if not isinstance(fn_args, str):
fn_args = json.dumps(fn_args, ensure_ascii=False)
call_id = obj.get("id")
if not isinstance(call_id, str) or not call_id.strip():
call_id = f"acp_call_{len(extracted)+1}"
extracted.append(
SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=None,
type="function",
function=SimpleNamespace(name=fn_name.strip(), arguments=fn_args),
)
)
for m in _TOOL_CALL_BLOCK_RE.finditer(text):
raw = m.group(1)
_try_add_tool_call(raw)
consumed_spans.append((m.start(), m.end()))
# Only try bare-JSON fallback when no XML blocks were found.
if not extracted:
for m in _TOOL_CALL_JSON_RE.finditer(text):
raw = m.group(0)
_try_add_tool_call(raw)
consumed_spans.append((m.start(), m.end()))
if not consumed_spans:
return extracted, text.strip()
consumed_spans.sort()
merged: list[tuple[int, int]] = []
for start, end in consumed_spans:
if not merged or start > merged[-1][1]:
merged.append((start, end))
else:
merged[-1] = (merged[-1][0], max(merged[-1][1], end))
parts: list[str] = []
cursor = 0
for start, end in merged:
if cursor < start:
parts.append(text[cursor:start])
cursor = max(cursor, end)
if cursor < len(text):
parts.append(text[cursor:])
cleaned = "\n".join(p.strip() for p in parts if p and p.strip()).strip()
return extracted, cleaned
def _ensure_path_within_cwd(path_text: str, cwd: str) -> Path:
candidate = Path(path_text)
if not candidate.is_absolute():
raise PermissionError("ACP file-system paths must be absolute.")
resolved = candidate.resolve()
root = Path(cwd).resolve()
try:
resolved.relative_to(root)
except ValueError as exc:
raise PermissionError(f"Path '{resolved}' is outside the session cwd '{root}'.") from exc
return resolved
class _ACPChatCompletions:
def __init__(self, client: "CopilotACPClient"):
self._client = client
def create(self, **kwargs: Any) -> Any:
return self._client._create_chat_completion(**kwargs)
class _ACPChatNamespace:
def __init__(self, client: "CopilotACPClient"):
self.completions = _ACPChatCompletions(client)
class CopilotACPClient:
"""Minimal OpenAI-client-compatible facade for Copilot ACP."""
def __init__(
self,
*,
api_key: str | None = None,
base_url: str | None = None,
default_headers: dict[str, str] | None = None,
acp_command: str | None = None,
acp_args: list[str] | None = None,
acp_cwd: str | None = None,
command: str | None = None,
args: list[str] | None = None,
**_: Any,
):
self.api_key = api_key or "copilot-acp"
self.base_url = base_url or ACP_MARKER_BASE_URL
self._default_headers = dict(default_headers or {})
self._acp_command = acp_command or command or _resolve_command()
self._acp_args = list(acp_args or args or _resolve_args())
self._acp_cwd = str(Path(acp_cwd or os.getcwd()).resolve())
self.chat = _ACPChatNamespace(self)
self.is_closed = False
self._active_process: subprocess.Popen[str] | None = None
self._active_process_lock = threading.Lock()
def close(self) -> None:
proc: subprocess.Popen[str] | None
with self._active_process_lock:
proc = self._active_process
self._active_process = None
self.is_closed = True
if proc is None:
return
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
def _create_chat_completion(
self,
*,
model: str | None = None,
messages: list[dict[str, Any]] | None = None,
timeout: float | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
**_: Any,
) -> Any:
prompt_text = _format_messages_as_prompt(
messages or [],
model=model,
tools=tools,
tool_choice=tool_choice,
)
# Normalise timeout: run_agent.py may pass an httpx.Timeout object
# (used natively by the OpenAI SDK) rather than a plain float.
if timeout is None:
_effective_timeout = _DEFAULT_TIMEOUT_SECONDS
elif isinstance(timeout, (int, float)):
_effective_timeout = float(timeout)
else:
# httpx.Timeout or similar — pick the largest component so the
# subprocess has enough wall-clock time for the full response.
_candidates = [
getattr(timeout, attr, None)
for attr in ("read", "write", "connect", "pool", "timeout")
]
_numeric = [float(v) for v in _candidates if isinstance(v, (int, float))]
_effective_timeout = max(_numeric) if _numeric else _DEFAULT_TIMEOUT_SECONDS
response_text, reasoning_text = self._run_prompt(
prompt_text,
timeout_seconds=_effective_timeout,
)
tool_calls, cleaned_text = _extract_tool_calls_from_text(response_text)
usage = SimpleNamespace(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
)
assistant_message = SimpleNamespace(
content=cleaned_text,
tool_calls=tool_calls,
reasoning=reasoning_text or None,
reasoning_content=reasoning_text or None,
reasoning_details=None,
)
finish_reason = "tool_calls" if tool_calls else "stop"
choice = SimpleNamespace(message=assistant_message, finish_reason=finish_reason)
return SimpleNamespace(
choices=[choice],
usage=usage,
model=model or "copilot-acp",
)
def _run_prompt(self, prompt_text: str, *, timeout_seconds: float) -> tuple[str, str]:
try:
proc = subprocess.Popen(
[self._acp_command] + self._acp_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
cwd=self._acp_cwd,
env=_build_subprocess_env(),
)
except FileNotFoundError as exc:
raise RuntimeError(
f"Could not start Copilot ACP command '{self._acp_command}'. "
"Install GitHub Copilot CLI or set HERMES_COPILOT_ACP_COMMAND/COPILOT_CLI_PATH."
) from exc
if proc.stdin is None or proc.stdout is None:
proc.kill()
raise RuntimeError("Copilot ACP process did not expose stdin/stdout pipes.")
self.is_closed = False
with self._active_process_lock:
self._active_process = proc
inbox: queue.Queue[dict[str, Any]] = queue.Queue()
stderr_tail: deque[str] = deque(maxlen=40)
def _stdout_reader() -> None:
if proc.stdout is None:
return
for line in proc.stdout:
try:
inbox.put(json.loads(line))
except Exception:
inbox.put({"raw": line.rstrip("\n")})
def _stderr_reader() -> None:
if proc.stderr is None:
return
for line in proc.stderr:
stderr_tail.append(line.rstrip("\n"))
out_thread = threading.Thread(target=_stdout_reader, daemon=True)
err_thread = threading.Thread(target=_stderr_reader, daemon=True)
out_thread.start()
err_thread.start()
next_id = 0
def _request(method: str, params: dict[str, Any], *, text_parts: list[str] | None = None, reasoning_parts: list[str] | None = None) -> Any:
nonlocal next_id
next_id += 1
request_id = next_id
payload = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params,
}
proc.stdin.write(json.dumps(payload) + "\n")
proc.stdin.flush()
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if proc.poll() is not None:
break
try:
msg = inbox.get(timeout=0.1)
except queue.Empty:
continue
if self._handle_server_message(
msg,
process=proc,
cwd=self._acp_cwd,
text_parts=text_parts,
reasoning_parts=reasoning_parts,
):
continue
if msg.get("id") != request_id:
continue
if "error" in msg:
err = msg.get("error") or {}
raise RuntimeError(
f"Copilot ACP {method} failed: {err.get('message') or err}"
)
return msg.get("result")
stderr_text = "\n".join(stderr_tail).strip()
if proc.poll() is not None and stderr_text:
raise RuntimeError(f"Copilot ACP process exited early: {stderr_text}")
raise TimeoutError(f"Timed out waiting for Copilot ACP response to {method}.")
try:
_request(
"initialize",
{
"protocolVersion": 1,
"clientCapabilities": {
"fs": {
"readTextFile": True,
"writeTextFile": True,
}
},
"clientInfo": {
"name": "hermes-agent",
"title": "Hermes Agent",
"version": "0.0.0",
},
},
)
session = _request(
"session/new",
{
"cwd": self._acp_cwd,
"mcpServers": [],
},
) or {}
session_id = str(session.get("sessionId") or "").strip()
if not session_id:
raise RuntimeError("Copilot ACP did not return a sessionId.")
text_parts: list[str] = []
reasoning_parts: list[str] = []
_request(
"session/prompt",
{
"sessionId": session_id,
"prompt": [
{
"type": "text",
"text": prompt_text,
}
],
},
text_parts=text_parts,
reasoning_parts=reasoning_parts,
)
return "".join(text_parts), "".join(reasoning_parts)
finally:
self.close()
def _handle_server_message(
self,
msg: dict[str, Any],
*,
process: subprocess.Popen[str],
cwd: str,
text_parts: list[str] | None,
reasoning_parts: list[str] | None,
) -> bool:
method = msg.get("method")
if not isinstance(method, str):
return False
if method == "session/update":
params = msg.get("params") or {}
update = params.get("update") or {}
kind = str(update.get("sessionUpdate") or "").strip()
content = update.get("content") or {}
chunk_text = ""
if isinstance(content, dict):
chunk_text = str(content.get("text") or "")
if kind == "agent_message_chunk" and chunk_text and text_parts is not None:
text_parts.append(chunk_text)
elif kind == "agent_thought_chunk" and chunk_text and reasoning_parts is not None:
reasoning_parts.append(chunk_text)
return True
if process.stdin is None:
return True
message_id = msg.get("id")
params = msg.get("params") or {}
if method == "session/request_permission":
response = _permission_denied(message_id)
elif method == "fs/read_text_file":
try:
path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd)
block_error = get_read_block_error(str(path))
if block_error:
raise PermissionError(block_error)
content = path.read_text() if path.exists() else ""
line = params.get("line")
limit = params.get("limit")
if isinstance(line, int) and line > 1:
lines = content.splitlines(keepends=True)
start = line - 1
end = start + limit if isinstance(limit, int) and limit > 0 else None
content = "".join(lines[start:end])
if content:
content = redact_sensitive_text(content)
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"content": content,
},
}
except Exception as exc:
response = _jsonrpc_error(message_id, -32602, str(exc))
elif method == "fs/write_text_file":
try:
path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd)
if is_write_denied(str(path)):
raise PermissionError(
f"Write denied: '{path}' is a protected system/credential file."
)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(params.get("content") or ""))
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": None,
}
except Exception as exc:
response = _jsonrpc_error(message_id, -32602, str(exc))
else:
response = _jsonrpc_error(
message_id,
-32601,
f"ACP client method '{method}' is not supported by Hermes yet.",
)
process.stdin.write(json.dumps(response) + "\n")
process.stdin.flush()
return True

View File

@@ -1633,7 +1633,7 @@ def _to_async_client(sync_client, model: str):
except ImportError:
pass
try:
from agent.copilot_acp_client import CopilotACPClient
from acp_adapter.copilot_client import CopilotACPClient
if isinstance(sync_client, CopilotACPClient):
return sync_client, model
except ImportError:
@@ -2040,7 +2040,7 @@ def resolve_provider_client(
"process credentials are incomplete"
)
return None, None
from agent.copilot_acp_client import CopilotACPClient
from acp_adapter.copilot_client import CopilotACPClient
client = CopilotACPClient(
api_key=api_key,

View File

@@ -1,646 +1,8 @@
"""OpenAI-compatible shim that forwards Hermes requests to `copilot --acp`.
"""Backward-compatibility shim.
This adapter lets Hermes treat the GitHub Copilot ACP server as a chat-style
backend. Each request starts a short-lived ACP session, sends the formatted
conversation as a single prompt, collects text chunks, and converts the result
back into the minimal shape Hermes expects from an OpenAI client.
CopilotACPClient has moved to acp_adapter/copilot_client.py.
This module re-exports it so existing callers continue to work.
"""
from acp_adapter.copilot_client import CopilotACPClient # noqa: F401
from __future__ import annotations
import json
import os
import queue
import re
import shlex
import subprocess
import threading
import time
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from agent.file_safety import get_read_block_error, is_write_denied
from agent.redact import redact_sensitive_text
ACP_MARKER_BASE_URL = "acp://copilot"
_DEFAULT_TIMEOUT_SECONDS = 900.0
_TOOL_CALL_BLOCK_RE = re.compile(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", re.DOTALL)
_TOOL_CALL_JSON_RE = re.compile(r"\{\s*\"id\"\s*:\s*\"[^\"]+\"\s*,\s*\"type\"\s*:\s*\"function\"\s*,\s*\"function\"\s*:\s*\{.*?\}\s*\}", re.DOTALL)
def _resolve_command() -> str:
return (
os.getenv("HERMES_COPILOT_ACP_COMMAND", "").strip()
or os.getenv("COPILOT_CLI_PATH", "").strip()
or "copilot"
)
def _resolve_args() -> list[str]:
raw = os.getenv("HERMES_COPILOT_ACP_ARGS", "").strip()
if not raw:
return ["--acp", "--stdio"]
return shlex.split(raw)
def _resolve_home_dir() -> str:
"""Return a stable HOME for child ACP processes."""
try:
from hermes_constants import get_subprocess_home
profile_home = get_subprocess_home()
if profile_home:
return profile_home
except Exception:
pass
home = os.environ.get("HOME", "").strip()
if home:
return home
expanded = os.path.expanduser("~")
if expanded and expanded != "~":
return expanded
try:
import pwd
resolved = pwd.getpwuid(os.getuid()).pw_dir.strip()
if resolved:
return resolved
except Exception:
pass
# Last resort: /tmp (writable on any POSIX system). Avoids crashing the
# subprocess with no HOME; callers can set HERMES_HOME explicitly if they
# need a different writable dir.
return "/tmp"
def _build_subprocess_env() -> dict[str, str]:
env = os.environ.copy()
env["HOME"] = _resolve_home_dir()
return env
def _jsonrpc_error(message_id: Any, code: int, message: str) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": code,
"message": message,
},
}
def _permission_denied(message_id: Any) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"outcome": {
"outcome": "cancelled",
}
},
}
def _format_messages_as_prompt(
messages: list[dict[str, Any]],
model: str | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
) -> str:
sections: list[str] = [
"You are being used as the active ACP agent backend for Hermes.",
"Use ACP capabilities to complete tasks.",
"IMPORTANT: If you take an action with a tool, you MUST output tool calls using <tool_call>{...}</tool_call> blocks with JSON exactly in OpenAI function-call shape.",
"If no tool is needed, answer normally.",
]
if model:
sections.append(f"Hermes requested model hint: {model}")
if isinstance(tools, list) and tools:
tool_specs: list[dict[str, Any]] = []
for t in tools:
if not isinstance(t, dict):
continue
fn = t.get("function") or {}
if not isinstance(fn, dict):
continue
name = fn.get("name")
if not isinstance(name, str) or not name.strip():
continue
tool_specs.append(
{
"name": name.strip(),
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
if tool_specs:
sections.append(
"Available tools (OpenAI function schema). "
"When using a tool, emit ONLY <tool_call>{...}</tool_call> with one JSON object "
"containing id/type/function{name,arguments}. arguments must be a JSON string.\n"
+ json.dumps(tool_specs, ensure_ascii=False)
)
if tool_choice is not None:
sections.append(f"Tool choice hint: {json.dumps(tool_choice, ensure_ascii=False)}")
transcript: list[str] = []
for message in messages:
if not isinstance(message, dict):
continue
role = str(message.get("role") or "unknown").strip().lower()
if role == "tool":
role = "tool"
elif role not in {"system", "user", "assistant"}:
role = "context"
content = message.get("content")
rendered = _render_message_content(content)
if not rendered:
continue
label = {
"system": "System",
"user": "User",
"assistant": "Assistant",
"tool": "Tool",
"context": "Context",
}.get(role, role.title())
transcript.append(f"{label}:\n{rendered}")
if transcript:
sections.append("Conversation transcript:\n\n" + "\n\n".join(transcript))
sections.append("Continue the conversation from the latest user request.")
return "\n\n".join(section.strip() for section in sections if section and section.strip())
def _render_message_content(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content.strip()
if isinstance(content, dict):
if "text" in content:
return str(content.get("text") or "").strip()
if "content" in content and isinstance(content.get("content"), str):
return str(content.get("content") or "").strip()
return json.dumps(content, ensure_ascii=True)
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return "\n".join(parts).strip()
return str(content).strip()
def _extract_tool_calls_from_text(text: str) -> tuple[list[SimpleNamespace], str]:
if not isinstance(text, str) or not text.strip():
return [], ""
extracted: list[SimpleNamespace] = []
consumed_spans: list[tuple[int, int]] = []
def _try_add_tool_call(raw_json: str) -> None:
try:
obj = json.loads(raw_json)
except Exception:
return
if not isinstance(obj, dict):
return
fn = obj.get("function")
if not isinstance(fn, dict):
return
fn_name = fn.get("name")
if not isinstance(fn_name, str) or not fn_name.strip():
return
fn_args = fn.get("arguments", "{}")
if not isinstance(fn_args, str):
fn_args = json.dumps(fn_args, ensure_ascii=False)
call_id = obj.get("id")
if not isinstance(call_id, str) or not call_id.strip():
call_id = f"acp_call_{len(extracted)+1}"
extracted.append(
SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=None,
type="function",
function=SimpleNamespace(name=fn_name.strip(), arguments=fn_args),
)
)
for m in _TOOL_CALL_BLOCK_RE.finditer(text):
raw = m.group(1)
_try_add_tool_call(raw)
consumed_spans.append((m.start(), m.end()))
# Only try bare-JSON fallback when no XML blocks were found.
if not extracted:
for m in _TOOL_CALL_JSON_RE.finditer(text):
raw = m.group(0)
_try_add_tool_call(raw)
consumed_spans.append((m.start(), m.end()))
if not consumed_spans:
return extracted, text.strip()
consumed_spans.sort()
merged: list[tuple[int, int]] = []
for start, end in consumed_spans:
if not merged or start > merged[-1][1]:
merged.append((start, end))
else:
merged[-1] = (merged[-1][0], max(merged[-1][1], end))
parts: list[str] = []
cursor = 0
for start, end in merged:
if cursor < start:
parts.append(text[cursor:start])
cursor = max(cursor, end)
if cursor < len(text):
parts.append(text[cursor:])
cleaned = "\n".join(p.strip() for p in parts if p and p.strip()).strip()
return extracted, cleaned
def _ensure_path_within_cwd(path_text: str, cwd: str) -> Path:
candidate = Path(path_text)
if not candidate.is_absolute():
raise PermissionError("ACP file-system paths must be absolute.")
resolved = candidate.resolve()
root = Path(cwd).resolve()
try:
resolved.relative_to(root)
except ValueError as exc:
raise PermissionError(f"Path '{resolved}' is outside the session cwd '{root}'.") from exc
return resolved
class _ACPChatCompletions:
def __init__(self, client: "CopilotACPClient"):
self._client = client
def create(self, **kwargs: Any) -> Any:
return self._client._create_chat_completion(**kwargs)
class _ACPChatNamespace:
def __init__(self, client: "CopilotACPClient"):
self.completions = _ACPChatCompletions(client)
class CopilotACPClient:
"""Minimal OpenAI-client-compatible facade for Copilot ACP."""
def __init__(
self,
*,
api_key: str | None = None,
base_url: str | None = None,
default_headers: dict[str, str] | None = None,
acp_command: str | None = None,
acp_args: list[str] | None = None,
acp_cwd: str | None = None,
command: str | None = None,
args: list[str] | None = None,
**_: Any,
):
self.api_key = api_key or "copilot-acp"
self.base_url = base_url or ACP_MARKER_BASE_URL
self._default_headers = dict(default_headers or {})
self._acp_command = acp_command or command or _resolve_command()
self._acp_args = list(acp_args or args or _resolve_args())
self._acp_cwd = str(Path(acp_cwd or os.getcwd()).resolve())
self.chat = _ACPChatNamespace(self)
self.is_closed = False
self._active_process: subprocess.Popen[str] | None = None
self._active_process_lock = threading.Lock()
def close(self) -> None:
proc: subprocess.Popen[str] | None
with self._active_process_lock:
proc = self._active_process
self._active_process = None
self.is_closed = True
if proc is None:
return
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
def _create_chat_completion(
self,
*,
model: str | None = None,
messages: list[dict[str, Any]] | None = None,
timeout: float | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
**_: Any,
) -> Any:
prompt_text = _format_messages_as_prompt(
messages or [],
model=model,
tools=tools,
tool_choice=tool_choice,
)
# Normalise timeout: run_agent.py may pass an httpx.Timeout object
# (used natively by the OpenAI SDK) rather than a plain float.
if timeout is None:
_effective_timeout = _DEFAULT_TIMEOUT_SECONDS
elif isinstance(timeout, (int, float)):
_effective_timeout = float(timeout)
else:
# httpx.Timeout or similar — pick the largest component so the
# subprocess has enough wall-clock time for the full response.
_candidates = [
getattr(timeout, attr, None)
for attr in ("read", "write", "connect", "pool", "timeout")
]
_numeric = [float(v) for v in _candidates if isinstance(v, (int, float))]
_effective_timeout = max(_numeric) if _numeric else _DEFAULT_TIMEOUT_SECONDS
response_text, reasoning_text = self._run_prompt(
prompt_text,
timeout_seconds=_effective_timeout,
)
tool_calls, cleaned_text = _extract_tool_calls_from_text(response_text)
usage = SimpleNamespace(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
)
assistant_message = SimpleNamespace(
content=cleaned_text,
tool_calls=tool_calls,
reasoning=reasoning_text or None,
reasoning_content=reasoning_text or None,
reasoning_details=None,
)
finish_reason = "tool_calls" if tool_calls else "stop"
choice = SimpleNamespace(message=assistant_message, finish_reason=finish_reason)
return SimpleNamespace(
choices=[choice],
usage=usage,
model=model or "copilot-acp",
)
def _run_prompt(self, prompt_text: str, *, timeout_seconds: float) -> tuple[str, str]:
try:
proc = subprocess.Popen(
[self._acp_command] + self._acp_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
cwd=self._acp_cwd,
env=_build_subprocess_env(),
)
except FileNotFoundError as exc:
raise RuntimeError(
f"Could not start Copilot ACP command '{self._acp_command}'. "
"Install GitHub Copilot CLI or set HERMES_COPILOT_ACP_COMMAND/COPILOT_CLI_PATH."
) from exc
if proc.stdin is None or proc.stdout is None:
proc.kill()
raise RuntimeError("Copilot ACP process did not expose stdin/stdout pipes.")
self.is_closed = False
with self._active_process_lock:
self._active_process = proc
inbox: queue.Queue[dict[str, Any]] = queue.Queue()
stderr_tail: deque[str] = deque(maxlen=40)
def _stdout_reader() -> None:
if proc.stdout is None:
return
for line in proc.stdout:
try:
inbox.put(json.loads(line))
except Exception:
inbox.put({"raw": line.rstrip("\n")})
def _stderr_reader() -> None:
if proc.stderr is None:
return
for line in proc.stderr:
stderr_tail.append(line.rstrip("\n"))
out_thread = threading.Thread(target=_stdout_reader, daemon=True)
err_thread = threading.Thread(target=_stderr_reader, daemon=True)
out_thread.start()
err_thread.start()
next_id = 0
def _request(method: str, params: dict[str, Any], *, text_parts: list[str] | None = None, reasoning_parts: list[str] | None = None) -> Any:
nonlocal next_id
next_id += 1
request_id = next_id
payload = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params,
}
proc.stdin.write(json.dumps(payload) + "\n")
proc.stdin.flush()
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if proc.poll() is not None:
break
try:
msg = inbox.get(timeout=0.1)
except queue.Empty:
continue
if self._handle_server_message(
msg,
process=proc,
cwd=self._acp_cwd,
text_parts=text_parts,
reasoning_parts=reasoning_parts,
):
continue
if msg.get("id") != request_id:
continue
if "error" in msg:
err = msg.get("error") or {}
raise RuntimeError(
f"Copilot ACP {method} failed: {err.get('message') or err}"
)
return msg.get("result")
stderr_text = "\n".join(stderr_tail).strip()
if proc.poll() is not None and stderr_text:
raise RuntimeError(f"Copilot ACP process exited early: {stderr_text}")
raise TimeoutError(f"Timed out waiting for Copilot ACP response to {method}.")
try:
_request(
"initialize",
{
"protocolVersion": 1,
"clientCapabilities": {
"fs": {
"readTextFile": True,
"writeTextFile": True,
}
},
"clientInfo": {
"name": "hermes-agent",
"title": "Hermes Agent",
"version": "0.0.0",
},
},
)
session = _request(
"session/new",
{
"cwd": self._acp_cwd,
"mcpServers": [],
},
) or {}
session_id = str(session.get("sessionId") or "").strip()
if not session_id:
raise RuntimeError("Copilot ACP did not return a sessionId.")
text_parts: list[str] = []
reasoning_parts: list[str] = []
_request(
"session/prompt",
{
"sessionId": session_id,
"prompt": [
{
"type": "text",
"text": prompt_text,
}
],
},
text_parts=text_parts,
reasoning_parts=reasoning_parts,
)
return "".join(text_parts), "".join(reasoning_parts)
finally:
self.close()
def _handle_server_message(
self,
msg: dict[str, Any],
*,
process: subprocess.Popen[str],
cwd: str,
text_parts: list[str] | None,
reasoning_parts: list[str] | None,
) -> bool:
method = msg.get("method")
if not isinstance(method, str):
return False
if method == "session/update":
params = msg.get("params") or {}
update = params.get("update") or {}
kind = str(update.get("sessionUpdate") or "").strip()
content = update.get("content") or {}
chunk_text = ""
if isinstance(content, dict):
chunk_text = str(content.get("text") or "")
if kind == "agent_message_chunk" and chunk_text and text_parts is not None:
text_parts.append(chunk_text)
elif kind == "agent_thought_chunk" and chunk_text and reasoning_parts is not None:
reasoning_parts.append(chunk_text)
return True
if process.stdin is None:
return True
message_id = msg.get("id")
params = msg.get("params") or {}
if method == "session/request_permission":
response = _permission_denied(message_id)
elif method == "fs/read_text_file":
try:
path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd)
block_error = get_read_block_error(str(path))
if block_error:
raise PermissionError(block_error)
content = path.read_text() if path.exists() else ""
line = params.get("line")
limit = params.get("limit")
if isinstance(line, int) and line > 1:
lines = content.splitlines(keepends=True)
start = line - 1
end = start + limit if isinstance(limit, int) and limit > 0 else None
content = "".join(lines[start:end])
if content:
content = redact_sensitive_text(content)
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": {
"content": content,
},
}
except Exception as exc:
response = _jsonrpc_error(message_id, -32602, str(exc))
elif method == "fs/write_text_file":
try:
path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd)
if is_write_denied(str(path)):
raise PermissionError(
f"Write denied: '{path}' is a protected system/credential file."
)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(params.get("content") or ""))
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": None,
}
except Exception as exc:
response = _jsonrpc_error(message_id, -32602, str(exc))
else:
response = _jsonrpc_error(
message_id,
-32601,
f"ACP client method '{method}' is not supported by Hermes yet.",
)
process.stdin.write(json.dumps(response) + "\n")
process.stdin.flush()
return True
__all__ = ["CopilotACPClient"]

View File

@@ -6,7 +6,13 @@ Usage:
result = transport.normalize_response(raw_response)
"""
from agent.transports.types import NormalizedResponse, ToolCall, Usage, build_tool_call, map_finish_reason # noqa: F401
from agent.transports.types import (
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
) # noqa: F401
_REGISTRY: dict = {}

View File

@@ -10,7 +10,7 @@ reasoning configuration, temperature handling, and extra_body assembly.
"""
import copy
from typing import Any, Dict, List, Optional
from typing import Any
from agent.moonshot_schema import is_moonshot_model, sanitize_moonshot_tools
from agent.prompt_builder import DEVELOPER_ROLE_MODELS
@@ -28,7 +28,9 @@ class ChatCompletionsTransport(ProviderTransport):
def api_mode(self) -> str:
return "chat_completions"
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> List[Dict[str, Any]]:
def convert_messages(
self, messages: list[dict[str, Any]], **kwargs
) -> list[dict[str, Any]]:
"""Messages are already in OpenAI format — sanitize Codex leaks only.
Strips Codex Responses API fields (``codex_reasoning_items`` /
@@ -45,7 +47,9 @@ class ChatCompletionsTransport(ProviderTransport):
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict) and ("call_id" in tc or "response_item_id" in tc):
if isinstance(tc, dict) and (
"call_id" in tc or "response_item_id" in tc
):
needs_sanitize = True
break
if needs_sanitize:
@@ -68,17 +72,17 @@ class ChatCompletionsTransport(ProviderTransport):
tc.pop("response_item_id", None)
return sanitized
def convert_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def convert_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Tools are already in OpenAI format — identity."""
return tools
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
**params,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Build chat.completions.create() kwargs.
This is the most complex transport method — it handles ~16 providers
@@ -99,7 +103,6 @@ class ChatCompletionsTransport(ProviderTransport):
is_nous: bool
is_qwen_portal: bool
is_github_models: bool
is_nvidia_nim: bool
is_kimi: bool
is_custom_provider: bool
ollama_num_ctx: int | None
@@ -122,7 +125,14 @@ class ChatCompletionsTransport(ProviderTransport):
# Codex sanitization: drop reasoning_items / call_id / response_item_id
sanitized = self.convert_messages(messages)
# Qwen portal prep AFTER codex sanitization. If sanitize already
# ── Provider profile: single-path when present ──────────
_profile = params.get("provider_profile")
if _profile:
return self._build_kwargs_from_profile(
_profile, model, sanitized, tools, params
)
# ── Legacy flag-based path (no profile) ─────────────────
# deepcopied, reuse that copy via the in-place variant to avoid a
# second deepcopy.
is_qwen = params.get("is_qwen_portal", False)
@@ -150,7 +160,7 @@ class ChatCompletionsTransport(ProviderTransport):
sanitized = list(sanitized)
sanitized[0] = {**sanitized[0], "role": "developer"}
api_kwargs: Dict[str, Any] = {
api_kwargs: dict[str, Any] = {
"model": model,
"messages": sanitized,
}
@@ -186,7 +196,6 @@ class ChatCompletionsTransport(ProviderTransport):
ephemeral = params.get("ephemeral_max_output_tokens")
max_tokens = params.get("max_tokens")
anthropic_max_out = params.get("anthropic_max_output")
is_nvidia_nim = params.get("is_nvidia_nim", False)
is_kimi = params.get("is_kimi", False)
reasoning_config = params.get("reasoning_config")
@@ -194,8 +203,6 @@ class ChatCompletionsTransport(ProviderTransport):
api_kwargs.update(max_tokens_fn(ephemeral))
elif max_tokens is not None and max_tokens_fn:
api_kwargs.update(max_tokens_fn(max_tokens))
elif is_nvidia_nim and max_tokens_fn:
api_kwargs.update(max_tokens_fn(16384))
elif is_qwen and max_tokens_fn:
api_kwargs.update(max_tokens_fn(65536))
elif is_kimi and max_tokens_fn:
@@ -220,7 +227,7 @@ class ChatCompletionsTransport(ProviderTransport):
api_kwargs["reasoning_effort"] = _kimi_effort
# extra_body assembly
extra_body: Dict[str, Any] = {}
extra_body: dict[str, Any] = {}
is_openrouter = params.get("is_openrouter", False)
is_nous = params.get("is_nous", False)
@@ -292,6 +299,113 @@ class ChatCompletionsTransport(ProviderTransport):
return api_kwargs
def _build_kwargs_from_profile(self, profile, model, sanitized, tools, params):
"""Build API kwargs using a ProviderProfile — single path, no legacy flags.
This method replaces the entire flag-based kwargs assembly when a
provider_profile is passed. Every quirk comes from the profile object.
"""
from providers.base import OMIT_TEMPERATURE
# Message preprocessing
sanitized = profile.prepare_messages(sanitized)
# Developer role swap — model-name-based, applies to all providers
_model_lower = (model or "").lower()
if (
sanitized
and isinstance(sanitized[0], dict)
and sanitized[0].get("role") == "system"
and any(p in _model_lower for p in DEVELOPER_ROLE_MODELS)
):
sanitized = list(sanitized)
sanitized[0] = {**sanitized[0], "role": "developer"}
api_kwargs: dict[str, Any] = {
"model": model,
"messages": sanitized,
}
# Temperature
if profile.fixed_temperature is OMIT_TEMPERATURE:
pass # Don't include temperature at all
elif profile.fixed_temperature is not None:
api_kwargs["temperature"] = profile.fixed_temperature
else:
# Use caller's temperature if provided
temp = params.get("temperature")
if temp is not None:
api_kwargs["temperature"] = temp
# Timeout
timeout = params.get("timeout")
if timeout is not None:
api_kwargs["timeout"] = timeout
# Tools
if tools:
api_kwargs["tools"] = tools
# max_tokens resolution — priority: ephemeral > user > profile default
max_tokens_fn = params.get("max_tokens_param_fn")
ephemeral = params.get("ephemeral_max_output_tokens")
user_max = params.get("max_tokens")
anthropic_max = params.get("anthropic_max_output")
if ephemeral is not None and max_tokens_fn:
api_kwargs.update(max_tokens_fn(ephemeral))
elif user_max is not None and max_tokens_fn:
api_kwargs.update(max_tokens_fn(user_max))
elif profile.default_max_tokens and max_tokens_fn:
api_kwargs.update(max_tokens_fn(profile.default_max_tokens))
elif anthropic_max is not None:
api_kwargs["max_tokens"] = anthropic_max
# Provider-specific api_kwargs extras (reasoning_effort, metadata, etc.)
reasoning_config = params.get("reasoning_config")
extra_body_from_profile, top_level_from_profile = (
profile.build_api_kwargs_extras(
reasoning_config=reasoning_config,
supports_reasoning=params.get("supports_reasoning", False),
qwen_session_metadata=params.get("qwen_session_metadata"),
)
)
api_kwargs.update(top_level_from_profile)
# extra_body assembly
extra_body: dict[str, Any] = {}
# Profile's extra_body (tags, provider prefs, vl_high_resolution, etc.)
profile_body = profile.build_extra_body(
session_id=params.get("session_id"),
provider_preferences=params.get("provider_preferences"),
)
if profile_body:
extra_body.update(profile_body)
# Profile's reasoning/thinking extra_body entries
if extra_body_from_profile:
extra_body.update(extra_body_from_profile)
# Merge any pre-built extra_body additions from the caller
additions = params.get("extra_body_additions")
if additions:
extra_body.update(additions)
# Request overrides (user config)
overrides = params.get("request_overrides")
if overrides:
for k, v in overrides.items():
if k == "extra_body" and isinstance(v, dict):
extra_body.update(v)
else:
api_kwargs[k] = v
if extra_body:
api_kwargs["extra_body"] = extra_body
return api_kwargs
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
"""Normalize OpenAI ChatCompletion to NormalizedResponse.
@@ -313,7 +427,7 @@ class ChatCompletionsTransport(ProviderTransport):
# Gemini 3 thinking models attach extra_content with
# thought_signature — without replay on the next turn the API
# rejects the request with 400.
tc_provider_data: Dict[str, Any] = {}
tc_provider_data: dict[str, Any] = {}
extra = getattr(tc, "extra_content", None)
if extra is None and hasattr(tc, "model_extra"):
extra = (tc.model_extra or {}).get("extra_content")
@@ -324,12 +438,14 @@ class ChatCompletionsTransport(ProviderTransport):
except Exception:
pass
tc_provider_data["extra_content"] = extra
tool_calls.append(ToolCall(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
provider_data=tc_provider_data or None,
))
tool_calls.append(
ToolCall(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
provider_data=tc_provider_data or None,
)
)
usage = None
if hasattr(response, "usage") and response.usage:
@@ -347,7 +463,7 @@ class ChatCompletionsTransport(ProviderTransport):
reasoning = getattr(msg, "reasoning", None)
reasoning_content = getattr(msg, "reasoning_content", None)
provider_data: Dict[str, Any] = {}
provider_data: dict[str, Any] = {}
if reasoning_content:
provider_data["reasoning_content"] = reasoning_content
rd = getattr(msg, "reasoning_details", None)
@@ -373,7 +489,7 @@ class ChatCompletionsTransport(ProviderTransport):
return False
return True
def extract_cache_stats(self, response: Any) -> Optional[Dict[str, int]]:
def extract_cache_stats(self, response: Any) -> dict[str, int] | None:
"""Extract OpenRouter/OpenAI cache stats from prompt_tokens_details."""
usage = getattr(response, "usage", None)
if usage is None:

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any
@dataclass
@@ -32,10 +32,10 @@ class ToolCall:
* Others: ``None``
"""
id: Optional[str]
id: str | None
name: str
arguments: str # JSON string
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
provider_data: dict[str, Any] | None = field(default=None, repr=False)
# ── Backward compatibility ──────────────────────────────────
# The agent loop reads tc.function.name / tc.function.arguments
@@ -47,17 +47,17 @@ class ToolCall:
return "function"
@property
def function(self) -> "ToolCall":
def function(self) -> ToolCall:
"""Return self so tc.function.name / tc.function.arguments work."""
return self
@property
def call_id(self) -> Optional[str]:
def call_id(self) -> str | None:
"""Codex call_id from provider_data, accessed via getattr by _build_assistant_message."""
return (self.provider_data or {}).get("call_id")
@property
def response_item_id(self) -> Optional[str]:
def response_item_id(self) -> str | None:
"""Codex response_item_id from provider_data."""
return (self.provider_data or {}).get("response_item_id")
@@ -101,18 +101,18 @@ class NormalizedResponse:
* Others: ``None``
"""
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
content: str | None
tool_calls: list[ToolCall] | None
finish_reason: str # "stop", "tool_calls", "length", "content_filter"
reasoning: Optional[str] = None
usage: Optional[Usage] = None
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
reasoning: str | None = None
usage: Usage | None = None
provider_data: dict[str, Any] | None = field(default=None, repr=False)
# ── Backward compatibility ──────────────────────────────────
# The shim _nr_to_assistant_message() mapped these from provider_data.
# These properties let NormalizedResponse pass through directly.
@property
def reasoning_content(self) -> Optional[str]:
def reasoning_content(self) -> str | None:
pd = self.provider_data or {}
return pd.get("reasoning_content")
@@ -136,8 +136,9 @@ class NormalizedResponse:
# Factory helpers
# ---------------------------------------------------------------------------
def build_tool_call(
id: Optional[str],
id: str | None,
name: str,
arguments: Any,
**provider_fields: Any,
@@ -151,7 +152,7 @@ def build_tool_call(
return ToolCall(id=id, name=name, arguments=args_str, provider_data=pd)
def map_finish_reason(reason: Optional[str], mapping: Dict[str, str]) -> str:
def map_finish_reason(reason: str | None, mapping: dict[str, str]) -> str:
"""Translate a provider-specific stop reason to the normalised set.
Falls back to ``"stop"`` for unknown or ``None`` reasons.

299
providers/README.md Normal file
View File

@@ -0,0 +1,299 @@
# providers/
Single source of truth for every inference provider Hermes knows about.
Each provider is declared once here as a `ProviderProfile`. Every other layer —
auth resolution, transport kwargs, model listing, runtime routing — reads from
these profiles instead of maintaining its own parallel data.
---
## Directory layout
```
providers/
├── base.py ProviderProfile dataclass + OMIT_TEMPERATURE sentinel
├── __init__.py Registry: register_provider(), get_provider_profile()
├── README.md This file
├── # Simple providers — just identity + auth + endpoint
├── alibaba.py Alibaba Cloud DashScope
├── arcee.py Arcee AI
├── bedrock.py AWS Bedrock (api_mode=bedrock_converse)
├── deepseek.py DeepSeek
├── huggingface.py Hugging Face Inference API
├── kilocode.py Kilo Code
├── minimax.py MiniMax (international + CN)
├── nvidia.py NVIDIA NIM (default_max_tokens=16384)
├── ollama_cloud.py Ollama Cloud
├── stepfun.py StepFun
├── xiaomi.py Xiaomi MiMo
├── xai.py xAI Grok (api_mode=codex_responses)
├── zai.py Z.AI / GLM
├── # Medium — one or two quirks
├── anthropic.py Native Anthropic (x-api-key header, api_mode=anthropic_messages)
├── copilot.py GitHub Copilot (auth_type=copilot, reasoning per model)
├── copilot_acp.py Copilot ACP subprocess (api_mode=copilot_acp)
├── custom.py Custom/Ollama local (think=false, num_ctx)
├── gemini.py Google Gemini AI Studio + Cloud Code OAuth
├── kimi.py Kimi Coding (OMIT_TEMPERATURE, thinking, dual endpoint)
├── openai_codex.py OpenAI Codex OAuth (api_mode=codex_responses)
├── opencode.py OpenCode Zen + Go (per-model api_mode routing)
├── # Complex — subclasses with multiple overrides
├── nous.py Nous Portal (tags, attribution, reasoning omit-when-disabled)
├── openrouter.py OpenRouter (provider preferences, public model fetch)
├── qwen.py Qwen OAuth (message normalization, cache_control, vl_hires)
└── vercel.py Vercel AI Gateway (attribution headers, reasoning passthrough)
```
---
## ProviderProfile fields
```python
@dataclass
class ProviderProfile:
# Identity
name: str # canonical ID matching hermes_cli/auth.py PROVIDER_REGISTRY
api_mode: str # "chat_completions" | "anthropic_messages" |
# "codex_responses" | "bedrock_converse" | "copilot_acp"
aliases: tuple # alternate names resolved by get_provider_profile()
# Auth & endpoints
env_vars: tuple # env var names holding the API key, in priority order
base_url: str # default inference endpoint
models_url: str # explicit models endpoint; falls back to {base_url}/models
# set when the models catalog lives at a different URL
# (e.g. OpenRouter: public /api/v1/models vs /api/v1 inference)
auth_type: str # "api_key" | "oauth_device_code" | "oauth_external" |
# "copilot" | "aws" | "external_process"
# Client-level quirks
default_headers: dict # extra HTTP headers sent on every request
# Request-level quirks
fixed_temperature: Any # None = use caller's default; OMIT_TEMPERATURE = don't send
default_max_tokens: int|None # inject max_tokens when caller omits it
default_aux_model: str # cheap model for auxiliary tasks (compression, vision, etc.)
# empty string = use main model (default)
```
---
## Hooks (override in a subclass)
| Method | When to override |
|--------|-----------------|
| `prepare_messages(messages)` | Provider needs message pre-processing (Qwen: string → list-of-parts, cache_control) |
| `build_extra_body(*, session_id, **ctx)` | Provider-specific `extra_body` fields (Nous: tags, OpenRouter: provider preferences) |
| `build_api_kwargs_extras(*, reasoning_config, **ctx)` | Returns `(extra_body_additions, top_level_kwargs)` — use when some fields go to `extra_body` and some go top-level (Kimi: `reasoning_effort` top-level; OpenRouter: `reasoning` in extra_body) |
| `fetch_models(*, api_key, timeout)` | Custom model listing (Anthropic: x-api-key header; OpenRouter: public endpoint, no auth; Bedrock/copilot-acp: return None) |
All hooks have safe defaults — only override what differs from the base.
---
## How to add a new provider
### 1. Simple (standard OpenAI-compatible endpoint)
```python
# providers/myprovider.py
from providers import register_provider
from providers.base import ProviderProfile
myprovider = ProviderProfile(
name="myprovider", # must match id in hermes_cli/auth.py PROVIDER_REGISTRY
aliases=("my-provider", "myp"),
api_mode="chat_completions",
env_vars=("MYPROVIDER_API_KEY",),
base_url="https://api.myprovider.com/v1",
auth_type="api_key",
)
register_provider(myprovider)
```
The default `fetch_models()` will call `GET https://api.myprovider.com/v1/models`
with Bearer auth automatically. No override needed for standard `/v1/models`.
### 2. With quirks (subclass)
```python
# providers/myprovider.py
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class MyProviderProfile(ProviderProfile):
"""My provider — custom reasoning header."""
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
**ctx: Any,
) -> tuple[dict[str, Any], dict[str, Any]]:
extra_body: dict[str, Any] = {}
if reasoning_config:
extra_body["my_reasoning"] = reasoning_config.get("effort", "medium")
return extra_body, {}
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
# Override only if your endpoint differs from standard /v1/models
return super().fetch_models(api_key=api_key, timeout=timeout)
myprovider = MyProviderProfile(
name="myprovider",
aliases=("myp",),
env_vars=("MYPROVIDER_API_KEY",),
base_url="https://api.myprovider.com/v1",
)
register_provider(myprovider)
```
### 3. Wire it up
After creating the file, add `name` to the `_PROFILE_ACTIVE_PROVIDERS` set in
`run_agent.py` once you've verified parity against the legacy flag path. Start
with a simple provider (no message prep, no reasoning quirks) and work up.
---
## fetch_models contract
```python
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
...
```
- Returns `list[str]`: model IDs from the provider's live endpoint.
- Returns `None`: provider doesn't support REST model listing (Bedrock, copilot-acp),
or the request failed. Callers **must** fall back to `_PROVIDER_MODELS` on `None`.
- Never raises — swallow exceptions and return `None`.
- Default implementation: `GET {base_url}/models` with Bearer auth. Works for any
standard OpenAI-compatible provider.
**Override when:**
- Auth header is not `Bearer` (Anthropic: `x-api-key`)
- Endpoint path differs from `/models` AND you can't just set `models_url` (OpenRouter: public endpoint, pass `api_key=None` explicitly)
- Response format differs (extra wrapping, non-standard `id` field)
- Provider has no REST endpoint (Bedrock, copilot-acp → return `None`)
- Filtering needed post-fetch (only tool-capable models, etc.)
Use `models_url` instead of overriding when the only difference is the URL:
```python
# No subclass needed — just set models_url
myprovider = ProviderProfile(
name="myprovider",
base_url="https://api.myprovider.com/v1",
models_url="https://catalog.myprovider.com/models", # different host
)
```
---
## Debugging
### Check if a provider resolves
```python
from providers import get_provider_profile
p = get_provider_profile("myprovider")
print(p) # ProviderProfile(name='myprovider', ...)
print(p.base_url)
print(p.api_mode)
```
### Check all registered providers
```python
from providers import _REGISTRY
print(list(_REGISTRY.keys()))
```
### Test live model fetch
```python
import os
from providers import get_provider_profile
p = get_provider_profile("myprovider")
key = os.getenv("MYPROVIDER_API_KEY")
models = p.fetch_models(api_key=key, timeout=5.0)
print(models) # list of model IDs, or None on failure
```
### Test alias resolution
```python
from providers import get_provider_profile
# All of these should return the same profile
assert get_provider_profile("openrouter").name == "openrouter"
assert get_provider_profile("or").name == "openrouter"
```
### Run the provider test suite
```bash
# From the repo root
source venv/bin/activate
python -m pytest tests/providers/ -v
```
### Check ruff + ty compliance
```bash
source venv/bin/activate
ruff format providers/*.py
ruff check providers/*.py --select UP,E,F,I,W
ty check providers/*.py
```
---
## Common mistakes
**Wrong `name`** — must exactly match the `id` field in `hermes_cli/auth.py`
`PROVIDER_REGISTRY`. If they diverge, `get_provider_profile()` will return a
profile that doesn't match what `runtime_provider.py` resolves.
**Wrong `env_vars`** — check `hermes_cli/auth.py` `api_key_env_vars` for the
exact env var names. Mismatch means WS2 auth migration will miss the key.
**Wrong `base_url`** — check `hermes_cli/auth.py` `inference_base_url` constant.
Several providers have version suffixes or non-obvious paths
(`stepfun: /step_plan/v1`, `opencode-go: /zen/go/v1`).
**Skipping `api_mode`** — defaults to `chat_completions`. Providers that use
`anthropic_messages`, `codex_responses`, `bedrock_converse`, or `copilot_acp`
must set it explicitly.
**Forgetting `register_provider()`** — auto-discovery runs `pkgutil.iter_modules`
over the package and imports each module, but only if `register_provider()` is
called at module level. Without it the profile is never in `_REGISTRY`.
**`fetch_models` returning the wrong shape** — must return `list[str]` (plain
model IDs), not `list[tuple]` or `list[dict]`. Callers expect plain strings.
**`build_api_kwargs_extras` wrong tuple** — must return `(extra_body_dict,
top_level_dict)`. Returning a flat dict or swapping the order silently sends
fields to the wrong place.

61
providers/__init__.py Normal file
View File

@@ -0,0 +1,61 @@
"""Provider module registry.
Auto-discovers ProviderProfile instances from providers/*.py modules.
Each module should define a module-level PROVIDER or PROVIDERS list.
Usage:
from providers import get_provider_profile
profile = get_provider_profile("nvidia") # returns ProviderProfile or None
profile = get_provider_profile("kimi") # checks name + aliases
"""
from __future__ import annotations
from providers.base import OMIT_TEMPERATURE, ProviderProfile # noqa: F401
_REGISTRY: dict[str, ProviderProfile] = {}
_ALIASES: dict[str, str] = {}
_discovered = False
def register_provider(profile: ProviderProfile) -> None:
"""Register a provider profile by name and aliases."""
_REGISTRY[profile.name] = profile
for alias in profile.aliases:
_ALIASES[alias] = profile.name
def get_provider_profile(name: str) -> ProviderProfile | None:
"""Look up a provider profile by name or alias.
Returns None if the provider has no profile (falls back to generic).
"""
if not _discovered:
_discover_providers()
canonical = _ALIASES.get(name, name)
return _REGISTRY.get(canonical)
def _discover_providers() -> None:
"""Import all provider modules to trigger registration."""
global _discovered
if _discovered:
return
_discovered = True
import importlib
import pkgutil
import providers as _pkg
for _importer, modname, _ispkg in pkgutil.iter_modules(_pkg.__path__):
if modname.startswith("_") or modname == "base":
continue
try:
importlib.import_module(f"providers.{modname}")
except ImportError as e:
import logging
logging.getLogger(__name__).warning(
"Failed to import provider module %s: %s", modname, e
)

13
providers/alibaba.py Normal file
View File

@@ -0,0 +1,13 @@
"""Alibaba Cloud DashScope provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
alibaba = ProviderProfile(
name="alibaba",
aliases=("dashscope", "alibaba-cloud", "qwen-dashscope"),
env_vars=("ALIBABA_API_KEY", "DASHSCOPE_API_KEY"),
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1",
)
register_provider(alibaba)

52
providers/anthropic.py Normal file
View File

@@ -0,0 +1,52 @@
"""Native Anthropic provider profile."""
import json
import logging
import urllib.request
from providers import register_provider
from providers.base import ProviderProfile
logger = logging.getLogger(__name__)
class AnthropicProfile(ProviderProfile):
"""Native Anthropic — uses x-api-key header, not Bearer."""
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Anthropic uses x-api-key header and anthropic-version."""
if not api_key:
return None
try:
req = urllib.request.Request("https://api.anthropic.com/v1/models")
req.add_header("x-api-key", api_key)
req.add_header("anthropic-version", "2023-06-01")
req.add_header("Accept", "application/json")
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read().decode())
return [
m["id"]
for m in data.get("data", [])
if isinstance(m, dict) and "id" in m
]
except Exception as exc:
logger.debug("fetch_models(anthropic): %s", exc)
return None
anthropic = AnthropicProfile(
name="anthropic",
aliases=("claude", "claude-oauth"),
api_mode="anthropic_messages",
env_vars=("ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN", "CLAUDE_CODE_OAUTH_TOKEN"),
base_url="https://api.anthropic.com",
auth_type="api_key",
default_aux_model="claude-haiku-4-5-20251001",
)
register_provider(anthropic)

13
providers/arcee.py Normal file
View File

@@ -0,0 +1,13 @@
"""Arcee AI provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
arcee = ProviderProfile(
name="arcee",
aliases=("arcee-ai", "arceeai"),
env_vars=("ARCEE_API_KEY",),
base_url="https://api.arcee.ai/api/v1",
)
register_provider(arcee)

138
providers/base.py Normal file
View File

@@ -0,0 +1,138 @@
"""Provider profile base class.
A ProviderProfile declares everything about an inference provider in one place:
auth, endpoints, client quirks, request-time quirks. The transport reads this
instead of receiving 20+ boolean flags.
Provider profiles are DECLARATIVE — they describe the provider's behavior.
They do NOT own client construction, credential rotation, or streaming.
Those stay on AIAgent.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
# Sentinel for "omit temperature entirely" (Kimi: server manages it)
OMIT_TEMPERATURE = object()
@dataclass
class ProviderProfile:
"""Base provider profile — subclass or instantiate with overrides."""
# ── Identity ─────────────────────────────────────────────
name: str
api_mode: str = "chat_completions"
aliases: tuple = ()
# ── Auth & endpoints ─────────────────────────────────────
env_vars: tuple = ()
base_url: str = ""
models_url: str = "" # explicit models endpoint; falls back to {base_url}/models
auth_type: str = "api_key" # api_key|oauth_device_code|oauth_external|copilot|aws
# ── Client-level quirks (set once at client construction) ─
default_headers: dict[str, str] = field(default_factory=dict)
# ── Request-level quirks ─────────────────────────────────
# Temperature: None = use caller's default, OMIT_TEMPERATURE = don't send
fixed_temperature: Any = None
default_max_tokens: int | None = None
default_aux_model: str = (
"" # cheap model for auxiliary tasks (compression, vision, etc.)
)
# empty = use main model
# ── Hooks (override in subclass for complex providers) ───
def prepare_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Provider-specific message preprocessing.
Called AFTER codex field sanitization, BEFORE developer role swap.
Default: pass-through.
"""
return messages
def build_extra_body(
self, *, session_id: str | None = None, **context: Any
) -> dict[str, Any]:
"""Provider-specific extra_body fields.
Merged into the API kwargs extra_body. Default: empty dict.
"""
return {}
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
**context: Any,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Provider-specific kwargs split between extra_body and top-level api_kwargs.
Returns (extra_body_additions, top_level_kwargs).
The transport merges extra_body_additions into extra_body, and
top_level_kwargs directly into api_kwargs.
This split exists because some providers put reasoning config in
extra_body (OpenRouter: extra_body.reasoning) while others put it
as top-level api_kwargs (Kimi: api_kwargs.reasoning_effort).
Default: ({}, {}).
"""
return {}, {}
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Fetch the live model list from the provider's models endpoint.
Returns a list of model ID strings, or None if the fetch failed or
the provider does not support live model listing.
Resolution order for the endpoint URL:
1. self.models_url (explicit override — use when the models
endpoint differs from the inference base URL, e.g. OpenRouter
exposes a public catalog at /api/v1/models while inference is
at /api/v1)
2. self.base_url + "/models" (standard OpenAI-compat fallback)
The default implementation sends Bearer auth when api_key is given
and forwards self.default_headers. Override to customise auth, path,
response shape, or to return None for providers with no REST catalog.
Callers must always fall back to the static _PROVIDER_MODELS list
when this returns None.
"""
url = (self.models_url or "").strip()
if not url:
if not self.base_url:
return None
url = self.base_url.rstrip("/") + "/models"
import json
import urllib.request
req = urllib.request.Request(url)
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
req.add_header("Accept", "application/json")
for k, v in self.default_headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read().decode())
items = data if isinstance(data, list) else data.get("data", [])
return [m["id"] for m in items if isinstance(m, dict) and "id" in m]
except Exception as exc:
logger.debug("fetch_models(%s): %s", self.name, exc)
return None

29
providers/bedrock.py Normal file
View File

@@ -0,0 +1,29 @@
"""AWS Bedrock provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
class BedrockProfile(ProviderProfile):
"""AWS Bedrock — no REST /v1/models endpoint; uses AWS SDK."""
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Bedrock model listing requires AWS SDK, not a REST call."""
return None
bedrock = BedrockProfile(
name="bedrock",
aliases=("aws", "aws-bedrock", "amazon-bedrock", "amazon"),
api_mode="bedrock_converse",
env_vars=(), # AWS SDK credentials — not env vars
base_url="https://bedrock-runtime.us-east-1.amazonaws.com",
auth_type="aws",
)
register_provider(bedrock)

55
providers/copilot.py Normal file
View File

@@ -0,0 +1,55 @@
"""Copilot / GitHub Models provider profile.
Copilot uses per-model api_mode routing:
- GPT-5+ / Codex models → codex_responses
- Claude models → anthropic_messages
- Everything else → chat_completions (this profile covers that subset)
Key quirks for the chat_completions subset:
- Editor attribution headers (via copilot_default_headers())
- GitHub Models reasoning extra_body (model-catalog gated)
"""
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class CopilotProfile(ProviderProfile):
"""GitHub Copilot / GitHub Models — editor headers + reasoning."""
def build_api_kwargs_extras(
self,
*,
model: str | None = None,
reasoning_config: dict | None = None,
supports_reasoning: bool = False,
**ctx,
) -> tuple[dict[str, Any], dict[str, Any]]:
extra_body: dict[str, Any] = {}
if supports_reasoning and model:
try:
from hermes_cli.models import github_model_reasoning_efforts
supported_efforts = github_model_reasoning_efforts(model)
if supported_efforts and reasoning_config:
effort = reasoning_config.get("effort", "medium")
if effort in supported_efforts:
extra_body["reasoning"] = {"effort": effort}
elif supported_efforts:
extra_body["reasoning"] = {"effort": "medium"}
except Exception:
pass
return extra_body, {}
copilot = CopilotProfile(
name="copilot",
aliases=("github-copilot", "github-models"),
env_vars=("COPILOT_API_KEY",),
base_url="https://api.githubcopilot.com",
auth_type="copilot",
)
register_provider(copilot)

34
providers/copilot_acp.py Normal file
View File

@@ -0,0 +1,34 @@
"""GitHub Copilot ACP provider profile.
copilot-acp uses an external ACP subprocess — NOT the standard
transport. api_mode="copilot_acp" is handled separately in run_agent.py.
The profile captures auth + endpoint metadata for registry migration.
"""
from providers import register_provider
from providers.base import ProviderProfile
class CopilotACPProfile(ProviderProfile):
"""GitHub Copilot ACP — external process, no REST models endpoint."""
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Model listing is handled by the ACP subprocess."""
return None
copilot_acp = CopilotACPProfile(
name="copilot-acp",
aliases=("github-copilot-acp", "copilot-acp-agent"),
api_mode="copilot_acp",
env_vars=(), # Managed by ACP subprocess
base_url="acp://copilot", # ACP internal scheme
auth_type="external_process",
)
register_provider(copilot_acp)

61
providers/custom.py Normal file
View File

@@ -0,0 +1,61 @@
"""Custom / Ollama (local) provider profile.
Covers any endpoint registered as provider="custom", including local
Ollama instances. Key quirks:
- ollama_num_ctx → extra_body.options.num_ctx (local context window)
- reasoning_config disabled → extra_body.think = False
"""
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class CustomProfile(ProviderProfile):
"""Custom/Ollama local provider — think=false and num_ctx support."""
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
ollama_num_ctx: int | None = None,
**ctx: Any,
) -> tuple[dict[str, Any], dict[str, Any]]:
extra_body: dict[str, Any] = {}
# Ollama context window
if ollama_num_ctx:
options = extra_body.get("options", {})
options["num_ctx"] = ollama_num_ctx
extra_body["options"] = options
# Disable thinking when reasoning is turned off
if reasoning_config and isinstance(reasoning_config, dict):
_effort = (reasoning_config.get("effort") or "").strip().lower()
_enabled = reasoning_config.get("enabled", True)
if _effort == "none" or _enabled is False:
extra_body["think"] = False
return extra_body, {}
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Custom/Ollama: base_url is user-configured; fetch if set."""
if not self.base_url:
return None
return super().fetch_models(api_key=api_key, timeout=timeout)
custom = CustomProfile(
name="custom",
aliases=("ollama", "local"),
env_vars=(), # No fixed key — custom endpoint
base_url="", # User-configured
)
register_provider(custom)

13
providers/deepseek.py Normal file
View File

@@ -0,0 +1,13 @@
"""DeepSeek provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
deepseek = ProviderProfile(
name="deepseek",
aliases=("deepseek-chat",),
env_vars=("DEEPSEEK_API_KEY",),
base_url="https://api.deepseek.com/v1",
)
register_provider(deepseek)

34
providers/gemini.py Normal file
View File

@@ -0,0 +1,34 @@
"""Google Gemini provider profiles.
gemini: Google AI Studio (API key) — uses GeminiNativeClient
google-gemini-cli: Google Cloud Code Assist (OAuth) — uses GeminiCloudCodeClient
Both report api_mode="chat_completions" but use custom native clients
that bypass the standard OpenAI transport. The profile captures auth
and endpoint metadata for auth.py / runtime_provider.py migration.
"""
from providers import register_provider
from providers.base import ProviderProfile
gemini = ProviderProfile(
name="gemini",
aliases=("google", "google-gemini", "google-ai-studio"),
api_mode="chat_completions",
env_vars=("GOOGLE_API_KEY", "GEMINI_API_KEY"),
base_url="https://generativelanguage.googleapis.com/v1beta",
auth_type="api_key",
default_aux_model="gemini-3-flash-preview",
)
google_gemini_cli = ProviderProfile(
name="google-gemini-cli",
aliases=("gemini-cli", "gemini-oauth"),
api_mode="chat_completions",
env_vars=(), # OAuth — no API key
base_url="cloudcode-pa://google", # Cloud Code Assist internal scheme
auth_type="oauth_external",
)
register_provider(gemini)
register_provider(google_gemini_cli)

13
providers/huggingface.py Normal file
View File

@@ -0,0 +1,13 @@
"""Hugging Face provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
huggingface = ProviderProfile(
name="huggingface",
aliases=("hf", "hugging-face"),
env_vars=("HF_TOKEN",),
base_url="https://router.huggingface.co/v1",
)
register_provider(huggingface)

14
providers/kilocode.py Normal file
View File

@@ -0,0 +1,14 @@
"""Kilo Code provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
kilocode = ProviderProfile(
name="kilocode",
aliases=("kilo-code", "kilo"),
env_vars=("KILOCODE_API_KEY",),
base_url="https://api.kilo.ai/api/gateway",
default_aux_model="google/gemini-3-flash-preview",
)
register_provider(kilocode)

71
providers/kimi.py Normal file
View File

@@ -0,0 +1,71 @@
"""Kimi / Moonshot provider profiles.
Kimi has dual endpoints:
- sk-kimi-* keys → api.kimi.com/coding (Anthropic Messages API)
- legacy keys → api.moonshot.ai/v1 (OpenAI chat completions)
This module covers the chat_completions path (/v1 endpoint).
"""
from typing import Any
from providers import register_provider
from providers.base import OMIT_TEMPERATURE, ProviderProfile
class KimiProfile(ProviderProfile):
"""Kimi/Moonshot — temperature omitted, thinking + reasoning_effort."""
def build_api_kwargs_extras(
self, *, reasoning_config: dict | None = None, **context
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Kimi uses extra_body.thinking + top-level reasoning_effort."""
extra_body = {}
top_level = {}
if not reasoning_config or not isinstance(reasoning_config, dict):
# No config → thinking enabled, default effort
extra_body["thinking"] = {"type": "enabled"}
top_level["reasoning_effort"] = "medium"
return extra_body, top_level
enabled = reasoning_config.get("enabled", True)
if enabled is False:
extra_body["thinking"] = {"type": "disabled"}
return extra_body, top_level
# Enabled
extra_body["thinking"] = {"type": "enabled"}
effort = (reasoning_config.get("effort") or "").strip().lower()
if effort in ("low", "medium", "high"):
top_level["reasoning_effort"] = effort
else:
top_level["reasoning_effort"] = "medium"
return extra_body, top_level
kimi = KimiProfile(
name="kimi-coding",
aliases=("kimi", "moonshot"),
env_vars=("KIMI_API_KEY", "MOONSHOT_API_KEY"),
base_url="https://api.moonshot.ai/v1",
fixed_temperature=OMIT_TEMPERATURE,
default_max_tokens=32000,
default_headers={"User-Agent": "hermes-agent/1.0"},
default_aux_model="kimi-k2-turbo-preview",
)
kimi_cn = KimiProfile(
name="kimi-coding-cn",
aliases=(),
env_vars=("KIMI_CN_API_KEY",),
base_url="https://api.moonshot.cn/v1",
fixed_temperature=OMIT_TEMPERATURE,
default_max_tokens=32000,
default_headers={"User-Agent": "hermes-agent/1.0"},
default_aux_model="kimi-k2-turbo-preview",
)
register_provider(kimi)
register_provider(kimi_cn)

29
providers/minimax.py Normal file
View File

@@ -0,0 +1,29 @@
"""MiniMax provider profiles (international + China).
Both use anthropic_messages api_mode — their inference_base_url
ends with /anthropic which triggers auto-detection to anthropic_messages.
"""
from providers import register_provider
from providers.base import ProviderProfile
minimax = ProviderProfile(
name="minimax",
aliases=("mini-max",),
api_mode="anthropic_messages",
env_vars=("MINIMAX_API_KEY",),
base_url="https://api.minimax.io/anthropic",
auth_type="api_key",
)
minimax_cn = ProviderProfile(
name="minimax-cn",
aliases=("minimax-china", "minimax_cn"),
api_mode="anthropic_messages",
env_vars=("MINIMAX_CN_API_KEY",),
base_url="https://api.minimaxi.com/anthropic",
auth_type="api_key",
)
register_provider(minimax)
register_provider(minimax_cn)

46
providers/nous.py Normal file
View File

@@ -0,0 +1,46 @@
"""Nous Portal provider profile."""
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class NousProfile(ProviderProfile):
"""Nous Portal — product tags, reasoning with Nous-specific omission."""
def build_extra_body(
self, *, session_id: str | None = None, **context
) -> dict[str, Any]:
return {"tags": ["product=hermes-agent"]}
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
supports_reasoning: bool = False,
**context,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Nous: passes full reasoning_config, but OMITS when disabled."""
extra_body = {}
if supports_reasoning:
if reasoning_config is not None:
rc = dict(reasoning_config)
if rc.get("enabled") is False:
pass # Nous omits reasoning when disabled
else:
extra_body["reasoning"] = rc
else:
extra_body["reasoning"] = {"enabled": True, "effort": "medium"}
return extra_body, {}
nous = NousProfile(
name="nous",
aliases=("nous-portal", "nousresearch"),
env_vars=("NOUS_API_KEY",),
base_url="https://inference-api.nousresearch.com/v1",
auth_type="oauth_device_code",
)
register_provider(nous)

14
providers/nvidia.py Normal file
View File

@@ -0,0 +1,14 @@
"""NVIDIA NIM provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
nvidia = ProviderProfile(
name="nvidia",
aliases=("nvidia-nim",),
env_vars=("NVIDIA_API_KEY",),
base_url="https://integrate.api.nvidia.com/v1",
default_max_tokens=16384,
)
register_provider(nvidia)

13
providers/ollama_cloud.py Normal file
View File

@@ -0,0 +1,13 @@
"""Ollama Cloud provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
ollama_cloud = ProviderProfile(
name="ollama-cloud",
aliases=("ollama_cloud",),
env_vars=("OLLAMA_CLOUD_API_KEY",),
base_url="https://ollama.com/v1",
)
register_provider(ollama_cloud)

15
providers/openai_codex.py Normal file
View File

@@ -0,0 +1,15 @@
"""OpenAI Codex (Responses API) provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
openai_codex = ProviderProfile(
name="openai-codex",
aliases=("codex", "openai_codex"),
api_mode="codex_responses",
env_vars=(), # OAuth external — no API key
base_url="https://chatgpt.com/backend-api/codex",
auth_type="oauth_external",
)
register_provider(openai_codex)

30
providers/opencode.py Normal file
View File

@@ -0,0 +1,30 @@
"""OpenCode provider profiles (Zen + Go).
Both use per-model api_mode routing:
- OpenCode Zen: Claude → anthropic_messages, GPT-5/Codex → codex_responses,
everything else → chat_completions (this profile)
- OpenCode Go: MiniMax → anthropic_messages, GLM/Kimi → chat_completions
(this profile)
"""
from providers import register_provider
from providers.base import ProviderProfile
opencode_zen = ProviderProfile(
name="opencode-zen",
aliases=("opencode", "opencode_zen", "zen"),
env_vars=("OPENCODE_ZEN_API_KEY",),
base_url="https://opencode.ai/zen/v1",
default_aux_model="gemini-3-flash",
)
opencode_go = ProviderProfile(
name="opencode-go",
aliases=("opencode_go",),
env_vars=("OPENCODE_GO_API_KEY",),
base_url="https://opencode.ai/zen/go/v1",
default_aux_model="glm-5",
)
register_provider(opencode_zen)
register_provider(opencode_go)

70
providers/openrouter.py Normal file
View File

@@ -0,0 +1,70 @@
"""OpenRouter provider profile."""
import logging
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
logger = logging.getLogger(__name__)
_CACHE: list[str] | None = None
class OpenRouterProfile(ProviderProfile):
"""OpenRouter aggregator — provider preferences, reasoning config passthrough."""
def fetch_models(
self,
*,
api_key: str | None = None,
timeout: float = 8.0,
) -> list[str] | None:
"""Fetch from public OpenRouter catalog — no auth, filter by tool support."""
global _CACHE # noqa: PLW0603
if _CACHE is not None:
return _CACHE
try:
result = super().fetch_models(api_key=None, timeout=timeout)
if result is not None:
_CACHE = result
return result
except Exception as exc:
logger.debug("fetch_models(openrouter): %s", exc)
return None
def build_extra_body(
self, *, session_id: str | None = None, **context: Any
) -> dict[str, Any]:
body: dict[str, Any] = {}
prefs = context.get("provider_preferences")
if prefs:
body["provider"] = prefs
return body
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
supports_reasoning: bool = False,
**context: Any,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""OpenRouter passes the full reasoning_config dict as extra_body.reasoning."""
extra_body: dict[str, Any] = {}
if supports_reasoning:
if reasoning_config is not None:
extra_body["reasoning"] = dict(reasoning_config)
else:
extra_body["reasoning"] = {"enabled": True, "effort": "medium"}
return extra_body, {}
openrouter = OpenRouterProfile(
name="openrouter",
aliases=("or",),
env_vars=("OPENROUTER_API_KEY",),
base_url="https://openrouter.ai/api/v1",
models_url="https://openrouter.ai/api/v1/models", # public, no auth needed
)
register_provider(openrouter)

82
providers/qwen.py Normal file
View File

@@ -0,0 +1,82 @@
"""Qwen Portal provider profile."""
import copy
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class QwenProfile(ProviderProfile):
"""Qwen Portal — message normalization, vl_high_resolution, metadata top-level."""
def prepare_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Normalize content to list-of-dicts format.
Inject cache_control on system message.
Matches the behavior of run_agent.py:_qwen_prepare_chat_messages().
"""
prepared = copy.deepcopy(messages)
if not prepared:
return prepared
for msg in prepared:
if not isinstance(msg, dict):
continue
content = msg.get("content")
if isinstance(content, str):
msg["content"] = [{"type": "text", "text": content}]
elif isinstance(content, list):
normalized_parts = []
for part in content:
if isinstance(part, str):
normalized_parts.append({"type": "text", "text": part})
elif isinstance(part, dict):
normalized_parts.append(part)
if normalized_parts:
msg["content"] = normalized_parts
# Inject cache_control on the last part of the system message.
for msg in prepared:
if isinstance(msg, dict) and msg.get("role") == "system":
content = msg.get("content")
if (
isinstance(content, list)
and content
and isinstance(content[-1], dict)
):
content[-1]["cache_control"] = {"type": "ephemeral"}
break
return prepared
def build_extra_body(
self, *, session_id: str | None = None, **context
) -> dict[str, Any]:
return {"vl_high_resolution_images": True}
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
qwen_session_metadata: dict | None = None,
**context,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Qwen metadata goes to top-level api_kwargs, not extra_body."""
top_level = {}
if qwen_session_metadata:
top_level["metadata"] = qwen_session_metadata
return {}, top_level
qwen = QwenProfile(
name="qwen-oauth",
aliases=("qwen", "qwen-portal"),
env_vars=("QWEN_API_KEY",),
base_url="https://portal.qwen.ai/v1",
auth_type="oauth_external",
default_max_tokens=65536,
)
register_provider(qwen)

13
providers/stepfun.py Normal file
View File

@@ -0,0 +1,13 @@
"""StepFun provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
stepfun = ProviderProfile(
name="stepfun",
aliases=("step", "stepfun-coding-plan"),
env_vars=("STEPFUN_API_KEY",),
base_url="https://api.stepfun.ai/step_plan/v1",
)
register_provider(stepfun)

43
providers/vercel.py Normal file
View File

@@ -0,0 +1,43 @@
"""Vercel AI Gateway provider profile.
AI Gateway routes to multiple backends. Hermes sends attribution
headers and full reasoning config passthrough.
"""
from typing import Any
from providers import register_provider
from providers.base import ProviderProfile
class VercelAIGatewayProfile(ProviderProfile):
"""Vercel AI Gateway — attribution headers + reasoning passthrough."""
def build_api_kwargs_extras(
self,
*,
reasoning_config: dict | None = None,
supports_reasoning: bool = True,
**ctx: Any,
) -> tuple[dict[str, Any], dict[str, Any]]:
extra_body: dict[str, Any] = {}
if supports_reasoning and reasoning_config is not None:
extra_body["reasoning"] = dict(reasoning_config)
elif supports_reasoning:
extra_body["reasoning"] = {"enabled": True, "effort": "medium"}
return extra_body, {}
vercel = VercelAIGatewayProfile(
name="ai-gateway",
aliases=("vercel", "vercel-ai-gateway", "ai_gateway", "aigateway"),
env_vars=("AI_GATEWAY_API_KEY",),
base_url="https://ai-gateway.vercel.sh/v1",
default_headers={
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-Title": "Hermes Agent",
},
default_aux_model="google/gemini-3-flash",
)
register_provider(vercel)

15
providers/xai.py Normal file
View File

@@ -0,0 +1,15 @@
"""xAI (Grok) provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
xai = ProviderProfile(
name="xai",
aliases=("grok", "x-ai"),
api_mode="codex_responses",
env_vars=("XAI_API_KEY",),
base_url="https://api.x.ai/v1",
auth_type="api_key",
)
register_provider(xai)

13
providers/xiaomi.py Normal file
View File

@@ -0,0 +1,13 @@
"""Xiaomi MiMo provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
xiaomi = ProviderProfile(
name="xiaomi",
aliases=("mimo", "xiaomi-mimo"),
env_vars=("XIAOMI_API_KEY",),
base_url="https://api.xiaomimimo.com/v1",
)
register_provider(xiaomi)

14
providers/zai.py Normal file
View File

@@ -0,0 +1,14 @@
"""ZAI / GLM provider profile."""
from providers import register_provider
from providers.base import ProviderProfile
zai = ProviderProfile(
name="zai",
aliases=("glm", "z-ai", "z.ai", "zhipu"),
env_vars=("ZAI_API_KEY",),
base_url="https://api.z.ai/api/paas/v4",
default_aux_model="glm-4.5-flash",
)
register_provider(zai)

View File

@@ -137,7 +137,7 @@ py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajector
hermes_cli = ["web_dist/**/*"]
[tool.setuptools.packages.find]
include = ["agent", "agent.*", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "tui_gateway", "tui_gateway.*", "cron", "acp_adapter", "plugins", "plugins.*"]
include = ["agent", "agent.*", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "tui_gateway", "tui_gateway.*", "cron", "acp_adapter", "plugins", "plugins.*", "providers", "providers.*"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -59,6 +59,14 @@ from hermes_cli.timeouts import (
_hermes_home = get_hermes_home()
_project_env = Path(__file__).parent / '.env'
_loaded_env_paths = load_hermes_dotenv(hermes_home=_hermes_home, project_env=_project_env)
# Providers whose kwargs are built via ProviderProfile instead of legacy flags.
# Extended incrementally as parity is verified. See providers/ package.
_PROFILE_ACTIVE_PROVIDERS: frozenset[str] = frozenset({
"nvidia", "nvidia-nim",
"deepseek", "deepseek-chat",
})
if _loaded_env_paths:
for _env_path in _loaded_env_paths:
logger.info("Loaded environment variables from %s", _env_path)
@@ -4963,7 +4971,7 @@ class AIAgent:
_validate_proxy_env_urls()
_validate_base_url(client_kwargs.get("base_url"))
if self.provider == "copilot-acp" or str(client_kwargs.get("base_url", "")).startswith("acp://copilot"):
from agent.copilot_acp_client import CopilotACPClient
from acp_adapter.copilot_client import CopilotACPClient
client = CopilotACPClient(**client_kwargs)
logger.info(
@@ -7527,7 +7535,36 @@ class AIAgent:
# ── chat_completions (default) ─────────────────────────────────────
_ct = self._get_transport()
# Provider detection flags
# ── Provider profile path ────────────────────────────────────────
# Activated incrementally per provider as parity is verified.
# Each provider here has parity tests proving identical output.
# _PROFILE_ACTIVE_PROVIDERS is defined at module level.
if self.provider in _PROFILE_ACTIVE_PROVIDERS:
try:
from providers import get_provider_profile
_profile = get_provider_profile(self.provider)
except Exception:
_profile = None
if _profile:
_ephemeral_out = getattr(self, "_ephemeral_max_output_tokens", None)
if _ephemeral_out is not None:
self._ephemeral_max_output_tokens = None
return _ct.build_kwargs(
model=self.model,
messages=api_messages,
tools=self.tools,
timeout=self._resolved_api_call_timeout(),
max_tokens=self.max_tokens,
ephemeral_max_output_tokens=_ephemeral_out,
max_tokens_param_fn=self._max_tokens_param,
reasoning_config=self.reasoning_config,
request_overrides=self.request_overrides,
session_id=getattr(self, "session_id", None),
provider_profile=_profile,
ollama_num_ctx=self._ollama_num_ctx,
)
# ── Legacy flag path (providers without active profiles) ─────────
_is_qwen = self._is_qwen_portal()
_is_or = self._is_openrouter_url()
_is_gh = (
@@ -7535,7 +7572,6 @@ class AIAgent:
or base_url_host_matches(self._base_url_lower, "api.githubcopilot.com")
)
_is_nous = "nousresearch" in self._base_url_lower
_is_nvidia = "integrate.api.nvidia.com" in self._base_url_lower
_is_kimi = (
base_url_host_matches(self.base_url, "api.kimi.com")
or base_url_host_matches(self.base_url, "moonshot.ai")
@@ -7607,7 +7643,6 @@ class AIAgent:
is_nous=_is_nous,
is_qwen_portal=_is_qwen,
is_github_models=_is_gh,
is_nvidia_nim=_is_nvidia,
is_kimi=_is_kimi,
is_custom_provider=self.provider == "custom",
ollama_num_ctx=self._ollama_num_ctx,

View File

@@ -10,7 +10,7 @@ import unittest
from pathlib import Path
from unittest.mock import patch
from agent.copilot_acp_client import CopilotACPClient
from acp_adapter.copilot_client import CopilotACPClient
class _FakeProcess:
@@ -100,7 +100,7 @@ class CopilotACPClientSafetyTests(unittest.TestCase):
target = home / ".ssh" / "id_rsa"
target.parent.mkdir(parents=True, exist_ok=True)
with patch("agent.copilot_acp_client.is_write_denied", return_value=True, create=True):
with patch("acp_adapter.copilot_client.is_write_denied", return_value=True, create=True):
response = self._dispatch(
{
"jsonrpc": "2.0",
@@ -181,7 +181,7 @@ def test_run_prompt_prefers_profile_home_when_available(monkeypatch, tmp_path):
captured = {}
client = _make_home_client(tmp_path)
with _patch("agent.copilot_acp_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)):
with _patch("acp_adapter.copilot_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)):
with pytest.raises(RuntimeError, match="Could not start Copilot ACP command"):
client._run_prompt("hello", timeout_seconds=1)
@@ -195,7 +195,7 @@ def test_run_prompt_passes_home_when_parent_env_is_clean(monkeypatch, tmp_path):
captured = {}
client = _make_home_client(tmp_path)
with _patch("agent.copilot_acp_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)):
with _patch("acp_adapter.copilot_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)):
with pytest.raises(RuntimeError, match="Could not start Copilot ACP command"):
client._run_prompt("hello", timeout_seconds=1)

View File

@@ -142,13 +142,17 @@ class TestChatCompletionsBuildKwargs:
assert kw["max_tokens"] == 2048
def test_nvidia_default_max_tokens(self, transport):
"""NVIDIA max_tokens=16384 is now set via ProviderProfile, not legacy flag."""
from providers import get_provider_profile
profile = get_provider_profile("nvidia")
msgs = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="glm-4.7", messages=msgs,
is_nvidia_nim=True,
model="nvidia/llama-3.1-405b-instruct",
messages=msgs,
max_tokens_param_fn=lambda n: {"max_tokens": n},
provider_profile=profile,
)
# NVIDIA default: 16384
assert kw["max_tokens"] == 16384
def test_qwen_default_max_tokens(self, transport):

View File

View File

@@ -0,0 +1,118 @@
"""E2E tests: verify _build_kwargs_from_profile produces correct output.
These tests call _build_kwargs_from_profile on the transport directly,
without importing run_agent (which would cause xdist worker contamination).
"""
import pytest
from agent.transports.chat_completions import ChatCompletionsTransport
from providers import get_provider_profile
@pytest.fixture
def transport():
return ChatCompletionsTransport()
def _msgs():
return [{"role": "user", "content": "hi"}]
class TestNvidiaProfileWiring:
def test_nvidia_gets_default_max_tokens(self, transport):
profile = get_provider_profile("nvidia")
kwargs = transport.build_kwargs(
model="nvidia/llama-3.1-nemotron-70b-instruct",
messages=_msgs(),
tools=None,
provider_profile=profile,
max_tokens=None,
max_tokens_param_fn=lambda x: {"max_tokens": x} if x else {},
timeout=300,
reasoning_config=None,
request_overrides=None,
session_id="test",
ollama_num_ctx=None,
)
# NVIDIA profile sets default_max_tokens=16384
assert kwargs.get("max_tokens") == 16384
def test_nvidia_nim_alias(self, transport):
profile = get_provider_profile("nvidia-nim")
assert profile is not None
assert profile.name == "nvidia"
assert profile.default_max_tokens == 16384
def test_nvidia_model_passed(self, transport):
profile = get_provider_profile("nvidia")
kwargs = transport.build_kwargs(
model="nvidia/test-model",
messages=_msgs(),
tools=None,
provider_profile=profile,
max_tokens=None,
max_tokens_param_fn=lambda x: {"max_tokens": x} if x else {},
timeout=300,
reasoning_config=None,
request_overrides=None,
session_id="test",
ollama_num_ctx=None,
)
assert kwargs["model"] == "nvidia/test-model"
def test_nvidia_messages_passed(self, transport):
profile = get_provider_profile("nvidia")
msgs = _msgs()
kwargs = transport.build_kwargs(
model="nvidia/test",
messages=msgs,
tools=None,
provider_profile=profile,
max_tokens=None,
max_tokens_param_fn=lambda x: {"max_tokens": x} if x else {},
timeout=300,
reasoning_config=None,
request_overrides=None,
session_id="test",
ollama_num_ctx=None,
)
assert kwargs["messages"] == msgs
class TestDeepSeekProfileWiring:
def test_deepseek_no_forced_max_tokens(self, transport):
profile = get_provider_profile("deepseek")
kwargs = transport.build_kwargs(
model="deepseek-chat",
messages=_msgs(),
tools=None,
provider_profile=profile,
max_tokens=None,
max_tokens_param_fn=lambda x: {"max_tokens": x} if x else {},
timeout=300,
reasoning_config=None,
request_overrides=None,
session_id="test",
ollama_num_ctx=None,
)
# DeepSeek has no default_max_tokens
assert kwargs["model"] == "deepseek-chat"
assert kwargs.get("max_tokens") is None or "max_tokens" not in kwargs
def test_deepseek_messages_passed(self, transport):
profile = get_provider_profile("deepseek")
msgs = _msgs()
kwargs = transport.build_kwargs(
model="deepseek-chat",
messages=msgs,
tools=None,
provider_profile=profile,
max_tokens=None,
max_tokens_param_fn=lambda x: {"max_tokens": x} if x else {},
timeout=300,
reasoning_config=None,
request_overrides=None,
session_id="test",
ollama_num_ctx=None,
)
assert kwargs["messages"] == msgs

View File

@@ -0,0 +1,290 @@
"""Profile-path parity tests: verify profile path produces identical output to legacy flags.
Each test calls build_kwargs twice — once with legacy flags, once with provider_profile —
and asserts the output is identical. This catches any behavioral drift between the two paths.
"""
import pytest
from agent.transports.chat_completions import ChatCompletionsTransport
from providers import get_provider_profile
@pytest.fixture
def transport():
return ChatCompletionsTransport()
def _msgs():
return [{"role": "user", "content": "hello"}]
def _max_tokens_fn(n):
return {"max_completion_tokens": n}
class TestNvidiaProfileParity:
def test_max_tokens_match(self, transport):
"""NVIDIA profile sets max_tokens=16384; legacy flag is removed."""
profile = transport.build_kwargs(
model="nvidia/nemotron", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("nvidia"),
max_tokens_param_fn=_max_tokens_fn,
)
assert profile["max_completion_tokens"] == 16384
class TestKimiProfileParity:
def test_temperature_omitted(self, transport):
legacy = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
is_kimi=True, omit_temperature=True,
)
profile = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("kimi"),
)
assert "temperature" not in legacy
assert "temperature" not in profile
def test_max_tokens(self, transport):
legacy = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
is_kimi=True, max_tokens_param_fn=_max_tokens_fn,
)
profile = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("kimi"),
max_tokens_param_fn=_max_tokens_fn,
)
assert profile["max_completion_tokens"] == legacy["max_completion_tokens"] == 32000
def test_thinking_enabled(self, transport):
rc = {"enabled": True, "effort": "high"}
legacy = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
is_kimi=True, reasoning_config=rc,
)
profile = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("kimi"),
reasoning_config=rc,
)
assert profile["extra_body"]["thinking"] == legacy["extra_body"]["thinking"]
assert profile["reasoning_effort"] == legacy["reasoning_effort"] == "high"
def test_thinking_disabled(self, transport):
rc = {"enabled": False}
legacy = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
is_kimi=True, reasoning_config=rc,
)
profile = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("kimi"),
reasoning_config=rc,
)
assert profile["extra_body"]["thinking"] == legacy["extra_body"]["thinking"]
assert profile["extra_body"]["thinking"]["type"] == "disabled"
assert "reasoning_effort" not in profile
assert "reasoning_effort" not in legacy
def test_reasoning_effort_default(self, transport):
rc = {"enabled": True}
legacy = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
is_kimi=True, reasoning_config=rc,
)
profile = transport.build_kwargs(
model="kimi-k2", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("kimi"),
reasoning_config=rc,
)
assert profile["reasoning_effort"] == legacy["reasoning_effort"] == "medium"
class TestOpenRouterProfileParity:
def test_provider_preferences(self, transport):
prefs = {"allow": ["anthropic"]}
legacy = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
is_openrouter=True, provider_preferences=prefs,
)
profile = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("openrouter"),
provider_preferences=prefs,
)
assert profile["extra_body"]["provider"] == legacy["extra_body"]["provider"]
def test_reasoning_full_config(self, transport):
rc = {"enabled": True, "effort": "high"}
legacy = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
is_openrouter=True, supports_reasoning=True, reasoning_config=rc,
)
profile = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("openrouter"),
supports_reasoning=True, reasoning_config=rc,
)
assert profile["extra_body"]["reasoning"] == legacy["extra_body"]["reasoning"]
def test_default_reasoning(self, transport):
legacy = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
is_openrouter=True, supports_reasoning=True,
)
profile = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("openrouter"),
supports_reasoning=True,
)
assert profile["extra_body"]["reasoning"] == legacy["extra_body"]["reasoning"]
class TestNousProfileParity:
def test_tags(self, transport):
legacy = transport.build_kwargs(
model="hermes-3", messages=_msgs(), tools=None, is_nous=True,
)
profile = transport.build_kwargs(
model="hermes-3", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("nous"),
)
assert profile["extra_body"]["tags"] == legacy["extra_body"]["tags"]
def test_reasoning_omitted_when_disabled(self, transport):
rc = {"enabled": False}
legacy = transport.build_kwargs(
model="hermes-3", messages=_msgs(), tools=None,
is_nous=True, supports_reasoning=True, reasoning_config=rc,
)
profile = transport.build_kwargs(
model="hermes-3", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("nous"),
supports_reasoning=True, reasoning_config=rc,
)
assert "reasoning" not in legacy.get("extra_body", {})
assert "reasoning" not in profile.get("extra_body", {})
class TestQwenProfileParity:
def test_max_tokens(self, transport):
legacy = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None,
is_qwen_portal=True, max_tokens_param_fn=_max_tokens_fn,
)
profile = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("qwen"),
max_tokens_param_fn=_max_tokens_fn,
)
assert profile["max_completion_tokens"] == legacy["max_completion_tokens"] == 65536
def test_vl_high_resolution(self, transport):
legacy = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None, is_qwen_portal=True,
)
profile = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("qwen"),
)
assert profile["extra_body"]["vl_high_resolution_images"] == legacy["extra_body"]["vl_high_resolution_images"]
def test_metadata_top_level(self, transport):
meta = {"sessionId": "s123", "promptId": "p456"}
legacy = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None,
is_qwen_portal=True, qwen_session_metadata=meta,
)
profile = transport.build_kwargs(
model="qwen3.5", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("qwen"),
qwen_session_metadata=meta,
)
assert profile["metadata"] == legacy["metadata"] == meta
assert "metadata" not in profile.get("extra_body", {})
def test_message_preprocessing(self, transport):
"""Qwen profile normalizes string content to list-of-parts."""
msgs = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "hello"},
]
profile = transport.build_kwargs(
model="qwen3.5", messages=msgs, tools=None,
provider_profile=get_provider_profile("qwen"),
)
out_msgs = profile["messages"]
# System message content normalized + cache_control injected
assert isinstance(out_msgs[0]["content"], list)
assert out_msgs[0]["content"][0]["type"] == "text"
assert "cache_control" in out_msgs[0]["content"][-1]
# User message content normalized
assert isinstance(out_msgs[1]["content"], list)
assert out_msgs[1]["content"][0] == {"type": "text", "text": "hello"}
class TestDeveloperRoleParity:
"""Developer role swap must work on BOTH legacy and profile paths."""
def test_legacy_path_swaps_for_gpt5(self, transport):
msgs = [{"role": "system", "content": "Be helpful"}, {"role": "user", "content": "hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=msgs, tools=None,
)
assert kw["messages"][0]["role"] == "developer"
def test_profile_path_swaps_for_gpt5(self, transport):
msgs = [{"role": "system", "content": "Be helpful"}, {"role": "user", "content": "hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=msgs, tools=None,
provider_profile=get_provider_profile("openrouter"),
)
assert kw["messages"][0]["role"] == "developer"
def test_profile_path_no_swap_for_claude(self, transport):
msgs = [{"role": "system", "content": "Be helpful"}, {"role": "user", "content": "hi"}]
kw = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6", messages=msgs, tools=None,
provider_profile=get_provider_profile("openrouter"),
)
assert kw["messages"][0]["role"] == "system"
class TestRequestOverridesParity:
"""request_overrides with extra_body must merge identically on both paths."""
def test_extra_body_override_legacy(self, transport):
kw = transport.build_kwargs(
model="gpt-5.4", messages=_msgs(), tools=None,
is_openrouter=True,
request_overrides={"extra_body": {"custom_key": "custom_val"}},
)
assert kw["extra_body"]["custom_key"] == "custom_val"
def test_extra_body_override_profile(self, transport):
kw = transport.build_kwargs(
model="gpt-5.4", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("openrouter"),
request_overrides={"extra_body": {"custom_key": "custom_val"}},
)
assert kw["extra_body"]["custom_key"] == "custom_val"
def test_extra_body_override_merges_with_provider_body(self, transport):
"""Override extra_body merges WITH provider extra_body, not replaces."""
kw = transport.build_kwargs(
model="hermes-3", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("nous"),
request_overrides={"extra_body": {"custom": True}},
)
assert kw["extra_body"]["tags"] == ["product=hermes-agent"] # from profile
assert kw["extra_body"]["custom"] is True # from override
def test_top_level_override(self, transport):
kw = transport.build_kwargs(
model="gpt-5.4", messages=_msgs(), tools=None,
provider_profile=get_provider_profile("openrouter"),
request_overrides={"top_p": 0.9},
)
assert kw["top_p"] == 0.9

View File

@@ -0,0 +1,203 @@
"""Tests for the provider module registry and profiles."""
import pytest
from providers import get_provider_profile, _REGISTRY
from providers.base import ProviderProfile, OMIT_TEMPERATURE
class TestRegistry:
def test_discovery_populates_registry(self):
p = get_provider_profile("nvidia")
assert p is not None
assert p.name == "nvidia"
def test_alias_lookup(self):
assert get_provider_profile("kimi").name == "kimi-coding"
assert get_provider_profile("moonshot").name == "kimi-coding"
assert get_provider_profile("kimi-coding-cn").name == "kimi-coding-cn"
assert get_provider_profile("or").name == "openrouter"
assert get_provider_profile("nous-portal").name == "nous"
assert get_provider_profile("qwen").name == "qwen-oauth"
assert get_provider_profile("qwen-portal").name == "qwen-oauth"
def test_unknown_provider_returns_none(self):
assert get_provider_profile("nonexistent-provider") is None
def test_all_providers_have_name(self):
get_provider_profile("nvidia") # trigger discovery
for name, profile in _REGISTRY.items():
assert profile.name == name
class TestNvidiaProfile:
def test_max_tokens(self):
p = get_provider_profile("nvidia")
assert p.default_max_tokens == 16384
def test_no_special_temperature(self):
p = get_provider_profile("nvidia")
assert p.fixed_temperature is None
def test_base_url(self):
p = get_provider_profile("nvidia")
assert "nvidia.com" in p.base_url
class TestKimiProfile:
def test_temperature_omit(self):
p = get_provider_profile("kimi")
assert p.fixed_temperature is OMIT_TEMPERATURE
def test_max_tokens(self):
p = get_provider_profile("kimi")
assert p.default_max_tokens == 32000
def test_cn_separate_profile(self):
p = get_provider_profile("kimi-coding-cn")
assert p.name == "kimi-coding-cn"
assert p.env_vars == ("KIMI_CN_API_KEY",)
assert "moonshot.cn" in p.base_url
def test_cn_not_alias_of_kimi(self):
kimi = get_provider_profile("kimi-coding")
cn = get_provider_profile("kimi-coding-cn")
assert kimi is not cn
assert kimi.base_url != cn.base_url
def test_thinking_enabled(self):
p = get_provider_profile("kimi")
eb, tl = p.build_api_kwargs_extras(reasoning_config={"enabled": True, "effort": "high"})
assert eb["thinking"] == {"type": "enabled"}
assert tl["reasoning_effort"] == "high"
def test_thinking_disabled(self):
p = get_provider_profile("kimi")
eb, tl = p.build_api_kwargs_extras(reasoning_config={"enabled": False})
assert eb["thinking"] == {"type": "disabled"}
assert "reasoning_effort" not in tl
def test_reasoning_effort_default(self):
p = get_provider_profile("kimi")
eb, tl = p.build_api_kwargs_extras(reasoning_config={"enabled": True})
assert tl["reasoning_effort"] == "medium"
def test_no_config_defaults(self):
p = get_provider_profile("kimi")
eb, tl = p.build_api_kwargs_extras(reasoning_config=None)
assert eb["thinking"] == {"type": "enabled"}
assert tl["reasoning_effort"] == "medium"
class TestOpenRouterProfile:
def test_extra_body_with_prefs(self):
p = get_provider_profile("openrouter")
body = p.build_extra_body(provider_preferences={"allow": ["anthropic"]})
assert body["provider"] == {"allow": ["anthropic"]}
def test_extra_body_no_prefs(self):
p = get_provider_profile("openrouter")
body = p.build_extra_body()
assert body == {}
def test_reasoning_full_config(self):
p = get_provider_profile("openrouter")
eb, _ = p.build_api_kwargs_extras(
reasoning_config={"enabled": True, "effort": "high"},
supports_reasoning=True,
)
assert eb["reasoning"] == {"enabled": True, "effort": "high"}
def test_reasoning_disabled_still_passes(self):
"""OpenRouter passes disabled reasoning through (unlike Nous)."""
p = get_provider_profile("openrouter")
eb, _ = p.build_api_kwargs_extras(
reasoning_config={"enabled": False},
supports_reasoning=True,
)
assert eb["reasoning"] == {"enabled": False}
def test_default_reasoning(self):
p = get_provider_profile("openrouter")
eb, _ = p.build_api_kwargs_extras(supports_reasoning=True)
assert eb["reasoning"] == {"enabled": True, "effort": "medium"}
class TestNousProfile:
def test_tags(self):
p = get_provider_profile("nous")
body = p.build_extra_body()
assert body["tags"] == ["product=hermes-agent"]
def test_auth_type(self):
p = get_provider_profile("nous")
assert p.auth_type == "oauth_device_code"
def test_reasoning_enabled(self):
p = get_provider_profile("nous")
eb, _ = p.build_api_kwargs_extras(
reasoning_config={"enabled": True, "effort": "medium"},
supports_reasoning=True,
)
assert eb["reasoning"] == {"enabled": True, "effort": "medium"}
def test_reasoning_omitted_when_disabled(self):
p = get_provider_profile("nous")
eb, _ = p.build_api_kwargs_extras(
reasoning_config={"enabled": False},
supports_reasoning=True,
)
assert "reasoning" not in eb
class TestQwenProfile:
def test_max_tokens(self):
p = get_provider_profile("qwen-oauth")
assert p.default_max_tokens == 65536
def test_auth_type(self):
p = get_provider_profile("qwen-oauth")
assert p.auth_type == "oauth_external"
def test_extra_body_vl(self):
p = get_provider_profile("qwen-oauth")
body = p.build_extra_body()
assert body["vl_high_resolution_images"] is True
def test_prepare_messages_normalizes_content(self):
p = get_provider_profile("qwen-oauth")
msgs = [
{"role": "system", "content": "Be helpful"},
{"role": "user", "content": "hello"},
]
result = p.prepare_messages(msgs)
# System message: content normalized to list, cache_control on last part
assert isinstance(result[0]["content"], list)
assert result[0]["content"][-1].get("cache_control") == {"type": "ephemeral"}
assert result[0]["content"][-1]["text"] == "Be helpful"
# User message: content normalized to list
assert isinstance(result[1]["content"], list)
assert result[1]["content"][0]["text"] == "hello"
def test_metadata_top_level(self):
p = get_provider_profile("qwen-oauth")
meta = {"sessionId": "s123", "promptId": "p456"}
eb, tl = p.build_api_kwargs_extras(qwen_session_metadata=meta)
assert tl["metadata"] == meta
assert "metadata" not in eb
class TestBaseProfile:
def test_prepare_messages_passthrough(self):
p = ProviderProfile(name="test")
msgs = [{"role": "user", "content": "hi"}]
assert p.prepare_messages(msgs) is msgs
def test_build_extra_body_empty(self):
p = ProviderProfile(name="test")
assert p.build_extra_body() == {}
def test_build_api_kwargs_extras_empty(self):
p = ProviderProfile(name="test")
eb, tl = p.build_api_kwargs_extras()
assert eb == {}
assert tl == {}

View File

@@ -0,0 +1,257 @@
"""Parity tests: pin the exact current transport behavior per provider.
These tests document the flag-based contract between run_agent.py and
ChatCompletionsTransport.build_kwargs(). When the next PR wires profiles
to replace flags, every assertion here must still pass — any failure is
a behavioral regression.
"""
import pytest
from agent.transports.chat_completions import ChatCompletionsTransport
@pytest.fixture
def transport():
return ChatCompletionsTransport()
def _simple_messages():
return [{"role": "user", "content": "hello"}]
def _max_tokens_fn(n):
return {"max_completion_tokens": n}
class TestNvidiaParity:
"""NVIDIA NIM: default max_tokens=16384."""
def test_default_max_tokens(self, transport):
"""NVIDIA default max_tokens=16384 comes from profile, not legacy is_nvidia_nim flag."""
from providers import get_provider_profile
profile = get_provider_profile("nvidia")
kw = transport.build_kwargs(
model="nvidia/llama-3.1-nemotron-70b-instruct",
messages=_simple_messages(),
tools=None,
max_tokens_param_fn=_max_tokens_fn,
provider_profile=profile,
)
assert kw["max_completion_tokens"] == 16384
def test_user_max_tokens_overrides(self, transport):
from providers import get_provider_profile
profile = get_provider_profile("nvidia")
kw = transport.build_kwargs(
model="nvidia/llama-3.1-nemotron-70b-instruct",
messages=_simple_messages(),
tools=None,
max_tokens=4096,
max_tokens_param_fn=_max_tokens_fn,
provider_profile=profile,
)
assert kw["max_completion_tokens"] == 4096 # user overrides default
class TestKimiParity:
"""Kimi: OMIT temperature, max_tokens=32000, thinking + reasoning_effort."""
def test_temperature_omitted(self, transport):
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
omit_temperature=True,
)
assert "temperature" not in kw
def test_default_max_tokens(self, transport):
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
max_tokens_param_fn=_max_tokens_fn,
)
assert kw["max_completion_tokens"] == 32000
def test_thinking_enabled(self, transport):
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
reasoning_config={"enabled": True, "effort": "high"},
)
assert kw["extra_body"]["thinking"] == {"type": "enabled"}
def test_thinking_disabled(self, transport):
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
reasoning_config={"enabled": False},
)
assert kw["extra_body"]["thinking"] == {"type": "disabled"}
def test_reasoning_effort_top_level(self, transport):
"""Kimi reasoning_effort is a TOP-LEVEL api_kwargs key, NOT in extra_body."""
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
reasoning_config={"enabled": True, "effort": "high"},
)
assert kw.get("reasoning_effort") == "high"
assert "reasoning_effort" not in kw.get("extra_body", {})
def test_reasoning_effort_default_medium(self, transport):
kw = transport.build_kwargs(
model="kimi-k2",
messages=_simple_messages(),
tools=None,
is_kimi=True,
reasoning_config={"enabled": True},
)
assert kw.get("reasoning_effort") == "medium"
class TestOpenRouterParity:
"""OpenRouter: provider preferences, reasoning in extra_body."""
def test_provider_preferences(self, transport):
prefs = {"allow": ["anthropic"], "sort": "price"}
kw = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6",
messages=_simple_messages(),
tools=None,
is_openrouter=True,
provider_preferences=prefs,
)
assert kw["extra_body"]["provider"] == prefs
def test_reasoning_passes_full_config(self, transport):
"""OpenRouter passes the FULL reasoning_config dict, not just effort."""
rc = {"enabled": True, "effort": "high"}
kw = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6",
messages=_simple_messages(),
tools=None,
is_openrouter=True,
supports_reasoning=True,
reasoning_config=rc,
)
assert kw["extra_body"]["reasoning"] == rc
def test_default_reasoning_when_no_config(self, transport):
"""When supports_reasoning=True but no config, adds default."""
kw = transport.build_kwargs(
model="anthropic/claude-sonnet-4.6",
messages=_simple_messages(),
tools=None,
is_openrouter=True,
supports_reasoning=True,
)
assert kw["extra_body"]["reasoning"] == {"enabled": True, "effort": "medium"}
class TestNousParity:
"""Nous: product tags, reasoning, omit when disabled."""
def test_tags(self, transport):
kw = transport.build_kwargs(
model="hermes-3-llama-3.1-405b",
messages=_simple_messages(),
tools=None,
is_nous=True,
)
assert kw["extra_body"]["tags"] == ["product=hermes-agent"]
def test_reasoning_omitted_when_disabled(self, transport):
"""Nous special case: reasoning omitted entirely when disabled."""
kw = transport.build_kwargs(
model="hermes-3-llama-3.1-405b",
messages=_simple_messages(),
tools=None,
is_nous=True,
supports_reasoning=True,
reasoning_config={"enabled": False},
)
assert "reasoning" not in kw.get("extra_body", {})
def test_reasoning_enabled(self, transport):
rc = {"enabled": True, "effort": "high"}
kw = transport.build_kwargs(
model="hermes-3-llama-3.1-405b",
messages=_simple_messages(),
tools=None,
is_nous=True,
supports_reasoning=True,
reasoning_config=rc,
)
assert kw["extra_body"]["reasoning"] == rc
class TestQwenParity:
"""Qwen: max_tokens=65536, vl_high_resolution, metadata top-level."""
def test_default_max_tokens(self, transport):
kw = transport.build_kwargs(
model="qwen3.5-plus",
messages=_simple_messages(),
tools=None,
is_qwen_portal=True,
max_tokens_param_fn=_max_tokens_fn,
)
assert kw["max_completion_tokens"] == 65536
def test_vl_high_resolution(self, transport):
kw = transport.build_kwargs(
model="qwen3.5-plus",
messages=_simple_messages(),
tools=None,
is_qwen_portal=True,
)
assert kw["extra_body"]["vl_high_resolution_images"] is True
def test_metadata_top_level(self, transport):
"""Qwen metadata goes to top-level api_kwargs, NOT extra_body."""
meta = {"sessionId": "s123", "promptId": "p456"}
kw = transport.build_kwargs(
model="qwen3.5-plus",
messages=_simple_messages(),
tools=None,
is_qwen_portal=True,
qwen_session_metadata=meta,
)
assert kw["metadata"] == meta
assert "metadata" not in kw.get("extra_body", {})
class TestCustomOllamaParity:
"""Custom/Ollama: num_ctx, think=false."""
def test_ollama_num_ctx(self, transport):
kw = transport.build_kwargs(
model="llama3.1",
messages=_simple_messages(),
tools=None,
is_custom_provider=True,
ollama_num_ctx=131072,
)
assert kw["extra_body"]["options"]["num_ctx"] == 131072
def test_think_false_when_disabled(self, transport):
kw = transport.build_kwargs(
model="qwen3:72b",
messages=_simple_messages(),
tools=None,
is_custom_provider=True,
reasoning_config={"enabled": False, "effort": "none"},
)
assert kw["extra_body"]["think"] is False

View File

@@ -3825,7 +3825,7 @@ def test_aiagent_uses_copilot_acp_client():
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI") as mock_openai,
patch("agent.copilot_acp_client.CopilotACPClient") as mock_acp_client,
patch("acp_adapter.copilot_client.CopilotACPClient") as mock_acp_client,
):
acp_client = MagicMock()
mock_acp_client.return_value = acp_client