mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 16:01:49 +08:00
Mechanical cleanup across 43 files — removes 46 unused imports (F401) and 14 unused local variables (F841) detected by `ruff check --select F401,F841`. Net: -49 lines. Also fixes a latent NameError in rl_cli.py where `get_hermes_home()` was called at module line 32 before its import at line 65 — the module never imported successfully on main. The ruff audit surfaced this because it correctly saw the symbol as imported-but-unused (the call happened before the import ran); the fix moves the import to the top of the file alongside other stdlib imports. One `# noqa: F401` kept in hermes_cli/status.py for `subprocess`: tests monkeypatch `hermes_cli.status.subprocess` as a regression guard that systemctl isn't called on Termux, so the name must exist at module scope even though the module body doesn't reference it. Docstring explains the reason. Also fixes an invalid `# noqa:` directive in gateway/platforms/discord.py:308 that lacked a rule code. Co-authored-by: teknium1 <teknium@users.noreply.github.com>
294 lines
10 KiB
Python
294 lines
10 KiB
Python
"""
|
|
DingTalk Device Flow authorization.
|
|
|
|
Implements the same 3-step registration flow as dingtalk-openclaw-connector:
|
|
1. POST /app/registration/init → get nonce
|
|
2. POST /app/registration/begin → get device_code + verification_uri_complete
|
|
3. POST /app/registration/poll → poll until SUCCESS → get client_id + client_secret
|
|
|
|
The verification_uri_complete is rendered as a QR code in the terminal so the
|
|
user can scan it with DingTalk to authorize, yielding AppKey + AppSecret
|
|
automatically.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
from typing import Optional, Tuple
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Configuration ──────────────────────────────────────────────────────────
|
|
|
|
REGISTRATION_BASE_URL = os.environ.get(
|
|
"DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com"
|
|
).rstrip("/")
|
|
|
|
REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw")
|
|
|
|
|
|
# ── API helpers ────────────────────────────────────────────────────────────
|
|
|
|
class RegistrationError(Exception):
|
|
"""Raised when a DingTalk registration API call fails."""
|
|
|
|
|
|
def _api_post(path: str, payload: dict) -> dict:
|
|
"""POST to the registration API and return the parsed JSON body."""
|
|
url = f"{REGISTRATION_BASE_URL}{path}"
|
|
try:
|
|
resp = requests.post(url, json=payload, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except requests.RequestException as exc:
|
|
raise RegistrationError(f"Network error calling {url}: {exc}") from exc
|
|
|
|
errcode = data.get("errcode", -1)
|
|
if errcode != 0:
|
|
errmsg = data.get("errmsg", "unknown error")
|
|
raise RegistrationError(f"API error [{path}]: {errmsg} (errcode={errcode})")
|
|
return data
|
|
|
|
|
|
# ── Core flow ──────────────────────────────────────────────────────────────
|
|
|
|
def begin_registration() -> dict:
|
|
"""Start a device-flow registration.
|
|
|
|
Returns a dict with keys:
|
|
device_code, verification_uri_complete, expires_in, interval
|
|
"""
|
|
# Step 1: init → nonce
|
|
init_data = _api_post("/app/registration/init", {"source": REGISTRATION_SOURCE})
|
|
nonce = str(init_data.get("nonce", "")).strip()
|
|
if not nonce:
|
|
raise RegistrationError("init response missing nonce")
|
|
|
|
# Step 2: begin → device_code, verification_uri_complete
|
|
begin_data = _api_post("/app/registration/begin", {"nonce": nonce})
|
|
device_code = str(begin_data.get("device_code", "")).strip()
|
|
verification_uri_complete = str(begin_data.get("verification_uri_complete", "")).strip()
|
|
if not device_code:
|
|
raise RegistrationError("begin response missing device_code")
|
|
if not verification_uri_complete:
|
|
raise RegistrationError("begin response missing verification_uri_complete")
|
|
|
|
return {
|
|
"device_code": device_code,
|
|
"verification_uri_complete": verification_uri_complete,
|
|
"expires_in": int(begin_data.get("expires_in", 7200)),
|
|
"interval": max(int(begin_data.get("interval", 3)), 2),
|
|
}
|
|
|
|
|
|
def poll_registration(device_code: str) -> dict:
|
|
"""Poll the registration status once.
|
|
|
|
Returns a dict with keys: status, client_id?, client_secret?, fail_reason?
|
|
"""
|
|
data = _api_post("/app/registration/poll", {"device_code": device_code})
|
|
status_raw = str(data.get("status", "")).strip().upper()
|
|
if status_raw not in ("WAITING", "SUCCESS", "FAIL", "EXPIRED"):
|
|
status_raw = "UNKNOWN"
|
|
return {
|
|
"status": status_raw,
|
|
"client_id": str(data.get("client_id", "")).strip() or None,
|
|
"client_secret": str(data.get("client_secret", "")).strip() or None,
|
|
"fail_reason": str(data.get("fail_reason", "")).strip() or None,
|
|
}
|
|
|
|
|
|
def wait_for_registration_success(
|
|
device_code: str,
|
|
interval: int = 3,
|
|
expires_in: int = 7200,
|
|
on_waiting: Optional[callable] = None,
|
|
) -> Tuple[str, str]:
|
|
"""Block until the registration succeeds or times out.
|
|
|
|
Returns (client_id, client_secret).
|
|
"""
|
|
deadline = time.monotonic() + expires_in
|
|
retry_window = 120 # 2 minutes for transient errors
|
|
retry_start = 0.0
|
|
|
|
while time.monotonic() < deadline:
|
|
time.sleep(interval)
|
|
try:
|
|
result = poll_registration(device_code)
|
|
except RegistrationError:
|
|
if retry_start == 0:
|
|
retry_start = time.monotonic()
|
|
if time.monotonic() - retry_start < retry_window:
|
|
continue
|
|
raise
|
|
|
|
status = result["status"]
|
|
if status == "WAITING":
|
|
retry_start = 0
|
|
if on_waiting:
|
|
on_waiting()
|
|
continue
|
|
if status == "SUCCESS":
|
|
cid = result["client_id"]
|
|
csecret = result["client_secret"]
|
|
if not cid or not csecret:
|
|
raise RegistrationError("authorization succeeded but credentials are missing")
|
|
return cid, csecret
|
|
# FAIL / EXPIRED / UNKNOWN
|
|
if retry_start == 0:
|
|
retry_start = time.monotonic()
|
|
if time.monotonic() - retry_start < retry_window:
|
|
continue
|
|
reason = result.get("fail_reason") or status
|
|
raise RegistrationError(f"authorization failed: {reason}")
|
|
|
|
raise RegistrationError("authorization timed out, please retry")
|
|
|
|
|
|
# ── QR code rendering ─────────────────────────────────────────────────────
|
|
|
|
def _ensure_qrcode_installed() -> bool:
|
|
"""Try to import qrcode; if missing, auto-install it via pip/uv."""
|
|
try:
|
|
import qrcode # noqa: F401
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
|
|
import subprocess
|
|
|
|
# Try uv first (Hermes convention), then pip
|
|
for cmd in (
|
|
[sys.executable, "-m", "uv", "pip", "install", "qrcode"],
|
|
[sys.executable, "-m", "pip", "install", "-q", "qrcode"],
|
|
):
|
|
try:
|
|
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
import qrcode # noqa: F401,F811
|
|
return True
|
|
except (subprocess.CalledProcessError, ImportError, FileNotFoundError):
|
|
continue
|
|
return False
|
|
|
|
|
|
def render_qr_to_terminal(url: str) -> bool:
|
|
"""Render *url* as a compact QR code in the terminal.
|
|
|
|
Returns True if the QR code was printed, False if the library is missing.
|
|
"""
|
|
try:
|
|
import qrcode
|
|
except ImportError:
|
|
return False
|
|
|
|
qr = qrcode.QRCode(
|
|
version=1,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
box_size=1,
|
|
border=1,
|
|
)
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
|
|
# Use half-block characters for compact rendering (2 rows per character)
|
|
matrix = qr.get_matrix()
|
|
rows = len(matrix)
|
|
lines: list[str] = []
|
|
|
|
TOP_HALF = "\u2580" # ▀
|
|
BOTTOM_HALF = "\u2584" # ▄
|
|
FULL_BLOCK = "\u2588" # █
|
|
EMPTY = " "
|
|
|
|
for r in range(0, rows, 2):
|
|
line_chars: list[str] = []
|
|
for c in range(len(matrix[r])):
|
|
top = matrix[r][c]
|
|
bottom = matrix[r + 1][c] if r + 1 < rows else False
|
|
if top and bottom:
|
|
line_chars.append(FULL_BLOCK)
|
|
elif top:
|
|
line_chars.append(TOP_HALF)
|
|
elif bottom:
|
|
line_chars.append(BOTTOM_HALF)
|
|
else:
|
|
line_chars.append(EMPTY)
|
|
lines.append(" " + "".join(line_chars))
|
|
|
|
print("\n".join(lines))
|
|
return True
|
|
|
|
|
|
# ── High-level entry point for the setup wizard ───────────────────────────
|
|
|
|
def dingtalk_qr_auth() -> Optional[Tuple[str, str]]:
|
|
"""Run the interactive QR-code device-flow authorization.
|
|
|
|
Returns (client_id, client_secret) on success, or None if the user
|
|
cancelled or the flow failed.
|
|
"""
|
|
from hermes_cli.setup import print_info, print_success, print_warning, print_error
|
|
|
|
print()
|
|
print_info(" Initializing DingTalk device authorization...")
|
|
print_info(" Note: the scan page is branded 'OpenClaw' — DingTalk's")
|
|
print_info(" ecosystem onboarding bridge. Safe to use.")
|
|
|
|
try:
|
|
reg = begin_registration()
|
|
except RegistrationError as exc:
|
|
print_error(f" Authorization init failed: {exc}")
|
|
return None
|
|
|
|
url = reg["verification_uri_complete"]
|
|
|
|
# Ensure qrcode library is available (auto-install if missing)
|
|
if not _ensure_qrcode_installed():
|
|
print_warning(" qrcode library install failed, will show link only.")
|
|
|
|
print()
|
|
print_info(" Please scan the QR code below with DingTalk to authorize:")
|
|
print()
|
|
|
|
if not render_qr_to_terminal(url):
|
|
print_warning(f" QR code render failed, please open the link below to authorize:")
|
|
|
|
print()
|
|
print_info(f" Or open this link manually: {url}")
|
|
print()
|
|
print_info(" Waiting for QR scan authorization... (timeout: 2 hours)")
|
|
|
|
dot_count = 0
|
|
|
|
def _on_waiting():
|
|
nonlocal dot_count
|
|
dot_count += 1
|
|
if dot_count % 10 == 0:
|
|
sys.stdout.write(".")
|
|
sys.stdout.flush()
|
|
|
|
try:
|
|
client_id, client_secret = wait_for_registration_success(
|
|
device_code=reg["device_code"],
|
|
interval=reg["interval"],
|
|
expires_in=reg["expires_in"],
|
|
on_waiting=_on_waiting,
|
|
)
|
|
except RegistrationError as exc:
|
|
print()
|
|
print_error(f" Authorization failed: {exc}")
|
|
return None
|
|
|
|
print()
|
|
print_success(" QR scan authorization successful!")
|
|
print_success(f" Client ID: {client_id}")
|
|
print_success(f" Client Secret: {client_secret[:8]}{'*' * (len(client_secret) - 8)}")
|
|
|
|
return client_id, client_secret
|