mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
418 lines
15 KiB
Python
418 lines
15 KiB
Python
|
|
"""Google Code Assist API client — project discovery, onboarding, quota.
|
|||
|
|
|
|||
|
|
The Code Assist API powers Google's official gemini-cli. It sits at
|
|||
|
|
``cloudcode-pa.googleapis.com`` and provides:
|
|||
|
|
|
|||
|
|
- Free tier access (generous daily quota) for personal Google accounts
|
|||
|
|
- Paid tier access via GCP projects with billing / Workspace / Standard / Enterprise
|
|||
|
|
|
|||
|
|
This module handles the control-plane dance needed before inference:
|
|||
|
|
|
|||
|
|
1. ``load_code_assist()`` — probe the user's account to learn what tier they're on
|
|||
|
|
and whether a ``cloudaicompanionProject`` is already assigned.
|
|||
|
|
2. ``onboard_user()`` — if the user hasn't been onboarded yet (new account, fresh
|
|||
|
|
free tier, etc.), call this with the chosen tier + project id. Supports LRO
|
|||
|
|
polling for slow provisioning.
|
|||
|
|
3. ``retrieve_user_quota()`` — fetch the ``buckets[]`` array showing remaining
|
|||
|
|
quota per model, used by the ``/gquota`` slash command.
|
|||
|
|
|
|||
|
|
VPC-SC handling: enterprise accounts under a VPC Service Controls perimeter
|
|||
|
|
will get ``SECURITY_POLICY_VIOLATED`` on ``load_code_assist``. We catch this
|
|||
|
|
and force the account to ``standard-tier`` so the call chain still succeeds.
|
|||
|
|
|
|||
|
|
Derived from opencode-gemini-auth (MIT) and clawdbot/extensions/google. The
|
|||
|
|
request/response shapes are specific to Google's internal Code Assist API,
|
|||
|
|
documented nowhere public — we copy them from the reference implementations.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import logging
|
|||
|
|
import os
|
|||
|
|
import time
|
|||
|
|
import urllib.error
|
|||
|
|
import urllib.parse
|
|||
|
|
import urllib.request
|
|||
|
|
import uuid
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from typing import Any, Dict, List, Optional
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Constants
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
CODE_ASSIST_ENDPOINT = "https://cloudcode-pa.googleapis.com"
|
|||
|
|
|
|||
|
|
# Fallback endpoints tried when prod returns an error during project discovery
|
|||
|
|
FALLBACK_ENDPOINTS = [
|
|||
|
|
"https://daily-cloudcode-pa.sandbox.googleapis.com",
|
|||
|
|
"https://autopush-cloudcode-pa.sandbox.googleapis.com",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# Tier identifiers that Google's API uses
|
|||
|
|
FREE_TIER_ID = "free-tier"
|
|||
|
|
LEGACY_TIER_ID = "legacy-tier"
|
|||
|
|
STANDARD_TIER_ID = "standard-tier"
|
|||
|
|
|
|||
|
|
# Default HTTP headers matching gemini-cli's fingerprint.
|
|||
|
|
# Google may reject unrecognized User-Agents on these internal endpoints.
|
|||
|
|
_GEMINI_CLI_USER_AGENT = "google-api-nodejs-client/9.15.1 (gzip)"
|
|||
|
|
_X_GOOG_API_CLIENT = "gl-node/24.0.0"
|
|||
|
|
_DEFAULT_REQUEST_TIMEOUT = 30.0
|
|||
|
|
_ONBOARDING_POLL_ATTEMPTS = 12
|
|||
|
|
_ONBOARDING_POLL_INTERVAL_SECONDS = 5.0
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CodeAssistError(RuntimeError):
|
|||
|
|
def __init__(self, message: str, *, code: str = "code_assist_error") -> None:
|
|||
|
|
super().__init__(message)
|
|||
|
|
self.code = code
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ProjectIdRequiredError(CodeAssistError):
|
|||
|
|
def __init__(self, message: str = "GCP project id required for this tier") -> None:
|
|||
|
|
super().__init__(message, code="code_assist_project_id_required")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# HTTP primitive (auth via Bearer token passed per-call)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
def _build_headers(access_token: str, *, user_agent_model: str = "") -> Dict[str, str]:
|
|||
|
|
ua = _GEMINI_CLI_USER_AGENT
|
|||
|
|
if user_agent_model:
|
|||
|
|
ua = f"{ua} model/{user_agent_model}"
|
|||
|
|
return {
|
|||
|
|
"Content-Type": "application/json",
|
|||
|
|
"Accept": "application/json",
|
|||
|
|
"Authorization": f"Bearer {access_token}",
|
|||
|
|
"User-Agent": ua,
|
|||
|
|
"X-Goog-Api-Client": _X_GOOG_API_CLIENT,
|
|||
|
|
"x-activity-request-id": str(uuid.uuid4()),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _client_metadata() -> Dict[str, str]:
|
|||
|
|
"""Match Google's gemini-cli exactly — unrecognized metadata may be rejected."""
|
|||
|
|
return {
|
|||
|
|
"ideType": "IDE_UNSPECIFIED",
|
|||
|
|
"platform": "PLATFORM_UNSPECIFIED",
|
|||
|
|
"pluginType": "GEMINI",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _post_json(
|
|||
|
|
url: str,
|
|||
|
|
body: Dict[str, Any],
|
|||
|
|
access_token: str,
|
|||
|
|
*,
|
|||
|
|
timeout: float = _DEFAULT_REQUEST_TIMEOUT,
|
|||
|
|
user_agent_model: str = "",
|
|||
|
|
) -> Dict[str, Any]:
|
|||
|
|
data = json.dumps(body).encode("utf-8")
|
|||
|
|
request = urllib.request.Request(
|
|||
|
|
url, data=data, method="POST",
|
|||
|
|
headers=_build_headers(access_token, user_agent_model=user_agent_model),
|
|||
|
|
)
|
|||
|
|
try:
|
|||
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|||
|
|
raw = response.read().decode("utf-8", errors="replace")
|
|||
|
|
return json.loads(raw) if raw else {}
|
|||
|
|
except urllib.error.HTTPError as exc:
|
|||
|
|
detail = ""
|
|||
|
|
try:
|
|||
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
# Special case: VPC-SC violation should be distinguishable
|
|||
|
|
if _is_vpc_sc_violation(detail):
|
|||
|
|
raise CodeAssistError(
|
|||
|
|
f"VPC-SC policy violation: {detail}",
|
|||
|
|
code="code_assist_vpc_sc",
|
|||
|
|
) from exc
|
|||
|
|
raise CodeAssistError(
|
|||
|
|
f"Code Assist HTTP {exc.code}: {detail or exc.reason}",
|
|||
|
|
code=f"code_assist_http_{exc.code}",
|
|||
|
|
) from exc
|
|||
|
|
except urllib.error.URLError as exc:
|
|||
|
|
raise CodeAssistError(
|
|||
|
|
f"Code Assist request failed: {exc}",
|
|||
|
|
code="code_assist_network_error",
|
|||
|
|
) from exc
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _is_vpc_sc_violation(body: str) -> bool:
|
|||
|
|
"""Detect a VPC Service Controls violation from a response body."""
|
|||
|
|
if not body:
|
|||
|
|
return False
|
|||
|
|
try:
|
|||
|
|
parsed = json.loads(body)
|
|||
|
|
except (json.JSONDecodeError, ValueError):
|
|||
|
|
return "SECURITY_POLICY_VIOLATED" in body
|
|||
|
|
# Walk the nested error structure Google uses
|
|||
|
|
error = parsed.get("error") if isinstance(parsed, dict) else None
|
|||
|
|
if not isinstance(error, dict):
|
|||
|
|
return False
|
|||
|
|
details = error.get("details") or []
|
|||
|
|
if isinstance(details, list):
|
|||
|
|
for item in details:
|
|||
|
|
if isinstance(item, dict):
|
|||
|
|
reason = item.get("reason") or ""
|
|||
|
|
if reason == "SECURITY_POLICY_VIOLATED":
|
|||
|
|
return True
|
|||
|
|
msg = str(error.get("message", ""))
|
|||
|
|
return "SECURITY_POLICY_VIOLATED" in msg
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# load_code_assist — discovers current tier + assigned project
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class CodeAssistProjectInfo:
|
|||
|
|
"""Result from ``load_code_assist``."""
|
|||
|
|
current_tier_id: str = ""
|
|||
|
|
cloudaicompanion_project: str = "" # Google-managed project (free tier)
|
|||
|
|
allowed_tiers: List[str] = field(default_factory=list)
|
|||
|
|
raw: Dict[str, Any] = field(default_factory=dict)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_code_assist(
|
|||
|
|
access_token: str,
|
|||
|
|
*,
|
|||
|
|
project_id: str = "",
|
|||
|
|
user_agent_model: str = "",
|
|||
|
|
) -> CodeAssistProjectInfo:
|
|||
|
|
"""Call ``POST /v1internal:loadCodeAssist`` with prod → sandbox fallback.
|
|||
|
|
|
|||
|
|
Returns whatever tier + project info Google reports. On VPC-SC violations,
|
|||
|
|
returns a synthetic ``standard-tier`` result so the chain can continue.
|
|||
|
|
"""
|
|||
|
|
body: Dict[str, Any] = {
|
|||
|
|
"metadata": {
|
|||
|
|
"duetProject": project_id,
|
|||
|
|
**_client_metadata(),
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
if project_id:
|
|||
|
|
body["cloudaicompanionProject"] = project_id
|
|||
|
|
|
|||
|
|
endpoints = [CODE_ASSIST_ENDPOINT] + FALLBACK_ENDPOINTS
|
|||
|
|
last_err: Optional[Exception] = None
|
|||
|
|
for endpoint in endpoints:
|
|||
|
|
url = f"{endpoint}/v1internal:loadCodeAssist"
|
|||
|
|
try:
|
|||
|
|
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
|
|||
|
|
return _parse_load_response(resp)
|
|||
|
|
except CodeAssistError as exc:
|
|||
|
|
if exc.code == "code_assist_vpc_sc":
|
|||
|
|
logger.info("VPC-SC violation on %s — defaulting to standard-tier", endpoint)
|
|||
|
|
return CodeAssistProjectInfo(
|
|||
|
|
current_tier_id=STANDARD_TIER_ID,
|
|||
|
|
cloudaicompanion_project=project_id,
|
|||
|
|
)
|
|||
|
|
last_err = exc
|
|||
|
|
logger.warning("loadCodeAssist failed on %s: %s", endpoint, exc)
|
|||
|
|
continue
|
|||
|
|
if last_err:
|
|||
|
|
raise last_err
|
|||
|
|
return CodeAssistProjectInfo()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _parse_load_response(resp: Dict[str, Any]) -> CodeAssistProjectInfo:
|
|||
|
|
current_tier = resp.get("currentTier") or {}
|
|||
|
|
tier_id = str(current_tier.get("id") or "") if isinstance(current_tier, dict) else ""
|
|||
|
|
project = str(resp.get("cloudaicompanionProject") or "")
|
|||
|
|
allowed = resp.get("allowedTiers") or []
|
|||
|
|
allowed_ids: List[str] = []
|
|||
|
|
if isinstance(allowed, list):
|
|||
|
|
for t in allowed:
|
|||
|
|
if isinstance(t, dict):
|
|||
|
|
tid = str(t.get("id") or "")
|
|||
|
|
if tid:
|
|||
|
|
allowed_ids.append(tid)
|
|||
|
|
return CodeAssistProjectInfo(
|
|||
|
|
current_tier_id=tier_id,
|
|||
|
|
cloudaicompanion_project=project,
|
|||
|
|
allowed_tiers=allowed_ids,
|
|||
|
|
raw=resp,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# onboard_user — provisions a new user on a tier (with LRO polling)
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
def onboard_user(
|
|||
|
|
access_token: str,
|
|||
|
|
*,
|
|||
|
|
tier_id: str,
|
|||
|
|
project_id: str = "",
|
|||
|
|
user_agent_model: str = "",
|
|||
|
|
) -> Dict[str, Any]:
|
|||
|
|
"""Call ``POST /v1internal:onboardUser`` to provision the user.
|
|||
|
|
|
|||
|
|
For paid tiers, ``project_id`` is REQUIRED (raises ProjectIdRequiredError).
|
|||
|
|
For free tiers, ``project_id`` is optional — Google will assign one.
|
|||
|
|
|
|||
|
|
Returns the final operation response. Polls ``/v1internal/<name>`` for up
|
|||
|
|
to ``_ONBOARDING_POLL_ATTEMPTS`` × ``_ONBOARDING_POLL_INTERVAL_SECONDS``
|
|||
|
|
(default: 12 × 5s = 1 min).
|
|||
|
|
"""
|
|||
|
|
if tier_id != FREE_TIER_ID and tier_id != LEGACY_TIER_ID and not project_id:
|
|||
|
|
raise ProjectIdRequiredError(
|
|||
|
|
f"Tier {tier_id!r} requires a GCP project id. "
|
|||
|
|
"Set HERMES_GEMINI_PROJECT_ID or GOOGLE_CLOUD_PROJECT."
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
body: Dict[str, Any] = {
|
|||
|
|
"tierId": tier_id,
|
|||
|
|
"metadata": _client_metadata(),
|
|||
|
|
}
|
|||
|
|
if project_id:
|
|||
|
|
body["cloudaicompanionProject"] = project_id
|
|||
|
|
|
|||
|
|
endpoint = CODE_ASSIST_ENDPOINT
|
|||
|
|
url = f"{endpoint}/v1internal:onboardUser"
|
|||
|
|
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
|
|||
|
|
|
|||
|
|
# Poll if LRO (long-running operation)
|
|||
|
|
if not resp.get("done"):
|
|||
|
|
op_name = resp.get("name", "")
|
|||
|
|
if not op_name:
|
|||
|
|
return resp
|
|||
|
|
for attempt in range(_ONBOARDING_POLL_ATTEMPTS):
|
|||
|
|
time.sleep(_ONBOARDING_POLL_INTERVAL_SECONDS)
|
|||
|
|
poll_url = f"{endpoint}/v1internal/{op_name}"
|
|||
|
|
try:
|
|||
|
|
poll_resp = _post_json(poll_url, {}, access_token, user_agent_model=user_agent_model)
|
|||
|
|
except CodeAssistError as exc:
|
|||
|
|
logger.warning("Onboarding poll attempt %d failed: %s", attempt + 1, exc)
|
|||
|
|
continue
|
|||
|
|
if poll_resp.get("done"):
|
|||
|
|
return poll_resp
|
|||
|
|
logger.warning("Onboarding did not complete within %d attempts", _ONBOARDING_POLL_ATTEMPTS)
|
|||
|
|
return resp
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# retrieve_user_quota — for /gquota
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class QuotaBucket:
|
|||
|
|
model_id: str
|
|||
|
|
token_type: str = ""
|
|||
|
|
remaining_fraction: float = 0.0
|
|||
|
|
reset_time_iso: str = ""
|
|||
|
|
raw: Dict[str, Any] = field(default_factory=dict)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def retrieve_user_quota(
|
|||
|
|
access_token: str,
|
|||
|
|
*,
|
|||
|
|
project_id: str = "",
|
|||
|
|
user_agent_model: str = "",
|
|||
|
|
) -> List[QuotaBucket]:
|
|||
|
|
"""Call ``POST /v1internal:retrieveUserQuota`` and parse ``buckets[]``."""
|
|||
|
|
body: Dict[str, Any] = {}
|
|||
|
|
if project_id:
|
|||
|
|
body["project"] = project_id
|
|||
|
|
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:retrieveUserQuota"
|
|||
|
|
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
|
|||
|
|
raw_buckets = resp.get("buckets") or []
|
|||
|
|
buckets: List[QuotaBucket] = []
|
|||
|
|
if not isinstance(raw_buckets, list):
|
|||
|
|
return buckets
|
|||
|
|
for b in raw_buckets:
|
|||
|
|
if not isinstance(b, dict):
|
|||
|
|
continue
|
|||
|
|
buckets.append(QuotaBucket(
|
|||
|
|
model_id=str(b.get("modelId") or ""),
|
|||
|
|
token_type=str(b.get("tokenType") or ""),
|
|||
|
|
remaining_fraction=float(b.get("remainingFraction") or 0.0),
|
|||
|
|
reset_time_iso=str(b.get("resetTime") or ""),
|
|||
|
|
raw=b,
|
|||
|
|
))
|
|||
|
|
return buckets
|
|||
|
|
|
|||
|
|
|
|||
|
|
# =============================================================================
|
|||
|
|
# Project context resolution
|
|||
|
|
# =============================================================================
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class ProjectContext:
|
|||
|
|
"""Resolved state for a given OAuth session."""
|
|||
|
|
project_id: str = "" # effective project id sent on requests
|
|||
|
|
managed_project_id: str = "" # Google-assigned project (free tier)
|
|||
|
|
tier_id: str = ""
|
|||
|
|
source: str = "" # "env", "config", "discovered", "onboarded"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def resolve_project_context(
|
|||
|
|
access_token: str,
|
|||
|
|
*,
|
|||
|
|
configured_project_id: str = "",
|
|||
|
|
env_project_id: str = "",
|
|||
|
|
user_agent_model: str = "",
|
|||
|
|
) -> ProjectContext:
|
|||
|
|
"""Figure out what project id + tier to use for requests.
|
|||
|
|
|
|||
|
|
Priority:
|
|||
|
|
1. If configured_project_id or env_project_id is set, use that directly
|
|||
|
|
and short-circuit (no discovery needed).
|
|||
|
|
2. Otherwise call loadCodeAssist to see what Google says.
|
|||
|
|
3. If no tier assigned yet, onboard the user (free tier default).
|
|||
|
|
"""
|
|||
|
|
# Short-circuit: caller provided a project id
|
|||
|
|
if configured_project_id:
|
|||
|
|
return ProjectContext(
|
|||
|
|
project_id=configured_project_id,
|
|||
|
|
tier_id=STANDARD_TIER_ID, # assume paid since they specified one
|
|||
|
|
source="config",
|
|||
|
|
)
|
|||
|
|
if env_project_id:
|
|||
|
|
return ProjectContext(
|
|||
|
|
project_id=env_project_id,
|
|||
|
|
tier_id=STANDARD_TIER_ID,
|
|||
|
|
source="env",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Discover via loadCodeAssist
|
|||
|
|
info = load_code_assist(access_token, user_agent_model=user_agent_model)
|
|||
|
|
|
|||
|
|
effective_project = info.cloudaicompanion_project
|
|||
|
|
tier = info.current_tier_id
|
|||
|
|
|
|||
|
|
if not tier:
|
|||
|
|
# User hasn't been onboarded — provision them on free tier
|
|||
|
|
onboard_resp = onboard_user(
|
|||
|
|
access_token,
|
|||
|
|
tier_id=FREE_TIER_ID,
|
|||
|
|
project_id="",
|
|||
|
|
user_agent_model=user_agent_model,
|
|||
|
|
)
|
|||
|
|
# Re-parse from the onboard response
|
|||
|
|
response_body = onboard_resp.get("response") or {}
|
|||
|
|
if isinstance(response_body, dict):
|
|||
|
|
effective_project = (
|
|||
|
|
effective_project
|
|||
|
|
or str(response_body.get("cloudaicompanionProject") or "")
|
|||
|
|
)
|
|||
|
|
tier = FREE_TIER_ID
|
|||
|
|
source = "onboarded"
|
|||
|
|
else:
|
|||
|
|
source = "discovered"
|
|||
|
|
|
|||
|
|
return ProjectContext(
|
|||
|
|
project_id=effective_project,
|
|||
|
|
managed_project_id=effective_project if tier == FREE_TIER_ID else "",
|
|||
|
|
tier_id=tier,
|
|||
|
|
source=source,
|
|||
|
|
)
|