mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
3 Commits
fix/plugin
...
feat/nativ
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fba9e02474 | ||
|
|
7aa1d37598 | ||
|
|
cc6d295503 |
@@ -775,6 +775,11 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
|
||||
if model is None:
|
||||
continue # skip provider if we don't know a valid aux model
|
||||
logger.debug("Auxiliary text client: %s (%s) via pool", pconfig.name, model)
|
||||
if provider_id == "gemini":
|
||||
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
|
||||
|
||||
if is_native_gemini_base_url(base_url):
|
||||
return GeminiNativeClient(api_key=api_key, base_url=base_url), model
|
||||
extra = {}
|
||||
if "api.kimi.com" in base_url.lower():
|
||||
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
|
||||
@@ -796,6 +801,11 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
|
||||
if model is None:
|
||||
continue # skip provider if we don't know a valid aux model
|
||||
logger.debug("Auxiliary text client: %s (%s)", pconfig.name, model)
|
||||
if provider_id == "gemini":
|
||||
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
|
||||
|
||||
if is_native_gemini_base_url(base_url):
|
||||
return GeminiNativeClient(api_key=api_key, base_url=base_url), model
|
||||
extra = {}
|
||||
if "api.kimi.com" in base_url.lower():
|
||||
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
|
||||
@@ -1298,6 +1308,13 @@ def _resolve_auto(main_runtime: Optional[Dict[str, Any]] = None) -> Tuple[Option
|
||||
resolved_provider = "custom"
|
||||
explicit_base_url = runtime_base_url
|
||||
explicit_api_key = runtime_api_key or None
|
||||
elif runtime_base_url and main_provider == "gemini":
|
||||
# Preserve explicit Gemini endpoint selection from the live runtime.
|
||||
# If the main agent is intentionally pinned to Google's OpenAI-
|
||||
# compatible route (or another explicit Gemini endpoint), auxiliary
|
||||
# tasks must honor that instead of silently switching transports.
|
||||
explicit_base_url = runtime_base_url
|
||||
explicit_api_key = runtime_api_key or None
|
||||
client, resolved = resolve_provider_client(
|
||||
resolved_provider,
|
||||
main_model,
|
||||
@@ -1348,6 +1365,13 @@ def _to_async_client(sync_client, model: str):
|
||||
return AsyncCodexAuxiliaryClient(sync_client), model
|
||||
if isinstance(sync_client, AnthropicAuxiliaryClient):
|
||||
return AsyncAnthropicAuxiliaryClient(sync_client), model
|
||||
try:
|
||||
from agent.gemini_native_adapter import GeminiNativeClient, AsyncGeminiNativeClient
|
||||
|
||||
if isinstance(sync_client, GeminiNativeClient):
|
||||
return AsyncGeminiNativeClient(sync_client), model
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
from agent.copilot_acp_client import CopilotACPClient
|
||||
if isinstance(sync_client, CopilotACPClient):
|
||||
@@ -1623,7 +1647,7 @@ def resolve_provider_client(
|
||||
return (_to_async_client(client, final_model) if async_mode else (client, final_model))
|
||||
|
||||
creds = resolve_api_key_provider_credentials(provider)
|
||||
api_key = str(creds.get("api_key", "")).strip()
|
||||
api_key = str((explicit_api_key or "")).strip() or str(creds.get("api_key", "")).strip()
|
||||
if not api_key:
|
||||
tried_sources = list(pconfig.api_key_env_vars)
|
||||
if provider == "copilot":
|
||||
@@ -1633,13 +1657,21 @@ def resolve_provider_client(
|
||||
provider, ", ".join(tried_sources))
|
||||
return None, None
|
||||
|
||||
base_url = _to_openai_base_url(
|
||||
str(creds.get("base_url", "")).strip().rstrip("/") or pconfig.inference_base_url
|
||||
)
|
||||
configured_base = str(creds.get("base_url", "")).strip().rstrip("/") or pconfig.inference_base_url
|
||||
base_url = _to_openai_base_url((explicit_base_url or configured_base).strip().rstrip("/"))
|
||||
|
||||
default_model = _API_KEY_PROVIDER_AUX_MODELS.get(provider, "")
|
||||
final_model = _normalize_resolved_model(model or default_model, provider)
|
||||
|
||||
if provider == "gemini":
|
||||
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
|
||||
|
||||
if is_native_gemini_base_url(base_url):
|
||||
client = GeminiNativeClient(api_key=api_key, base_url=base_url)
|
||||
logger.debug("resolve_provider_client: %s (%s)", provider, final_model)
|
||||
return (_to_async_client(client, final_model) if async_mode
|
||||
else (client, final_model))
|
||||
|
||||
# Provider-specific headers
|
||||
headers = {}
|
||||
if "api.kimi.com" in base_url.lower():
|
||||
|
||||
867
agent/gemini_native_adapter.py
Normal file
867
agent/gemini_native_adapter.py
Normal file
@@ -0,0 +1,867 @@
|
||||
"""OpenAI-compatible facade over Google AI Studio's native Gemini API.
|
||||
|
||||
Hermes keeps ``api_mode='chat_completions'`` for the ``gemini`` provider so the
|
||||
main agent loop can keep using its existing OpenAI-shaped message flow.
|
||||
This adapter is the transport shim that converts those OpenAI-style
|
||||
``messages[]`` / ``tools[]`` requests into Gemini's native
|
||||
``models/{model}:generateContent`` schema and converts the responses back.
|
||||
|
||||
Why this exists
|
||||
---------------
|
||||
Google's OpenAI-compatible endpoint has been brittle for Hermes's multi-turn
|
||||
agent/tool loop (auth churn, tool-call replay quirks, thought-signature
|
||||
requirements). The native Gemini API is the canonical path and avoids the
|
||||
OpenAI-compat layer entirely.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from types import SimpleNamespace
|
||||
from typing import Any, Dict, Iterator, List, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
|
||||
|
||||
|
||||
def is_native_gemini_base_url(base_url: str) -> bool:
|
||||
"""Return True when the endpoint speaks Gemini's native REST API."""
|
||||
normalized = str(base_url or "").strip().rstrip("/").lower()
|
||||
if not normalized:
|
||||
return False
|
||||
if "generativelanguage.googleapis.com" not in normalized:
|
||||
return False
|
||||
return not normalized.endswith("/openai")
|
||||
|
||||
|
||||
class GeminiAPIError(Exception):
|
||||
"""Error shape compatible with Hermes retry/error classification."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
code: str = "gemini_api_error",
|
||||
status_code: Optional[int] = None,
|
||||
response: Optional[httpx.Response] = None,
|
||||
retry_after: Optional[float] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.code = code
|
||||
self.status_code = status_code
|
||||
self.response = response
|
||||
self.retry_after = retry_after
|
||||
self.details = details or {}
|
||||
|
||||
|
||||
def _coerce_content_to_text(content: Any) -> str:
|
||||
if content is None:
|
||||
return ""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
pieces: List[str] = []
|
||||
for part in content:
|
||||
if isinstance(part, str):
|
||||
pieces.append(part)
|
||||
elif isinstance(part, dict) and part.get("type") == "text":
|
||||
text = part.get("text")
|
||||
if isinstance(text, str):
|
||||
pieces.append(text)
|
||||
return "\n".join(pieces)
|
||||
return str(content)
|
||||
|
||||
|
||||
def _extract_multimodal_parts(content: Any) -> List[Dict[str, Any]]:
|
||||
if not isinstance(content, list):
|
||||
text = _coerce_content_to_text(content)
|
||||
return [{"text": text}] if text else []
|
||||
|
||||
parts: List[Dict[str, Any]] = []
|
||||
for item in content:
|
||||
if isinstance(item, str):
|
||||
parts.append({"text": item})
|
||||
continue
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
ptype = item.get("type")
|
||||
if ptype == "text":
|
||||
text = item.get("text")
|
||||
if isinstance(text, str) and text:
|
||||
parts.append({"text": text})
|
||||
elif ptype == "image_url":
|
||||
url = ((item.get("image_url") or {}).get("url") or "")
|
||||
if not isinstance(url, str) or not url.startswith("data:"):
|
||||
continue
|
||||
try:
|
||||
header, encoded = url.split(",", 1)
|
||||
mime = header.split(":", 1)[1].split(";", 1)[0]
|
||||
raw = base64.b64decode(encoded)
|
||||
except Exception:
|
||||
continue
|
||||
parts.append(
|
||||
{
|
||||
"inlineData": {
|
||||
"mimeType": mime,
|
||||
"data": base64.b64encode(raw).decode("ascii"),
|
||||
}
|
||||
}
|
||||
)
|
||||
return parts
|
||||
|
||||
|
||||
def _tool_call_extra_signature(tool_call: Dict[str, Any]) -> Optional[str]:
|
||||
extra = tool_call.get("extra_content") or {}
|
||||
if not isinstance(extra, dict):
|
||||
return None
|
||||
google = extra.get("google") or extra.get("thought_signature")
|
||||
if isinstance(google, dict):
|
||||
sig = google.get("thought_signature") or google.get("thoughtSignature")
|
||||
return str(sig) if isinstance(sig, str) and sig else None
|
||||
if isinstance(google, str) and google:
|
||||
return google
|
||||
return None
|
||||
|
||||
|
||||
def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
||||
fn = tool_call.get("function") or {}
|
||||
args_raw = fn.get("arguments", "")
|
||||
try:
|
||||
args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
|
||||
except json.JSONDecodeError:
|
||||
args = {"_raw": args_raw}
|
||||
if not isinstance(args, dict):
|
||||
args = {"_value": args}
|
||||
|
||||
part: Dict[str, Any] = {
|
||||
"functionCall": {
|
||||
"name": str(fn.get("name") or ""),
|
||||
"args": args,
|
||||
}
|
||||
}
|
||||
thought_signature = _tool_call_extra_signature(tool_call)
|
||||
if thought_signature:
|
||||
part["thoughtSignature"] = thought_signature
|
||||
return part
|
||||
|
||||
|
||||
def _translate_tool_result_to_gemini(
|
||||
message: Dict[str, Any],
|
||||
tool_name_by_call_id: Optional[Dict[str, str]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
tool_name_by_call_id = tool_name_by_call_id or {}
|
||||
tool_call_id = str(message.get("tool_call_id") or "")
|
||||
name = str(
|
||||
message.get("name")
|
||||
or tool_name_by_call_id.get(tool_call_id)
|
||||
or tool_call_id
|
||||
or "tool"
|
||||
)
|
||||
content = _coerce_content_to_text(message.get("content"))
|
||||
try:
|
||||
parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
|
||||
except json.JSONDecodeError:
|
||||
parsed = None
|
||||
response = parsed if isinstance(parsed, dict) else {"output": content}
|
||||
return {
|
||||
"functionResponse": {
|
||||
"name": name,
|
||||
"response": response,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _build_gemini_contents(messages: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
|
||||
system_text_parts: List[str] = []
|
||||
contents: List[Dict[str, Any]] = []
|
||||
tool_name_by_call_id: Dict[str, str] = {}
|
||||
|
||||
for msg in messages:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
role = str(msg.get("role") or "user")
|
||||
|
||||
if role == "system":
|
||||
system_text_parts.append(_coerce_content_to_text(msg.get("content")))
|
||||
continue
|
||||
|
||||
if role in {"tool", "function"}:
|
||||
contents.append(
|
||||
{
|
||||
"role": "user",
|
||||
"parts": [
|
||||
_translate_tool_result_to_gemini(
|
||||
msg,
|
||||
tool_name_by_call_id=tool_name_by_call_id,
|
||||
)
|
||||
],
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
gemini_role = "model" if role == "assistant" else "user"
|
||||
parts: List[Dict[str, Any]] = []
|
||||
|
||||
content_parts = _extract_multimodal_parts(msg.get("content"))
|
||||
parts.extend(content_parts)
|
||||
|
||||
tool_calls = msg.get("tool_calls") or []
|
||||
if isinstance(tool_calls, list):
|
||||
for tool_call in tool_calls:
|
||||
if isinstance(tool_call, dict):
|
||||
tool_call_id = str(tool_call.get("id") or tool_call.get("call_id") or "")
|
||||
tool_name = str(((tool_call.get("function") or {}).get("name") or ""))
|
||||
if tool_call_id and tool_name:
|
||||
tool_name_by_call_id[tool_call_id] = tool_name
|
||||
parts.append(_translate_tool_call_to_gemini(tool_call))
|
||||
|
||||
if parts:
|
||||
contents.append({"role": gemini_role, "parts": parts})
|
||||
|
||||
system_instruction = None
|
||||
joined_system = "\n".join(part for part in system_text_parts if part).strip()
|
||||
if joined_system:
|
||||
system_instruction = {"parts": [{"text": joined_system}]}
|
||||
return contents, system_instruction
|
||||
|
||||
|
||||
def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
|
||||
if not isinstance(tools, list):
|
||||
return []
|
||||
declarations: List[Dict[str, Any]] = []
|
||||
for tool in tools:
|
||||
if not isinstance(tool, dict):
|
||||
continue
|
||||
fn = tool.get("function") or {}
|
||||
if not isinstance(fn, dict):
|
||||
continue
|
||||
name = fn.get("name")
|
||||
if not isinstance(name, str) or not name:
|
||||
continue
|
||||
decl: Dict[str, Any] = {"name": name}
|
||||
description = fn.get("description")
|
||||
if isinstance(description, str) and description:
|
||||
decl["description"] = description
|
||||
parameters = fn.get("parameters")
|
||||
if isinstance(parameters, dict):
|
||||
decl["parameters"] = parameters
|
||||
declarations.append(decl)
|
||||
return [{"functionDeclarations": declarations}] if declarations else []
|
||||
|
||||
|
||||
def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
|
||||
if tool_choice is None:
|
||||
return None
|
||||
if isinstance(tool_choice, str):
|
||||
if tool_choice == "auto":
|
||||
return {"functionCallingConfig": {"mode": "AUTO"}}
|
||||
if tool_choice == "required":
|
||||
return {"functionCallingConfig": {"mode": "ANY"}}
|
||||
if tool_choice == "none":
|
||||
return {"functionCallingConfig": {"mode": "NONE"}}
|
||||
if isinstance(tool_choice, dict):
|
||||
fn = tool_choice.get("function") or {}
|
||||
name = fn.get("name")
|
||||
if isinstance(name, str) and name:
|
||||
return {"functionCallingConfig": {"mode": "ANY", "allowedFunctionNames": [name]}}
|
||||
return None
|
||||
|
||||
|
||||
def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
|
||||
if not isinstance(config, dict) or not config:
|
||||
return None
|
||||
budget = config.get("thinkingBudget", config.get("thinking_budget"))
|
||||
include = config.get("includeThoughts", config.get("include_thoughts"))
|
||||
level = config.get("thinkingLevel", config.get("thinking_level"))
|
||||
normalized: Dict[str, Any] = {}
|
||||
if isinstance(budget, (int, float)):
|
||||
normalized["thinkingBudget"] = int(budget)
|
||||
if isinstance(include, bool):
|
||||
normalized["includeThoughts"] = include
|
||||
if isinstance(level, str) and level.strip():
|
||||
normalized["thinkingLevel"] = level.strip().lower()
|
||||
return normalized or None
|
||||
|
||||
|
||||
def build_gemini_request(
|
||||
*,
|
||||
messages: List[Dict[str, Any]],
|
||||
tools: Any = None,
|
||||
tool_choice: Any = None,
|
||||
temperature: Optional[float] = None,
|
||||
max_tokens: Optional[int] = None,
|
||||
top_p: Optional[float] = None,
|
||||
stop: Any = None,
|
||||
thinking_config: Any = None,
|
||||
) -> Dict[str, Any]:
|
||||
contents, system_instruction = _build_gemini_contents(messages)
|
||||
request: Dict[str, Any] = {"contents": contents}
|
||||
if system_instruction:
|
||||
request["systemInstruction"] = system_instruction
|
||||
|
||||
gemini_tools = _translate_tools_to_gemini(tools)
|
||||
if gemini_tools:
|
||||
request["tools"] = gemini_tools
|
||||
|
||||
tool_config = _translate_tool_choice_to_gemini(tool_choice)
|
||||
if tool_config:
|
||||
request["toolConfig"] = tool_config
|
||||
|
||||
generation_config: Dict[str, Any] = {}
|
||||
if temperature is not None:
|
||||
generation_config["temperature"] = temperature
|
||||
if max_tokens is not None:
|
||||
generation_config["maxOutputTokens"] = max_tokens
|
||||
if top_p is not None:
|
||||
generation_config["topP"] = top_p
|
||||
if stop:
|
||||
generation_config["stopSequences"] = stop if isinstance(stop, list) else [str(stop)]
|
||||
normalized_thinking = _normalize_thinking_config(thinking_config)
|
||||
if normalized_thinking:
|
||||
generation_config["thinkingConfig"] = normalized_thinking
|
||||
if generation_config:
|
||||
request["generationConfig"] = generation_config
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def _map_gemini_finish_reason(reason: str) -> str:
|
||||
mapping = {
|
||||
"STOP": "stop",
|
||||
"MAX_TOKENS": "length",
|
||||
"SAFETY": "content_filter",
|
||||
"RECITATION": "content_filter",
|
||||
"OTHER": "stop",
|
||||
}
|
||||
return mapping.get(str(reason or "").upper(), "stop")
|
||||
|
||||
|
||||
def _tool_call_extra_from_part(part: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
sig = part.get("thoughtSignature")
|
||||
if isinstance(sig, str) and sig:
|
||||
return {"google": {"thought_signature": sig}}
|
||||
return None
|
||||
|
||||
|
||||
def _empty_response(model: str) -> SimpleNamespace:
|
||||
message = SimpleNamespace(
|
||||
role="assistant",
|
||||
content="",
|
||||
tool_calls=None,
|
||||
reasoning=None,
|
||||
reasoning_content=None,
|
||||
reasoning_details=None,
|
||||
)
|
||||
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
|
||||
usage = SimpleNamespace(
|
||||
prompt_tokens=0,
|
||||
completion_tokens=0,
|
||||
total_tokens=0,
|
||||
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
|
||||
)
|
||||
return SimpleNamespace(
|
||||
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
||||
object="chat.completion",
|
||||
created=int(time.time()),
|
||||
model=model,
|
||||
choices=[choice],
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
|
||||
def translate_gemini_response(resp: Dict[str, Any], model: str) -> SimpleNamespace:
|
||||
candidates = resp.get("candidates") or []
|
||||
if not isinstance(candidates, list) or not candidates:
|
||||
return _empty_response(model)
|
||||
|
||||
cand = candidates[0] if isinstance(candidates[0], dict) else {}
|
||||
content_obj = cand.get("content") if isinstance(cand, dict) else {}
|
||||
parts = content_obj.get("parts") if isinstance(content_obj, dict) else []
|
||||
|
||||
text_pieces: List[str] = []
|
||||
reasoning_pieces: List[str] = []
|
||||
tool_calls: List[SimpleNamespace] = []
|
||||
|
||||
for index, part in enumerate(parts or []):
|
||||
if not isinstance(part, dict):
|
||||
continue
|
||||
if part.get("thought") is True and isinstance(part.get("text"), str):
|
||||
reasoning_pieces.append(part["text"])
|
||||
continue
|
||||
if isinstance(part.get("text"), str):
|
||||
text_pieces.append(part["text"])
|
||||
continue
|
||||
fc = part.get("functionCall")
|
||||
if isinstance(fc, dict) and fc.get("name"):
|
||||
try:
|
||||
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
|
||||
except (TypeError, ValueError):
|
||||
args_str = "{}"
|
||||
tool_call = SimpleNamespace(
|
||||
id=f"call_{uuid.uuid4().hex[:12]}",
|
||||
type="function",
|
||||
index=index,
|
||||
function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
|
||||
)
|
||||
extra_content = _tool_call_extra_from_part(part)
|
||||
if extra_content:
|
||||
tool_call.extra_content = extra_content
|
||||
tool_calls.append(tool_call)
|
||||
|
||||
finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(str(cand.get("finishReason") or ""))
|
||||
usage_meta = resp.get("usageMetadata") or {}
|
||||
usage = SimpleNamespace(
|
||||
prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
|
||||
completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
|
||||
total_tokens=int(usage_meta.get("totalTokenCount") or 0),
|
||||
prompt_tokens_details=SimpleNamespace(
|
||||
cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
|
||||
),
|
||||
)
|
||||
reasoning = "".join(reasoning_pieces) or None
|
||||
message = SimpleNamespace(
|
||||
role="assistant",
|
||||
content="".join(text_pieces) if text_pieces else None,
|
||||
tool_calls=tool_calls or None,
|
||||
reasoning=reasoning,
|
||||
reasoning_content=reasoning,
|
||||
reasoning_details=None,
|
||||
)
|
||||
choice = SimpleNamespace(index=0, message=message, finish_reason=finish_reason)
|
||||
return SimpleNamespace(
|
||||
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
||||
object="chat.completion",
|
||||
created=int(time.time()),
|
||||
model=model,
|
||||
choices=[choice],
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
|
||||
class _GeminiStreamChunk(SimpleNamespace):
|
||||
pass
|
||||
|
||||
|
||||
def _make_stream_chunk(
|
||||
*,
|
||||
model: str,
|
||||
content: str = "",
|
||||
tool_call_delta: Optional[Dict[str, Any]] = None,
|
||||
finish_reason: Optional[str] = None,
|
||||
reasoning: str = "",
|
||||
) -> _GeminiStreamChunk:
|
||||
delta_kwargs: Dict[str, Any] = {
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": None,
|
||||
"reasoning": None,
|
||||
"reasoning_content": None,
|
||||
}
|
||||
if content:
|
||||
delta_kwargs["content"] = content
|
||||
if tool_call_delta is not None:
|
||||
tool_delta = SimpleNamespace(
|
||||
index=tool_call_delta.get("index", 0),
|
||||
id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
|
||||
type="function",
|
||||
function=SimpleNamespace(
|
||||
name=tool_call_delta.get("name") or "",
|
||||
arguments=tool_call_delta.get("arguments") or "",
|
||||
),
|
||||
)
|
||||
extra_content = tool_call_delta.get("extra_content")
|
||||
if isinstance(extra_content, dict):
|
||||
tool_delta.extra_content = extra_content
|
||||
delta_kwargs["tool_calls"] = [tool_delta]
|
||||
if reasoning:
|
||||
delta_kwargs["reasoning"] = reasoning
|
||||
delta_kwargs["reasoning_content"] = reasoning
|
||||
delta = SimpleNamespace(**delta_kwargs)
|
||||
choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
|
||||
return _GeminiStreamChunk(
|
||||
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
||||
object="chat.completion.chunk",
|
||||
created=int(time.time()),
|
||||
model=model,
|
||||
choices=[choice],
|
||||
usage=None,
|
||||
)
|
||||
|
||||
|
||||
def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
|
||||
buffer = ""
|
||||
data_lines: List[str] = []
|
||||
|
||||
def _flush_event() -> Iterator[Dict[str, Any]]:
|
||||
nonlocal data_lines
|
||||
if not data_lines:
|
||||
return iter(())
|
||||
data = "\n".join(data_lines)
|
||||
data_lines = []
|
||||
if data == "[DONE]":
|
||||
return iter(({"__done__": True},))
|
||||
try:
|
||||
payload = json.loads(data)
|
||||
except json.JSONDecodeError:
|
||||
logger.debug("Non-JSON Gemini SSE event: %s", data[:200])
|
||||
return iter(())
|
||||
if isinstance(payload, dict):
|
||||
return iter((payload,))
|
||||
return iter(())
|
||||
|
||||
for chunk in response.iter_text():
|
||||
if not chunk:
|
||||
continue
|
||||
buffer += chunk
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
line = line.rstrip("\r")
|
||||
if line == "":
|
||||
for payload in _flush_event():
|
||||
if payload.get("__done__"):
|
||||
return
|
||||
yield payload
|
||||
continue
|
||||
if line.startswith(":"):
|
||||
continue
|
||||
if line.startswith("data:"):
|
||||
value = line[5:]
|
||||
if value.startswith(" "):
|
||||
value = value[1:]
|
||||
data_lines.append(value)
|
||||
|
||||
for payload in _flush_event():
|
||||
if payload.get("__done__"):
|
||||
return
|
||||
yield payload
|
||||
|
||||
|
||||
def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
|
||||
candidates = event.get("candidates") or []
|
||||
if not candidates:
|
||||
return []
|
||||
cand = candidates[0] if isinstance(candidates[0], dict) else {}
|
||||
parts = ((cand.get("content") or {}).get("parts") or []) if isinstance(cand, dict) else []
|
||||
chunks: List[_GeminiStreamChunk] = []
|
||||
|
||||
for part_index, part in enumerate(parts):
|
||||
if not isinstance(part, dict):
|
||||
continue
|
||||
if part.get("thought") is True and isinstance(part.get("text"), str):
|
||||
chunks.append(_make_stream_chunk(model=model, reasoning=part["text"]))
|
||||
continue
|
||||
if isinstance(part.get("text"), str) and part["text"]:
|
||||
chunks.append(_make_stream_chunk(model=model, content=part["text"]))
|
||||
fc = part.get("functionCall")
|
||||
if isinstance(fc, dict) and fc.get("name"):
|
||||
name = str(fc["name"])
|
||||
try:
|
||||
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False, sort_keys=True)
|
||||
except (TypeError, ValueError):
|
||||
args_str = "{}"
|
||||
thought_signature = part.get("thoughtSignature") if isinstance(part.get("thoughtSignature"), str) else ""
|
||||
call_key = json.dumps(
|
||||
{
|
||||
"part_index": part_index,
|
||||
"name": name,
|
||||
"thought_signature": thought_signature,
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
slot = tool_call_indices.get(call_key)
|
||||
if slot is None:
|
||||
slot = {
|
||||
"index": len(tool_call_indices),
|
||||
"id": f"call_{uuid.uuid4().hex[:12]}",
|
||||
"last_arguments": "",
|
||||
}
|
||||
tool_call_indices[call_key] = slot
|
||||
emitted_arguments = args_str
|
||||
last_arguments = str(slot.get("last_arguments") or "")
|
||||
if last_arguments:
|
||||
if args_str == last_arguments:
|
||||
emitted_arguments = ""
|
||||
elif args_str.startswith(last_arguments):
|
||||
emitted_arguments = args_str[len(last_arguments):]
|
||||
slot["last_arguments"] = args_str
|
||||
chunks.append(
|
||||
_make_stream_chunk(
|
||||
model=model,
|
||||
tool_call_delta={
|
||||
"index": slot["index"],
|
||||
"id": slot["id"],
|
||||
"name": name,
|
||||
"arguments": emitted_arguments,
|
||||
"extra_content": _tool_call_extra_from_part(part),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
finish_reason_raw = str(cand.get("finishReason") or "")
|
||||
if finish_reason_raw:
|
||||
mapped = "tool_calls" if tool_call_indices else _map_gemini_finish_reason(finish_reason_raw)
|
||||
chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
|
||||
return chunks
|
||||
|
||||
|
||||
def gemini_http_error(response: httpx.Response) -> GeminiAPIError:
|
||||
status = response.status_code
|
||||
body_text = ""
|
||||
body_json: Dict[str, Any] = {}
|
||||
try:
|
||||
body_text = response.text
|
||||
except Exception:
|
||||
body_text = ""
|
||||
if body_text:
|
||||
try:
|
||||
parsed = json.loads(body_text)
|
||||
if isinstance(parsed, dict):
|
||||
body_json = parsed
|
||||
except (ValueError, TypeError):
|
||||
body_json = {}
|
||||
|
||||
err_obj = body_json.get("error") if isinstance(body_json, dict) else None
|
||||
if not isinstance(err_obj, dict):
|
||||
err_obj = {}
|
||||
err_status = str(err_obj.get("status") or "").strip()
|
||||
err_message = str(err_obj.get("message") or "").strip()
|
||||
details_list = err_obj.get("details") if isinstance(err_obj.get("details"), list) else []
|
||||
|
||||
reason = ""
|
||||
retry_after: Optional[float] = None
|
||||
metadata: Dict[str, Any] = {}
|
||||
for detail in details_list:
|
||||
if not isinstance(detail, dict):
|
||||
continue
|
||||
type_url = str(detail.get("@type") or "")
|
||||
if not reason and type_url.endswith("/google.rpc.ErrorInfo"):
|
||||
reason_value = detail.get("reason")
|
||||
if isinstance(reason_value, str):
|
||||
reason = reason_value
|
||||
md = detail.get("metadata")
|
||||
if isinstance(md, dict):
|
||||
metadata = md
|
||||
header_retry = response.headers.get("Retry-After") or response.headers.get("retry-after")
|
||||
if header_retry:
|
||||
try:
|
||||
retry_after = float(header_retry)
|
||||
except (TypeError, ValueError):
|
||||
retry_after = None
|
||||
|
||||
code = f"gemini_http_{status}"
|
||||
if status == 401:
|
||||
code = "gemini_unauthorized"
|
||||
elif status == 429:
|
||||
code = "gemini_rate_limited"
|
||||
elif status == 404:
|
||||
code = "gemini_model_not_found"
|
||||
|
||||
if err_message:
|
||||
message = f"Gemini HTTP {status} ({err_status or 'error'}): {err_message}"
|
||||
else:
|
||||
message = f"Gemini returned HTTP {status}: {body_text[:500]}"
|
||||
|
||||
return GeminiAPIError(
|
||||
message,
|
||||
code=code,
|
||||
status_code=status,
|
||||
response=response,
|
||||
retry_after=retry_after,
|
||||
details={
|
||||
"status": err_status,
|
||||
"reason": reason,
|
||||
"metadata": metadata,
|
||||
"message": err_message,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class _GeminiChatCompletions:
|
||||
def __init__(self, client: "GeminiNativeClient"):
|
||||
self._client = client
|
||||
|
||||
def create(self, **kwargs: Any) -> Any:
|
||||
return self._client._create_chat_completion(**kwargs)
|
||||
|
||||
|
||||
class _AsyncGeminiChatCompletions:
|
||||
def __init__(self, client: "AsyncGeminiNativeClient"):
|
||||
self._client = client
|
||||
|
||||
async def create(self, **kwargs: Any) -> Any:
|
||||
return await self._client._create_chat_completion(**kwargs)
|
||||
|
||||
|
||||
class _GeminiChatNamespace:
|
||||
def __init__(self, client: "GeminiNativeClient"):
|
||||
self.completions = _GeminiChatCompletions(client)
|
||||
|
||||
|
||||
class _AsyncGeminiChatNamespace:
|
||||
def __init__(self, client: "AsyncGeminiNativeClient"):
|
||||
self.completions = _AsyncGeminiChatCompletions(client)
|
||||
|
||||
|
||||
class GeminiNativeClient:
|
||||
"""Minimal OpenAI-SDK-compatible facade over Gemini's native REST API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
base_url: Optional[str] = None,
|
||||
default_headers: Optional[Dict[str, str]] = None,
|
||||
timeout: Any = None,
|
||||
http_client: Optional[httpx.Client] = None,
|
||||
**_: Any,
|
||||
) -> None:
|
||||
self.api_key = api_key
|
||||
normalized_base = (base_url or DEFAULT_GEMINI_BASE_URL).rstrip("/")
|
||||
if normalized_base.endswith("/openai"):
|
||||
normalized_base = normalized_base[: -len("/openai")]
|
||||
self.base_url = normalized_base
|
||||
self._default_headers = dict(default_headers or {})
|
||||
self.chat = _GeminiChatNamespace(self)
|
||||
self.is_closed = False
|
||||
self._http = http_client or httpx.Client(
|
||||
timeout=timeout or httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0)
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
self.is_closed = True
|
||||
try:
|
||||
self._http.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
def _headers(self) -> Dict[str, str]:
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"x-goog-api-key": self.api_key,
|
||||
"User-Agent": "hermes-agent (gemini-native)",
|
||||
}
|
||||
headers.update(self._default_headers)
|
||||
return headers
|
||||
|
||||
@staticmethod
|
||||
def _advance_stream_iterator(iterator: Iterator[_GeminiStreamChunk]) -> tuple[bool, Optional[_GeminiStreamChunk]]:
|
||||
try:
|
||||
return False, next(iterator)
|
||||
except StopIteration:
|
||||
return True, None
|
||||
|
||||
def _create_chat_completion(
|
||||
self,
|
||||
*,
|
||||
model: str = "gemini-2.5-flash",
|
||||
messages: Optional[List[Dict[str, Any]]] = None,
|
||||
stream: bool = False,
|
||||
tools: Any = None,
|
||||
tool_choice: Any = None,
|
||||
temperature: Optional[float] = None,
|
||||
max_tokens: Optional[int] = None,
|
||||
top_p: Optional[float] = None,
|
||||
stop: Any = None,
|
||||
extra_body: Optional[Dict[str, Any]] = None,
|
||||
timeout: Any = None,
|
||||
**_: Any,
|
||||
) -> Any:
|
||||
thinking_config = None
|
||||
if isinstance(extra_body, dict):
|
||||
thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")
|
||||
|
||||
request = build_gemini_request(
|
||||
messages=messages or [],
|
||||
tools=tools,
|
||||
tool_choice=tool_choice,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
top_p=top_p,
|
||||
stop=stop,
|
||||
thinking_config=thinking_config,
|
||||
)
|
||||
|
||||
if stream:
|
||||
return self._stream_completion(model=model, request=request, timeout=timeout)
|
||||
|
||||
url = f"{self.base_url}/models/{model}:generateContent"
|
||||
response = self._http.post(url, json=request, headers=self._headers(), timeout=timeout)
|
||||
if response.status_code != 200:
|
||||
raise gemini_http_error(response)
|
||||
try:
|
||||
payload = response.json()
|
||||
except ValueError as exc:
|
||||
raise GeminiAPIError(
|
||||
f"Invalid JSON from Gemini native API: {exc}",
|
||||
code="gemini_invalid_json",
|
||||
status_code=response.status_code,
|
||||
response=response,
|
||||
) from exc
|
||||
return translate_gemini_response(payload, model=model)
|
||||
|
||||
def _stream_completion(self, *, model: str, request: Dict[str, Any], timeout: Any = None) -> Iterator[_GeminiStreamChunk]:
|
||||
url = f"{self.base_url}/models/{model}:streamGenerateContent?alt=sse"
|
||||
stream_headers = dict(self._headers())
|
||||
stream_headers["Accept"] = "text/event-stream"
|
||||
|
||||
def _generator() -> Iterator[_GeminiStreamChunk]:
|
||||
try:
|
||||
with self._http.stream("POST", url, json=request, headers=stream_headers, timeout=timeout) as response:
|
||||
if response.status_code != 200:
|
||||
response.read()
|
||||
raise gemini_http_error(response)
|
||||
tool_call_indices: Dict[str, Dict[str, Any]] = {}
|
||||
for event in _iter_sse_events(response):
|
||||
for chunk in translate_stream_event(event, model, tool_call_indices):
|
||||
yield chunk
|
||||
except httpx.HTTPError as exc:
|
||||
raise GeminiAPIError(
|
||||
f"Gemini streaming request failed: {exc}",
|
||||
code="gemini_stream_error",
|
||||
) from exc
|
||||
|
||||
return _generator()
|
||||
|
||||
|
||||
class AsyncGeminiNativeClient:
|
||||
"""Async wrapper used by auxiliary_client for native Gemini calls."""
|
||||
|
||||
def __init__(self, sync_client: GeminiNativeClient):
|
||||
self._sync = sync_client
|
||||
self.api_key = sync_client.api_key
|
||||
self.base_url = sync_client.base_url
|
||||
self.chat = _AsyncGeminiChatNamespace(self)
|
||||
|
||||
async def _create_chat_completion(self, **kwargs: Any) -> Any:
|
||||
stream = bool(kwargs.get("stream"))
|
||||
result = await asyncio.to_thread(self._sync.chat.completions.create, **kwargs)
|
||||
if not stream:
|
||||
return result
|
||||
|
||||
async def _async_stream() -> Any:
|
||||
while True:
|
||||
done, chunk = await asyncio.to_thread(self._sync._advance_stream_iterator, result)
|
||||
if done:
|
||||
break
|
||||
yield chunk
|
||||
|
||||
return _async_stream()
|
||||
|
||||
async def close(self) -> None:
|
||||
await asyncio.to_thread(self._sync.close)
|
||||
@@ -151,7 +151,7 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
|
||||
id="gemini",
|
||||
name="Google AI Studio",
|
||||
auth_type="api_key",
|
||||
inference_base_url="https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
inference_base_url="https://generativelanguage.googleapis.com/v1beta",
|
||||
api_key_env_vars=("GOOGLE_API_KEY", "GEMINI_API_KEY"),
|
||||
base_url_env_var="GEMINI_BASE_URL",
|
||||
),
|
||||
|
||||
@@ -552,7 +552,7 @@ CANONICAL_PROVIDERS: list[ProviderEntry] = [
|
||||
ProviderEntry("copilot", "GitHub Copilot", "GitHub Copilot (uses GITHUB_TOKEN or gh auth token)"),
|
||||
ProviderEntry("copilot-acp", "GitHub Copilot ACP", "GitHub Copilot ACP (spawns `copilot --acp --stdio`)"),
|
||||
ProviderEntry("huggingface", "Hugging Face", "Hugging Face Inference Providers (20+ open models)"),
|
||||
ProviderEntry("gemini", "Google AI Studio", "Google AI Studio (Gemini models — OpenAI-compatible endpoint)"),
|
||||
ProviderEntry("gemini", "Google AI Studio", "Google AI Studio (Gemini models — native Gemini API)"),
|
||||
ProviderEntry("google-gemini-cli", "Google Gemini (OAuth)", "Google Gemini via OAuth + Code Assist (free tier supported; no API key needed)"),
|
||||
ProviderEntry("deepseek", "DeepSeek", "DeepSeek (DeepSeek-V3, R1, coder — direct API)"),
|
||||
ProviderEntry("xai", "xAI", "xAI (Grok models — direct API)"),
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
Add a first-class `gemini` provider that authenticates via Google OAuth, using the standard Gemini API (not Cloud Code Assist). Users who have a Google AI subscription or Gemini API access can authenticate through the browser without needing to manually copy API keys.
|
||||
|
||||
## Architecture Decision
|
||||
- **Path A (chosen):** Standard Gemini API at `generativelanguage.googleapis.com/v1beta/openai/`
|
||||
- **Path A (chosen):** Standard Gemini API at `generativelanguage.googleapis.com/v1beta`
|
||||
- **NOT Path B:** Cloud Code Assist (`cloudcode-pa.googleapis.com`) — rate-limited free tier, internal API, account ban risk
|
||||
- Standard `chat_completions` api_mode via OpenAI SDK — no new api_mode needed
|
||||
- Our own OAuth credentials — NOT sharing tokens with Gemini CLI
|
||||
@@ -32,9 +32,9 @@ Add a first-class `gemini` provider that authenticates via Google OAuth, using t
|
||||
- File locking for concurrent access (multiple agent sessions)
|
||||
|
||||
## API Integration
|
||||
- Base URL: `https://generativelanguage.googleapis.com/v1beta/openai/`
|
||||
- Auth: `Authorization: Bearer <access_token>` (passed as `api_key` to OpenAI SDK)
|
||||
- api_mode: `chat_completions` (standard)
|
||||
- Base URL: `https://generativelanguage.googleapis.com/v1beta`
|
||||
- Auth: native Gemini API authentication handled by the provider adapter
|
||||
- api_mode: `chat_completions` (standard facade over native transport)
|
||||
- Models: gemini-2.5-pro, gemini-2.5-flash, gemini-2.0-flash, etc.
|
||||
|
||||
## Files to Create/Modify
|
||||
|
||||
60
run_agent.py
60
run_agent.py
@@ -4653,6 +4653,25 @@ class AIAgent:
|
||||
return bool(getattr(http_client, "is_closed", False))
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _build_keepalive_http_client() -> Any:
|
||||
try:
|
||||
import httpx as _httpx
|
||||
import socket as _socket
|
||||
|
||||
_sock_opts = [(_socket.SOL_SOCKET, _socket.SO_KEEPALIVE, 1)]
|
||||
if hasattr(_socket, "TCP_KEEPIDLE"):
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPIDLE, 30))
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPINTVL, 10))
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPCNT, 3))
|
||||
elif hasattr(_socket, "TCP_KEEPALIVE"):
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPALIVE, 30))
|
||||
return _httpx.Client(
|
||||
transport=_httpx.HTTPTransport(socket_options=_sock_opts),
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _create_openai_client(self, client_kwargs: dict, *, reason: str, shared: bool) -> Any:
|
||||
from agent.auxiliary_client import _validate_base_url, _validate_proxy_env_urls
|
||||
# Treat client_kwargs as read-only. Callers pass self._client_kwargs (or shallow
|
||||
@@ -4693,6 +4712,27 @@ class AIAgent:
|
||||
self._client_log_context(),
|
||||
)
|
||||
return client
|
||||
if self.provider == "gemini":
|
||||
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
|
||||
|
||||
base_url = str(client_kwargs.get("base_url", "") or "")
|
||||
if is_native_gemini_base_url(base_url):
|
||||
safe_kwargs = {
|
||||
k: v for k, v in client_kwargs.items()
|
||||
if k in {"api_key", "base_url", "default_headers", "timeout", "http_client"}
|
||||
}
|
||||
if "http_client" not in safe_kwargs:
|
||||
keepalive_http = self._build_keepalive_http_client()
|
||||
if keepalive_http is not None:
|
||||
safe_kwargs["http_client"] = keepalive_http
|
||||
client = GeminiNativeClient(**safe_kwargs)
|
||||
logger.info(
|
||||
"Gemini native client created (%s, shared=%s) %s",
|
||||
reason,
|
||||
shared,
|
||||
self._client_log_context(),
|
||||
)
|
||||
return client
|
||||
# Inject TCP keepalives so the kernel detects dead provider connections
|
||||
# instead of letting them sit silently in CLOSE-WAIT (#10324). Without
|
||||
# this, a peer that drops mid-stream leaves the socket in a state where
|
||||
@@ -4711,23 +4751,9 @@ class AIAgent:
|
||||
# Tests in ``tests/run_agent/test_create_openai_client_reuse.py`` and
|
||||
# ``tests/run_agent/test_sequential_chats_live.py`` pin this invariant.
|
||||
if "http_client" not in client_kwargs:
|
||||
try:
|
||||
import httpx as _httpx
|
||||
import socket as _socket
|
||||
_sock_opts = [(_socket.SOL_SOCKET, _socket.SO_KEEPALIVE, 1)]
|
||||
if hasattr(_socket, "TCP_KEEPIDLE"):
|
||||
# Linux
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPIDLE, 30))
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPINTVL, 10))
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPCNT, 3))
|
||||
elif hasattr(_socket, "TCP_KEEPALIVE"):
|
||||
# macOS (uses TCP_KEEPALIVE instead of TCP_KEEPIDLE)
|
||||
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPALIVE, 30))
|
||||
client_kwargs["http_client"] = _httpx.Client(
|
||||
transport=_httpx.HTTPTransport(socket_options=_sock_opts),
|
||||
)
|
||||
except Exception:
|
||||
pass # Fall through to default transport if socket opts fail
|
||||
keepalive_http = self._build_keepalive_http_client()
|
||||
if keepalive_http is not None:
|
||||
client_kwargs["http_client"] = keepalive_http
|
||||
client = OpenAI(**client_kwargs)
|
||||
logger.info(
|
||||
"OpenAI client created (%s, shared=%s) %s",
|
||||
|
||||
@@ -909,6 +909,24 @@ class TestStaleBaseUrlWarning:
|
||||
assert not any("OPENAI_BASE_URL is set" in rec.message for rec in caplog.records), \
|
||||
"Should NOT warn when OPENAI_BASE_URL is not set"
|
||||
|
||||
def test_gemini_main_runtime_preserves_explicit_base_url(self, monkeypatch):
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "AIza_ENV")
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_native, \
|
||||
patch("agent.auxiliary_client.OpenAI") as mock_openai:
|
||||
mock_openai.return_value = MagicMock(base_url="https://generativelanguage.googleapis.com/v1beta/openai")
|
||||
client, model = _resolve_auto(main_runtime={
|
||||
"provider": "gemini",
|
||||
"model": "gemini-2.5-flash",
|
||||
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
"api_key": "AIza_RUNTIME",
|
||||
"api_mode": "chat_completions",
|
||||
})
|
||||
|
||||
mock_native.assert_not_called()
|
||||
mock_openai.assert_called_once()
|
||||
assert model == "gemini-2.5-flash"
|
||||
assert str(getattr(client, "base_url", "")) == "https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Anthropic-compatible image block conversion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
289
tests/agent/test_gemini_native_adapter.py
Normal file
289
tests/agent/test_gemini_native_adapter.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""Tests for the native Google AI Studio Gemini adapter."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(self, status_code=200, payload=None, headers=None, text=None):
|
||||
self.status_code = status_code
|
||||
self._payload = payload or {}
|
||||
self.headers = headers or {}
|
||||
self.text = text if text is not None else json.dumps(self._payload)
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
def test_build_native_request_preserves_thought_signature_on_tool_replay():
|
||||
from agent.gemini_native_adapter import build_gemini_request
|
||||
|
||||
request = build_gemini_request(
|
||||
messages=[
|
||||
{"role": "system", "content": "Be helpful."},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "call_1",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "get_weather",
|
||||
"arguments": '{"city": "Paris"}',
|
||||
},
|
||||
"extra_content": {
|
||||
"google": {"thought_signature": "sig-123"}
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
tools=[],
|
||||
tool_choice=None,
|
||||
)
|
||||
|
||||
parts = request["contents"][0]["parts"]
|
||||
assert parts[0]["functionCall"]["name"] == "get_weather"
|
||||
assert parts[0]["thoughtSignature"] == "sig-123"
|
||||
|
||||
|
||||
def test_build_native_request_uses_original_function_name_for_tool_result():
|
||||
from agent.gemini_native_adapter import build_gemini_request
|
||||
|
||||
request = build_gemini_request(
|
||||
messages=[
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "call_1",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "get_weather",
|
||||
"arguments": '{"city": "Paris"}',
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_call_id": "call_1",
|
||||
"content": '{"forecast": "sunny"}',
|
||||
},
|
||||
],
|
||||
tools=[],
|
||||
tool_choice=None,
|
||||
)
|
||||
|
||||
tool_response = request["contents"][1]["parts"][0]["functionResponse"]
|
||||
assert tool_response["name"] == "get_weather"
|
||||
|
||||
|
||||
def test_translate_native_response_surfaces_reasoning_and_tool_calls():
|
||||
from agent.gemini_native_adapter import translate_gemini_response
|
||||
|
||||
payload = {
|
||||
"candidates": [
|
||||
{
|
||||
"content": {
|
||||
"parts": [
|
||||
{"thought": True, "text": "thinking..."},
|
||||
{"functionCall": {"name": "search", "args": {"q": "hermes"}}},
|
||||
]
|
||||
},
|
||||
"finishReason": "STOP",
|
||||
}
|
||||
],
|
||||
"usageMetadata": {
|
||||
"promptTokenCount": 10,
|
||||
"candidatesTokenCount": 5,
|
||||
"totalTokenCount": 15,
|
||||
},
|
||||
}
|
||||
|
||||
response = translate_gemini_response(payload, model="gemini-2.5-flash")
|
||||
choice = response.choices[0]
|
||||
assert choice.finish_reason == "tool_calls"
|
||||
assert choice.message.reasoning == "thinking..."
|
||||
assert choice.message.tool_calls[0].function.name == "search"
|
||||
assert json.loads(choice.message.tool_calls[0].function.arguments) == {"q": "hermes"}
|
||||
|
||||
|
||||
def test_native_client_uses_x_goog_api_key_and_native_models_endpoint(monkeypatch):
|
||||
from agent.gemini_native_adapter import GeminiNativeClient
|
||||
|
||||
recorded = {}
|
||||
|
||||
class DummyHTTP:
|
||||
def post(self, url, json=None, headers=None, timeout=None):
|
||||
recorded["url"] = url
|
||||
recorded["json"] = json
|
||||
recorded["headers"] = headers
|
||||
return DummyResponse(
|
||||
payload={
|
||||
"candidates": [
|
||||
{
|
||||
"content": {"parts": [{"text": "hello"}]},
|
||||
"finishReason": "STOP",
|
||||
}
|
||||
],
|
||||
"usageMetadata": {
|
||||
"promptTokenCount": 1,
|
||||
"candidatesTokenCount": 1,
|
||||
"totalTokenCount": 2,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
def close(self):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr("agent.gemini_native_adapter.httpx.Client", lambda *a, **k: DummyHTTP())
|
||||
|
||||
client = GeminiNativeClient(api_key="AIza-test", base_url="https://generativelanguage.googleapis.com/v1beta")
|
||||
response = client.chat.completions.create(
|
||||
model="gemini-2.5-flash",
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
)
|
||||
|
||||
assert recorded["url"] == "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent"
|
||||
assert recorded["headers"]["x-goog-api-key"] == "AIza-test"
|
||||
assert "Authorization" not in recorded["headers"]
|
||||
assert response.choices[0].message.content == "hello"
|
||||
|
||||
|
||||
def test_native_http_error_keeps_status_and_retry_after():
|
||||
from agent.gemini_native_adapter import gemini_http_error
|
||||
|
||||
response = DummyResponse(
|
||||
status_code=429,
|
||||
headers={"Retry-After": "17"},
|
||||
payload={
|
||||
"error": {
|
||||
"code": 429,
|
||||
"message": "quota exhausted",
|
||||
"status": "RESOURCE_EXHAUSTED",
|
||||
"details": [
|
||||
{
|
||||
"@type": "type.googleapis.com/google.rpc.ErrorInfo",
|
||||
"reason": "RESOURCE_EXHAUSTED",
|
||||
"metadata": {"service": "generativelanguage.googleapis.com"},
|
||||
}
|
||||
],
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
err = gemini_http_error(response)
|
||||
assert getattr(err, "status_code", None) == 429
|
||||
assert getattr(err, "retry_after", None) == 17.0
|
||||
assert "quota exhausted" in str(err)
|
||||
|
||||
|
||||
def test_iter_sse_events_supports_multiline_data_frames():
|
||||
from agent.gemini_native_adapter import _iter_sse_events
|
||||
|
||||
class DummyStreamResponse:
|
||||
def iter_text(self):
|
||||
yield 'data: {"candidates":\n'
|
||||
yield 'data: [{"content":{"parts":[{"text":"hello"}]},"finishReason":"STOP"}]}\n\n'
|
||||
yield 'data: [DONE]\n\n'
|
||||
|
||||
events = list(_iter_sse_events(DummyStreamResponse()))
|
||||
assert len(events) == 1
|
||||
assert events[0]["candidates"][0]["content"]["parts"][0]["text"] == "hello"
|
||||
|
||||
|
||||
def test_native_client_accepts_injected_http_client():
|
||||
from agent.gemini_native_adapter import GeminiNativeClient
|
||||
|
||||
injected = SimpleNamespace(close=lambda: None)
|
||||
client = GeminiNativeClient(api_key="AIza-test", http_client=injected)
|
||||
assert client._http is injected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_native_client_streams_without_requiring_async_iterator_from_sync_client():
|
||||
from agent.gemini_native_adapter import AsyncGeminiNativeClient
|
||||
|
||||
chunk = SimpleNamespace(choices=[SimpleNamespace(delta=SimpleNamespace(content="hi"), finish_reason=None)])
|
||||
sync_stream = iter([chunk])
|
||||
|
||||
def _advance(iterator):
|
||||
try:
|
||||
return False, next(iterator)
|
||||
except StopIteration:
|
||||
return True, None
|
||||
|
||||
sync_client = SimpleNamespace(
|
||||
api_key="AIza-test",
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta",
|
||||
chat=SimpleNamespace(completions=SimpleNamespace(create=lambda **kwargs: sync_stream)),
|
||||
_advance_stream_iterator=_advance,
|
||||
close=lambda: None,
|
||||
)
|
||||
|
||||
async_client = AsyncGeminiNativeClient(sync_client)
|
||||
stream = await async_client.chat.completions.create(stream=True)
|
||||
collected = []
|
||||
async for item in stream:
|
||||
collected.append(item)
|
||||
assert collected == [chunk]
|
||||
|
||||
|
||||
def test_stream_event_translation_emits_tool_call_delta_with_stable_index():
|
||||
from agent.gemini_native_adapter import translate_stream_event
|
||||
|
||||
tool_call_indices = {}
|
||||
event = {
|
||||
"candidates": [
|
||||
{
|
||||
"content": {
|
||||
"parts": [
|
||||
{"functionCall": {"name": "search", "args": {"q": "abc"}}}
|
||||
]
|
||||
},
|
||||
"finishReason": "STOP",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
first = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices=tool_call_indices)
|
||||
second = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices=tool_call_indices)
|
||||
|
||||
assert first[0].choices[0].delta.tool_calls[0].index == 0
|
||||
assert second[0].choices[0].delta.tool_calls[0].index == 0
|
||||
assert first[0].choices[0].delta.tool_calls[0].id == second[0].choices[0].delta.tool_calls[0].id
|
||||
assert first[0].choices[0].delta.tool_calls[0].function.arguments == '{"q": "abc"}'
|
||||
assert second[0].choices[0].delta.tool_calls[0].function.arguments == ""
|
||||
assert first[-1].choices[0].finish_reason == "tool_calls"
|
||||
|
||||
|
||||
def test_stream_event_translation_keeps_identical_calls_in_distinct_parts():
|
||||
from agent.gemini_native_adapter import translate_stream_event
|
||||
|
||||
event = {
|
||||
"candidates": [
|
||||
{
|
||||
"content": {
|
||||
"parts": [
|
||||
{"functionCall": {"name": "search", "args": {"q": "abc"}}},
|
||||
{"functionCall": {"name": "search", "args": {"q": "abc"}}},
|
||||
]
|
||||
},
|
||||
"finishReason": "STOP",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
chunks = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices={})
|
||||
tool_chunks = [chunk for chunk in chunks if chunk.choices[0].delta.tool_calls]
|
||||
assert tool_chunks[0].choices[0].delta.tool_calls[0].index == 0
|
||||
assert tool_chunks[1].choices[0].delta.tool_calls[0].index == 1
|
||||
assert tool_chunks[0].choices[0].delta.tool_calls[0].id != tool_chunks[1].choices[0].delta.tool_calls[0].id
|
||||
@@ -13,7 +13,7 @@ class TestCustomProvidersValidation:
|
||||
issues = validate_config_structure({
|
||||
"custom_providers": {
|
||||
"name": "Generativelanguage.googleapis.com",
|
||||
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
"base_url": "https://generativelanguage.googleapis.com/v1beta",
|
||||
"api_key": "xxx",
|
||||
"model": "models/gemini-2.5-flash",
|
||||
"rate_limit_delay": 2.0,
|
||||
|
||||
@@ -22,7 +22,7 @@ class TestGeminiProviderRegistry:
|
||||
assert pconfig.id == "gemini"
|
||||
assert pconfig.name == "Google AI Studio"
|
||||
assert pconfig.auth_type == "api_key"
|
||||
assert pconfig.inference_base_url == "https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
assert pconfig.inference_base_url == "https://generativelanguage.googleapis.com/v1beta"
|
||||
|
||||
def test_gemini_env_vars(self):
|
||||
pconfig = PROVIDER_REGISTRY["gemini"]
|
||||
@@ -99,7 +99,7 @@ class TestGeminiCredentials:
|
||||
creds = resolve_api_key_provider_credentials("gemini")
|
||||
assert creds["provider"] == "gemini"
|
||||
assert creds["api_key"] == "google-secret"
|
||||
assert creds["base_url"] == "https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
assert creds["base_url"] == "https://generativelanguage.googleapis.com/v1beta"
|
||||
|
||||
def test_resolve_with_gemini_api_key(self, monkeypatch):
|
||||
monkeypatch.setenv("GEMINI_API_KEY", "gemini-secret")
|
||||
@@ -119,7 +119,7 @@ class TestGeminiCredentials:
|
||||
assert result["provider"] == "gemini"
|
||||
assert result["api_mode"] == "chat_completions"
|
||||
assert result["api_key"] == "google-key"
|
||||
assert result["base_url"] == "https://generativelanguage.googleapis.com/v1beta/openai"
|
||||
assert result["base_url"] == "https://generativelanguage.googleapis.com/v1beta"
|
||||
|
||||
|
||||
# ── Model Catalog ──
|
||||
@@ -193,50 +193,89 @@ class TestGeminiAgentInit:
|
||||
importlib.reload(run_agent)
|
||||
|
||||
def test_gemini_agent_uses_chat_completions(self, monkeypatch):
|
||||
"""Gemini falls through to chat_completions — no special elif needed."""
|
||||
"""Gemini still reports chat_completions even though the transport is native."""
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
|
||||
with patch("run_agent.OpenAI") as mock_openai:
|
||||
mock_openai.return_value = MagicMock()
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client:
|
||||
mock_client.return_value = MagicMock()
|
||||
from run_agent import AIAgent
|
||||
agent = AIAgent(
|
||||
model="gemini-2.5-flash",
|
||||
provider="gemini",
|
||||
api_key="test-key",
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta",
|
||||
)
|
||||
assert agent.api_mode == "chat_completions"
|
||||
assert agent.provider == "gemini"
|
||||
|
||||
def test_gemini_uses_bearer_auth(self, monkeypatch):
|
||||
"""Gemini OpenAI-compatible endpoint should receive the real API key."""
|
||||
def test_gemini_agent_uses_native_client(self, monkeypatch):
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
|
||||
real_key = "AIzaSy_REAL_KEY"
|
||||
with patch("run_agent.OpenAI") as mock_openai:
|
||||
mock_openai.return_value = MagicMock()
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
|
||||
patch("run_agent.OpenAI") as mock_openai, \
|
||||
patch("run_agent.ContextCompressor") as mock_compressor:
|
||||
mock_client.return_value = MagicMock()
|
||||
mock_compressor.return_value = MagicMock(context_length=1048576, threshold_tokens=524288)
|
||||
from run_agent import AIAgent
|
||||
AIAgent(
|
||||
model="gemini-2.5-flash",
|
||||
provider="gemini",
|
||||
api_key=real_key,
|
||||
api_key="AIzaSy_REAL_KEY",
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta",
|
||||
)
|
||||
assert mock_client.called
|
||||
mock_openai.assert_not_called()
|
||||
|
||||
def test_gemini_custom_base_url_keeps_openai_client(self, monkeypatch):
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
|
||||
patch("run_agent.OpenAI") as mock_openai, \
|
||||
patch("run_agent.ContextCompressor") as mock_compressor:
|
||||
mock_openai.return_value = MagicMock()
|
||||
mock_compressor.return_value = MagicMock(context_length=128000, threshold_tokens=64000)
|
||||
from run_agent import AIAgent
|
||||
AIAgent(
|
||||
model="gemini-2.5-flash",
|
||||
provider="gemini",
|
||||
api_key="AIzaSy_REAL_KEY",
|
||||
base_url="https://proxy.example.com/v1",
|
||||
)
|
||||
mock_openai.assert_called_once()
|
||||
|
||||
def test_gemini_openai_compat_base_url_keeps_openai_client(self, monkeypatch):
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
|
||||
patch("run_agent.OpenAI") as mock_openai, \
|
||||
patch("run_agent.ContextCompressor") as mock_compressor:
|
||||
mock_openai.return_value = MagicMock()
|
||||
mock_compressor.return_value = MagicMock(context_length=1048576, threshold_tokens=524288)
|
||||
from run_agent import AIAgent
|
||||
AIAgent(
|
||||
model="gemini-2.5-flash",
|
||||
provider="gemini",
|
||||
api_key="AIzaSy_REAL_KEY",
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
)
|
||||
call_kwargs = mock_openai.call_args[1]
|
||||
assert call_kwargs.get("api_key") == real_key
|
||||
headers = call_kwargs.get("default_headers", {})
|
||||
assert "x-goog-api-key" not in headers
|
||||
mock_openai.assert_called_once()
|
||||
|
||||
def test_gemini_resolve_provider_client_auth(self, monkeypatch):
|
||||
"""resolve_provider_client('gemini') should pass the real API key through."""
|
||||
def test_gemini_resolve_provider_client_uses_native_client(self, monkeypatch):
|
||||
"""resolve_provider_client('gemini') should build GeminiNativeClient."""
|
||||
monkeypatch.setenv("GEMINI_API_KEY", "AIzaSy_TEST_KEY")
|
||||
real_key = "AIzaSy_TEST_KEY"
|
||||
with patch("agent.auxiliary_client.OpenAI") as mock_openai:
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
|
||||
patch("agent.auxiliary_client.OpenAI") as mock_openai:
|
||||
mock_client.return_value = MagicMock()
|
||||
from agent.auxiliary_client import resolve_provider_client
|
||||
resolve_provider_client("gemini")
|
||||
assert mock_client.called
|
||||
mock_openai.assert_not_called()
|
||||
|
||||
def test_gemini_resolve_provider_client_keeps_openai_for_non_native_base_url(self, monkeypatch):
|
||||
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_TEST_KEY")
|
||||
monkeypatch.setenv("GEMINI_BASE_URL", "https://proxy.example.com/v1")
|
||||
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
|
||||
patch("agent.auxiliary_client.OpenAI") as mock_openai:
|
||||
mock_openai.return_value = MagicMock()
|
||||
from agent.auxiliary_client import resolve_provider_client
|
||||
resolve_provider_client("gemini")
|
||||
call_kwargs = mock_openai.call_args[1]
|
||||
assert call_kwargs.get("api_key") == real_key
|
||||
headers = call_kwargs.get("default_headers", {})
|
||||
assert "x-goog-api-key" not in headers
|
||||
mock_openai.assert_called_once()
|
||||
|
||||
|
||||
# ── models.dev Integration ──
|
||||
|
||||
@@ -38,6 +38,20 @@ You need at least one way to connect to an LLM. Use `hermes model` to switch pro
|
||||
| **Google Gemini (OAuth)** | `hermes model` → "Google Gemini (OAuth)" (provider: `google-gemini-cli`, free tier supported, browser PKCE login) |
|
||||
| **Custom Endpoint** | `hermes model` → choose "Custom endpoint" (saved in `config.yaml`) |
|
||||
|
||||
:::info Google AI Studio transport
|
||||
The built-in `gemini` provider now uses Google's native Gemini API (`https://generativelanguage.googleapis.com/v1beta`) by default.
|
||||
|
||||
If you specifically want Google's OpenAI-compatible route instead, set an explicit base URL:
|
||||
|
||||
```yaml
|
||||
model:
|
||||
provider: gemini
|
||||
base_url: https://generativelanguage.googleapis.com/v1beta/openai
|
||||
```
|
||||
|
||||
That explicit `base_url` is preserved and Hermes will stay on the OpenAI-compatible transport for both the main runtime and auxiliary tasks.
|
||||
:::
|
||||
|
||||
:::tip Model key alias
|
||||
In the `model:` config section, you can use either `default:` or `model:` as the key name for your model ID. Both `model: { default: my-model }` and `model: { model: my-model }` work identically.
|
||||
:::
|
||||
|
||||
@@ -48,7 +48,7 @@ All variables go in `~/.hermes/.env`. You can also set them with `hermes config
|
||||
| `HF_BASE_URL` | Override Hugging Face base URL (default: `https://router.huggingface.co/v1`) |
|
||||
| `GOOGLE_API_KEY` | Google AI Studio API key ([aistudio.google.com/app/apikey](https://aistudio.google.com/app/apikey)) |
|
||||
| `GEMINI_API_KEY` | Alias for `GOOGLE_API_KEY` |
|
||||
| `GEMINI_BASE_URL` | Override Google AI Studio base URL |
|
||||
| `GEMINI_BASE_URL` | Override Google AI Studio base URL (native default: `https://generativelanguage.googleapis.com/v1beta`; set `/v1beta/openai` for Google's OpenAI-compatible route) |
|
||||
| `HERMES_GEMINI_CLIENT_ID` | OAuth client ID for `google-gemini-cli` PKCE login (optional; defaults to Google's public gemini-cli client) |
|
||||
| `HERMES_GEMINI_CLIENT_SECRET` | OAuth client secret for `google-gemini-cli` (optional) |
|
||||
| `HERMES_GEMINI_PROJECT_ID` | GCP project ID for paid Gemini tiers (free tier auto-provisions) |
|
||||
|
||||
Reference in New Issue
Block a user