Compare commits

...

2 Commits

Author SHA1 Message Date
Teknium
840efe485d fix: cleanup Qwen OAuth provider gaps
- Add HERMES_QWEN_BASE_URL to OPTIONAL_ENV_VARS in config.py (was missing
  despite being referenced in code)
- Remove redundant qwen-oauth entry from _API_KEY_PROVIDER_AUX_MODELS
  (non-aggregator providers use their main model for aux tasks automatically)
2026-04-08 13:39:57 -07:00
kshitijk4poor
1a43ea0ee7 feat(qwen): add Qwen OAuth provider with portal request support
Based on #6079 by @tunamitom with critical fixes and comprehensive tests.

Changes from #6079:
- Fix: sanitization overwrite bug — Qwen message prep now runs AFTER codex
  field sanitization, not before (was silently discarding Qwen transforms)
- Fix: missing try/except AuthError in runtime_provider.py — stale Qwen
  credentials now fall through to next provider on auto-detect
- Fix: 'qwen' alias conflict — bare 'qwen' stays mapped to 'alibaba'
  (DashScope); use 'qwen-portal' or 'qwen-cli' for the OAuth provider
- Fix: hardcoded ['coder-model'] replaced with live API fetch + curated
  fallback list (qwen3-coder-plus, qwen3-coder)
- Fix: extract _is_qwen_portal() helper + _qwen_portal_headers() to replace
  5 inline 'portal.qwen.ai' string checks and share headers between init
  and credential swap
- Fix: add Qwen branch to _apply_client_headers_for_base_url for mid-session
  credential swaps
- Fix: remove suspicious TypeError catch blocks around _prompt_provider_choice
- Fix: handle bare string items in content lists (were silently dropped)
- Fix: remove redundant dict() copies after deepcopy in message prep
- Revert: unrelated ai-gateway test mock removal and model_switch.py comment deletion

New tests (30 test functions):
- _qwen_cli_auth_path, _read_qwen_cli_tokens (success + 3 error paths)
- _save_qwen_cli_tokens (roundtrip, parent creation, permissions)
- _qwen_access_token_is_expiring (5 edge cases: fresh, expired, within skew,
  None, non-numeric)
- _refresh_qwen_cli_tokens (success, preserve old refresh, 4 error paths,
  default expires_in, disk persistence)
- resolve_qwen_runtime_credentials (fresh, auto-refresh, force-refresh,
  missing token, env override)
- get_qwen_auth_status (logged in, not logged in)
- Runtime provider resolution (direct, pool entry, alias)
- _build_api_kwargs (metadata, vl_high_resolution_images, message formatting,
  max_tokens suppression)
2026-04-08 13:38:16 -07:00
16 changed files with 962 additions and 4 deletions

View File

@@ -81,6 +81,14 @@
# HF_TOKEN=
# OPENCODE_GO_BASE_URL=https://opencode.ai/zen/go/v1 # Override default base URL
# =============================================================================
# LLM PROVIDER (Qwen OAuth)
# =============================================================================
# Qwen OAuth reuses your local Qwen CLI login (qwen auth qwen-oauth).
# No API key needed — credentials come from ~/.qwen/oauth_creds.json.
# Optional base URL override:
# HERMES_QWEN_BASE_URL=https://portal.qwen.ai/v1
# =============================================================================
# TOOL API KEYS
# =============================================================================

View File

@@ -26,12 +26,14 @@ _PROVIDER_PREFIXES: frozenset[str] = frozenset({
"openrouter", "nous", "openai-codex", "copilot", "copilot-acp",
"gemini", "zai", "kimi-coding", "minimax", "minimax-cn", "anthropic", "deepseek",
"opencode-zen", "opencode-go", "ai-gateway", "kilocode", "alibaba",
"qwen-oauth",
"custom", "local",
# Common aliases
"google", "google-gemini", "google-ai-studio",
"glm", "z-ai", "z.ai", "zhipu", "github", "github-copilot",
"github-models", "kimi", "moonshot", "claude", "deep-seek",
"opencode", "zen", "go", "vercel", "kilo", "dashscope", "aliyun", "qwen",
"qwen-portal",
})
@@ -187,6 +189,7 @@ _URL_TO_PROVIDER: Dict[str, str] = {
"api.minimax": "minimax",
"dashscope.aliyuncs.com": "alibaba",
"dashscope-intl.aliyuncs.com": "alibaba",
"portal.qwen.ai": "qwen-oauth",
"openrouter.ai": "openrouter",
"generativelanguage.googleapis.com": "gemini",
"inference-api.nousresearch.com": "nous",

View File

@@ -153,6 +153,7 @@ PROVIDER_TO_MODELS_DEV: Dict[str, str] = {
"minimax-cn": "minimax-cn",
"deepseek": "deepseek",
"alibaba": "alibaba",
"qwen-oauth": "alibaba",
"copilot": "github-copilot",
"ai-gateway": "vercel",
"opencode-zen": "opencode",

View File

@@ -67,12 +67,16 @@ DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_QWEN_BASE_URL = "https://portal.qwen.ai/v1"
DEFAULT_GITHUB_MODELS_BASE_URL = "https://api.githubcopilot.com"
DEFAULT_COPILOT_ACP_BASE_URL = "acp://copilot"
DEFAULT_GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai"
CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token"
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
# =============================================================================
@@ -112,6 +116,12 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
auth_type="oauth_external",
inference_base_url=DEFAULT_CODEX_BASE_URL,
),
"qwen-oauth": ProviderConfig(
id="qwen-oauth",
name="Qwen OAuth",
auth_type="oauth_external",
inference_base_url=DEFAULT_QWEN_BASE_URL,
),
"copilot": ProviderConfig(
id="copilot",
name="GitHub Copilot",
@@ -817,6 +827,7 @@ def resolve_provider(
"github-copilot-acp": "copilot-acp", "copilot-acp-agent": "copilot-acp",
"aigateway": "ai-gateway", "vercel": "ai-gateway", "vercel-ai-gateway": "ai-gateway",
"opencode": "opencode-zen", "zen": "opencode-zen",
"qwen-portal": "qwen-oauth", "qwen-cli": "qwen-oauth", "qwen-oauth": "qwen-oauth",
"hf": "huggingface", "hugging-face": "huggingface", "huggingface-hub": "huggingface",
"go": "opencode-go", "opencode-go-sub": "opencode-go",
"kilo": "kilocode", "kilo-code": "kilocode", "kilo-gateway": "kilocode",
@@ -946,6 +957,176 @@ def _codex_access_token_is_expiring(access_token: Any, skew_seconds: int) -> boo
return float(exp) <= (time.time() + max(0, int(skew_seconds)))
def _qwen_cli_auth_path() -> Path:
return Path.home() / ".qwen" / "oauth_creds.json"
def _read_qwen_cli_tokens() -> Dict[str, Any]:
auth_path = _qwen_cli_auth_path()
if not auth_path.exists():
raise AuthError(
"Qwen CLI credentials not found. Run 'qwen auth qwen-oauth' first.",
provider="qwen-oauth",
code="qwen_auth_missing",
)
try:
data = json.loads(auth_path.read_text(encoding="utf-8"))
except Exception as exc:
raise AuthError(
f"Failed to read Qwen CLI credentials from {auth_path}: {exc}",
provider="qwen-oauth",
code="qwen_auth_read_failed",
) from exc
if not isinstance(data, dict):
raise AuthError(
f"Invalid Qwen CLI credentials in {auth_path}.",
provider="qwen-oauth",
code="qwen_auth_invalid",
)
return data
def _save_qwen_cli_tokens(tokens: Dict[str, Any]) -> Path:
auth_path = _qwen_cli_auth_path()
auth_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = auth_path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(tokens, indent=2, sort_keys=True) + "\n", encoding="utf-8")
os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR)
tmp_path.replace(auth_path)
return auth_path
def _qwen_access_token_is_expiring(expiry_date_ms: Any, skew_seconds: int = QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS) -> bool:
try:
expiry_ms = int(expiry_date_ms)
except Exception:
return True
return (time.time() + max(0, int(skew_seconds))) * 1000 >= expiry_ms
def _refresh_qwen_cli_tokens(tokens: Dict[str, Any], timeout_seconds: float = 20.0) -> Dict[str, Any]:
refresh_token = str(tokens.get("refresh_token", "") or "").strip()
if not refresh_token:
raise AuthError(
"Qwen OAuth refresh token missing. Re-run 'qwen auth qwen-oauth'.",
provider="qwen-oauth",
code="qwen_refresh_token_missing",
)
try:
response = httpx.post(
QWEN_OAUTH_TOKEN_URL,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": QWEN_OAUTH_CLIENT_ID,
},
timeout=timeout_seconds,
)
except Exception as exc:
raise AuthError(
f"Qwen OAuth refresh failed: {exc}",
provider="qwen-oauth",
code="qwen_refresh_failed",
) from exc
if response.status_code >= 400:
body = response.text.strip()
raise AuthError(
"Qwen OAuth refresh failed. Re-run 'qwen auth qwen-oauth'."
+ (f" Response: {body}" if body else ""),
provider="qwen-oauth",
code="qwen_refresh_failed",
)
try:
payload = response.json()
except Exception as exc:
raise AuthError(
f"Qwen OAuth refresh returned invalid JSON: {exc}",
provider="qwen-oauth",
code="qwen_refresh_invalid_json",
) from exc
if not isinstance(payload, dict) or not str(payload.get("access_token", "") or "").strip():
raise AuthError(
"Qwen OAuth refresh response missing access_token.",
provider="qwen-oauth",
code="qwen_refresh_invalid_response",
)
expires_in = payload.get("expires_in")
try:
expires_in_seconds = int(expires_in)
except Exception:
expires_in_seconds = 6 * 60 * 60
refreshed = {
"access_token": str(payload.get("access_token", "") or "").strip(),
"refresh_token": str(payload.get("refresh_token", refresh_token) or refresh_token).strip(),
"token_type": str(payload.get("token_type", tokens.get("token_type", "Bearer")) or "Bearer").strip() or "Bearer",
"resource_url": str(payload.get("resource_url", tokens.get("resource_url", "portal.qwen.ai")) or "portal.qwen.ai").strip(),
"expiry_date": int(time.time() * 1000) + max(1, expires_in_seconds) * 1000,
}
_save_qwen_cli_tokens(refreshed)
return refreshed
def resolve_qwen_runtime_credentials(
*,
force_refresh: bool = False,
refresh_if_expiring: bool = True,
refresh_skew_seconds: int = QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
) -> Dict[str, Any]:
tokens = _read_qwen_cli_tokens()
access_token = str(tokens.get("access_token", "") or "").strip()
should_refresh = bool(force_refresh)
if not should_refresh and refresh_if_expiring:
should_refresh = _qwen_access_token_is_expiring(tokens.get("expiry_date"), refresh_skew_seconds)
if should_refresh:
tokens = _refresh_qwen_cli_tokens(tokens)
access_token = str(tokens.get("access_token", "") or "").strip()
if not access_token:
raise AuthError(
"Qwen OAuth access token missing. Re-run 'qwen auth qwen-oauth'.",
provider="qwen-oauth",
code="qwen_access_token_missing",
)
base_url = os.getenv("HERMES_QWEN_BASE_URL", "").strip().rstrip("/") or DEFAULT_QWEN_BASE_URL
return {
"provider": "qwen-oauth",
"base_url": base_url,
"api_key": access_token,
"source": "qwen-cli",
"expires_at_ms": tokens.get("expiry_date"),
"auth_file": str(_qwen_cli_auth_path()),
}
def get_qwen_auth_status() -> Dict[str, Any]:
auth_path = _qwen_cli_auth_path()
try:
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False)
return {
"logged_in": True,
"auth_file": str(auth_path),
"source": creds.get("source"),
"api_key": creds.get("api_key"),
"expires_at_ms": creds.get("expires_at_ms"),
}
except AuthError as exc:
return {
"logged_in": False,
"auth_file": str(auth_path),
"error": str(exc),
}
# =============================================================================
# SSH / remote session detection
# =============================================================================
@@ -2072,6 +2253,8 @@ def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
return get_nous_auth_status()
if target == "openai-codex":
return get_codex_auth_status()
if target == "qwen-oauth":
return get_qwen_auth_status()
if target == "copilot-acp":
return get_external_process_provider_status(target)
# API-key providers

View File

@@ -32,7 +32,7 @@ from hermes_constants import OPENROUTER_BASE_URL
# Providers that support OAuth login in addition to API keys.
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex"}
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex", "qwen-oauth"}
def _get_custom_provider_names() -> list:
@@ -147,7 +147,7 @@ def auth_add_command(args) -> None:
if provider.startswith(CUSTOM_POOL_PREFIX):
requested_type = AUTH_TYPE_API_KEY
else:
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex"} else AUTH_TYPE_API_KEY
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex", "qwen-oauth"} else AUTH_TYPE_API_KEY
pool = load_pool(provider)
@@ -250,6 +250,26 @@ def auth_add_command(args) -> None:
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "qwen-oauth":
creds = auth_mod.resolve_qwen_runtime_credentials(refresh_if_expiring=False)
label = (getattr(args, "label", None) or "").strip() or label_from_token(
creds["api_key"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:qwen_cli",
access_token=creds["api_key"],
base_url=creds.get("base_url"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
raise SystemExit(f"`hermes auth add {provider}` is not implemented for auth type {requested_type} yet.")

View File

@@ -724,6 +724,14 @@ OPTIONAL_ENV_VARS = {
"category": "provider",
"advanced": True,
},
"HERMES_QWEN_BASE_URL": {
"description": "Qwen Portal base URL override (default: https://portal.qwen.ai/v1)",
"prompt": "Qwen Portal base URL (leave empty for default)",
"url": None,
"password": False,
"category": "provider",
"advanced": True,
},
"OPENCODE_ZEN_API_KEY": {
"description": "OpenCode Zen API key (pay-as-you-go access to curated models)",
"prompt": "OpenCode Zen API key",

View File

@@ -918,6 +918,7 @@ def select_provider_and_model(args=None):
"openrouter": "OpenRouter",
"nous": "Nous Portal",
"openai-codex": "OpenAI Codex",
"qwen-oauth": "Qwen OAuth",
"copilot-acp": "GitHub Copilot ACP",
"copilot": "GitHub Copilot",
"anthropic": "Anthropic",
@@ -947,6 +948,7 @@ def select_provider_and_model(args=None):
("openrouter", "OpenRouter (100+ models, pay-per-use)"),
("anthropic", "Anthropic (Claude models — API key or Claude Code)"),
("openai-codex", "OpenAI Codex"),
("qwen-oauth", "Qwen OAuth (reuses local Qwen CLI login)"),
("copilot", "GitHub Copilot (uses GITHUB_TOKEN or gh auth token)"),
("huggingface", "Hugging Face Inference Providers (20+ open models)"),
]
@@ -1043,6 +1045,8 @@ def select_provider_and_model(args=None):
_model_flow_nous(config, current_model, args=args)
elif selected_provider == "openai-codex":
_model_flow_openai_codex(config, current_model)
elif selected_provider == "qwen-oauth":
_model_flow_qwen_oauth(config, current_model)
elif selected_provider == "copilot-acp":
_model_flow_copilot_acp(config, current_model)
elif selected_provider == "copilot":
@@ -1359,6 +1363,56 @@ def _model_flow_openai_codex(config, current_model=""):
_DEFAULT_QWEN_PORTAL_MODELS = [
"qwen3-coder-plus",
"qwen3-coder",
]
def _model_flow_qwen_oauth(_config, current_model=""):
"""Qwen OAuth provider: reuse local Qwen CLI login, then pick model."""
from hermes_cli.auth import (
get_qwen_auth_status,
resolve_qwen_runtime_credentials,
_prompt_model_selection,
_save_model_choice,
_update_config_for_provider,
DEFAULT_QWEN_BASE_URL,
)
from hermes_cli.models import fetch_api_models
status = get_qwen_auth_status()
if not status.get("logged_in"):
print("Not logged into Qwen CLI OAuth.")
print("Run: qwen auth qwen-oauth")
auth_file = status.get("auth_file")
if auth_file:
print(f"Expected credentials file: {auth_file}")
if status.get("error"):
print(f"Error: {status.get('error')}")
return
# Try live model discovery, fall back to curated list.
models = None
try:
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=True)
models = fetch_api_models(creds["api_key"], creds["base_url"])
except Exception:
pass
if not models:
models = list(_DEFAULT_QWEN_PORTAL_MODELS)
default = current_model or (models[0] if models else "qwen3-coder-plus")
selected = _prompt_model_selection(models, current_model=default)
if selected:
_save_model_choice(selected)
_update_config_for_provider("qwen-oauth", DEFAULT_QWEN_BASE_URL)
print(f"Default model set to: {selected} (via Qwen OAuth)")
else:
print("No change.")
def _model_flow_custom(config):
"""Custom endpoint: collect URL, API key, and model name.

View File

@@ -84,6 +84,7 @@ _PASSTHROUGH_PROVIDERS: frozenset[str] = frozenset({
"minimax",
"minimax-cn",
"alibaba",
"qwen-oauth",
"huggingface",
"openai-codex",
"custom",

View File

@@ -483,6 +483,7 @@ _PROVIDER_LABELS = {
"ai-gateway": "AI Gateway",
"kilocode": "Kilo Code",
"alibaba": "Alibaba Cloud (DashScope)",
"qwen-oauth": "Qwen OAuth (Portal)",
"huggingface": "Hugging Face",
"custom": "Custom endpoint",
}
@@ -522,6 +523,7 @@ _PROVIDER_ALIASES = {
"aliyun": "alibaba",
"qwen": "alibaba",
"alibaba-cloud": "alibaba",
"qwen-portal": "qwen-oauth",
"hf": "huggingface",
"hugging-face": "huggingface",
"huggingface-hub": "huggingface",
@@ -767,6 +769,7 @@ def list_available_providers() -> list[dict[str, str]]:
"openrouter", "nous", "openai-codex", "copilot", "copilot-acp",
"gemini", "huggingface",
"zai", "kimi-coding", "minimax", "minimax-cn", "kilocode", "anthropic", "alibaba",
"qwen-oauth",
"opencode-zen", "opencode-go",
"ai-gateway", "deepseek", "custom",
]

View File

@@ -58,6 +58,12 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
auth_type="oauth_external",
base_url_override="https://chatgpt.com/backend-api/codex",
),
"qwen-oauth": HermesOverlay(
transport="openai_chat",
auth_type="oauth_external",
base_url_override="https://portal.qwen.ai/v1",
base_url_env_var="HERMES_QWEN_BASE_URL",
),
"copilot-acp": HermesOverlay(
transport="codex_responses",
auth_type="external_process",

View File

@@ -14,11 +14,13 @@ from agent.credential_pool import CredentialPool, PooledCredential, get_custom_p
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
DEFAULT_QWEN_BASE_URL,
PROVIDER_REGISTRY,
format_auth_error,
resolve_provider,
resolve_nous_runtime_credentials,
resolve_codex_runtime_credentials,
resolve_qwen_runtime_credentials,
resolve_api_key_provider_credentials,
resolve_external_process_provider_credentials,
has_usable_secret,
@@ -148,6 +150,9 @@ def _resolve_runtime_from_pool_entry(
if provider == "openai-codex":
api_mode = "codex_responses"
base_url = base_url or DEFAULT_CODEX_BASE_URL
elif provider == "qwen-oauth":
api_mode = "chat_completions"
base_url = base_url or DEFAULT_QWEN_BASE_URL
elif provider == "anthropic":
api_mode = "anthropic_messages"
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
@@ -691,6 +696,24 @@ def resolve_runtime_provider(
logger.info("Auto-detected Codex provider but credentials failed; "
"falling through to next provider.")
if provider == "qwen-oauth":
try:
creds = resolve_qwen_runtime_credentials()
return {
"provider": "qwen-oauth",
"api_mode": "chat_completions",
"base_url": creds.get("base_url", "").rstrip("/"),
"api_key": creds.get("api_key", ""),
"source": creds.get("source", "qwen-cli"),
"expires_at_ms": creds.get("expires_at_ms"),
"requested_provider": requested_provider,
}
except AuthError:
if requested_provider != "auto":
raise
logger.info("Qwen OAuth credentials failed; "
"falling through to next provider.")
if provider == "copilot-acp":
creds = resolve_external_process_provider_credentials(provider)
return {

View File

@@ -153,12 +153,14 @@ def show_status(args):
print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
try:
from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status, get_qwen_auth_status
nous_status = get_nous_auth_status()
codex_status = get_codex_auth_status()
qwen_status = get_qwen_auth_status()
except Exception:
nous_status = {}
codex_status = {}
qwen_status = {}
nous_logged_in = bool(nous_status.get("logged_in"))
print(
@@ -189,6 +191,21 @@ def show_status(args):
if codex_status.get("error") and not codex_logged_in:
print(f" Error: {codex_status.get('error')}")
qwen_logged_in = bool(qwen_status.get("logged_in"))
print(
f" {'Qwen OAuth':<12} {check_mark(qwen_logged_in)} "
f"{'logged in' if qwen_logged_in else 'not logged in (run: qwen auth qwen-oauth)'}"
)
qwen_auth_file = qwen_status.get("auth_file")
if qwen_auth_file:
print(f" Auth file: {qwen_auth_file}")
qwen_exp = qwen_status.get("expires_at_ms")
if qwen_exp:
from datetime import datetime, timezone
print(f" Access exp: {datetime.fromtimestamp(int(qwen_exp) / 1000, tz=timezone.utc).isoformat()}")
if qwen_status.get("error") and not qwen_logged_in:
print(f" Error: {qwen_status.get('error')}")
# =========================================================================
# Nous Subscription Features
# =========================================================================

View File

@@ -413,6 +413,27 @@ def _strip_budget_warnings_from_history(messages: list) -> None:
# =========================================================================
# =========================================================================
# Qwen Portal headers — mimics QwenCode CLI for portal.qwen.ai compatibility.
# Extracted as a module-level helper so both __init__ and
# _apply_client_headers_for_base_url can share it.
# =========================================================================
_QWEN_CODE_VERSION = "0.14.1"
def _qwen_portal_headers() -> dict:
"""Return default HTTP headers required by Qwen Portal API."""
import platform as _plat
_ua = f"QwenCode/{_QWEN_CODE_VERSION} ({_plat.system().lower()}; {_plat.machine()})"
return {
"User-Agent": _ua,
"X-DashScope-CacheControl": "enable",
"X-DashScope-UserAgent": _ua,
"X-DashScope-AuthType": "qwen-oauth",
}
class AIAgent:
"""
AI Agent with tool calling capabilities.
@@ -756,6 +777,8 @@ class AIAgent:
client_kwargs["default_headers"] = {
"User-Agent": "KimiCLI/1.3",
}
elif "portal.qwen.ai" in effective_base.lower():
client_kwargs["default_headers"] = _qwen_portal_headers()
else:
# No explicit creds — use the centralized provider router
from agent.auxiliary_client import resolve_provider_client
@@ -4080,6 +4103,8 @@ class AIAgent:
self._client_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in normalized:
self._client_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.3"}
elif "portal.qwen.ai" in normalized:
self._client_kwargs["default_headers"] = _qwen_portal_headers()
else:
self._client_kwargs.pop("default_headers", None)
@@ -5226,6 +5251,71 @@ class AIAgent:
base = (getattr(self, "base_url", "") or "").lower()
return "dashscope" in base or "aliyuncs" in base or "opencode.ai/zen/go" in base
def _is_qwen_portal(self) -> bool:
"""Return True when the base URL targets Qwen Portal."""
return "portal.qwen.ai" in self._base_url_lower
def _qwen_prepare_chat_messages(self, api_messages: list) -> list:
prepared = copy.deepcopy(api_messages)
if not prepared:
return prepared
for msg in prepared:
if not isinstance(msg, dict):
continue
content = msg.get("content")
if isinstance(content, str):
msg["content"] = [{"type": "text", "text": content}]
elif isinstance(content, list):
# Normalize: convert bare strings to text dicts, keep dicts as-is.
# deepcopy already created independent copies, no need for dict().
normalized_parts = []
for part in content:
if isinstance(part, str):
normalized_parts.append({"type": "text", "text": part})
elif isinstance(part, dict):
normalized_parts.append(part)
if normalized_parts:
msg["content"] = normalized_parts
# Inject cache_control on the last part of the system message.
for msg in prepared:
if isinstance(msg, dict) and msg.get("role") == "system":
content = msg.get("content")
if isinstance(content, list) and content and isinstance(content[-1], dict):
content[-1]["cache_control"] = {"type": "ephemeral"}
break
return prepared
def _qwen_prepare_chat_messages_inplace(self, messages: list) -> None:
"""In-place variant — mutates an already-copied message list."""
if not messages:
return
for msg in messages:
if not isinstance(msg, dict):
continue
content = msg.get("content")
if isinstance(content, str):
msg["content"] = [{"type": "text", "text": content}]
elif isinstance(content, list):
normalized_parts = []
for part in content:
if isinstance(part, str):
normalized_parts.append({"type": "text", "text": part})
elif isinstance(part, dict):
normalized_parts.append(part)
if normalized_parts:
msg["content"] = normalized_parts
for msg in messages:
if isinstance(msg, dict) and msg.get("role") == "system":
content = msg.get("content")
if isinstance(content, list) and content and isinstance(content[-1], dict):
content[-1]["cache_control"] = {"type": "ephemeral"}
break
def _build_api_kwargs(self, api_messages: list) -> dict:
"""Build the keyword arguments dict for the active API mode."""
if self.api_mode == "anthropic_messages":
@@ -5337,6 +5427,17 @@ class AIAgent:
tool_call.pop("call_id", None)
tool_call.pop("response_item_id", None)
# Qwen portal: normalize content to list-of-dicts, inject cache_control.
# Must run AFTER codex sanitization so we transform the final messages.
# If sanitization already deepcopied, reuse that copy (in-place).
if self._is_qwen_portal():
if sanitized_messages is api_messages:
# No sanitization was done — we need our own copy.
sanitized_messages = self._qwen_prepare_chat_messages(sanitized_messages)
else:
# Already a deepcopy — transform in place to avoid a second deepcopy.
self._qwen_prepare_chat_messages_inplace(sanitized_messages)
# GPT-5 and Codex models respond better to 'developer' than 'system'
# for instruction-following. Swap the role at the API boundary so
# internal message representation stays uniform ("system").
@@ -5369,11 +5470,17 @@ class AIAgent:
"messages": sanitized_messages,
"timeout": float(os.getenv("HERMES_API_TIMEOUT", 1800.0)),
}
if self._is_qwen_portal():
api_kwargs["metadata"] = {
"sessionId": self.session_id or "hermes",
"promptId": str(uuid.uuid4()),
}
if self.tools:
api_kwargs["tools"] = self.tools
if self.max_tokens is not None:
api_kwargs.update(self._max_tokens_param(self.max_tokens))
if not self._is_qwen_portal():
api_kwargs.update(self._max_tokens_param(self.max_tokens))
elif self._is_openrouter_url() and "claude" in (self.model or "").lower():
# OpenRouter translates requests to Anthropic's Messages API,
# which requires max_tokens as a mandatory field. When we omit
@@ -5438,6 +5545,9 @@ class AIAgent:
options["num_ctx"] = self._ollama_num_ctx
extra_body["options"] = options
if self._is_qwen_portal():
extra_body["vl_high_resolution_images"] = True
if extra_body:
api_kwargs["extra_body"] = extra_body

View File

@@ -0,0 +1,399 @@
"""Tests for Qwen OAuth provider authentication (hermes_cli/auth.py).
Covers: _qwen_cli_auth_path, _read_qwen_cli_tokens, _save_qwen_cli_tokens,
_qwen_access_token_is_expiring, _refresh_qwen_cli_tokens,
resolve_qwen_runtime_credentials, get_qwen_auth_status.
"""
import json
import os
import stat
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli.auth import (
AuthError,
DEFAULT_QWEN_BASE_URL,
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
_qwen_cli_auth_path,
_read_qwen_cli_tokens,
_save_qwen_cli_tokens,
_qwen_access_token_is_expiring,
_refresh_qwen_cli_tokens,
resolve_qwen_runtime_credentials,
get_qwen_auth_status,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_qwen_tokens(
access_token="test-access-token",
refresh_token="test-refresh-token",
expiry_date=None,
**extra,
):
"""Create a minimal Qwen CLI OAuth credential dict."""
if expiry_date is None:
# 1 hour from now in milliseconds
expiry_date = int((time.time() + 3600) * 1000)
data = {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expiry_date": expiry_date,
"resource_url": "portal.qwen.ai",
}
data.update(extra)
return data
def _write_qwen_creds(tmp_path, tokens=None):
"""Write tokens to the Qwen CLI credentials file and return the path."""
qwen_dir = tmp_path / ".qwen"
qwen_dir.mkdir(parents=True, exist_ok=True)
creds_path = qwen_dir / "oauth_creds.json"
if tokens is None:
tokens = _make_qwen_tokens()
creds_path.write_text(json.dumps(tokens), encoding="utf-8")
return creds_path
@pytest.fixture()
def qwen_env(tmp_path, monkeypatch):
"""Redirect _qwen_cli_auth_path to tmp_path/.qwen/oauth_creds.json."""
creds_path = tmp_path / ".qwen" / "oauth_creds.json"
monkeypatch.setattr(
"hermes_cli.auth._qwen_cli_auth_path", lambda: creds_path
)
return tmp_path
# ---------------------------------------------------------------------------
# _qwen_cli_auth_path
# ---------------------------------------------------------------------------
def test_qwen_cli_auth_path_returns_expected_location():
path = _qwen_cli_auth_path()
assert path == Path.home() / ".qwen" / "oauth_creds.json"
# ---------------------------------------------------------------------------
# _read_qwen_cli_tokens
# ---------------------------------------------------------------------------
def test_read_qwen_cli_tokens_success(qwen_env):
tokens = _make_qwen_tokens(access_token="my-access")
_write_qwen_creds(qwen_env, tokens)
result = _read_qwen_cli_tokens()
assert result["access_token"] == "my-access"
assert result["refresh_token"] == "test-refresh-token"
def test_read_qwen_cli_tokens_missing_file(qwen_env):
with pytest.raises(AuthError) as exc:
_read_qwen_cli_tokens()
assert exc.value.code == "qwen_auth_missing"
def test_read_qwen_cli_tokens_invalid_json(qwen_env):
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
creds_path.parent.mkdir(parents=True, exist_ok=True)
creds_path.write_text("not json{{{", encoding="utf-8")
with pytest.raises(AuthError) as exc:
_read_qwen_cli_tokens()
assert exc.value.code == "qwen_auth_read_failed"
def test_read_qwen_cli_tokens_non_dict(qwen_env):
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
creds_path.parent.mkdir(parents=True, exist_ok=True)
creds_path.write_text(json.dumps(["a", "b"]), encoding="utf-8")
with pytest.raises(AuthError) as exc:
_read_qwen_cli_tokens()
assert exc.value.code == "qwen_auth_invalid"
# ---------------------------------------------------------------------------
# _save_qwen_cli_tokens
# ---------------------------------------------------------------------------
def test_save_qwen_cli_tokens_roundtrip(qwen_env):
tokens = _make_qwen_tokens(access_token="saved-token")
saved_path = _save_qwen_cli_tokens(tokens)
assert saved_path.exists()
loaded = json.loads(saved_path.read_text(encoding="utf-8"))
assert loaded["access_token"] == "saved-token"
def test_save_qwen_cli_tokens_creates_parent(qwen_env):
tokens = _make_qwen_tokens()
saved_path = _save_qwen_cli_tokens(tokens)
assert saved_path.parent.exists()
def test_save_qwen_cli_tokens_permissions(qwen_env):
tokens = _make_qwen_tokens()
saved_path = _save_qwen_cli_tokens(tokens)
mode = saved_path.stat().st_mode
assert mode & stat.S_IRUSR # owner read
assert mode & stat.S_IWUSR # owner write
assert not (mode & stat.S_IRGRP) # no group read
assert not (mode & stat.S_IROTH) # no other read
# ---------------------------------------------------------------------------
# _qwen_access_token_is_expiring
# ---------------------------------------------------------------------------
def test_expiring_token_not_expired():
# 1 hour from now in milliseconds
future_ms = int((time.time() + 3600) * 1000)
assert not _qwen_access_token_is_expiring(future_ms)
def test_expiring_token_already_expired():
# 1 hour ago in milliseconds
past_ms = int((time.time() - 3600) * 1000)
assert _qwen_access_token_is_expiring(past_ms)
def test_expiring_token_within_skew():
# Just inside the default skew window
near_ms = int((time.time() + QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS - 5) * 1000)
assert _qwen_access_token_is_expiring(near_ms)
def test_expiring_token_none_returns_true():
assert _qwen_access_token_is_expiring(None)
def test_expiring_token_non_numeric_returns_true():
assert _qwen_access_token_is_expiring("not-a-number")
# ---------------------------------------------------------------------------
# _refresh_qwen_cli_tokens
# ---------------------------------------------------------------------------
def test_refresh_qwen_cli_tokens_success(qwen_env):
tokens = _make_qwen_tokens(refresh_token="old-refresh")
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"access_token": "new-access",
"refresh_token": "new-refresh",
"expires_in": 7200,
}
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
result = _refresh_qwen_cli_tokens(tokens)
assert result["access_token"] == "new-access"
assert result["refresh_token"] == "new-refresh"
assert "expiry_date" in result
def test_refresh_qwen_cli_tokens_preserves_old_refresh_if_not_in_response(qwen_env):
tokens = _make_qwen_tokens(refresh_token="keep-me")
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"access_token": "new-access",
# No refresh_token in response — should keep old one
"expires_in": 3600,
}
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
result = _refresh_qwen_cli_tokens(tokens)
assert result["refresh_token"] == "keep-me"
def test_refresh_qwen_cli_tokens_missing_refresh_token():
tokens = {"access_token": "at", "refresh_token": ""}
with pytest.raises(AuthError) as exc:
_refresh_qwen_cli_tokens(tokens)
assert exc.value.code == "qwen_refresh_token_missing"
def test_refresh_qwen_cli_tokens_http_error(qwen_env):
tokens = _make_qwen_tokens()
resp = MagicMock()
resp.status_code = 401
resp.text = "unauthorized"
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
with pytest.raises(AuthError) as exc:
_refresh_qwen_cli_tokens(tokens)
assert exc.value.code == "qwen_refresh_failed"
def test_refresh_qwen_cli_tokens_network_error(qwen_env):
tokens = _make_qwen_tokens()
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.side_effect = ConnectionError("timeout")
with pytest.raises(AuthError) as exc:
_refresh_qwen_cli_tokens(tokens)
assert exc.value.code == "qwen_refresh_failed"
def test_refresh_qwen_cli_tokens_invalid_json_response(qwen_env):
tokens = _make_qwen_tokens()
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = ValueError("bad json")
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
with pytest.raises(AuthError) as exc:
_refresh_qwen_cli_tokens(tokens)
assert exc.value.code == "qwen_refresh_invalid_json"
def test_refresh_qwen_cli_tokens_missing_access_token_in_response(qwen_env):
tokens = _make_qwen_tokens()
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"something": "but no access_token"}
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
with pytest.raises(AuthError) as exc:
_refresh_qwen_cli_tokens(tokens)
assert exc.value.code == "qwen_refresh_invalid_response"
def test_refresh_qwen_cli_tokens_default_expires_in(qwen_env):
"""When expires_in is missing, default to 6 hours."""
tokens = _make_qwen_tokens()
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"access_token": "new"}
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
result = _refresh_qwen_cli_tokens(tokens)
# Verify expiry_date is roughly now + 6h (within 60s tolerance)
expected_ms = int(time.time() * 1000) + 6 * 60 * 60 * 1000
assert abs(result["expiry_date"] - expected_ms) < 60_000
def test_refresh_qwen_cli_tokens_saves_to_disk(qwen_env):
tokens = _make_qwen_tokens()
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"access_token": "disk-check",
"expires_in": 3600,
}
with patch("hermes_cli.auth.httpx") as mock_httpx:
mock_httpx.post.return_value = resp
_refresh_qwen_cli_tokens(tokens)
# Verify it was persisted
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
assert creds_path.exists()
saved = json.loads(creds_path.read_text(encoding="utf-8"))
assert saved["access_token"] == "disk-check"
# ---------------------------------------------------------------------------
# resolve_qwen_runtime_credentials
# ---------------------------------------------------------------------------
def test_resolve_qwen_runtime_credentials_fresh_token(qwen_env):
tokens = _make_qwen_tokens(access_token="fresh-at")
_write_qwen_creds(qwen_env, tokens)
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False)
assert creds["provider"] == "qwen-oauth"
assert creds["api_key"] == "fresh-at"
assert creds["base_url"] == DEFAULT_QWEN_BASE_URL
assert creds["source"] == "qwen-cli"
def test_resolve_qwen_runtime_credentials_triggers_refresh(qwen_env):
# Write an expired token
expired_ms = int((time.time() - 3600) * 1000)
tokens = _make_qwen_tokens(access_token="old", expiry_date=expired_ms)
_write_qwen_creds(qwen_env, tokens)
refreshed = _make_qwen_tokens(access_token="refreshed-at")
with patch(
"hermes_cli.auth._refresh_qwen_cli_tokens", return_value=refreshed
) as mock_refresh:
creds = resolve_qwen_runtime_credentials()
mock_refresh.assert_called_once()
assert creds["api_key"] == "refreshed-at"
def test_resolve_qwen_runtime_credentials_force_refresh(qwen_env):
tokens = _make_qwen_tokens(access_token="old-at")
_write_qwen_creds(qwen_env, tokens)
refreshed = _make_qwen_tokens(access_token="force-refreshed")
with patch(
"hermes_cli.auth._refresh_qwen_cli_tokens", return_value=refreshed
) as mock_refresh:
creds = resolve_qwen_runtime_credentials(force_refresh=True)
mock_refresh.assert_called_once()
assert creds["api_key"] == "force-refreshed"
def test_resolve_qwen_runtime_credentials_missing_access_token(qwen_env):
tokens = _make_qwen_tokens(access_token="")
_write_qwen_creds(qwen_env, tokens)
with pytest.raises(AuthError) as exc:
resolve_qwen_runtime_credentials(refresh_if_expiring=False)
assert exc.value.code == "qwen_access_token_missing"
def test_resolve_qwen_runtime_credentials_base_url_env_override(qwen_env, monkeypatch):
tokens = _make_qwen_tokens(access_token="at")
_write_qwen_creds(qwen_env, tokens)
monkeypatch.setenv("HERMES_QWEN_BASE_URL", "https://custom.qwen.ai/v1")
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False)
assert creds["base_url"] == "https://custom.qwen.ai/v1"
# ---------------------------------------------------------------------------
# get_qwen_auth_status
# ---------------------------------------------------------------------------
def test_get_qwen_auth_status_logged_in(qwen_env):
tokens = _make_qwen_tokens(access_token="status-at")
_write_qwen_creds(qwen_env, tokens)
status = get_qwen_auth_status()
assert status["logged_in"] is True
assert status["api_key"] == "status-at"
def test_get_qwen_auth_status_not_logged_in(qwen_env):
# No credentials file
status = get_qwen_auth_status()
assert status["logged_in"] is False
assert "error" in status

View File

@@ -143,6 +143,82 @@ def test_resolve_runtime_provider_codex(monkeypatch):
assert resolved["requested_provider"] == "openai-codex"
def test_resolve_runtime_provider_qwen_oauth(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "qwen-oauth")
monkeypatch.setattr(
rp,
"resolve_qwen_runtime_credentials",
lambda: {
"provider": "qwen-oauth",
"base_url": "https://portal.qwen.ai/v1",
"api_key": "qwen-token",
"source": "qwen-cli",
"expires_at_ms": 1775640710946,
},
)
resolved = rp.resolve_runtime_provider(requested="qwen-oauth")
assert resolved["provider"] == "qwen-oauth"
assert resolved["api_mode"] == "chat_completions"
assert resolved["base_url"] == "https://portal.qwen.ai/v1"
assert resolved["api_key"] == "qwen-token"
assert resolved["requested_provider"] == "qwen-oauth"
def test_resolve_runtime_provider_uses_qwen_pool_entry(monkeypatch):
class _Entry:
access_token = "pool-qwen-token"
source = "manual:qwen_cli"
base_url = "https://portal.qwen.ai/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "qwen-oauth")
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "qwen-oauth", "default": "coder-model"})
resolved = rp.resolve_runtime_provider(requested="qwen-oauth")
assert resolved["provider"] == "qwen-oauth"
assert resolved["api_mode"] == "chat_completions"
assert resolved["base_url"] == "https://portal.qwen.ai/v1"
assert resolved["api_key"] == "pool-qwen-token"
assert resolved["source"] == "manual:qwen_cli"
def test_resolve_provider_alias_qwen(monkeypatch):
monkeypatch.setattr(rp.auth_mod, "_load_auth_store", lambda: {})
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
assert rp.resolve_provider("qwen-portal") == "qwen-oauth"
assert rp.resolve_provider("qwen-cli") == "qwen-oauth"
def test_qwen_oauth_auto_fallthrough_on_auth_failure(monkeypatch):
"""When requested_provider is 'auto' and Qwen creds fail, fall through."""
from hermes_cli.auth import AuthError
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "qwen-oauth")
monkeypatch.setattr(
rp,
"resolve_qwen_runtime_credentials",
lambda **kw: (_ for _ in ()).throw(AuthError("stale", provider="qwen-oauth", code="qwen_auth_missing")),
)
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENROUTER_API_KEY", "test-or-key")
# Should NOT raise — falls through to OpenRouter
resolved = rp.resolve_runtime_provider(requested="auto")
# The fallthrough means it won't be qwen-oauth
assert resolved["provider"] != "qwen-oauth"
def test_resolve_runtime_provider_ai_gateway(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "ai-gateway")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})

View File

@@ -872,6 +872,52 @@ class TestBuildApiKwargs:
kwargs = agent._build_api_kwargs(messages)
assert kwargs["max_tokens"] == 4096
def test_qwen_portal_formats_messages_and_metadata(self, agent):
agent.base_url = "https://portal.qwen.ai/v1"
agent._base_url_lower = agent.base_url.lower()
agent.session_id = "sess-123"
messages = [
{"role": "system", "content": "You are helpful"},
{"role": "assistant", "content": "Got it"},
{"role": "user", "content": "hi"},
]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["metadata"]["sessionId"] == "sess-123"
assert kwargs["extra_body"]["vl_high_resolution_images"] is True
assert isinstance(kwargs["messages"][0]["content"], list)
assert kwargs["messages"][0]["content"][0]["cache_control"] == {"type": "ephemeral"}
assert kwargs["messages"][2]["content"][0]["text"] == "hi"
def test_qwen_portal_normalizes_bare_string_content_parts(self, agent):
agent.base_url = "https://portal.qwen.ai/v1"
agent._base_url_lower = agent.base_url.lower()
messages = [
{"role": "system", "content": [{"type": "text", "text": "system"}]},
{"role": "user", "content": ["hello", {"type": "text", "text": "world"}]},
]
kwargs = agent._build_api_kwargs(messages)
user_content = kwargs["messages"][1]["content"]
assert user_content[0] == {"type": "text", "text": "hello"}
assert user_content[1] == {"type": "text", "text": "world"}
def test_qwen_portal_no_system_message(self, agent):
agent.base_url = "https://portal.qwen.ai/v1"
agent._base_url_lower = agent.base_url.lower()
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
# Should not crash even without a system message
assert kwargs["messages"][0]["content"][0]["text"] == "hi"
assert "cache_control" not in kwargs["messages"][0]["content"][0]
def test_qwen_portal_omits_max_tokens(self, agent):
agent.base_url = "https://portal.qwen.ai/v1"
agent._base_url_lower = agent.base_url.lower()
agent.max_tokens = 4096
messages = [{"role": "system", "content": "sys"}, {"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "max_tokens" not in kwargs
assert "max_completion_tokens" not in kwargs
class TestBuildAssistantMessage:
def test_basic_message(self, agent):