Compare commits

...

4 Commits

Author SHA1 Message Date
teknium1
92971403fd fix: smooth google workspace auth recovery and gws token handoff
Emit fresh auth URLs on stale or expired browser redirects, scope OAuth to requested services, and drive gws with the live access token so the hybrid backend works without a separate gws login.
2026-03-15 02:34:52 -07:00
teknium1
f8a835e476 fix: harden google workspace oauth setup UX
Reduce auth scopes to the requested services, add JSON-mode auth URL output, regenerate fresh auth URLs on stale/expired code failures, and document the headless copy-paste flow more clearly.
2026-03-14 23:32:45 -07:00
teknium1
be58335be0 docs: smooth google workspace setup instructions
Add direct project/API/audience links and warn that bare absolute file paths can be mistaken for slash commands in the Hermes CLI.
2026-03-14 23:07:28 -07:00
teknium1
818e72ea28 refactor: route google workspace operations through gws when available
Keep Hermes-managed OAuth setup and JSON output stable while preferring the Google Workspace CLI for Gmail, Calendar, Drive, Sheets, Docs, and Contacts operations.
2026-03-14 22:42:41 -07:00
6 changed files with 1016 additions and 139 deletions

View File

@@ -1,6 +1,6 @@
---
name: google-workspace
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via Python. Uses OAuth2 with automatic token refresh. No external binaries needed — runs entirely with Google's Python client libraries in the Hermes venv.
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise.
version: 1.0.0
author: Nous Research
license: MIT
@@ -13,7 +13,7 @@ metadata:
# Google Workspace
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python scripts in this skill. No external binaries to install.
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — through Hermes-managed OAuth and a thin CLI wrapper. When `gws` is installed, the skill uses it as the execution backend for broader Google Workspace coverage; otherwise it falls back to the bundled Python client implementation.
## References
@@ -22,7 +22,7 @@ Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python script
## Scripts
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
- `scripts/google_api.py`API wrapper CLI (agent uses this for all operations)
- `scripts/google_api.py`compatibility wrapper CLI. It prefers `gws` for operations when available, while preserving Hermes' existing JSON output contract.
## First-Time Setup
@@ -55,8 +55,15 @@ Calendar/Drive/Sheets/Docs?"**
Passwords) and takes 2 minutes to set up. No Google Cloud project needed.
Load the himalaya skill and follow its setup instructions.
- **Calendar, Drive, Sheets, Docs (or email + these)** → Continue with this
skill's OAuth setup below.
- **Email + Calendar** → Continue with this skill, but use
`--services email,calendar` during auth so the consent screen only asks for
the scopes they actually need.
- **Calendar/Drive/Sheets/Docs only** → Continue with this skill and use a
narrower `--services` set like `calendar,drive,sheets,docs`.
- **Full Workspace access** → Continue with this skill and use the default
`all` service set.
**Question 2: "Does your Google account use Advanced Protection (hardware
security keys required to sign in)? If you're not sure, you probably don't
@@ -72,13 +79,23 @@ Tell the user:
> You need a Google Cloud OAuth client. This is a one-time setup:
>
> 1. Go to https://console.cloud.google.com/apis/credentials
> 2. Create a project (or use an existing one)
> 3. Click "Enable APIs" and enable: Gmail API, Google Calendar API,
> Google Drive API, Google Sheets API, Google Docs API, People API
> 4. Go to Credentials → Create Credentials → OAuth 2.0 Client ID
> 5. Application type: "Desktop app" → Create
> 6. Click "Download JSON" and tell me the file path
> 1. Create or select a project:
> https://console.cloud.google.com/projectselector2/home/dashboard
> 2. Enable the required APIs from the API Library:
> https://console.cloud.google.com/apis/library
> Enable: Gmail API, Google Calendar API, Google Drive API,
> Google Sheets API, Google Docs API, People API
> 3. Create the OAuth client here:
> https://console.cloud.google.com/apis/credentials
> Credentials → Create Credentials → OAuth 2.0 Client ID
> 4. Application type: "Desktop app" → Create
> 5. If the app is still in Testing, add the user's Google account as a test user here:
> https://console.cloud.google.com/auth/audience
> Audience → Test users → Add users
> 6. Download the JSON file and tell me the file path
>
> Important Hermes CLI note: if the file path starts with `/`, do NOT send only the bare path as its own message in the CLI, because it can be mistaken for a slash command. Send it in a sentence instead, like:
> `The JSON file path is: /home/user/Downloads/client_secret_....json`
Once they provide the path:
@@ -86,18 +103,29 @@ Once they provide the path:
$GSETUP --client-secret /path/to/client_secret.json
```
If they paste the raw client ID / client secret values instead of a file path,
write a valid Desktop OAuth JSON file for them yourself, save it somewhere
explicit (for example `~/Downloads/hermes-google-client-secret.json`), then run
`--client-secret` against that file.
### Step 3: Get authorization URL
Use the service set chosen in Step 1. Examples:
```bash
$GSETUP --auth-url
$GSETUP --auth-url --services email,calendar --format json
$GSETUP --auth-url --services calendar,drive,sheets,docs --format json
$GSETUP --auth-url --services all --format json
```
This prints a URL. **Send the URL to the user** and tell them:
This returns JSON with an `auth_url` field and also saves the exact URL to
`~/.hermes/google_oauth_last_url.txt`.
> Open this link in your browser, sign in with your Google account, and
> authorize access. After authorizing, you'll be redirected to a page that
> may show an error — that's expected. Copy the ENTIRE URL from your
> browser's address bar and paste it back to me.
Agent rules for this step:
- Extract the `auth_url` field and send that exact URL to the user as a single line.
- Tell the user that the browser will likely fail on `http://localhost:1` after approval, and that this is expected.
- Tell them to copy the ENTIRE redirected URL from the browser address bar.
- If the user gets `Error 403: access_denied`, send them directly to `https://console.cloud.google.com/auth/audience` to add themselves as a test user.
### Step 4: Exchange the code
@@ -107,9 +135,14 @@ pending OAuth session locally so `--auth-code` can complete the PKCE exchange
later, even on headless systems:
```bash
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED" --format json
```
If `--auth-code` fails because the code expired, was already used, or came from
an older browser tab, it now returns a fresh `fresh_auth_url`. In that case,
immediately send the new URL to the user and have them retry with the newest
browser redirect only.
### Step 5: Verify
```bash
@@ -122,6 +155,7 @@ Should print `AUTHENTICATED`. Setup is complete — token refreshes automaticall
- Token is stored at `~/.hermes/google_token.json` and auto-refreshes.
- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes.
- If `gws` is installed, `google_api.py` points it at the same `~/.hermes/google_token.json` credentials file. Users do not need to run a separate `gws auth login` flow.
- To revoke: `$GSETUP --revoke`
## Usage

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python3
"""Google Workspace API CLI for Hermes Agent.
A thin CLI wrapper around Google's Python client libraries.
Authenticates using the token stored by setup.py.
Uses the Google Workspace CLI (`gws`) when available, but preserves the
existing Hermes-facing JSON contract and falls back to the Python client
libraries if `gws` is not installed.
Usage:
python google_api.py gmail search "is:unread" [--max 10]
@@ -23,6 +24,8 @@ import argparse
import base64
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
@@ -30,6 +33,7 @@ from pathlib import Path
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
@@ -43,17 +47,128 @@ SCOPES = [
]
def get_credentials():
"""Load and refresh credentials from token file."""
def _ensure_authenticated():
if not TOKEN_PATH.exists():
print("Not authenticated. Run the setup script first:", file=sys.stderr)
print(f" python {Path(__file__).parent / 'setup.py'}", file=sys.stderr)
sys.exit(1)
def _stored_token_scopes() -> list[str]:
try:
data = json.loads(TOKEN_PATH.read_text())
except Exception:
return list(SCOPES)
scopes = data.get("scopes")
if isinstance(scopes, list) and scopes:
return scopes
return list(SCOPES)
def _gws_binary() -> str | None:
override = os.getenv("HERMES_GWS_BIN")
if override:
return override
return shutil.which("gws")
def _gws_env() -> dict[str, str]:
env = os.environ.copy()
creds = get_credentials()
env["GOOGLE_WORKSPACE_CLI_TOKEN"] = creds.token
return env
def _run_gws(parts: list[str], *, params: dict | None = None, body: dict | None = None):
binary = _gws_binary()
if not binary:
raise RuntimeError("gws not installed")
_ensure_authenticated()
cmd = [binary, *parts]
if params is not None:
cmd.extend(["--params", json.dumps(params)])
if body is not None:
cmd.extend(["--json", json.dumps(body)])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env=_gws_env(),
)
if result.returncode != 0:
err = result.stderr.strip() or result.stdout.strip() or "Unknown gws error"
print(err, file=sys.stderr)
sys.exit(result.returncode or 1)
stdout = result.stdout.strip()
if not stdout:
return {}
try:
return json.loads(stdout)
except json.JSONDecodeError:
print("ERROR: Unexpected non-JSON output from gws:", file=sys.stderr)
print(stdout, file=sys.stderr)
sys.exit(1)
def _headers_dict(msg: dict) -> dict[str, str]:
return {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
def _extract_message_body(msg: dict) -> str:
body = ""
payload = msg.get("payload", {})
if payload.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
elif payload.get("parts"):
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
if not body:
for part in payload["parts"]:
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
return body
def _extract_doc_text(doc: dict) -> str:
text_parts = []
for element in doc.get("body", {}).get("content", []):
paragraph = element.get("paragraph", {})
for pe in paragraph.get("elements", []):
text_run = pe.get("textRun", {})
if text_run.get("content"):
text_parts.append(text_run["content"])
return "".join(text_parts)
def _datetime_with_timezone(value: str) -> str:
if not value:
return value
if "T" not in value:
return value
if value.endswith("Z"):
return value
tail = value[10:]
if "+" in tail or "-" in tail:
return value
return value + "Z"
def get_credentials():
"""Load and refresh credentials from token file."""
_ensure_authenticated()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), _stored_token_scopes())
if creds.expired and creds.refresh_token:
creds.refresh(Request())
TOKEN_PATH.write_text(creds.to_json())
@@ -65,6 +180,7 @@ def get_credentials():
def build_service(api, version):
from googleapiclient.discovery import build
return build(api, version, credentials=get_credentials())
@@ -72,7 +188,41 @@ def build_service(api, version):
# Gmail
# =========================================================================
def gmail_search(args):
if _gws_binary():
results = _run_gws(
["gmail", "users", "messages", "list"],
params={"userId": "me", "q": args.query, "maxResults": args.max},
)
messages = results.get("messages", [])
output = []
for msg_meta in messages:
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": msg_meta["id"],
"format": "metadata",
"metadataHeaders": ["From", "To", "Subject", "Date"],
},
)
headers = _headers_dict(msg)
output.append(
{
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": msg.get("labelIds", []),
}
)
print(json.dumps(output, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
results = service.users().messages().list(
userId="me", q=args.query, maxResults=args.max
@@ -88,7 +238,7 @@ def gmail_search(args):
userId="me", id=msg_meta["id"], format="metadata",
metadataHeaders=["From", "To", "Subject", "Date"],
).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
headers = _headers_dict(msg)
output.append({
"id": msg["id"],
"threadId": msg["threadId"],
@@ -102,30 +252,33 @@ def gmail_search(args):
print(json.dumps(output, indent=2, ensure_ascii=False))
def gmail_get(args):
if _gws_binary():
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={"userId": "me", "id": args.message_id, "format": "full"},
)
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
msg = service.users().messages().get(
userId="me", id=args.message_id, format="full"
).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
# Extract body text
body = ""
payload = msg.get("payload", {})
if payload.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
elif payload.get("parts"):
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
if not body:
for part in payload["parts"]:
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
@@ -134,12 +287,33 @@ def gmail_get(args):
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": body,
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
def gmail_send(args):
if _gws_binary():
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
message["subject"] = args.subject
if args.cc:
message["cc"] = args.cc
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw}
if args.thread_id:
body["threadId"] = args.thread_id
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body=body,
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
@@ -157,14 +331,46 @@ def gmail_send(args):
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_reply(args):
if _gws_binary():
original = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": args.message_id,
"format": "metadata",
"metadataHeaders": ["From", "Subject", "Message-ID"],
},
)
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
message = MIMEText(args.body)
message["to"] = headers.get("From", "")
message["subject"] = subject
if headers.get("Message-ID"):
message["In-Reply-To"] = headers["Message-ID"]
message["References"] = headers["Message-ID"]
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body={"raw": raw, "threadId": original["threadId"]},
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
# Fetch original to get thread ID and headers
original = service.users().messages().get(
userId="me", id=args.message_id, format="metadata",
metadataHeaders=["From", "Subject", "Message-ID"],
).execute()
headers = {h["name"]: h["value"] for h in original.get("payload", {}).get("headers", [])}
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
@@ -184,20 +390,38 @@ def gmail_reply(args):
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_labels(args):
if _gws_binary():
results = _run_gws(["gmail", "users", "labels", "list"], params={"userId": "me"})
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
return
service = build_service("gmail", "v1")
results = service.users().labels().list(userId="me").execute()
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
def gmail_modify(args):
service = build_service("gmail", "v1")
body = {}
if args.add_labels:
body["addLabelIds"] = args.add_labels.split(",")
if args.remove_labels:
body["removeLabelIds"] = args.remove_labels.split(",")
if _gws_binary():
result = _run_gws(
["gmail", "users", "messages", "modify"],
params={"userId": "me", "id": args.message_id},
body=body,
)
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
return
service = build_service("gmail", "v1")
result = service.users().messages().modify(userId="me", id=args.message_id, body=body).execute()
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
@@ -206,17 +430,40 @@ def gmail_modify(args):
# Calendar
# =========================================================================
def calendar_list(args):
service = build_service("calendar", "v3")
now = datetime.now(timezone.utc)
time_min = args.start or now.isoformat()
time_max = args.end or (now + timedelta(days=7)).isoformat()
time_min = _datetime_with_timezone(args.start or now.isoformat())
time_max = _datetime_with_timezone(args.end or (now + timedelta(days=7)).isoformat())
# Ensure timezone info
for val in [time_min, time_max]:
if "T" in val and "Z" not in val and "+" not in val and "-" not in val[11:]:
val += "Z"
if _gws_binary():
results = _run_gws(
["calendar", "events", "list"],
params={
"calendarId": args.calendar,
"timeMin": time_min,
"timeMax": time_max,
"maxResults": args.max,
"singleEvents": True,
"orderBy": "startTime",
},
)
events = []
for e in results.get("items", []):
events.append({
"id": e["id"],
"summary": e.get("summary", "(no title)"),
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
"location": e.get("location", ""),
"description": e.get("description", ""),
"status": e.get("status", ""),
"htmlLink": e.get("htmlLink", ""),
})
print(json.dumps(events, indent=2, ensure_ascii=False))
return
service = build_service("calendar", "v3")
results = service.events().list(
calendarId=args.calendar, timeMin=time_min, timeMax=time_max,
maxResults=args.max, singleEvents=True, orderBy="startTime",
@@ -237,8 +484,8 @@ def calendar_list(args):
print(json.dumps(events, indent=2, ensure_ascii=False))
def calendar_create(args):
service = build_service("calendar", "v3")
event = {
"summary": args.summary,
"start": {"dateTime": args.start},
@@ -249,8 +496,23 @@ def calendar_create(args):
if args.description:
event["description"] = args.description
if args.attendees:
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",")]
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",") if e.strip()]
if _gws_binary():
result = _run_gws(
["calendar", "events", "insert"],
params={"calendarId": args.calendar},
body=event,
)
print(json.dumps({
"status": "created",
"id": result["id"],
"summary": result.get("summary", ""),
"htmlLink": result.get("htmlLink", ""),
}, indent=2))
return
service = build_service("calendar", "v3")
result = service.events().insert(calendarId=args.calendar, body=event).execute()
print(json.dumps({
"status": "created",
@@ -260,7 +522,13 @@ def calendar_create(args):
}, indent=2))
def calendar_delete(args):
if _gws_binary():
_run_gws(["calendar", "events", "delete"], params={"calendarId": args.calendar, "eventId": args.event_id})
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
return
service = build_service("calendar", "v3")
service.events().delete(calendarId=args.calendar, eventId=args.event_id).execute()
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
@@ -270,9 +538,22 @@ def calendar_delete(args):
# Drive
# =========================================================================
def drive_search(args):
query = args.query if args.raw_query else f"fullText contains '{args.query}'"
if _gws_binary():
results = _run_gws(
["drive", "files", "list"],
params={
"q": query,
"pageSize": args.max,
"fields": "files(id, name, mimeType, modifiedTime, webViewLink)",
},
)
print(json.dumps(results.get("files", []), indent=2, ensure_ascii=False))
return
service = build_service("drive", "v3")
query = f"fullText contains '{args.query}'" if not args.raw_query else args.query
results = service.files().list(
q=query, pageSize=args.max, fields="files(id, name, mimeType, modifiedTime, webViewLink)",
).execute()
@@ -284,7 +565,30 @@ def drive_search(args):
# Contacts
# =========================================================================
def contacts_list(args):
if _gws_binary():
results = _run_gws(
["people", "people", "connections", "list"],
params={
"resourceName": "people/me",
"pageSize": args.max,
"personFields": "names,emailAddresses,phoneNumbers",
},
)
contacts = []
for person in results.get("connections", []):
names = person.get("names", [{}])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
contacts.append({
"name": names[0].get("displayName", "") if names else "",
"emails": [e.get("value", "") for e in emails],
"phones": [p.get("value", "") for p in phones],
})
print(json.dumps(contacts, indent=2, ensure_ascii=False))
return
service = build_service("people", "v1")
results = service.people().connections().list(
resourceName="people/me",
@@ -308,7 +612,16 @@ def contacts_list(args):
# Sheets
# =========================================================================
def sheets_get(args):
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "get"],
params={"spreadsheetId": args.sheet_id, "range": args.range},
)
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().get(
spreadsheetId=args.sheet_id, range=args.range,
@@ -316,10 +629,25 @@ def sheets_get(args):
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
def sheets_update(args):
service = build_service("sheets", "v4")
values = json.loads(args.values)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "update"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().update(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", body=body,
@@ -327,10 +655,26 @@ def sheets_update(args):
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
def sheets_append(args):
service = build_service("sheets", "v4")
values = json.loads(args.values)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "append"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
"insertDataOption": "INSERT_ROWS",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().append(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body,
@@ -342,21 +686,24 @@ def sheets_append(args):
# Docs
# =========================================================================
def docs_get(args):
if _gws_binary():
doc = _run_gws(["docs", "documents", "get"], params={"documentId": args.doc_id})
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("docs", "v1")
doc = service.documents().get(documentId=args.doc_id).execute()
# Extract plain text from the document structure
text_parts = []
for element in doc.get("body", {}).get("content", []):
paragraph = element.get("paragraph", {})
for pe in paragraph.get("elements", []):
text_run = pe.get("textRun", {})
if text_run.get("content"):
text_parts.append(text_run["content"])
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": "".join(text_parts),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
@@ -365,6 +712,7 @@ def docs_get(args):
# CLI parser
# =========================================================================
def main():
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent")
sub = parser.add_subparsers(dest="service", required=True)

View File

@@ -21,28 +21,50 @@ Agent workflow:
6. Run --check to verify. Done.
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Iterable
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json"
LAST_AUTH_URL_PATH = HERMES_HOME / "google_oauth_last_url.txt"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/contacts.readonly",
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/documents.readonly",
]
SERVICE_SCOPE_GROUPS = {
"email": [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
],
"calendar": ["https://www.googleapis.com/auth/calendar"],
"drive": ["https://www.googleapis.com/auth/drive.readonly"],
"contacts": ["https://www.googleapis.com/auth/contacts.readonly"],
"sheets": ["https://www.googleapis.com/auth/spreadsheets"],
"docs": ["https://www.googleapis.com/auth/documents.readonly"],
}
SERVICE_ALIASES = {
"all": "all",
"email": "email",
"gmail": "email",
"mail": "email",
"calendar": "calendar",
"cal": "calendar",
"drive": "drive",
"contacts": "contacts",
"people": "contacts",
"sheets": "sheets",
"docs": "docs",
"documents": "docs",
}
DEFAULT_SERVICES = ["email", "calendar", "drive", "contacts", "sheets", "docs"]
ALL_SCOPES = [scope for service in DEFAULT_SERVICES for scope in SERVICE_SCOPE_GROUPS[service]]
REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google-auth-httplib2"]
@@ -50,6 +72,10 @@ REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google
# Google deprecated OOB, so we use a localhost redirect and tell the user to
# copy the code from the browser's URL bar (or the page body).
REDIRECT_URI = "http://localhost:1"
AUDIENCE_URL = "https://console.cloud.google.com/auth/audience"
PROJECT_SELECTOR_URL = "https://console.cloud.google.com/projectselector2/home/dashboard"
API_LIBRARY_URL = "https://console.cloud.google.com/apis/library"
CREDENTIALS_URL = "https://console.cloud.google.com/apis/credentials"
def install_deps():
@@ -86,18 +112,98 @@ def _ensure_deps():
sys.exit(1)
def check_auth():
def _dedupe(items: Iterable[str]) -> list[str]:
seen = set()
result = []
for item in items:
if item not in seen:
seen.add(item)
result.append(item)
return result
def _resolve_services(services_text: str | None) -> tuple[list[str], list[str]]:
"""Resolve a comma/space separated service list to canonical service names + scopes."""
text = (services_text or "all").strip().lower()
if not text or text == "all":
services = list(DEFAULT_SERVICES)
return services, list(ALL_SCOPES)
raw_parts = [part.strip() for part in text.replace(" ", ",").split(",") if part.strip()]
canonical = []
unknown = []
for part in raw_parts:
alias = SERVICE_ALIASES.get(part)
if alias == "all":
return list(DEFAULT_SERVICES), list(ALL_SCOPES)
if not alias:
unknown.append(part)
continue
canonical.append(alias)
if unknown:
print(
"ERROR: Unknown Google service(s): "
+ ", ".join(sorted(set(unknown)))
+ ". Supported values: all, email, calendar, drive, contacts, sheets, docs."
)
sys.exit(1)
canonical = _dedupe(canonical)
scopes = [scope for service in canonical for scope in SERVICE_SCOPE_GROUPS[service]]
return canonical, scopes
def _stored_token_scopes() -> list[str] | None:
if not TOKEN_PATH.exists():
return None
try:
data = json.loads(TOKEN_PATH.read_text())
except Exception:
return None
scopes = data.get("scopes")
if isinstance(scopes, list) and scopes:
return scopes
return None
def _credentials_scopes(services_text: str | None = None) -> list[str]:
requested_scopes = None
if services_text:
_, requested_scopes = _resolve_services(services_text)
stored_scopes = _stored_token_scopes()
if stored_scopes:
if requested_scopes:
missing = [scope for scope in requested_scopes if scope not in stored_scopes]
if missing:
print("TOKEN_MISSING_SCOPES: Stored token does not include the requested services.")
print("Missing scopes:")
for scope in missing:
print(f" - {scope}")
print("Re-run setup with a fresh auth URL for the services you need.")
return []
return stored_scopes
return requested_scopes or list(ALL_SCOPES)
def check_auth(services_text: str | None = None):
"""Check if stored credentials are valid. Prints status, exits 0 or 1."""
if not TOKEN_PATH.exists():
print(f"NOT_AUTHENTICATED: No token at {TOKEN_PATH}")
return False
scopes = _credentials_scopes(services_text)
if not scopes:
return False
_ensure_deps()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
try:
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), scopes)
except Exception as e:
print(f"TOKEN_CORRUPT: {e}")
return False
@@ -135,14 +241,14 @@ def store_client_secret(path: str):
if "installed" not in data and "web" not in data:
print("ERROR: Not a Google OAuth client secret file (missing 'installed' key).")
print("Download the correct file from: https://console.cloud.google.com/apis/credentials")
print(f"Download the correct file from: {CREDENTIALS_URL}")
sys.exit(1)
CLIENT_SECRET_PATH.write_text(json.dumps(data, indent=2))
print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}")
def _save_pending_auth(*, state: str, code_verifier: str):
def _save_pending_auth(*, state: str, code_verifier: str, scopes: list[str], services: list[str], auth_url: str):
"""Persist the OAuth session bits needed for a later token exchange."""
PENDING_AUTH_PATH.write_text(
json.dumps(
@@ -150,10 +256,14 @@ def _save_pending_auth(*, state: str, code_verifier: str):
"state": state,
"code_verifier": code_verifier,
"redirect_uri": REDIRECT_URI,
"scopes": scopes,
"services": services,
"auth_url": auth_url,
},
indent=2,
)
)
LAST_AUTH_URL_PATH.write_text(auth_url)
def _load_pending_auth() -> dict:
@@ -174,6 +284,8 @@ def _load_pending_auth() -> dict:
print("Run --auth-url again to start a fresh OAuth session.")
sys.exit(1)
data.setdefault("scopes", list(ALL_SCOPES))
data.setdefault("services", list(DEFAULT_SERVICES))
return data
@@ -188,37 +300,96 @@ def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]:
params = parse_qs(parsed.query)
if "code" not in params:
print("ERROR: No 'code' parameter found in URL.")
print("When the browser lands on the localhost error page, copy the FULL address bar URL.")
sys.exit(1)
state = params.get("state", [None])[0]
return params["code"][0], state
def get_auth_url():
def _build_flow(scopes: list[str], *, state: str | None = None, code_verifier: str | None = None, autogenerate_code_verifier: bool = False):
_ensure_deps()
from google_auth_oauthlib.flow import Flow
return Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=scopes,
redirect_uri=REDIRECT_URI,
state=state,
code_verifier=code_verifier,
autogenerate_code_verifier=autogenerate_code_verifier,
)
def _create_auth_session(services_text: str | None, *, output_format: str = "plain", emit_output: bool = True) -> str:
services, scopes = _resolve_services(services_text)
flow = _build_flow(scopes, autogenerate_code_verifier=True)
auth_url, state = flow.authorization_url(access_type="offline", prompt="consent")
_save_pending_auth(
state=state,
code_verifier=flow.code_verifier,
scopes=scopes,
services=services,
auth_url=auth_url,
)
if emit_output:
if output_format == "json":
print(
json.dumps(
{
"success": True,
"auth_url": auth_url,
"auth_url_file": str(LAST_AUTH_URL_PATH),
"services": services,
"scopes": scopes,
"project_selector_url": PROJECT_SELECTOR_URL,
"api_library_url": API_LIBRARY_URL,
"credentials_url": CREDENTIALS_URL,
"audience_url": AUDIENCE_URL,
"instructions": [
"Open auth_url in your browser.",
"If the browser lands on a localhost error page, that is expected.",
"Copy the FULL redirected URL from the address bar and pass it to --auth-code.",
],
},
indent=2,
)
)
else:
print(auth_url)
return auth_url
def get_auth_url(services_text: str | None = None, *, output_format: str = "plain"):
"""Print the OAuth authorization URL. User visits this in a browser."""
if not CLIENT_SECRET_PATH.exists():
print("ERROR: No client secret stored. Run --client-secret first.")
sys.exit(1)
_ensure_deps()
from google_auth_oauthlib.flow import Flow
flow = Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=SCOPES,
redirect_uri=REDIRECT_URI,
autogenerate_code_verifier=True,
)
auth_url, state = flow.authorization_url(
access_type="offline",
prompt="consent",
)
_save_pending_auth(state=state, code_verifier=flow.code_verifier)
# Print just the URL so the agent can extract it cleanly
print(auth_url)
_create_auth_session(services_text, output_format=output_format)
def exchange_auth_code(code: str):
def _print_recovery_url(auth_url: str, output_format: str):
if output_format == "json":
print(
json.dumps(
{
"success": False,
"fresh_auth_url": auth_url,
"auth_url_file": str(LAST_AUTH_URL_PATH),
"audience_url": AUDIENCE_URL,
},
indent=2,
)
)
else:
print("A fresh auth URL has been generated. Use this exact URL:")
print(auth_url)
print(f"If Google blocks access, add your account as a test user here: {AUDIENCE_URL}")
def exchange_auth_code(code: str, *, output_format: str = "plain"):
"""Exchange the authorization code for a token and save it."""
if not CLIENT_SECRET_PATH.exists():
print("ERROR: No client secret stored. Run --client-secret first.")
@@ -227,16 +398,18 @@ def exchange_auth_code(code: str):
pending_auth = _load_pending_auth()
code, returned_state = _extract_code_and_state(code)
if returned_state and returned_state != pending_auth["state"]:
print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.")
auth_url = _create_auth_session(
",".join(pending_auth.get("services", DEFAULT_SERVICES)),
output_format=output_format,
emit_output=False,
)
if output_format != "json":
print("ERROR: OAuth state mismatch. Your browser redirect came from an older auth session.")
_print_recovery_url(auth_url, output_format)
sys.exit(1)
_ensure_deps()
from google_auth_oauthlib.flow import Flow
flow = Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=SCOPES,
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
flow = _build_flow(
pending_auth.get("scopes", list(ALL_SCOPES)),
state=pending_auth["state"],
code_verifier=pending_auth["code_verifier"],
)
@@ -244,14 +417,33 @@ def exchange_auth_code(code: str):
try:
flow.fetch_token(code=code)
except Exception as e:
print(f"ERROR: Token exchange failed: {e}")
print("The code may have expired. Run --auth-url to get a fresh URL.")
auth_url = _create_auth_session(
",".join(pending_auth.get("services", DEFAULT_SERVICES)),
output_format=output_format,
emit_output=False,
)
if output_format != "json":
print(f"ERROR: Token exchange failed: {e}")
print("The code may have expired or already been used.")
_print_recovery_url(auth_url, output_format)
sys.exit(1)
creds = flow.credentials
TOKEN_PATH.write_text(creds.to_json())
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
if output_format == "json":
print(
json.dumps(
{
"success": True,
"token_path": str(TOKEN_PATH),
"services": pending_auth.get("services", DEFAULT_SERVICES),
},
indent=2,
)
)
else:
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
def revoke():
@@ -260,16 +452,19 @@ def revoke():
print("No token to revoke.")
return
scopes = _stored_token_scopes() or list(ALL_SCOPES)
_ensure_deps()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
try:
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), scopes)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
import urllib.request
urllib.request.urlopen(
urllib.request.Request(
f"https://oauth2.googleapis.com/revoke?token={creds.token}",
@@ -283,11 +478,23 @@ def revoke():
TOKEN_PATH.unlink(missing_ok=True)
PENDING_AUTH_PATH.unlink(missing_ok=True)
LAST_AUTH_URL_PATH.unlink(missing_ok=True)
print(f"Deleted {TOKEN_PATH}")
def main():
parser = argparse.ArgumentParser(description="Google Workspace OAuth setup for Hermes")
parser.add_argument(
"--services",
default="all",
help="Comma-separated services to authorize: all, email, calendar, drive, contacts, sheets, docs",
)
parser.add_argument(
"--format",
choices=["plain", "json"],
default="plain",
help="Output format. Use json for agent-friendly parsing.",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Check if auth is valid (exit 0=yes, 1=no)")
group.add_argument("--client-secret", metavar="PATH", help="Store OAuth client_secret.json")
@@ -298,16 +505,20 @@ def main():
args = parser.parse_args()
if args.check:
sys.exit(0 if check_auth() else 1)
elif args.client_secret:
sys.exit(0 if check_auth(args.services) else 1)
if args.client_secret:
store_client_secret(args.client_secret)
elif args.auth_url:
get_auth_url()
elif args.auth_code:
exchange_auth_code(args.auth_code)
elif args.revoke:
return
if args.auth_url:
get_auth_url(args.services, output_format=args.format)
return
if args.auth_code:
exchange_auth_code(args.auth_code, output_format=args.format)
return
if args.revoke:
revoke()
elif args.install_deps:
return
if args.install_deps:
sys.exit(0 if install_deps() else 1)

View File

@@ -0,0 +1,203 @@
"""Tests for the Google Workspace skill CLI wrapper.
These focus on the hybrid backend: prefer the Google Workspace CLI (`gws`) when
available, while preserving the existing Hermes-facing JSON contract.
"""
import importlib.util
import json
from argparse import Namespace
from pathlib import Path
from types import SimpleNamespace
import pytest
SCRIPT_PATH = (
Path(__file__).resolve().parents[2]
/ "skills/productivity/google-workspace/scripts/google_api.py"
)
def _load_module():
spec = importlib.util.spec_from_file_location("google_workspace_api_test", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def _completed(stdout: str = "", stderr: str = "", returncode: int = 0):
return SimpleNamespace(stdout=stdout, stderr=stderr, returncode=returncode)
@pytest.fixture
def google_api_module(tmp_path, monkeypatch):
module = _load_module()
token_path = tmp_path / "google_token.json"
token_path.write_text(
json.dumps(
{
"token": "access-token",
"refresh_token": "refresh-token",
"token_uri": "https://oauth2.googleapis.com/token",
"client_id": "client-id",
"client_secret": "client-secret",
}
)
)
monkeypatch.setattr(module, "TOKEN_PATH", token_path)
monkeypatch.setattr(module, "_gws_binary", lambda: "/usr/bin/gws", raising=False)
monkeypatch.setattr(module, "get_credentials", lambda: SimpleNamespace(token="access-token"))
monkeypatch.setattr(
module,
"build_service",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("legacy backend should not be used")),
)
return module
def test_gmail_search_uses_gws_and_normalizes_results(google_api_module, monkeypatch, capsys):
calls = []
def fake_run(cmd, capture_output, text, env):
calls.append({"cmd": cmd, "env": env})
if cmd[1:4] == ["gmail", "users", "messages"] and cmd[4] == "list":
assert json.loads(cmd[6]) == {"userId": "me", "q": "is:unread", "maxResults": 5}
return _completed(
json.dumps({"messages": [{"id": "msg-1", "threadId": "thread-1"}]})
)
if cmd[1:4] == ["gmail", "users", "messages"] and cmd[4] == "get":
params = json.loads(cmd[6])
assert params["id"] == "msg-1"
return _completed(
json.dumps(
{
"id": "msg-1",
"threadId": "thread-1",
"payload": {
"headers": [
{"name": "From", "value": "alice@example.com"},
{"name": "To", "value": "bob@example.com"},
{"name": "Subject", "value": "Hello"},
{"name": "Date", "value": "Sat, 15 Mar 2026 10:00:00 +0000"},
]
},
"snippet": "preview",
"labelIds": ["UNREAD"],
}
)
)
raise AssertionError(f"Unexpected command: {cmd}")
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
google_api_module.gmail_search(Namespace(query="is:unread", max=5))
out = json.loads(capsys.readouterr().out)
assert out == [
{
"id": "msg-1",
"threadId": "thread-1",
"from": "alice@example.com",
"to": "bob@example.com",
"subject": "Hello",
"date": "Sat, 15 Mar 2026 10:00:00 +0000",
"snippet": "preview",
"labels": ["UNREAD"],
}
]
assert calls[0]["env"]["GOOGLE_WORKSPACE_CLI_TOKEN"] == "access-token"
def test_calendar_create_uses_gws_insert_and_normalizes_result(google_api_module, monkeypatch, capsys):
def fake_run(cmd, capture_output, text, env):
assert cmd[:5] == ["/usr/bin/gws", "calendar", "events", "insert", "--params"]
assert json.loads(cmd[5]) == {"calendarId": "primary"}
body = json.loads(cmd[7])
assert body == {
"summary": "Standup",
"start": {"dateTime": "2026-03-15T09:00:00Z"},
"end": {"dateTime": "2026-03-15T09:30:00Z"},
"location": "Room 1",
"description": "Daily sync",
"attendees": [{"email": "alice@example.com"}, {"email": "bob@example.com"}],
}
return _completed(json.dumps({"id": "evt-1", "summary": "Standup", "htmlLink": "https://calendar/event"}))
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
google_api_module.calendar_create(
Namespace(
summary="Standup",
start="2026-03-15T09:00:00Z",
end="2026-03-15T09:30:00Z",
location="Room 1",
description="Daily sync",
attendees="alice@example.com,bob@example.com",
calendar="primary",
)
)
assert json.loads(capsys.readouterr().out) == {
"status": "created",
"id": "evt-1",
"summary": "Standup",
"htmlLink": "https://calendar/event",
}
def test_sheets_append_uses_gws_and_returns_updated_cells(google_api_module, monkeypatch, capsys):
def fake_run(cmd, capture_output, text, env):
assert cmd[:6] == ["/usr/bin/gws", "sheets", "spreadsheets", "values", "append", "--params"]
assert json.loads(cmd[6]) == {
"spreadsheetId": "sheet-123",
"range": "Sheet1!A:C",
"valueInputOption": "USER_ENTERED",
"insertDataOption": "INSERT_ROWS",
}
assert json.loads(cmd[8]) == {"values": [["a", "b", "c"]]}
return _completed(json.dumps({"updates": {"updatedCells": 3}}))
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
google_api_module.sheets_append(
Namespace(sheet_id="sheet-123", range="Sheet1!A:C", values='[["a", "b", "c"]]')
)
assert json.loads(capsys.readouterr().out) == {"updatedCells": 3}
def test_docs_get_uses_gws_and_extracts_plain_text(google_api_module, monkeypatch, capsys):
def fake_run(cmd, capture_output, text, env):
assert cmd[:6] == ["/usr/bin/gws", "docs", "documents", "get", "--params", '{"documentId": "doc-123"}']
return _completed(
json.dumps(
{
"title": "Doc Title",
"documentId": "doc-123",
"body": {
"content": [
{
"paragraph": {
"elements": [
{"textRun": {"content": "Hello "}},
{"textRun": {"content": "world"}},
]
}
}
]
},
}
)
)
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
google_api_module.docs_get(Namespace(doc_id="doc-123"))
assert json.loads(capsys.readouterr().out) == {
"title": "Doc Title",
"documentId": "doc-123",
"body": "Hello world",
}

View File

@@ -4,6 +4,8 @@ These tests cover the headless/manual auth-code flow where the browser step and
code exchange happen in separate process invocations.
"""
from __future__ import annotations
import importlib.util
import json
import sys
@@ -110,6 +112,7 @@ def setup_module(monkeypatch, tmp_path):
monkeypatch.setattr(module, "CLIENT_SECRET_PATH", tmp_path / "google_client_secret.json")
monkeypatch.setattr(module, "TOKEN_PATH", tmp_path / "google_token.json")
monkeypatch.setattr(module, "PENDING_AUTH_PATH", tmp_path / "google_oauth_pending.json", raising=False)
monkeypatch.setattr(module, "LAST_AUTH_URL_PATH", tmp_path / "google_oauth_last_url.txt", raising=False)
client_secret = {
"installed": {
@@ -123,16 +126,38 @@ def setup_module(monkeypatch, tmp_path):
return module
class TestGetAuthUrl:
def test_persists_state_and_code_verifier_for_later_exchange(self, setup_module, capsys):
setup_module.get_auth_url()
class TestResolveServices:
def test_reduces_to_requested_services(self, setup_module):
services, scopes = setup_module._resolve_services("email,calendar")
assert services == ["email", "calendar"]
assert scopes == [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
]
out = capsys.readouterr().out.strip()
assert out == "https://auth.example/authorize?state=generated-state"
class TestGetAuthUrl:
def test_persists_state_verifier_scopes_and_last_url(self, setup_module, capsys):
setup_module.get_auth_url("email,calendar", output_format="json")
out = json.loads(capsys.readouterr().out)
assert out["success"] is True
assert out["auth_url"] == "https://auth.example/authorize?state=generated-state"
assert out["services"] == ["email", "calendar"]
assert Path(out["auth_url_file"]).read_text() == out["auth_url"]
saved = json.loads(setup_module.PENDING_AUTH_PATH.read_text())
assert saved["state"] == "generated-state"
assert saved["code_verifier"] == "generated-code-verifier"
assert saved["services"] == ["email", "calendar"]
assert saved["scopes"] == [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
]
flow = FakeFlow.created[-1]
assert flow.autogenerate_code_verifier is True
@@ -142,7 +167,14 @@ class TestGetAuthUrl:
class TestExchangeAuthCode:
def test_reuses_saved_pkce_material_for_plain_code(self, setup_module):
setup_module.PENDING_AUTH_PATH.write_text(
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
json.dumps(
{
"state": "saved-state",
"code_verifier": "saved-verifier",
"services": ["email", "calendar"],
"scopes": ["scope-a", "scope-b"],
}
)
)
setup_module.exchange_auth_code("4/test-auth-code")
@@ -150,13 +182,21 @@ class TestExchangeAuthCode:
flow = FakeFlow.created[-1]
assert flow.state == "saved-state"
assert flow.code_verifier == "saved-verifier"
assert flow.scopes == ["scope-a", "scope-b"]
assert flow.fetch_token_calls == [{"code": "4/test-auth-code"}]
assert json.loads(setup_module.TOKEN_PATH.read_text())["token"] == "access-token"
assert not setup_module.PENDING_AUTH_PATH.exists()
def test_extracts_code_from_redirect_url_and_checks_state(self, setup_module):
setup_module.PENDING_AUTH_PATH.write_text(
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
json.dumps(
{
"state": "saved-state",
"code_verifier": "saved-verifier",
"services": ["email"],
"scopes": ["scope-a"],
}
)
)
setup_module.exchange_auth_code(
@@ -166,19 +206,34 @@ class TestExchangeAuthCode:
flow = FakeFlow.created[-1]
assert flow.fetch_token_calls == [{"code": "4/extracted-code"}]
def test_rejects_state_mismatch(self, setup_module, capsys):
def test_state_mismatch_regenerates_fresh_url(self, setup_module, capsys):
setup_module.PENDING_AUTH_PATH.write_text(
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
json.dumps(
{
"state": "saved-state",
"code_verifier": "saved-verifier",
"services": ["email", "calendar"],
"scopes": ["scope-a", "scope-b"],
}
)
)
FakeFlow.default_state = "replacement-state"
FakeFlow.default_verifier = "replacement-verifier"
with pytest.raises(SystemExit):
setup_module.exchange_auth_code(
"http://localhost:1/?code=4/extracted-code&state=wrong-state"
"http://localhost:1/?code=4/extracted-code&state=wrong-state",
output_format="json",
)
out = capsys.readouterr().out
assert "state mismatch" in out.lower()
assert not setup_module.TOKEN_PATH.exists()
out = json.loads(capsys.readouterr().out)
assert out["success"] is False
assert out["fresh_auth_url"] == "https://auth.example/authorize?state=replacement-state"
saved = json.loads(setup_module.PENDING_AUTH_PATH.read_text())
assert saved["state"] == "replacement-state"
assert saved["code_verifier"] == "replacement-verifier"
assert saved["services"] == ["email", "calendar"]
def test_requires_pending_auth_session(self, setup_module, capsys):
with pytest.raises(SystemExit):
@@ -188,16 +243,42 @@ class TestExchangeAuthCode:
assert "run --auth-url first" in out.lower()
assert not setup_module.TOKEN_PATH.exists()
def test_keeps_pending_auth_session_when_exchange_fails(self, setup_module, capsys):
def test_failed_exchange_regenerates_fresh_url(self, setup_module, capsys):
setup_module.PENDING_AUTH_PATH.write_text(
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
json.dumps(
{
"state": "saved-state",
"code_verifier": "saved-verifier",
"services": ["email"],
"scopes": ["scope-a"],
}
)
)
FakeFlow.default_state = "replacement-state"
FakeFlow.default_verifier = "replacement-verifier"
FakeFlow.fetch_error = Exception("invalid_grant: Missing code verifier")
with pytest.raises(SystemExit):
setup_module.exchange_auth_code("4/test-auth-code")
setup_module.exchange_auth_code("4/test-auth-code", output_format="json")
out = capsys.readouterr().out
assert "token exchange failed" in out.lower()
out = json.loads(capsys.readouterr().out)
assert out["success"] is False
assert out["fresh_auth_url"] == "https://auth.example/authorize?state=replacement-state"
assert setup_module.PENDING_AUTH_PATH.exists()
assert not setup_module.TOKEN_PATH.exists()
def test_check_auth_rejects_missing_requested_scopes(self, setup_module, capsys):
setup_module.TOKEN_PATH.write_text(
json.dumps(
{
"token": "access-token",
"refresh_token": "refresh-token",
"scopes": ["https://www.googleapis.com/auth/gmail.readonly"],
}
)
)
ok = setup_module.check_auth("email,calendar")
out = capsys.readouterr().out
assert ok is False
assert "missing scopes" in out.lower()

View File

@@ -204,7 +204,7 @@ Skills for document creation, presentations, spreadsheets, and other productivit
| Skill | Description | Path |
|-------|-------------|------|
| `google-workspace` | Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via Python. Uses OAuth2 with automatic token refresh. No external binaries needed — runs entirely with Google's Python client libraries in the Hermes venv. | `productivity/google-workspace` |
| `google-workspace` | Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise. | `productivity/google-workspace` |
| `nano-pdf` | Edit PDFs with natural-language instructions using the nano-pdf CLI. Modify text, fix typos, update titles, and make content changes to specific pages without manual editing. | `productivity/nano-pdf` |
| `notion` | Notion API for creating and managing pages, databases, and blocks via curl. Search, create, update, and query Notion workspaces directly from the terminal. | `productivity/notion` |
| `ocr-and-documents` | Extract text from PDFs and scanned documents. Use web_extract for remote URLs, pymupdf for local text-based PDFs, marker-pdf for OCR/scanned docs. For DOCX use python-docx, for PPTX see the powerpoint skill. | `productivity/ocr-and-documents` |