mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
4 Commits
fix/plugin
...
hermes/gws
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92971403fd | ||
|
|
f8a835e476 | ||
|
|
be58335be0 | ||
|
|
818e72ea28 |
@@ -1,6 +1,6 @@
|
||||
---
|
||||
name: google-workspace
|
||||
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via Python. Uses OAuth2 with automatic token refresh. No external binaries needed — runs entirely with Google's Python client libraries in the Hermes venv.
|
||||
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise.
|
||||
version: 1.0.0
|
||||
author: Nous Research
|
||||
license: MIT
|
||||
@@ -13,7 +13,7 @@ metadata:
|
||||
|
||||
# Google Workspace
|
||||
|
||||
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python scripts in this skill. No external binaries to install.
|
||||
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — through Hermes-managed OAuth and a thin CLI wrapper. When `gws` is installed, the skill uses it as the execution backend for broader Google Workspace coverage; otherwise it falls back to the bundled Python client implementation.
|
||||
|
||||
## References
|
||||
|
||||
@@ -22,7 +22,7 @@ Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python script
|
||||
## Scripts
|
||||
|
||||
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
|
||||
- `scripts/google_api.py` — API wrapper CLI (agent uses this for all operations)
|
||||
- `scripts/google_api.py` — compatibility wrapper CLI. It prefers `gws` for operations when available, while preserving Hermes' existing JSON output contract.
|
||||
|
||||
## First-Time Setup
|
||||
|
||||
@@ -55,8 +55,15 @@ Calendar/Drive/Sheets/Docs?"**
|
||||
Passwords) and takes 2 minutes to set up. No Google Cloud project needed.
|
||||
Load the himalaya skill and follow its setup instructions.
|
||||
|
||||
- **Calendar, Drive, Sheets, Docs (or email + these)** → Continue with this
|
||||
skill's OAuth setup below.
|
||||
- **Email + Calendar** → Continue with this skill, but use
|
||||
`--services email,calendar` during auth so the consent screen only asks for
|
||||
the scopes they actually need.
|
||||
|
||||
- **Calendar/Drive/Sheets/Docs only** → Continue with this skill and use a
|
||||
narrower `--services` set like `calendar,drive,sheets,docs`.
|
||||
|
||||
- **Full Workspace access** → Continue with this skill and use the default
|
||||
`all` service set.
|
||||
|
||||
**Question 2: "Does your Google account use Advanced Protection (hardware
|
||||
security keys required to sign in)? If you're not sure, you probably don't
|
||||
@@ -72,13 +79,23 @@ Tell the user:
|
||||
|
||||
> You need a Google Cloud OAuth client. This is a one-time setup:
|
||||
>
|
||||
> 1. Go to https://console.cloud.google.com/apis/credentials
|
||||
> 2. Create a project (or use an existing one)
|
||||
> 3. Click "Enable APIs" and enable: Gmail API, Google Calendar API,
|
||||
> Google Drive API, Google Sheets API, Google Docs API, People API
|
||||
> 4. Go to Credentials → Create Credentials → OAuth 2.0 Client ID
|
||||
> 5. Application type: "Desktop app" → Create
|
||||
> 6. Click "Download JSON" and tell me the file path
|
||||
> 1. Create or select a project:
|
||||
> https://console.cloud.google.com/projectselector2/home/dashboard
|
||||
> 2. Enable the required APIs from the API Library:
|
||||
> https://console.cloud.google.com/apis/library
|
||||
> Enable: Gmail API, Google Calendar API, Google Drive API,
|
||||
> Google Sheets API, Google Docs API, People API
|
||||
> 3. Create the OAuth client here:
|
||||
> https://console.cloud.google.com/apis/credentials
|
||||
> Credentials → Create Credentials → OAuth 2.0 Client ID
|
||||
> 4. Application type: "Desktop app" → Create
|
||||
> 5. If the app is still in Testing, add the user's Google account as a test user here:
|
||||
> https://console.cloud.google.com/auth/audience
|
||||
> Audience → Test users → Add users
|
||||
> 6. Download the JSON file and tell me the file path
|
||||
>
|
||||
> Important Hermes CLI note: if the file path starts with `/`, do NOT send only the bare path as its own message in the CLI, because it can be mistaken for a slash command. Send it in a sentence instead, like:
|
||||
> `The JSON file path is: /home/user/Downloads/client_secret_....json`
|
||||
|
||||
Once they provide the path:
|
||||
|
||||
@@ -86,18 +103,29 @@ Once they provide the path:
|
||||
$GSETUP --client-secret /path/to/client_secret.json
|
||||
```
|
||||
|
||||
If they paste the raw client ID / client secret values instead of a file path,
|
||||
write a valid Desktop OAuth JSON file for them yourself, save it somewhere
|
||||
explicit (for example `~/Downloads/hermes-google-client-secret.json`), then run
|
||||
`--client-secret` against that file.
|
||||
|
||||
### Step 3: Get authorization URL
|
||||
|
||||
Use the service set chosen in Step 1. Examples:
|
||||
|
||||
```bash
|
||||
$GSETUP --auth-url
|
||||
$GSETUP --auth-url --services email,calendar --format json
|
||||
$GSETUP --auth-url --services calendar,drive,sheets,docs --format json
|
||||
$GSETUP --auth-url --services all --format json
|
||||
```
|
||||
|
||||
This prints a URL. **Send the URL to the user** and tell them:
|
||||
This returns JSON with an `auth_url` field and also saves the exact URL to
|
||||
`~/.hermes/google_oauth_last_url.txt`.
|
||||
|
||||
> Open this link in your browser, sign in with your Google account, and
|
||||
> authorize access. After authorizing, you'll be redirected to a page that
|
||||
> may show an error — that's expected. Copy the ENTIRE URL from your
|
||||
> browser's address bar and paste it back to me.
|
||||
Agent rules for this step:
|
||||
- Extract the `auth_url` field and send that exact URL to the user as a single line.
|
||||
- Tell the user that the browser will likely fail on `http://localhost:1` after approval, and that this is expected.
|
||||
- Tell them to copy the ENTIRE redirected URL from the browser address bar.
|
||||
- If the user gets `Error 403: access_denied`, send them directly to `https://console.cloud.google.com/auth/audience` to add themselves as a test user.
|
||||
|
||||
### Step 4: Exchange the code
|
||||
|
||||
@@ -107,9 +135,14 @@ pending OAuth session locally so `--auth-code` can complete the PKCE exchange
|
||||
later, even on headless systems:
|
||||
|
||||
```bash
|
||||
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
|
||||
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED" --format json
|
||||
```
|
||||
|
||||
If `--auth-code` fails because the code expired, was already used, or came from
|
||||
an older browser tab, it now returns a fresh `fresh_auth_url`. In that case,
|
||||
immediately send the new URL to the user and have them retry with the newest
|
||||
browser redirect only.
|
||||
|
||||
### Step 5: Verify
|
||||
|
||||
```bash
|
||||
@@ -122,6 +155,7 @@ Should print `AUTHENTICATED`. Setup is complete — token refreshes automaticall
|
||||
|
||||
- Token is stored at `~/.hermes/google_token.json` and auto-refreshes.
|
||||
- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes.
|
||||
- If `gws` is installed, `google_api.py` points it at the same `~/.hermes/google_token.json` credentials file. Users do not need to run a separate `gws auth login` flow.
|
||||
- To revoke: `$GSETUP --revoke`
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Google Workspace API CLI for Hermes Agent.
|
||||
|
||||
A thin CLI wrapper around Google's Python client libraries.
|
||||
Authenticates using the token stored by setup.py.
|
||||
Uses the Google Workspace CLI (`gws`) when available, but preserves the
|
||||
existing Hermes-facing JSON contract and falls back to the Python client
|
||||
libraries if `gws` is not installed.
|
||||
|
||||
Usage:
|
||||
python google_api.py gmail search "is:unread" [--max 10]
|
||||
@@ -23,6 +24,8 @@ import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from email.mime.text import MIMEText
|
||||
@@ -30,6 +33,7 @@ from pathlib import Path
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
TOKEN_PATH = HERMES_HOME / "google_token.json"
|
||||
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
|
||||
|
||||
SCOPES = [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
@@ -43,17 +47,128 @@ SCOPES = [
|
||||
]
|
||||
|
||||
|
||||
def get_credentials():
|
||||
"""Load and refresh credentials from token file."""
|
||||
def _ensure_authenticated():
|
||||
if not TOKEN_PATH.exists():
|
||||
print("Not authenticated. Run the setup script first:", file=sys.stderr)
|
||||
print(f" python {Path(__file__).parent / 'setup.py'}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _stored_token_scopes() -> list[str]:
|
||||
try:
|
||||
data = json.loads(TOKEN_PATH.read_text())
|
||||
except Exception:
|
||||
return list(SCOPES)
|
||||
scopes = data.get("scopes")
|
||||
if isinstance(scopes, list) and scopes:
|
||||
return scopes
|
||||
return list(SCOPES)
|
||||
|
||||
|
||||
def _gws_binary() -> str | None:
|
||||
override = os.getenv("HERMES_GWS_BIN")
|
||||
if override:
|
||||
return override
|
||||
return shutil.which("gws")
|
||||
|
||||
|
||||
def _gws_env() -> dict[str, str]:
|
||||
env = os.environ.copy()
|
||||
creds = get_credentials()
|
||||
env["GOOGLE_WORKSPACE_CLI_TOKEN"] = creds.token
|
||||
return env
|
||||
|
||||
|
||||
def _run_gws(parts: list[str], *, params: dict | None = None, body: dict | None = None):
|
||||
binary = _gws_binary()
|
||||
if not binary:
|
||||
raise RuntimeError("gws not installed")
|
||||
|
||||
_ensure_authenticated()
|
||||
|
||||
cmd = [binary, *parts]
|
||||
if params is not None:
|
||||
cmd.extend(["--params", json.dumps(params)])
|
||||
if body is not None:
|
||||
cmd.extend(["--json", json.dumps(body)])
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=_gws_env(),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
err = result.stderr.strip() or result.stdout.strip() or "Unknown gws error"
|
||||
print(err, file=sys.stderr)
|
||||
sys.exit(result.returncode or 1)
|
||||
|
||||
stdout = result.stdout.strip()
|
||||
if not stdout:
|
||||
return {}
|
||||
|
||||
try:
|
||||
return json.loads(stdout)
|
||||
except json.JSONDecodeError:
|
||||
print("ERROR: Unexpected non-JSON output from gws:", file=sys.stderr)
|
||||
print(stdout, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _headers_dict(msg: dict) -> dict[str, str]:
|
||||
return {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
||||
|
||||
|
||||
def _extract_message_body(msg: dict) -> str:
|
||||
body = ""
|
||||
payload = msg.get("payload", {})
|
||||
if payload.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
|
||||
elif payload.get("parts"):
|
||||
for part in payload["parts"]:
|
||||
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
||||
break
|
||||
if not body:
|
||||
for part in payload["parts"]:
|
||||
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
||||
break
|
||||
return body
|
||||
|
||||
|
||||
def _extract_doc_text(doc: dict) -> str:
|
||||
text_parts = []
|
||||
for element in doc.get("body", {}).get("content", []):
|
||||
paragraph = element.get("paragraph", {})
|
||||
for pe in paragraph.get("elements", []):
|
||||
text_run = pe.get("textRun", {})
|
||||
if text_run.get("content"):
|
||||
text_parts.append(text_run["content"])
|
||||
return "".join(text_parts)
|
||||
|
||||
|
||||
def _datetime_with_timezone(value: str) -> str:
|
||||
if not value:
|
||||
return value
|
||||
if "T" not in value:
|
||||
return value
|
||||
if value.endswith("Z"):
|
||||
return value
|
||||
tail = value[10:]
|
||||
if "+" in tail or "-" in tail:
|
||||
return value
|
||||
return value + "Z"
|
||||
|
||||
|
||||
def get_credentials():
|
||||
"""Load and refresh credentials from token file."""
|
||||
_ensure_authenticated()
|
||||
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google.auth.transport.requests import Request
|
||||
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), _stored_token_scopes())
|
||||
if creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
TOKEN_PATH.write_text(creds.to_json())
|
||||
@@ -65,6 +180,7 @@ def get_credentials():
|
||||
|
||||
def build_service(api, version):
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
return build(api, version, credentials=get_credentials())
|
||||
|
||||
|
||||
@@ -72,7 +188,41 @@ def build_service(api, version):
|
||||
# Gmail
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def gmail_search(args):
|
||||
if _gws_binary():
|
||||
results = _run_gws(
|
||||
["gmail", "users", "messages", "list"],
|
||||
params={"userId": "me", "q": args.query, "maxResults": args.max},
|
||||
)
|
||||
messages = results.get("messages", [])
|
||||
output = []
|
||||
for msg_meta in messages:
|
||||
msg = _run_gws(
|
||||
["gmail", "users", "messages", "get"],
|
||||
params={
|
||||
"userId": "me",
|
||||
"id": msg_meta["id"],
|
||||
"format": "metadata",
|
||||
"metadataHeaders": ["From", "To", "Subject", "Date"],
|
||||
},
|
||||
)
|
||||
headers = _headers_dict(msg)
|
||||
output.append(
|
||||
{
|
||||
"id": msg["id"],
|
||||
"threadId": msg["threadId"],
|
||||
"from": headers.get("From", ""),
|
||||
"to": headers.get("To", ""),
|
||||
"subject": headers.get("Subject", ""),
|
||||
"date": headers.get("Date", ""),
|
||||
"snippet": msg.get("snippet", ""),
|
||||
"labels": msg.get("labelIds", []),
|
||||
}
|
||||
)
|
||||
print(json.dumps(output, indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
results = service.users().messages().list(
|
||||
userId="me", q=args.query, maxResults=args.max
|
||||
@@ -88,7 +238,7 @@ def gmail_search(args):
|
||||
userId="me", id=msg_meta["id"], format="metadata",
|
||||
metadataHeaders=["From", "To", "Subject", "Date"],
|
||||
).execute()
|
||||
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
||||
headers = _headers_dict(msg)
|
||||
output.append({
|
||||
"id": msg["id"],
|
||||
"threadId": msg["threadId"],
|
||||
@@ -102,30 +252,33 @@ def gmail_search(args):
|
||||
print(json.dumps(output, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
|
||||
def gmail_get(args):
|
||||
if _gws_binary():
|
||||
msg = _run_gws(
|
||||
["gmail", "users", "messages", "get"],
|
||||
params={"userId": "me", "id": args.message_id, "format": "full"},
|
||||
)
|
||||
headers = _headers_dict(msg)
|
||||
result = {
|
||||
"id": msg["id"],
|
||||
"threadId": msg["threadId"],
|
||||
"from": headers.get("From", ""),
|
||||
"to": headers.get("To", ""),
|
||||
"subject": headers.get("Subject", ""),
|
||||
"date": headers.get("Date", ""),
|
||||
"labels": msg.get("labelIds", []),
|
||||
"body": _extract_message_body(msg),
|
||||
}
|
||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
msg = service.users().messages().get(
|
||||
userId="me", id=args.message_id, format="full"
|
||||
).execute()
|
||||
|
||||
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
||||
|
||||
# Extract body text
|
||||
body = ""
|
||||
payload = msg.get("payload", {})
|
||||
if payload.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
|
||||
elif payload.get("parts"):
|
||||
for part in payload["parts"]:
|
||||
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
||||
break
|
||||
if not body:
|
||||
for part in payload["parts"]:
|
||||
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
|
||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
||||
break
|
||||
|
||||
headers = _headers_dict(msg)
|
||||
result = {
|
||||
"id": msg["id"],
|
||||
"threadId": msg["threadId"],
|
||||
@@ -134,12 +287,33 @@ def gmail_get(args):
|
||||
"subject": headers.get("Subject", ""),
|
||||
"date": headers.get("Date", ""),
|
||||
"labels": msg.get("labelIds", []),
|
||||
"body": body,
|
||||
"body": _extract_message_body(msg),
|
||||
}
|
||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
|
||||
def gmail_send(args):
|
||||
if _gws_binary():
|
||||
message = MIMEText(args.body, "html" if args.html else "plain")
|
||||
message["to"] = args.to
|
||||
message["subject"] = args.subject
|
||||
if args.cc:
|
||||
message["cc"] = args.cc
|
||||
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
body = {"raw": raw}
|
||||
if args.thread_id:
|
||||
body["threadId"] = args.thread_id
|
||||
|
||||
result = _run_gws(
|
||||
["gmail", "users", "messages", "send"],
|
||||
params={"userId": "me"},
|
||||
body=body,
|
||||
)
|
||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
message = MIMEText(args.body, "html" if args.html else "plain")
|
||||
message["to"] = args.to
|
||||
@@ -157,14 +331,46 @@ def gmail_send(args):
|
||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
||||
|
||||
|
||||
|
||||
def gmail_reply(args):
|
||||
if _gws_binary():
|
||||
original = _run_gws(
|
||||
["gmail", "users", "messages", "get"],
|
||||
params={
|
||||
"userId": "me",
|
||||
"id": args.message_id,
|
||||
"format": "metadata",
|
||||
"metadataHeaders": ["From", "Subject", "Message-ID"],
|
||||
},
|
||||
)
|
||||
headers = _headers_dict(original)
|
||||
|
||||
subject = headers.get("Subject", "")
|
||||
if not subject.startswith("Re:"):
|
||||
subject = f"Re: {subject}"
|
||||
|
||||
message = MIMEText(args.body)
|
||||
message["to"] = headers.get("From", "")
|
||||
message["subject"] = subject
|
||||
if headers.get("Message-ID"):
|
||||
message["In-Reply-To"] = headers["Message-ID"]
|
||||
message["References"] = headers["Message-ID"]
|
||||
|
||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||||
result = _run_gws(
|
||||
["gmail", "users", "messages", "send"],
|
||||
params={"userId": "me"},
|
||||
body={"raw": raw, "threadId": original["threadId"]},
|
||||
)
|
||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
# Fetch original to get thread ID and headers
|
||||
original = service.users().messages().get(
|
||||
userId="me", id=args.message_id, format="metadata",
|
||||
metadataHeaders=["From", "Subject", "Message-ID"],
|
||||
).execute()
|
||||
headers = {h["name"]: h["value"] for h in original.get("payload", {}).get("headers", [])}
|
||||
headers = _headers_dict(original)
|
||||
|
||||
subject = headers.get("Subject", "")
|
||||
if not subject.startswith("Re:"):
|
||||
@@ -184,20 +390,38 @@ def gmail_reply(args):
|
||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
||||
|
||||
|
||||
|
||||
def gmail_labels(args):
|
||||
if _gws_binary():
|
||||
results = _run_gws(["gmail", "users", "labels", "list"], params={"userId": "me"})
|
||||
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
|
||||
print(json.dumps(labels, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
results = service.users().labels().list(userId="me").execute()
|
||||
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
|
||||
print(json.dumps(labels, indent=2))
|
||||
|
||||
|
||||
|
||||
def gmail_modify(args):
|
||||
service = build_service("gmail", "v1")
|
||||
body = {}
|
||||
if args.add_labels:
|
||||
body["addLabelIds"] = args.add_labels.split(",")
|
||||
if args.remove_labels:
|
||||
body["removeLabelIds"] = args.remove_labels.split(",")
|
||||
|
||||
if _gws_binary():
|
||||
result = _run_gws(
|
||||
["gmail", "users", "messages", "modify"],
|
||||
params={"userId": "me", "id": args.message_id},
|
||||
body=body,
|
||||
)
|
||||
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("gmail", "v1")
|
||||
result = service.users().messages().modify(userId="me", id=args.message_id, body=body).execute()
|
||||
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
|
||||
|
||||
@@ -206,17 +430,40 @@ def gmail_modify(args):
|
||||
# Calendar
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def calendar_list(args):
|
||||
service = build_service("calendar", "v3")
|
||||
now = datetime.now(timezone.utc)
|
||||
time_min = args.start or now.isoformat()
|
||||
time_max = args.end or (now + timedelta(days=7)).isoformat()
|
||||
time_min = _datetime_with_timezone(args.start or now.isoformat())
|
||||
time_max = _datetime_with_timezone(args.end or (now + timedelta(days=7)).isoformat())
|
||||
|
||||
# Ensure timezone info
|
||||
for val in [time_min, time_max]:
|
||||
if "T" in val and "Z" not in val and "+" not in val and "-" not in val[11:]:
|
||||
val += "Z"
|
||||
if _gws_binary():
|
||||
results = _run_gws(
|
||||
["calendar", "events", "list"],
|
||||
params={
|
||||
"calendarId": args.calendar,
|
||||
"timeMin": time_min,
|
||||
"timeMax": time_max,
|
||||
"maxResults": args.max,
|
||||
"singleEvents": True,
|
||||
"orderBy": "startTime",
|
||||
},
|
||||
)
|
||||
events = []
|
||||
for e in results.get("items", []):
|
||||
events.append({
|
||||
"id": e["id"],
|
||||
"summary": e.get("summary", "(no title)"),
|
||||
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
|
||||
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
|
||||
"location": e.get("location", ""),
|
||||
"description": e.get("description", ""),
|
||||
"status": e.get("status", ""),
|
||||
"htmlLink": e.get("htmlLink", ""),
|
||||
})
|
||||
print(json.dumps(events, indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("calendar", "v3")
|
||||
results = service.events().list(
|
||||
calendarId=args.calendar, timeMin=time_min, timeMax=time_max,
|
||||
maxResults=args.max, singleEvents=True, orderBy="startTime",
|
||||
@@ -237,8 +484,8 @@ def calendar_list(args):
|
||||
print(json.dumps(events, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
|
||||
def calendar_create(args):
|
||||
service = build_service("calendar", "v3")
|
||||
event = {
|
||||
"summary": args.summary,
|
||||
"start": {"dateTime": args.start},
|
||||
@@ -249,8 +496,23 @@ def calendar_create(args):
|
||||
if args.description:
|
||||
event["description"] = args.description
|
||||
if args.attendees:
|
||||
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",")]
|
||||
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",") if e.strip()]
|
||||
|
||||
if _gws_binary():
|
||||
result = _run_gws(
|
||||
["calendar", "events", "insert"],
|
||||
params={"calendarId": args.calendar},
|
||||
body=event,
|
||||
)
|
||||
print(json.dumps({
|
||||
"status": "created",
|
||||
"id": result["id"],
|
||||
"summary": result.get("summary", ""),
|
||||
"htmlLink": result.get("htmlLink", ""),
|
||||
}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("calendar", "v3")
|
||||
result = service.events().insert(calendarId=args.calendar, body=event).execute()
|
||||
print(json.dumps({
|
||||
"status": "created",
|
||||
@@ -260,7 +522,13 @@ def calendar_create(args):
|
||||
}, indent=2))
|
||||
|
||||
|
||||
|
||||
def calendar_delete(args):
|
||||
if _gws_binary():
|
||||
_run_gws(["calendar", "events", "delete"], params={"calendarId": args.calendar, "eventId": args.event_id})
|
||||
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
|
||||
return
|
||||
|
||||
service = build_service("calendar", "v3")
|
||||
service.events().delete(calendarId=args.calendar, eventId=args.event_id).execute()
|
||||
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
|
||||
@@ -270,9 +538,22 @@ def calendar_delete(args):
|
||||
# Drive
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def drive_search(args):
|
||||
query = args.query if args.raw_query else f"fullText contains '{args.query}'"
|
||||
if _gws_binary():
|
||||
results = _run_gws(
|
||||
["drive", "files", "list"],
|
||||
params={
|
||||
"q": query,
|
||||
"pageSize": args.max,
|
||||
"fields": "files(id, name, mimeType, modifiedTime, webViewLink)",
|
||||
},
|
||||
)
|
||||
print(json.dumps(results.get("files", []), indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("drive", "v3")
|
||||
query = f"fullText contains '{args.query}'" if not args.raw_query else args.query
|
||||
results = service.files().list(
|
||||
q=query, pageSize=args.max, fields="files(id, name, mimeType, modifiedTime, webViewLink)",
|
||||
).execute()
|
||||
@@ -284,7 +565,30 @@ def drive_search(args):
|
||||
# Contacts
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def contacts_list(args):
|
||||
if _gws_binary():
|
||||
results = _run_gws(
|
||||
["people", "people", "connections", "list"],
|
||||
params={
|
||||
"resourceName": "people/me",
|
||||
"pageSize": args.max,
|
||||
"personFields": "names,emailAddresses,phoneNumbers",
|
||||
},
|
||||
)
|
||||
contacts = []
|
||||
for person in results.get("connections", []):
|
||||
names = person.get("names", [{}])
|
||||
emails = person.get("emailAddresses", [])
|
||||
phones = person.get("phoneNumbers", [])
|
||||
contacts.append({
|
||||
"name": names[0].get("displayName", "") if names else "",
|
||||
"emails": [e.get("value", "") for e in emails],
|
||||
"phones": [p.get("value", "") for p in phones],
|
||||
})
|
||||
print(json.dumps(contacts, indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("people", "v1")
|
||||
results = service.people().connections().list(
|
||||
resourceName="people/me",
|
||||
@@ -308,7 +612,16 @@ def contacts_list(args):
|
||||
# Sheets
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def sheets_get(args):
|
||||
if _gws_binary():
|
||||
result = _run_gws(
|
||||
["sheets", "spreadsheets", "values", "get"],
|
||||
params={"spreadsheetId": args.sheet_id, "range": args.range},
|
||||
)
|
||||
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("sheets", "v4")
|
||||
result = service.spreadsheets().values().get(
|
||||
spreadsheetId=args.sheet_id, range=args.range,
|
||||
@@ -316,10 +629,25 @@ def sheets_get(args):
|
||||
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
|
||||
def sheets_update(args):
|
||||
service = build_service("sheets", "v4")
|
||||
values = json.loads(args.values)
|
||||
body = {"values": values}
|
||||
|
||||
if _gws_binary():
|
||||
result = _run_gws(
|
||||
["sheets", "spreadsheets", "values", "update"],
|
||||
params={
|
||||
"spreadsheetId": args.sheet_id,
|
||||
"range": args.range,
|
||||
"valueInputOption": "USER_ENTERED",
|
||||
},
|
||||
body=body,
|
||||
)
|
||||
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("sheets", "v4")
|
||||
result = service.spreadsheets().values().update(
|
||||
spreadsheetId=args.sheet_id, range=args.range,
|
||||
valueInputOption="USER_ENTERED", body=body,
|
||||
@@ -327,10 +655,26 @@ def sheets_update(args):
|
||||
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
|
||||
|
||||
|
||||
|
||||
def sheets_append(args):
|
||||
service = build_service("sheets", "v4")
|
||||
values = json.loads(args.values)
|
||||
body = {"values": values}
|
||||
|
||||
if _gws_binary():
|
||||
result = _run_gws(
|
||||
["sheets", "spreadsheets", "values", "append"],
|
||||
params={
|
||||
"spreadsheetId": args.sheet_id,
|
||||
"range": args.range,
|
||||
"valueInputOption": "USER_ENTERED",
|
||||
"insertDataOption": "INSERT_ROWS",
|
||||
},
|
||||
body=body,
|
||||
)
|
||||
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
|
||||
return
|
||||
|
||||
service = build_service("sheets", "v4")
|
||||
result = service.spreadsheets().values().append(
|
||||
spreadsheetId=args.sheet_id, range=args.range,
|
||||
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body,
|
||||
@@ -342,21 +686,24 @@ def sheets_append(args):
|
||||
# Docs
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def docs_get(args):
|
||||
if _gws_binary():
|
||||
doc = _run_gws(["docs", "documents", "get"], params={"documentId": args.doc_id})
|
||||
result = {
|
||||
"title": doc.get("title", ""),
|
||||
"documentId": doc.get("documentId", ""),
|
||||
"body": _extract_doc_text(doc),
|
||||
}
|
||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||||
return
|
||||
|
||||
service = build_service("docs", "v1")
|
||||
doc = service.documents().get(documentId=args.doc_id).execute()
|
||||
# Extract plain text from the document structure
|
||||
text_parts = []
|
||||
for element in doc.get("body", {}).get("content", []):
|
||||
paragraph = element.get("paragraph", {})
|
||||
for pe in paragraph.get("elements", []):
|
||||
text_run = pe.get("textRun", {})
|
||||
if text_run.get("content"):
|
||||
text_parts.append(text_run["content"])
|
||||
result = {
|
||||
"title": doc.get("title", ""),
|
||||
"documentId": doc.get("documentId", ""),
|
||||
"body": "".join(text_parts),
|
||||
"body": _extract_doc_text(doc),
|
||||
}
|
||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||||
|
||||
@@ -365,6 +712,7 @@ def docs_get(args):
|
||||
# CLI parser
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent")
|
||||
sub = parser.add_subparsers(dest="service", required=True)
|
||||
|
||||
@@ -21,28 +21,50 @@ Agent workflow:
|
||||
6. Run --check to verify. Done.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
TOKEN_PATH = HERMES_HOME / "google_token.json"
|
||||
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
|
||||
PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json"
|
||||
LAST_AUTH_URL_PATH = HERMES_HOME / "google_oauth_last_url.txt"
|
||||
|
||||
SCOPES = [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.send",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
"https://www.googleapis.com/auth/calendar",
|
||||
"https://www.googleapis.com/auth/drive.readonly",
|
||||
"https://www.googleapis.com/auth/contacts.readonly",
|
||||
"https://www.googleapis.com/auth/spreadsheets",
|
||||
"https://www.googleapis.com/auth/documents.readonly",
|
||||
]
|
||||
SERVICE_SCOPE_GROUPS = {
|
||||
"email": [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.send",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
],
|
||||
"calendar": ["https://www.googleapis.com/auth/calendar"],
|
||||
"drive": ["https://www.googleapis.com/auth/drive.readonly"],
|
||||
"contacts": ["https://www.googleapis.com/auth/contacts.readonly"],
|
||||
"sheets": ["https://www.googleapis.com/auth/spreadsheets"],
|
||||
"docs": ["https://www.googleapis.com/auth/documents.readonly"],
|
||||
}
|
||||
SERVICE_ALIASES = {
|
||||
"all": "all",
|
||||
"email": "email",
|
||||
"gmail": "email",
|
||||
"mail": "email",
|
||||
"calendar": "calendar",
|
||||
"cal": "calendar",
|
||||
"drive": "drive",
|
||||
"contacts": "contacts",
|
||||
"people": "contacts",
|
||||
"sheets": "sheets",
|
||||
"docs": "docs",
|
||||
"documents": "docs",
|
||||
}
|
||||
DEFAULT_SERVICES = ["email", "calendar", "drive", "contacts", "sheets", "docs"]
|
||||
ALL_SCOPES = [scope for service in DEFAULT_SERVICES for scope in SERVICE_SCOPE_GROUPS[service]]
|
||||
|
||||
REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google-auth-httplib2"]
|
||||
|
||||
@@ -50,6 +72,10 @@ REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google
|
||||
# Google deprecated OOB, so we use a localhost redirect and tell the user to
|
||||
# copy the code from the browser's URL bar (or the page body).
|
||||
REDIRECT_URI = "http://localhost:1"
|
||||
AUDIENCE_URL = "https://console.cloud.google.com/auth/audience"
|
||||
PROJECT_SELECTOR_URL = "https://console.cloud.google.com/projectselector2/home/dashboard"
|
||||
API_LIBRARY_URL = "https://console.cloud.google.com/apis/library"
|
||||
CREDENTIALS_URL = "https://console.cloud.google.com/apis/credentials"
|
||||
|
||||
|
||||
def install_deps():
|
||||
@@ -86,18 +112,98 @@ def _ensure_deps():
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def check_auth():
|
||||
def _dedupe(items: Iterable[str]) -> list[str]:
|
||||
seen = set()
|
||||
result = []
|
||||
for item in items:
|
||||
if item not in seen:
|
||||
seen.add(item)
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
|
||||
def _resolve_services(services_text: str | None) -> tuple[list[str], list[str]]:
|
||||
"""Resolve a comma/space separated service list to canonical service names + scopes."""
|
||||
text = (services_text or "all").strip().lower()
|
||||
if not text or text == "all":
|
||||
services = list(DEFAULT_SERVICES)
|
||||
return services, list(ALL_SCOPES)
|
||||
|
||||
raw_parts = [part.strip() for part in text.replace(" ", ",").split(",") if part.strip()]
|
||||
canonical = []
|
||||
unknown = []
|
||||
for part in raw_parts:
|
||||
alias = SERVICE_ALIASES.get(part)
|
||||
if alias == "all":
|
||||
return list(DEFAULT_SERVICES), list(ALL_SCOPES)
|
||||
if not alias:
|
||||
unknown.append(part)
|
||||
continue
|
||||
canonical.append(alias)
|
||||
|
||||
if unknown:
|
||||
print(
|
||||
"ERROR: Unknown Google service(s): "
|
||||
+ ", ".join(sorted(set(unknown)))
|
||||
+ ". Supported values: all, email, calendar, drive, contacts, sheets, docs."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
canonical = _dedupe(canonical)
|
||||
scopes = [scope for service in canonical for scope in SERVICE_SCOPE_GROUPS[service]]
|
||||
return canonical, scopes
|
||||
|
||||
|
||||
def _stored_token_scopes() -> list[str] | None:
|
||||
if not TOKEN_PATH.exists():
|
||||
return None
|
||||
try:
|
||||
data = json.loads(TOKEN_PATH.read_text())
|
||||
except Exception:
|
||||
return None
|
||||
scopes = data.get("scopes")
|
||||
if isinstance(scopes, list) and scopes:
|
||||
return scopes
|
||||
return None
|
||||
|
||||
|
||||
def _credentials_scopes(services_text: str | None = None) -> list[str]:
|
||||
requested_scopes = None
|
||||
if services_text:
|
||||
_, requested_scopes = _resolve_services(services_text)
|
||||
|
||||
stored_scopes = _stored_token_scopes()
|
||||
if stored_scopes:
|
||||
if requested_scopes:
|
||||
missing = [scope for scope in requested_scopes if scope not in stored_scopes]
|
||||
if missing:
|
||||
print("TOKEN_MISSING_SCOPES: Stored token does not include the requested services.")
|
||||
print("Missing scopes:")
|
||||
for scope in missing:
|
||||
print(f" - {scope}")
|
||||
print("Re-run setup with a fresh auth URL for the services you need.")
|
||||
return []
|
||||
return stored_scopes
|
||||
|
||||
return requested_scopes or list(ALL_SCOPES)
|
||||
|
||||
|
||||
def check_auth(services_text: str | None = None):
|
||||
"""Check if stored credentials are valid. Prints status, exits 0 or 1."""
|
||||
if not TOKEN_PATH.exists():
|
||||
print(f"NOT_AUTHENTICATED: No token at {TOKEN_PATH}")
|
||||
return False
|
||||
|
||||
scopes = _credentials_scopes(services_text)
|
||||
if not scopes:
|
||||
return False
|
||||
|
||||
_ensure_deps()
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google.auth.transport.requests import Request
|
||||
|
||||
try:
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), scopes)
|
||||
except Exception as e:
|
||||
print(f"TOKEN_CORRUPT: {e}")
|
||||
return False
|
||||
@@ -135,14 +241,14 @@ def store_client_secret(path: str):
|
||||
|
||||
if "installed" not in data and "web" not in data:
|
||||
print("ERROR: Not a Google OAuth client secret file (missing 'installed' key).")
|
||||
print("Download the correct file from: https://console.cloud.google.com/apis/credentials")
|
||||
print(f"Download the correct file from: {CREDENTIALS_URL}")
|
||||
sys.exit(1)
|
||||
|
||||
CLIENT_SECRET_PATH.write_text(json.dumps(data, indent=2))
|
||||
print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}")
|
||||
|
||||
|
||||
def _save_pending_auth(*, state: str, code_verifier: str):
|
||||
def _save_pending_auth(*, state: str, code_verifier: str, scopes: list[str], services: list[str], auth_url: str):
|
||||
"""Persist the OAuth session bits needed for a later token exchange."""
|
||||
PENDING_AUTH_PATH.write_text(
|
||||
json.dumps(
|
||||
@@ -150,10 +256,14 @@ def _save_pending_auth(*, state: str, code_verifier: str):
|
||||
"state": state,
|
||||
"code_verifier": code_verifier,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scopes": scopes,
|
||||
"services": services,
|
||||
"auth_url": auth_url,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
LAST_AUTH_URL_PATH.write_text(auth_url)
|
||||
|
||||
|
||||
def _load_pending_auth() -> dict:
|
||||
@@ -174,6 +284,8 @@ def _load_pending_auth() -> dict:
|
||||
print("Run --auth-url again to start a fresh OAuth session.")
|
||||
sys.exit(1)
|
||||
|
||||
data.setdefault("scopes", list(ALL_SCOPES))
|
||||
data.setdefault("services", list(DEFAULT_SERVICES))
|
||||
return data
|
||||
|
||||
|
||||
@@ -188,37 +300,96 @@ def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]:
|
||||
params = parse_qs(parsed.query)
|
||||
if "code" not in params:
|
||||
print("ERROR: No 'code' parameter found in URL.")
|
||||
print("When the browser lands on the localhost error page, copy the FULL address bar URL.")
|
||||
sys.exit(1)
|
||||
|
||||
state = params.get("state", [None])[0]
|
||||
return params["code"][0], state
|
||||
|
||||
|
||||
def get_auth_url():
|
||||
def _build_flow(scopes: list[str], *, state: str | None = None, code_verifier: str | None = None, autogenerate_code_verifier: bool = False):
|
||||
_ensure_deps()
|
||||
from google_auth_oauthlib.flow import Flow
|
||||
|
||||
return Flow.from_client_secrets_file(
|
||||
str(CLIENT_SECRET_PATH),
|
||||
scopes=scopes,
|
||||
redirect_uri=REDIRECT_URI,
|
||||
state=state,
|
||||
code_verifier=code_verifier,
|
||||
autogenerate_code_verifier=autogenerate_code_verifier,
|
||||
)
|
||||
|
||||
|
||||
def _create_auth_session(services_text: str | None, *, output_format: str = "plain", emit_output: bool = True) -> str:
|
||||
services, scopes = _resolve_services(services_text)
|
||||
flow = _build_flow(scopes, autogenerate_code_verifier=True)
|
||||
auth_url, state = flow.authorization_url(access_type="offline", prompt="consent")
|
||||
_save_pending_auth(
|
||||
state=state,
|
||||
code_verifier=flow.code_verifier,
|
||||
scopes=scopes,
|
||||
services=services,
|
||||
auth_url=auth_url,
|
||||
)
|
||||
|
||||
if emit_output:
|
||||
if output_format == "json":
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"auth_url": auth_url,
|
||||
"auth_url_file": str(LAST_AUTH_URL_PATH),
|
||||
"services": services,
|
||||
"scopes": scopes,
|
||||
"project_selector_url": PROJECT_SELECTOR_URL,
|
||||
"api_library_url": API_LIBRARY_URL,
|
||||
"credentials_url": CREDENTIALS_URL,
|
||||
"audience_url": AUDIENCE_URL,
|
||||
"instructions": [
|
||||
"Open auth_url in your browser.",
|
||||
"If the browser lands on a localhost error page, that is expected.",
|
||||
"Copy the FULL redirected URL from the address bar and pass it to --auth-code.",
|
||||
],
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(auth_url)
|
||||
return auth_url
|
||||
|
||||
|
||||
def get_auth_url(services_text: str | None = None, *, output_format: str = "plain"):
|
||||
"""Print the OAuth authorization URL. User visits this in a browser."""
|
||||
if not CLIENT_SECRET_PATH.exists():
|
||||
print("ERROR: No client secret stored. Run --client-secret first.")
|
||||
sys.exit(1)
|
||||
|
||||
_ensure_deps()
|
||||
from google_auth_oauthlib.flow import Flow
|
||||
|
||||
flow = Flow.from_client_secrets_file(
|
||||
str(CLIENT_SECRET_PATH),
|
||||
scopes=SCOPES,
|
||||
redirect_uri=REDIRECT_URI,
|
||||
autogenerate_code_verifier=True,
|
||||
)
|
||||
auth_url, state = flow.authorization_url(
|
||||
access_type="offline",
|
||||
prompt="consent",
|
||||
)
|
||||
_save_pending_auth(state=state, code_verifier=flow.code_verifier)
|
||||
# Print just the URL so the agent can extract it cleanly
|
||||
print(auth_url)
|
||||
_create_auth_session(services_text, output_format=output_format)
|
||||
|
||||
|
||||
def exchange_auth_code(code: str):
|
||||
def _print_recovery_url(auth_url: str, output_format: str):
|
||||
if output_format == "json":
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"success": False,
|
||||
"fresh_auth_url": auth_url,
|
||||
"auth_url_file": str(LAST_AUTH_URL_PATH),
|
||||
"audience_url": AUDIENCE_URL,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
else:
|
||||
print("A fresh auth URL has been generated. Use this exact URL:")
|
||||
print(auth_url)
|
||||
print(f"If Google blocks access, add your account as a test user here: {AUDIENCE_URL}")
|
||||
|
||||
|
||||
def exchange_auth_code(code: str, *, output_format: str = "plain"):
|
||||
"""Exchange the authorization code for a token and save it."""
|
||||
if not CLIENT_SECRET_PATH.exists():
|
||||
print("ERROR: No client secret stored. Run --client-secret first.")
|
||||
@@ -227,16 +398,18 @@ def exchange_auth_code(code: str):
|
||||
pending_auth = _load_pending_auth()
|
||||
code, returned_state = _extract_code_and_state(code)
|
||||
if returned_state and returned_state != pending_auth["state"]:
|
||||
print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.")
|
||||
auth_url = _create_auth_session(
|
||||
",".join(pending_auth.get("services", DEFAULT_SERVICES)),
|
||||
output_format=output_format,
|
||||
emit_output=False,
|
||||
)
|
||||
if output_format != "json":
|
||||
print("ERROR: OAuth state mismatch. Your browser redirect came from an older auth session.")
|
||||
_print_recovery_url(auth_url, output_format)
|
||||
sys.exit(1)
|
||||
|
||||
_ensure_deps()
|
||||
from google_auth_oauthlib.flow import Flow
|
||||
|
||||
flow = Flow.from_client_secrets_file(
|
||||
str(CLIENT_SECRET_PATH),
|
||||
scopes=SCOPES,
|
||||
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
|
||||
flow = _build_flow(
|
||||
pending_auth.get("scopes", list(ALL_SCOPES)),
|
||||
state=pending_auth["state"],
|
||||
code_verifier=pending_auth["code_verifier"],
|
||||
)
|
||||
@@ -244,14 +417,33 @@ def exchange_auth_code(code: str):
|
||||
try:
|
||||
flow.fetch_token(code=code)
|
||||
except Exception as e:
|
||||
print(f"ERROR: Token exchange failed: {e}")
|
||||
print("The code may have expired. Run --auth-url to get a fresh URL.")
|
||||
auth_url = _create_auth_session(
|
||||
",".join(pending_auth.get("services", DEFAULT_SERVICES)),
|
||||
output_format=output_format,
|
||||
emit_output=False,
|
||||
)
|
||||
if output_format != "json":
|
||||
print(f"ERROR: Token exchange failed: {e}")
|
||||
print("The code may have expired or already been used.")
|
||||
_print_recovery_url(auth_url, output_format)
|
||||
sys.exit(1)
|
||||
|
||||
creds = flow.credentials
|
||||
TOKEN_PATH.write_text(creds.to_json())
|
||||
PENDING_AUTH_PATH.unlink(missing_ok=True)
|
||||
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
|
||||
if output_format == "json":
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"token_path": str(TOKEN_PATH),
|
||||
"services": pending_auth.get("services", DEFAULT_SERVICES),
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
|
||||
|
||||
|
||||
def revoke():
|
||||
@@ -260,16 +452,19 @@ def revoke():
|
||||
print("No token to revoke.")
|
||||
return
|
||||
|
||||
scopes = _stored_token_scopes() or list(ALL_SCOPES)
|
||||
|
||||
_ensure_deps()
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google.auth.transport.requests import Request
|
||||
|
||||
try:
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), scopes)
|
||||
if creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
|
||||
import urllib.request
|
||||
|
||||
urllib.request.urlopen(
|
||||
urllib.request.Request(
|
||||
f"https://oauth2.googleapis.com/revoke?token={creds.token}",
|
||||
@@ -283,11 +478,23 @@ def revoke():
|
||||
|
||||
TOKEN_PATH.unlink(missing_ok=True)
|
||||
PENDING_AUTH_PATH.unlink(missing_ok=True)
|
||||
LAST_AUTH_URL_PATH.unlink(missing_ok=True)
|
||||
print(f"Deleted {TOKEN_PATH}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Google Workspace OAuth setup for Hermes")
|
||||
parser.add_argument(
|
||||
"--services",
|
||||
default="all",
|
||||
help="Comma-separated services to authorize: all, email, calendar, drive, contacts, sheets, docs",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--format",
|
||||
choices=["plain", "json"],
|
||||
default="plain",
|
||||
help="Output format. Use json for agent-friendly parsing.",
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument("--check", action="store_true", help="Check if auth is valid (exit 0=yes, 1=no)")
|
||||
group.add_argument("--client-secret", metavar="PATH", help="Store OAuth client_secret.json")
|
||||
@@ -298,16 +505,20 @@ def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.check:
|
||||
sys.exit(0 if check_auth() else 1)
|
||||
elif args.client_secret:
|
||||
sys.exit(0 if check_auth(args.services) else 1)
|
||||
if args.client_secret:
|
||||
store_client_secret(args.client_secret)
|
||||
elif args.auth_url:
|
||||
get_auth_url()
|
||||
elif args.auth_code:
|
||||
exchange_auth_code(args.auth_code)
|
||||
elif args.revoke:
|
||||
return
|
||||
if args.auth_url:
|
||||
get_auth_url(args.services, output_format=args.format)
|
||||
return
|
||||
if args.auth_code:
|
||||
exchange_auth_code(args.auth_code, output_format=args.format)
|
||||
return
|
||||
if args.revoke:
|
||||
revoke()
|
||||
elif args.install_deps:
|
||||
return
|
||||
if args.install_deps:
|
||||
sys.exit(0 if install_deps() else 1)
|
||||
|
||||
|
||||
|
||||
203
tests/skills/test_google_api_cli.py
Normal file
203
tests/skills/test_google_api_cli.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Tests for the Google Workspace skill CLI wrapper.
|
||||
|
||||
These focus on the hybrid backend: prefer the Google Workspace CLI (`gws`) when
|
||||
available, while preserving the existing Hermes-facing JSON contract.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
from argparse import Namespace
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
SCRIPT_PATH = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "skills/productivity/google-workspace/scripts/google_api.py"
|
||||
)
|
||||
|
||||
|
||||
def _load_module():
|
||||
spec = importlib.util.spec_from_file_location("google_workspace_api_test", SCRIPT_PATH)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _completed(stdout: str = "", stderr: str = "", returncode: int = 0):
|
||||
return SimpleNamespace(stdout=stdout, stderr=stderr, returncode=returncode)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def google_api_module(tmp_path, monkeypatch):
|
||||
module = _load_module()
|
||||
token_path = tmp_path / "google_token.json"
|
||||
token_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"token": "access-token",
|
||||
"refresh_token": "refresh-token",
|
||||
"token_uri": "https://oauth2.googleapis.com/token",
|
||||
"client_id": "client-id",
|
||||
"client_secret": "client-secret",
|
||||
}
|
||||
)
|
||||
)
|
||||
monkeypatch.setattr(module, "TOKEN_PATH", token_path)
|
||||
monkeypatch.setattr(module, "_gws_binary", lambda: "/usr/bin/gws", raising=False)
|
||||
monkeypatch.setattr(module, "get_credentials", lambda: SimpleNamespace(token="access-token"))
|
||||
monkeypatch.setattr(
|
||||
module,
|
||||
"build_service",
|
||||
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("legacy backend should not be used")),
|
||||
)
|
||||
return module
|
||||
|
||||
|
||||
def test_gmail_search_uses_gws_and_normalizes_results(google_api_module, monkeypatch, capsys):
|
||||
calls = []
|
||||
|
||||
def fake_run(cmd, capture_output, text, env):
|
||||
calls.append({"cmd": cmd, "env": env})
|
||||
if cmd[1:4] == ["gmail", "users", "messages"] and cmd[4] == "list":
|
||||
assert json.loads(cmd[6]) == {"userId": "me", "q": "is:unread", "maxResults": 5}
|
||||
return _completed(
|
||||
json.dumps({"messages": [{"id": "msg-1", "threadId": "thread-1"}]})
|
||||
)
|
||||
if cmd[1:4] == ["gmail", "users", "messages"] and cmd[4] == "get":
|
||||
params = json.loads(cmd[6])
|
||||
assert params["id"] == "msg-1"
|
||||
return _completed(
|
||||
json.dumps(
|
||||
{
|
||||
"id": "msg-1",
|
||||
"threadId": "thread-1",
|
||||
"payload": {
|
||||
"headers": [
|
||||
{"name": "From", "value": "alice@example.com"},
|
||||
{"name": "To", "value": "bob@example.com"},
|
||||
{"name": "Subject", "value": "Hello"},
|
||||
{"name": "Date", "value": "Sat, 15 Mar 2026 10:00:00 +0000"},
|
||||
]
|
||||
},
|
||||
"snippet": "preview",
|
||||
"labelIds": ["UNREAD"],
|
||||
}
|
||||
)
|
||||
)
|
||||
raise AssertionError(f"Unexpected command: {cmd}")
|
||||
|
||||
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
|
||||
|
||||
google_api_module.gmail_search(Namespace(query="is:unread", max=5))
|
||||
|
||||
out = json.loads(capsys.readouterr().out)
|
||||
assert out == [
|
||||
{
|
||||
"id": "msg-1",
|
||||
"threadId": "thread-1",
|
||||
"from": "alice@example.com",
|
||||
"to": "bob@example.com",
|
||||
"subject": "Hello",
|
||||
"date": "Sat, 15 Mar 2026 10:00:00 +0000",
|
||||
"snippet": "preview",
|
||||
"labels": ["UNREAD"],
|
||||
}
|
||||
]
|
||||
assert calls[0]["env"]["GOOGLE_WORKSPACE_CLI_TOKEN"] == "access-token"
|
||||
|
||||
|
||||
def test_calendar_create_uses_gws_insert_and_normalizes_result(google_api_module, monkeypatch, capsys):
|
||||
def fake_run(cmd, capture_output, text, env):
|
||||
assert cmd[:5] == ["/usr/bin/gws", "calendar", "events", "insert", "--params"]
|
||||
assert json.loads(cmd[5]) == {"calendarId": "primary"}
|
||||
body = json.loads(cmd[7])
|
||||
assert body == {
|
||||
"summary": "Standup",
|
||||
"start": {"dateTime": "2026-03-15T09:00:00Z"},
|
||||
"end": {"dateTime": "2026-03-15T09:30:00Z"},
|
||||
"location": "Room 1",
|
||||
"description": "Daily sync",
|
||||
"attendees": [{"email": "alice@example.com"}, {"email": "bob@example.com"}],
|
||||
}
|
||||
return _completed(json.dumps({"id": "evt-1", "summary": "Standup", "htmlLink": "https://calendar/event"}))
|
||||
|
||||
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
|
||||
|
||||
google_api_module.calendar_create(
|
||||
Namespace(
|
||||
summary="Standup",
|
||||
start="2026-03-15T09:00:00Z",
|
||||
end="2026-03-15T09:30:00Z",
|
||||
location="Room 1",
|
||||
description="Daily sync",
|
||||
attendees="alice@example.com,bob@example.com",
|
||||
calendar="primary",
|
||||
)
|
||||
)
|
||||
|
||||
assert json.loads(capsys.readouterr().out) == {
|
||||
"status": "created",
|
||||
"id": "evt-1",
|
||||
"summary": "Standup",
|
||||
"htmlLink": "https://calendar/event",
|
||||
}
|
||||
|
||||
|
||||
def test_sheets_append_uses_gws_and_returns_updated_cells(google_api_module, monkeypatch, capsys):
|
||||
def fake_run(cmd, capture_output, text, env):
|
||||
assert cmd[:6] == ["/usr/bin/gws", "sheets", "spreadsheets", "values", "append", "--params"]
|
||||
assert json.loads(cmd[6]) == {
|
||||
"spreadsheetId": "sheet-123",
|
||||
"range": "Sheet1!A:C",
|
||||
"valueInputOption": "USER_ENTERED",
|
||||
"insertDataOption": "INSERT_ROWS",
|
||||
}
|
||||
assert json.loads(cmd[8]) == {"values": [["a", "b", "c"]]}
|
||||
return _completed(json.dumps({"updates": {"updatedCells": 3}}))
|
||||
|
||||
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
|
||||
|
||||
google_api_module.sheets_append(
|
||||
Namespace(sheet_id="sheet-123", range="Sheet1!A:C", values='[["a", "b", "c"]]')
|
||||
)
|
||||
|
||||
assert json.loads(capsys.readouterr().out) == {"updatedCells": 3}
|
||||
|
||||
|
||||
def test_docs_get_uses_gws_and_extracts_plain_text(google_api_module, monkeypatch, capsys):
|
||||
def fake_run(cmd, capture_output, text, env):
|
||||
assert cmd[:6] == ["/usr/bin/gws", "docs", "documents", "get", "--params", '{"documentId": "doc-123"}']
|
||||
return _completed(
|
||||
json.dumps(
|
||||
{
|
||||
"title": "Doc Title",
|
||||
"documentId": "doc-123",
|
||||
"body": {
|
||||
"content": [
|
||||
{
|
||||
"paragraph": {
|
||||
"elements": [
|
||||
{"textRun": {"content": "Hello "}},
|
||||
{"textRun": {"content": "world"}},
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
monkeypatch.setattr(google_api_module.subprocess, "run", fake_run)
|
||||
|
||||
google_api_module.docs_get(Namespace(doc_id="doc-123"))
|
||||
|
||||
assert json.loads(capsys.readouterr().out) == {
|
||||
"title": "Doc Title",
|
||||
"documentId": "doc-123",
|
||||
"body": "Hello world",
|
||||
}
|
||||
@@ -4,6 +4,8 @@ These tests cover the headless/manual auth-code flow where the browser step and
|
||||
code exchange happen in separate process invocations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
@@ -110,6 +112,7 @@ def setup_module(monkeypatch, tmp_path):
|
||||
monkeypatch.setattr(module, "CLIENT_SECRET_PATH", tmp_path / "google_client_secret.json")
|
||||
monkeypatch.setattr(module, "TOKEN_PATH", tmp_path / "google_token.json")
|
||||
monkeypatch.setattr(module, "PENDING_AUTH_PATH", tmp_path / "google_oauth_pending.json", raising=False)
|
||||
monkeypatch.setattr(module, "LAST_AUTH_URL_PATH", tmp_path / "google_oauth_last_url.txt", raising=False)
|
||||
|
||||
client_secret = {
|
||||
"installed": {
|
||||
@@ -123,16 +126,38 @@ def setup_module(monkeypatch, tmp_path):
|
||||
return module
|
||||
|
||||
|
||||
class TestGetAuthUrl:
|
||||
def test_persists_state_and_code_verifier_for_later_exchange(self, setup_module, capsys):
|
||||
setup_module.get_auth_url()
|
||||
class TestResolveServices:
|
||||
def test_reduces_to_requested_services(self, setup_module):
|
||||
services, scopes = setup_module._resolve_services("email,calendar")
|
||||
assert services == ["email", "calendar"]
|
||||
assert scopes == [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.send",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
"https://www.googleapis.com/auth/calendar",
|
||||
]
|
||||
|
||||
out = capsys.readouterr().out.strip()
|
||||
assert out == "https://auth.example/authorize?state=generated-state"
|
||||
|
||||
class TestGetAuthUrl:
|
||||
def test_persists_state_verifier_scopes_and_last_url(self, setup_module, capsys):
|
||||
setup_module.get_auth_url("email,calendar", output_format="json")
|
||||
|
||||
out = json.loads(capsys.readouterr().out)
|
||||
assert out["success"] is True
|
||||
assert out["auth_url"] == "https://auth.example/authorize?state=generated-state"
|
||||
assert out["services"] == ["email", "calendar"]
|
||||
assert Path(out["auth_url_file"]).read_text() == out["auth_url"]
|
||||
|
||||
saved = json.loads(setup_module.PENDING_AUTH_PATH.read_text())
|
||||
assert saved["state"] == "generated-state"
|
||||
assert saved["code_verifier"] == "generated-code-verifier"
|
||||
assert saved["services"] == ["email", "calendar"]
|
||||
assert saved["scopes"] == [
|
||||
"https://www.googleapis.com/auth/gmail.readonly",
|
||||
"https://www.googleapis.com/auth/gmail.send",
|
||||
"https://www.googleapis.com/auth/gmail.modify",
|
||||
"https://www.googleapis.com/auth/calendar",
|
||||
]
|
||||
|
||||
flow = FakeFlow.created[-1]
|
||||
assert flow.autogenerate_code_verifier is True
|
||||
@@ -142,7 +167,14 @@ class TestGetAuthUrl:
|
||||
class TestExchangeAuthCode:
|
||||
def test_reuses_saved_pkce_material_for_plain_code(self, setup_module):
|
||||
setup_module.PENDING_AUTH_PATH.write_text(
|
||||
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
||||
json.dumps(
|
||||
{
|
||||
"state": "saved-state",
|
||||
"code_verifier": "saved-verifier",
|
||||
"services": ["email", "calendar"],
|
||||
"scopes": ["scope-a", "scope-b"],
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
setup_module.exchange_auth_code("4/test-auth-code")
|
||||
@@ -150,13 +182,21 @@ class TestExchangeAuthCode:
|
||||
flow = FakeFlow.created[-1]
|
||||
assert flow.state == "saved-state"
|
||||
assert flow.code_verifier == "saved-verifier"
|
||||
assert flow.scopes == ["scope-a", "scope-b"]
|
||||
assert flow.fetch_token_calls == [{"code": "4/test-auth-code"}]
|
||||
assert json.loads(setup_module.TOKEN_PATH.read_text())["token"] == "access-token"
|
||||
assert not setup_module.PENDING_AUTH_PATH.exists()
|
||||
|
||||
def test_extracts_code_from_redirect_url_and_checks_state(self, setup_module):
|
||||
setup_module.PENDING_AUTH_PATH.write_text(
|
||||
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
||||
json.dumps(
|
||||
{
|
||||
"state": "saved-state",
|
||||
"code_verifier": "saved-verifier",
|
||||
"services": ["email"],
|
||||
"scopes": ["scope-a"],
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
setup_module.exchange_auth_code(
|
||||
@@ -166,19 +206,34 @@ class TestExchangeAuthCode:
|
||||
flow = FakeFlow.created[-1]
|
||||
assert flow.fetch_token_calls == [{"code": "4/extracted-code"}]
|
||||
|
||||
def test_rejects_state_mismatch(self, setup_module, capsys):
|
||||
def test_state_mismatch_regenerates_fresh_url(self, setup_module, capsys):
|
||||
setup_module.PENDING_AUTH_PATH.write_text(
|
||||
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
||||
json.dumps(
|
||||
{
|
||||
"state": "saved-state",
|
||||
"code_verifier": "saved-verifier",
|
||||
"services": ["email", "calendar"],
|
||||
"scopes": ["scope-a", "scope-b"],
|
||||
}
|
||||
)
|
||||
)
|
||||
FakeFlow.default_state = "replacement-state"
|
||||
FakeFlow.default_verifier = "replacement-verifier"
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
setup_module.exchange_auth_code(
|
||||
"http://localhost:1/?code=4/extracted-code&state=wrong-state"
|
||||
"http://localhost:1/?code=4/extracted-code&state=wrong-state",
|
||||
output_format="json",
|
||||
)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "state mismatch" in out.lower()
|
||||
assert not setup_module.TOKEN_PATH.exists()
|
||||
out = json.loads(capsys.readouterr().out)
|
||||
assert out["success"] is False
|
||||
assert out["fresh_auth_url"] == "https://auth.example/authorize?state=replacement-state"
|
||||
|
||||
saved = json.loads(setup_module.PENDING_AUTH_PATH.read_text())
|
||||
assert saved["state"] == "replacement-state"
|
||||
assert saved["code_verifier"] == "replacement-verifier"
|
||||
assert saved["services"] == ["email", "calendar"]
|
||||
|
||||
def test_requires_pending_auth_session(self, setup_module, capsys):
|
||||
with pytest.raises(SystemExit):
|
||||
@@ -188,16 +243,42 @@ class TestExchangeAuthCode:
|
||||
assert "run --auth-url first" in out.lower()
|
||||
assert not setup_module.TOKEN_PATH.exists()
|
||||
|
||||
def test_keeps_pending_auth_session_when_exchange_fails(self, setup_module, capsys):
|
||||
def test_failed_exchange_regenerates_fresh_url(self, setup_module, capsys):
|
||||
setup_module.PENDING_AUTH_PATH.write_text(
|
||||
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
||||
json.dumps(
|
||||
{
|
||||
"state": "saved-state",
|
||||
"code_verifier": "saved-verifier",
|
||||
"services": ["email"],
|
||||
"scopes": ["scope-a"],
|
||||
}
|
||||
)
|
||||
)
|
||||
FakeFlow.default_state = "replacement-state"
|
||||
FakeFlow.default_verifier = "replacement-verifier"
|
||||
FakeFlow.fetch_error = Exception("invalid_grant: Missing code verifier")
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
setup_module.exchange_auth_code("4/test-auth-code")
|
||||
setup_module.exchange_auth_code("4/test-auth-code", output_format="json")
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "token exchange failed" in out.lower()
|
||||
out = json.loads(capsys.readouterr().out)
|
||||
assert out["success"] is False
|
||||
assert out["fresh_auth_url"] == "https://auth.example/authorize?state=replacement-state"
|
||||
assert setup_module.PENDING_AUTH_PATH.exists()
|
||||
assert not setup_module.TOKEN_PATH.exists()
|
||||
|
||||
def test_check_auth_rejects_missing_requested_scopes(self, setup_module, capsys):
|
||||
setup_module.TOKEN_PATH.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"token": "access-token",
|
||||
"refresh_token": "refresh-token",
|
||||
"scopes": ["https://www.googleapis.com/auth/gmail.readonly"],
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
ok = setup_module.check_auth("email,calendar")
|
||||
out = capsys.readouterr().out
|
||||
assert ok is False
|
||||
assert "missing scopes" in out.lower()
|
||||
|
||||
@@ -204,7 +204,7 @@ Skills for document creation, presentations, spreadsheets, and other productivit
|
||||
|
||||
| Skill | Description | Path |
|
||||
|-------|-------------|------|
|
||||
| `google-workspace` | Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via Python. Uses OAuth2 with automatic token refresh. No external binaries needed — runs entirely with Google's Python client libraries in the Hermes venv. | `productivity/google-workspace` |
|
||||
| `google-workspace` | Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise. | `productivity/google-workspace` |
|
||||
| `nano-pdf` | Edit PDFs with natural-language instructions using the nano-pdf CLI. Modify text, fix typos, update titles, and make content changes to specific pages without manual editing. | `productivity/nano-pdf` |
|
||||
| `notion` | Notion API for creating and managing pages, databases, and blocks via curl. Search, create, update, and query Notion workspaces directly from the terminal. | `productivity/notion` |
|
||||
| `ocr-and-documents` | Extract text from PDFs and scanned documents. Use web_extract for remote URLs, pymupdf for local text-based PDFs, marker-pdf for OCR/scanned docs. For DOCX use python-docx, for PPTX see the powerpoint skill. | `productivity/ocr-and-documents` |
|
||||
|
||||
Reference in New Issue
Block a user