mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-01 00:11:39 +08:00
Compare commits
4 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a1c67bc5b | ||
|
|
55960330fc | ||
|
|
9e6029bc73 | ||
|
|
d5bdf6a4ac |
@@ -1,7 +1,7 @@
|
|||||||
---
|
---
|
||||||
name: google-workspace
|
name: google-workspace
|
||||||
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via Python. Uses OAuth2 with automatic token refresh. No external binaries needed — runs entirely with Google's Python client libraries in the Hermes venv.
|
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via gws CLI (googleworkspace/cli). Uses OAuth2 with automatic token refresh via bridge script. Requires gws binary.
|
||||||
version: 1.0.0
|
version: 2.0.0
|
||||||
author: Nous Research
|
author: Nous Research
|
||||||
license: MIT
|
license: MIT
|
||||||
required_credential_files:
|
required_credential_files:
|
||||||
@@ -11,14 +11,25 @@ required_credential_files:
|
|||||||
description: Google OAuth2 client credentials (downloaded from Google Cloud Console)
|
description: Google OAuth2 client credentials (downloaded from Google Cloud Console)
|
||||||
metadata:
|
metadata:
|
||||||
hermes:
|
hermes:
|
||||||
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth]
|
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth, gws]
|
||||||
homepage: https://github.com/NousResearch/hermes-agent
|
homepage: https://github.com/NousResearch/hermes-agent
|
||||||
related_skills: [himalaya]
|
related_skills: [himalaya]
|
||||||
---
|
---
|
||||||
|
|
||||||
# Google Workspace
|
# Google Workspace
|
||||||
|
|
||||||
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python scripts in this skill. No external binaries to install.
|
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — powered by `gws` (Google's official Rust CLI). The skill provides a backward-compatible Python wrapper that handles OAuth token refresh and delegates to `gws`.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
google_api.py → gws_bridge.py → gws CLI
|
||||||
|
(argparse compat) (token refresh) (Google APIs)
|
||||||
|
```
|
||||||
|
|
||||||
|
- `setup.py` handles OAuth2 (headless-compatible, works on CLI/Telegram/Discord)
|
||||||
|
- `gws_bridge.py` refreshes the Hermes token and injects it into `gws` via `GOOGLE_WORKSPACE_CLI_TOKEN`
|
||||||
|
- `google_api.py` provides the same CLI interface as v1 but delegates to `gws`
|
||||||
|
|
||||||
## References
|
## References
|
||||||
|
|
||||||
@@ -27,7 +38,22 @@ Gmail, Calendar, Drive, Contacts, Sheets, and Docs — all through Python script
|
|||||||
## Scripts
|
## Scripts
|
||||||
|
|
||||||
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
|
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
|
||||||
- `scripts/google_api.py` — API wrapper CLI (agent uses this for all operations)
|
- `scripts/gws_bridge.py` — Token refresh bridge to gws CLI
|
||||||
|
- `scripts/google_api.py` — Backward-compatible API wrapper (delegates to gws)
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
Install `gws`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cargo install google-workspace-cli
|
||||||
|
# or via npm (recommended, downloads prebuilt binary):
|
||||||
|
npm install -g @googleworkspace/cli
|
||||||
|
# or via Homebrew:
|
||||||
|
brew install googleworkspace-cli
|
||||||
|
```
|
||||||
|
|
||||||
|
Verify: `gws --version`
|
||||||
|
|
||||||
## First-Time Setup
|
## First-Time Setup
|
||||||
|
|
||||||
@@ -56,42 +82,29 @@ If it prints `AUTHENTICATED`, skip to Usage — setup is already done.
|
|||||||
|
|
||||||
### Step 1: Triage — ask the user what they need
|
### Step 1: Triage — ask the user what they need
|
||||||
|
|
||||||
Before starting OAuth setup, ask the user TWO questions:
|
|
||||||
|
|
||||||
**Question 1: "What Google services do you need? Just email, or also
|
**Question 1: "What Google services do you need? Just email, or also
|
||||||
Calendar/Drive/Sheets/Docs?"**
|
Calendar/Drive/Sheets/Docs?"**
|
||||||
|
|
||||||
- **Email only** → They don't need this skill at all. Use the `himalaya` skill
|
- **Email only** → Use the `himalaya` skill instead — simpler setup.
|
||||||
instead — it works with a Gmail App Password (Settings → Security → App
|
- **Calendar, Drive, Sheets, Docs (or email + these)** → Continue below.
|
||||||
Passwords) and takes 2 minutes to set up. No Google Cloud project needed.
|
|
||||||
Load the himalaya skill and follow its setup instructions.
|
|
||||||
|
|
||||||
- **Calendar, Drive, Sheets, Docs (or email + these)** → Continue with this
|
**Partial scopes**: Users can authorize only a subset of services. The setup
|
||||||
skill's OAuth setup below.
|
script accepts partial scopes and warns about missing ones.
|
||||||
|
|
||||||
**Question 2: "Does your Google account use Advanced Protection (hardware
|
**Question 2: "Does your Google account use Advanced Protection?"**
|
||||||
security keys required to sign in)? If you're not sure, you probably don't
|
|
||||||
— it's something you would have explicitly enrolled in."**
|
|
||||||
|
|
||||||
- **No / Not sure** → Normal setup. Continue below.
|
- **No / Not sure** → Normal setup.
|
||||||
- **Yes** → Their Workspace admin must add the OAuth client ID to the org's
|
- **Yes** → Workspace admin must add the OAuth client ID to allowed apps first.
|
||||||
allowed apps list before Step 4 will work. Let them know upfront.
|
|
||||||
|
|
||||||
### Step 2: Create OAuth credentials (one-time, ~5 minutes)
|
### Step 2: Create OAuth credentials (one-time, ~5 minutes)
|
||||||
|
|
||||||
Tell the user:
|
Tell the user:
|
||||||
|
|
||||||
> You need a Google Cloud OAuth client. This is a one-time setup:
|
|
||||||
>
|
|
||||||
> 1. Go to https://console.cloud.google.com/apis/credentials
|
> 1. Go to https://console.cloud.google.com/apis/credentials
|
||||||
> 2. Create a project (or use an existing one)
|
> 2. Create a project (or use an existing one)
|
||||||
> 3. Click "Enable APIs" and enable: Gmail API, Google Calendar API,
|
> 3. Enable the APIs you need (Gmail, Calendar, Drive, Sheets, Docs, People)
|
||||||
> Google Drive API, Google Sheets API, Google Docs API, People API
|
> 4. Credentials → Create Credentials → OAuth 2.0 Client ID → Desktop app
|
||||||
> 4. Go to Credentials → Create Credentials → OAuth 2.0 Client ID
|
> 5. Download JSON and tell me the file path
|
||||||
> 5. Application type: "Desktop app" → Create
|
|
||||||
> 6. Click "Download JSON" and tell me the file path
|
|
||||||
|
|
||||||
Once they provide the path:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$GSETUP --client-secret /path/to/client_secret.json
|
$GSETUP --client-secret /path/to/client_secret.json
|
||||||
@@ -103,20 +116,10 @@ $GSETUP --client-secret /path/to/client_secret.json
|
|||||||
$GSETUP --auth-url
|
$GSETUP --auth-url
|
||||||
```
|
```
|
||||||
|
|
||||||
This prints a URL. **Send the URL to the user** and tell them:
|
Send the URL to the user. After authorizing, they paste back the redirect URL or code.
|
||||||
|
|
||||||
> Open this link in your browser, sign in with your Google account, and
|
|
||||||
> authorize access. After authorizing, you'll be redirected to a page that
|
|
||||||
> may show an error — that's expected. Copy the ENTIRE URL from your
|
|
||||||
> browser's address bar and paste it back to me.
|
|
||||||
|
|
||||||
### Step 4: Exchange the code
|
### Step 4: Exchange the code
|
||||||
|
|
||||||
The user will paste back either a URL like `http://localhost:1/?code=4/0A...&scope=...`
|
|
||||||
or just the code string. Either works. The `--auth-url` step stores a temporary
|
|
||||||
pending OAuth session locally so `--auth-code` can complete the PKCE exchange
|
|
||||||
later, even on headless systems:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
|
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
|
||||||
```
|
```
|
||||||
@@ -127,18 +130,11 @@ $GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
|
|||||||
$GSETUP --check
|
$GSETUP --check
|
||||||
```
|
```
|
||||||
|
|
||||||
Should print `AUTHENTICATED`. Setup is complete — token refreshes automatically from now on.
|
Should print `AUTHENTICATED`. Token refreshes automatically from now on.
|
||||||
|
|
||||||
### Notes
|
|
||||||
|
|
||||||
- Token is stored at `google_token.json` under the active profile's `HERMES_HOME` and auto-refreshes.
|
|
||||||
- Pending OAuth session state/verifier are stored temporarily at `google_oauth_pending.json` under the active profile's `HERMES_HOME` until exchange completes.
|
|
||||||
- Hermes now refuses to overwrite a full Google Workspace token with a narrower re-auth token missing Gmail scopes, so one profile's partial consent cannot silently break email actions later.
|
|
||||||
- To revoke: `$GSETUP --revoke`
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
All commands go through the API script. Set `GAPI` as a shorthand:
|
All commands go through the API script:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}"
|
HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}"
|
||||||
@@ -153,40 +149,21 @@ GAPI="$PYTHON_BIN $GWORKSPACE_SKILL_DIR/scripts/google_api.py"
|
|||||||
### Gmail
|
### Gmail
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Search (returns JSON array with id, from, subject, date, snippet)
|
|
||||||
$GAPI gmail search "is:unread" --max 10
|
$GAPI gmail search "is:unread" --max 10
|
||||||
$GAPI gmail search "from:boss@company.com newer_than:1d"
|
|
||||||
$GAPI gmail search "has:attachment filename:pdf newer_than:7d"
|
|
||||||
|
|
||||||
# Read full message (returns JSON with body text)
|
|
||||||
$GAPI gmail get MESSAGE_ID
|
$GAPI gmail get MESSAGE_ID
|
||||||
|
|
||||||
# Send
|
|
||||||
$GAPI gmail send --to user@example.com --subject "Hello" --body "Message text"
|
$GAPI gmail send --to user@example.com --subject "Hello" --body "Message text"
|
||||||
$GAPI gmail send --to user@example.com --subject "Report" --body "<h1>Q4</h1><p>Details...</p>" --html
|
$GAPI gmail send --to user@example.com --subject "Report" --body "<h1>Q4</h1>" --html
|
||||||
|
|
||||||
# Reply (automatically threads and sets In-Reply-To)
|
|
||||||
$GAPI gmail reply MESSAGE_ID --body "Thanks, that works for me."
|
$GAPI gmail reply MESSAGE_ID --body "Thanks, that works for me."
|
||||||
|
|
||||||
# Labels
|
|
||||||
$GAPI gmail labels
|
$GAPI gmail labels
|
||||||
$GAPI gmail modify MESSAGE_ID --add-labels LABEL_ID
|
$GAPI gmail modify MESSAGE_ID --add-labels LABEL_ID
|
||||||
$GAPI gmail modify MESSAGE_ID --remove-labels UNREAD
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Calendar
|
### Calendar
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# List events (defaults to next 7 days)
|
|
||||||
$GAPI calendar list
|
$GAPI calendar list
|
||||||
$GAPI calendar list --start 2026-03-01T00:00:00Z --end 2026-03-07T23:59:59Z
|
$GAPI calendar create --summary "Standup" --start 2026-03-01T10:00:00+01:00 --end 2026-03-01T10:30:00+01:00
|
||||||
|
$GAPI calendar create --summary "Review" --start ... --end ... --attendees "alice@co.com,bob@co.com"
|
||||||
# Create event (ISO 8601 with timezone required)
|
|
||||||
$GAPI calendar create --summary "Team Standup" --start 2026-03-01T10:00:00-06:00 --end 2026-03-01T10:30:00-06:00
|
|
||||||
$GAPI calendar create --summary "Lunch" --start 2026-03-01T12:00:00Z --end 2026-03-01T13:00:00Z --location "Cafe"
|
|
||||||
$GAPI calendar create --summary "Review" --start 2026-03-01T14:00:00Z --end 2026-03-01T15:00:00Z --attendees "alice@co.com,bob@co.com"
|
|
||||||
|
|
||||||
# Delete event
|
|
||||||
$GAPI calendar delete EVENT_ID
|
$GAPI calendar delete EVENT_ID
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -206,13 +183,8 @@ $GAPI contacts list --max 20
|
|||||||
### Sheets
|
### Sheets
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Read
|
|
||||||
$GAPI sheets get SHEET_ID "Sheet1!A1:D10"
|
$GAPI sheets get SHEET_ID "Sheet1!A1:D10"
|
||||||
|
|
||||||
# Write
|
|
||||||
$GAPI sheets update SHEET_ID "Sheet1!A1:B2" --values '[["Name","Score"],["Alice","95"]]'
|
$GAPI sheets update SHEET_ID "Sheet1!A1:B2" --values '[["Name","Score"],["Alice","95"]]'
|
||||||
|
|
||||||
# Append rows
|
|
||||||
$GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
|
$GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -222,37 +194,52 @@ $GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
|
|||||||
$GAPI docs get DOC_ID
|
$GAPI docs get DOC_ID
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Direct gws access (advanced)
|
||||||
|
|
||||||
|
For operations not covered by the wrapper, use `gws_bridge.py` directly:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
GBRIDGE="$PYTHON_BIN $GWORKSPACE_SKILL_DIR/scripts/gws_bridge.py"
|
||||||
|
$GBRIDGE calendar +agenda --today --format table
|
||||||
|
$GBRIDGE gmail +triage --labels --format json
|
||||||
|
$GBRIDGE drive +upload ./report.pdf
|
||||||
|
$GBRIDGE sheets +read --spreadsheet SHEET_ID --range "Sheet1!A1:D10"
|
||||||
|
```
|
||||||
|
|
||||||
## Output Format
|
## Output Format
|
||||||
|
|
||||||
All commands return JSON. Parse with `jq` or read directly. Key fields:
|
All commands return JSON via `gws --format json`. Key output shapes:
|
||||||
|
|
||||||
- **Gmail search**: `[{id, threadId, from, to, subject, date, snippet, labels}]`
|
- **Gmail search/triage**: Array of message summaries (sender, subject, date, snippet)
|
||||||
- **Gmail get**: `{id, threadId, from, to, subject, date, labels, body}`
|
- **Gmail get/read**: Message object with headers and body text
|
||||||
- **Gmail send/reply**: `{status: "sent", id, threadId}`
|
- **Gmail send/reply**: Confirmation with message ID
|
||||||
- **Calendar list**: `[{id, summary, start, end, location, description, htmlLink}]`
|
- **Calendar list/agenda**: Array of event objects (summary, start, end, location)
|
||||||
- **Calendar create**: `{status: "created", id, summary, htmlLink}`
|
- **Calendar create**: Confirmation with event ID and htmlLink
|
||||||
- **Drive search**: `[{id, name, mimeType, modifiedTime, webViewLink}]`
|
- **Drive search**: Array of file objects (id, name, mimeType, webViewLink)
|
||||||
- **Contacts list**: `[{name, emails: [...], phones: [...]}]`
|
- **Sheets get/read**: 2D array of cell values
|
||||||
- **Sheets get**: `[[cell, cell, ...], ...]`
|
- **Docs get**: Full document JSON (use `body.content` for text extraction)
|
||||||
|
- **Contacts list**: Array of person objects with names, emails, phones
|
||||||
|
|
||||||
|
Parse output with `jq` or read JSON directly.
|
||||||
|
|
||||||
## Rules
|
## Rules
|
||||||
|
|
||||||
1. **Never send email or create/delete events without confirming with the user first.** Show the draft content and ask for approval.
|
1. **Never send email or create/delete events without confirming with the user first.**
|
||||||
2. **Check auth before first use** — run `setup.py --check`. If it fails, guide the user through setup.
|
2. **Check auth before first use** — run `setup.py --check`.
|
||||||
3. **Use the Gmail search syntax reference** for complex queries — load it with `skill_view("google-workspace", file_path="references/gmail-search-syntax.md")`.
|
3. **Use the Gmail search syntax reference** for complex queries.
|
||||||
4. **Calendar times must include timezone** — always use ISO 8601 with offset (e.g., `2026-03-01T10:00:00-06:00`) or UTC (`Z`).
|
4. **Calendar times must include timezone** — ISO 8601 with offset or UTC.
|
||||||
5. **Respect rate limits** — avoid rapid-fire sequential API calls. Batch reads when possible.
|
5. **Respect rate limits** — avoid rapid-fire sequential API calls.
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
| Problem | Fix |
|
| Problem | Fix |
|
||||||
|---------|-----|
|
|---------|-----|
|
||||||
| `NOT_AUTHENTICATED` | Run setup Steps 2-5 above |
|
| `NOT_AUTHENTICATED` | Run setup Steps 2-5 |
|
||||||
| `REFRESH_FAILED` | Token revoked or expired — redo Steps 3-5 |
|
| `REFRESH_FAILED` | Token revoked — redo Steps 3-5 |
|
||||||
| `HttpError 403: Insufficient Permission` | Missing API scope — `$GSETUP --revoke` then redo Steps 3-5 |
|
| `gws: command not found` | Install: `npm install -g @googleworkspace/cli` |
|
||||||
| `HttpError 403: Access Not Configured` | API not enabled — user needs to enable it in Google Cloud Console |
|
| `HttpError 403` | Missing scope — `$GSETUP --revoke` then redo Steps 3-5 |
|
||||||
| `ModuleNotFoundError` | Run `$GSETUP --install-deps` |
|
| `HttpError 403: Access Not Configured` | Enable API in Google Cloud Console |
|
||||||
| Advanced Protection blocks auth | Workspace admin must allowlist the OAuth client ID |
|
| Advanced Protection blocks auth | Admin must allowlist the OAuth client ID |
|
||||||
|
|
||||||
## Revoking Access
|
## Revoking Access
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,17 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Google Workspace API CLI for Hermes Agent.
|
"""Google Workspace API CLI for Hermes Agent.
|
||||||
|
|
||||||
A thin CLI wrapper around Google's Python client libraries.
|
Thin wrapper that delegates to gws (googleworkspace/cli) via gws_bridge.py.
|
||||||
Authenticates using the token stored by setup.py.
|
Maintains the same CLI interface for backward compatibility with Hermes skills.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
python google_api.py gmail search "is:unread" [--max 10]
|
python google_api.py gmail search "is:unread" [--max 10]
|
||||||
python google_api.py gmail get MESSAGE_ID
|
python google_api.py gmail get MESSAGE_ID
|
||||||
python google_api.py gmail send --to user@example.com --subject "Hi" --body "Hello"
|
python google_api.py gmail send --to user@example.com --subject "Hi" --body "Hello"
|
||||||
python google_api.py gmail reply MESSAGE_ID --body "Thanks"
|
python google_api.py gmail reply MESSAGE_ID --body "Thanks"
|
||||||
python google_api.py calendar list [--from DATE] [--to DATE] [--calendar primary]
|
python google_api.py calendar list [--start DATE] [--end DATE] [--calendar primary]
|
||||||
python google_api.py calendar create --summary "Meeting" --start DATETIME --end DATETIME
|
python google_api.py calendar create --summary "Meeting" --start DATETIME --end DATETIME
|
||||||
|
python google_api.py calendar delete EVENT_ID
|
||||||
python google_api.py drive search "budget report" [--max 10]
|
python google_api.py drive search "budget report" [--max 10]
|
||||||
python google_api.py contacts list [--max 20]
|
python google_api.py contacts list [--max 20]
|
||||||
python google_api.py sheets get SHEET_ID RANGE
|
python google_api.py sheets get SHEET_ID RANGE
|
||||||
@@ -20,386 +21,193 @@ Usage:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import base64
|
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime, timedelta, timezone
|
|
||||||
from email.mime.text import MIMEText
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
try:
|
BRIDGE = Path(__file__).parent / "gws_bridge.py"
|
||||||
from hermes_constants import display_hermes_home, get_hermes_home
|
PYTHON = sys.executable
|
||||||
except ModuleNotFoundError:
|
|
||||||
HERMES_AGENT_ROOT = Path(__file__).resolve().parents[4]
|
|
||||||
if HERMES_AGENT_ROOT.exists():
|
|
||||||
sys.path.insert(0, str(HERMES_AGENT_ROOT))
|
|
||||||
from hermes_constants import display_hermes_home, get_hermes_home
|
|
||||||
|
|
||||||
HERMES_HOME = get_hermes_home()
|
|
||||||
TOKEN_PATH = HERMES_HOME / "google_token.json"
|
|
||||||
|
|
||||||
SCOPES = [
|
|
||||||
"https://www.googleapis.com/auth/gmail.readonly",
|
|
||||||
"https://www.googleapis.com/auth/gmail.send",
|
|
||||||
"https://www.googleapis.com/auth/gmail.modify",
|
|
||||||
"https://www.googleapis.com/auth/calendar",
|
|
||||||
"https://www.googleapis.com/auth/drive.readonly",
|
|
||||||
"https://www.googleapis.com/auth/contacts.readonly",
|
|
||||||
"https://www.googleapis.com/auth/spreadsheets",
|
|
||||||
"https://www.googleapis.com/auth/documents.readonly",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def _missing_scopes() -> list[str]:
|
def gws(*args: str) -> None:
|
||||||
try:
|
"""Call gws via the bridge and exit with its return code."""
|
||||||
payload = json.loads(TOKEN_PATH.read_text())
|
result = subprocess.run(
|
||||||
except Exception:
|
[PYTHON, str(BRIDGE)] + list(args),
|
||||||
return []
|
env={**os.environ, "HERMES_HOME": os.environ.get("HERMES_HOME", str(Path.home() / ".hermes"))},
|
||||||
raw = payload.get("scopes") or payload.get("scope")
|
)
|
||||||
if not raw:
|
sys.exit(result.returncode)
|
||||||
return []
|
|
||||||
granted = {s.strip() for s in (raw.split() if isinstance(raw, str) else raw) if s.strip()}
|
|
||||||
return sorted(scope for scope in SCOPES if scope not in granted)
|
|
||||||
|
|
||||||
|
|
||||||
def get_credentials():
|
# -- Gmail --
|
||||||
"""Load and refresh credentials from token file."""
|
|
||||||
if not TOKEN_PATH.exists():
|
|
||||||
print("Not authenticated. Run the setup script first:", file=sys.stderr)
|
|
||||||
print(f" python {Path(__file__).parent / 'setup.py'}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
from google.oauth2.credentials import Credentials
|
|
||||||
from google.auth.transport.requests import Request
|
|
||||||
|
|
||||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
|
||||||
if creds.expired and creds.refresh_token:
|
|
||||||
creds.refresh(Request())
|
|
||||||
TOKEN_PATH.write_text(creds.to_json())
|
|
||||||
if not creds.valid:
|
|
||||||
print("Token is invalid. Re-run setup.", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
missing_scopes = _missing_scopes()
|
|
||||||
if missing_scopes:
|
|
||||||
print(
|
|
||||||
"Token is valid but missing Google Workspace scopes required by this skill.",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
for scope in missing_scopes:
|
|
||||||
print(f" - {scope}", file=sys.stderr)
|
|
||||||
print(
|
|
||||||
f"Re-run setup.py from the active Hermes profile ({display_hermes_home()}) to restore full access.",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
sys.exit(1)
|
|
||||||
return creds
|
|
||||||
|
|
||||||
|
|
||||||
def build_service(api, version):
|
|
||||||
from googleapiclient.discovery import build
|
|
||||||
return build(api, version, credentials=get_credentials())
|
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# Gmail
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def gmail_search(args):
|
def gmail_search(args):
|
||||||
service = build_service("gmail", "v1")
|
cmd = ["gmail", "+triage", "--query", args.query, "--max", str(args.max), "--format", "json"]
|
||||||
results = service.users().messages().list(
|
gws(*cmd)
|
||||||
userId="me", q=args.query, maxResults=args.max
|
|
||||||
).execute()
|
|
||||||
messages = results.get("messages", [])
|
|
||||||
if not messages:
|
|
||||||
print("No messages found.")
|
|
||||||
return
|
|
||||||
|
|
||||||
output = []
|
|
||||||
for msg_meta in messages:
|
|
||||||
msg = service.users().messages().get(
|
|
||||||
userId="me", id=msg_meta["id"], format="metadata",
|
|
||||||
metadataHeaders=["From", "To", "Subject", "Date"],
|
|
||||||
).execute()
|
|
||||||
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
|
||||||
output.append({
|
|
||||||
"id": msg["id"],
|
|
||||||
"threadId": msg["threadId"],
|
|
||||||
"from": headers.get("From", ""),
|
|
||||||
"to": headers.get("To", ""),
|
|
||||||
"subject": headers.get("Subject", ""),
|
|
||||||
"date": headers.get("Date", ""),
|
|
||||||
"snippet": msg.get("snippet", ""),
|
|
||||||
"labels": msg.get("labelIds", []),
|
|
||||||
})
|
|
||||||
print(json.dumps(output, indent=2, ensure_ascii=False))
|
|
||||||
|
|
||||||
|
|
||||||
def gmail_get(args):
|
def gmail_get(args):
|
||||||
service = build_service("gmail", "v1")
|
gws("gmail", "+read", "--id", args.message_id, "--headers", "--format", "json")
|
||||||
msg = service.users().messages().get(
|
|
||||||
userId="me", id=args.message_id, format="full"
|
|
||||||
).execute()
|
|
||||||
|
|
||||||
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
|
|
||||||
|
|
||||||
# Extract body text
|
|
||||||
body = ""
|
|
||||||
payload = msg.get("payload", {})
|
|
||||||
if payload.get("body", {}).get("data"):
|
|
||||||
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
|
|
||||||
elif payload.get("parts"):
|
|
||||||
for part in payload["parts"]:
|
|
||||||
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
|
|
||||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
|
||||||
break
|
|
||||||
if not body:
|
|
||||||
for part in payload["parts"]:
|
|
||||||
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
|
|
||||||
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
|
|
||||||
break
|
|
||||||
|
|
||||||
result = {
|
|
||||||
"id": msg["id"],
|
|
||||||
"threadId": msg["threadId"],
|
|
||||||
"from": headers.get("From", ""),
|
|
||||||
"to": headers.get("To", ""),
|
|
||||||
"subject": headers.get("Subject", ""),
|
|
||||||
"date": headers.get("Date", ""),
|
|
||||||
"labels": msg.get("labelIds", []),
|
|
||||||
"body": body,
|
|
||||||
}
|
|
||||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
||||||
|
|
||||||
|
|
||||||
def gmail_send(args):
|
def gmail_send(args):
|
||||||
service = build_service("gmail", "v1")
|
cmd = ["gmail", "+send", "--to", args.to, "--subject", args.subject, "--body", args.body, "--format", "json"]
|
||||||
message = MIMEText(args.body, "html" if args.html else "plain")
|
|
||||||
message["to"] = args.to
|
|
||||||
message["subject"] = args.subject
|
|
||||||
if args.cc:
|
if args.cc:
|
||||||
message["cc"] = args.cc
|
cmd += ["--cc", args.cc]
|
||||||
|
if args.html:
|
||||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
cmd.append("--html")
|
||||||
body = {"raw": raw}
|
gws(*cmd)
|
||||||
|
|
||||||
if args.thread_id:
|
|
||||||
body["threadId"] = args.thread_id
|
|
||||||
|
|
||||||
result = service.users().messages().send(userId="me", body=body).execute()
|
|
||||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
|
||||||
|
|
||||||
|
|
||||||
def gmail_reply(args):
|
def gmail_reply(args):
|
||||||
service = build_service("gmail", "v1")
|
gws("gmail", "+reply", "--message-id", args.message_id, "--body", args.body, "--format", "json")
|
||||||
# Fetch original to get thread ID and headers
|
|
||||||
original = service.users().messages().get(
|
|
||||||
userId="me", id=args.message_id, format="metadata",
|
|
||||||
metadataHeaders=["From", "Subject", "Message-ID"],
|
|
||||||
).execute()
|
|
||||||
headers = {h["name"]: h["value"] for h in original.get("payload", {}).get("headers", [])}
|
|
||||||
|
|
||||||
subject = headers.get("Subject", "")
|
|
||||||
if not subject.startswith("Re:"):
|
|
||||||
subject = f"Re: {subject}"
|
|
||||||
|
|
||||||
message = MIMEText(args.body)
|
|
||||||
message["to"] = headers.get("From", "")
|
|
||||||
message["subject"] = subject
|
|
||||||
if headers.get("Message-ID"):
|
|
||||||
message["In-Reply-To"] = headers["Message-ID"]
|
|
||||||
message["References"] = headers["Message-ID"]
|
|
||||||
|
|
||||||
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
|
||||||
body = {"raw": raw, "threadId": original["threadId"]}
|
|
||||||
|
|
||||||
result = service.users().messages().send(userId="me", body=body).execute()
|
|
||||||
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
|
|
||||||
|
|
||||||
|
|
||||||
def gmail_labels(args):
|
def gmail_labels(args):
|
||||||
service = build_service("gmail", "v1")
|
gws("gmail", "users", "labels", "list", "--params", json.dumps({"userId": "me"}), "--format", "json")
|
||||||
results = service.users().labels().list(userId="me").execute()
|
|
||||||
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
|
|
||||||
print(json.dumps(labels, indent=2))
|
|
||||||
|
|
||||||
|
|
||||||
def gmail_modify(args):
|
def gmail_modify(args):
|
||||||
service = build_service("gmail", "v1")
|
|
||||||
body = {}
|
body = {}
|
||||||
if args.add_labels:
|
if args.add_labels:
|
||||||
body["addLabelIds"] = args.add_labels.split(",")
|
body["addLabelIds"] = args.add_labels.split(",")
|
||||||
if args.remove_labels:
|
if args.remove_labels:
|
||||||
body["removeLabelIds"] = args.remove_labels.split(",")
|
body["removeLabelIds"] = args.remove_labels.split(",")
|
||||||
result = service.users().messages().modify(userId="me", id=args.message_id, body=body).execute()
|
gws(
|
||||||
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
|
"gmail", "users", "messages", "modify",
|
||||||
|
"--params", json.dumps({"userId": "me", "id": args.message_id}),
|
||||||
|
"--json", json.dumps(body),
|
||||||
|
"--format", "json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- Calendar --
|
||||||
# Calendar
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def calendar_list(args):
|
def calendar_list(args):
|
||||||
service = build_service("calendar", "v3")
|
if args.start or args.end:
|
||||||
now = datetime.now(timezone.utc)
|
# Specific date range — use raw Calendar API for precise timeMin/timeMax
|
||||||
time_min = args.start or now.isoformat()
|
from datetime import datetime, timedelta, timezone as tz
|
||||||
time_max = args.end or (now + timedelta(days=7)).isoformat()
|
now = datetime.now(tz.utc)
|
||||||
|
time_min = args.start or now.isoformat()
|
||||||
# Ensure timezone info
|
time_max = args.end or (now + timedelta(days=7)).isoformat()
|
||||||
for val in [time_min, time_max]:
|
gws(
|
||||||
if "T" in val and "Z" not in val and "+" not in val and "-" not in val[11:]:
|
"calendar", "events", "list",
|
||||||
val += "Z"
|
"--params", json.dumps({
|
||||||
|
"calendarId": args.calendar,
|
||||||
results = service.events().list(
|
"timeMin": time_min,
|
||||||
calendarId=args.calendar, timeMin=time_min, timeMax=time_max,
|
"timeMax": time_max,
|
||||||
maxResults=args.max, singleEvents=True, orderBy="startTime",
|
"maxResults": args.max,
|
||||||
).execute()
|
"singleEvents": True,
|
||||||
|
"orderBy": "startTime",
|
||||||
events = []
|
}),
|
||||||
for e in results.get("items", []):
|
"--format", "json",
|
||||||
events.append({
|
)
|
||||||
"id": e["id"],
|
else:
|
||||||
"summary": e.get("summary", "(no title)"),
|
# No date range — use +agenda helper (defaults to 7 days)
|
||||||
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
|
cmd = ["calendar", "+agenda", "--days", "7", "--format", "json"]
|
||||||
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
|
if args.calendar != "primary":
|
||||||
"location": e.get("location", ""),
|
cmd += ["--calendar", args.calendar]
|
||||||
"description": e.get("description", ""),
|
gws(*cmd)
|
||||||
"status": e.get("status", ""),
|
|
||||||
"htmlLink": e.get("htmlLink", ""),
|
|
||||||
})
|
|
||||||
print(json.dumps(events, indent=2, ensure_ascii=False))
|
|
||||||
|
|
||||||
|
|
||||||
def calendar_create(args):
|
def calendar_create(args):
|
||||||
service = build_service("calendar", "v3")
|
cmd = [
|
||||||
event = {
|
"calendar", "+insert",
|
||||||
"summary": args.summary,
|
"--summary", args.summary,
|
||||||
"start": {"dateTime": args.start},
|
"--start", args.start,
|
||||||
"end": {"dateTime": args.end},
|
"--end", args.end,
|
||||||
}
|
"--format", "json",
|
||||||
|
]
|
||||||
if args.location:
|
if args.location:
|
||||||
event["location"] = args.location
|
cmd += ["--location", args.location]
|
||||||
if args.description:
|
if args.description:
|
||||||
event["description"] = args.description
|
cmd += ["--description", args.description]
|
||||||
if args.attendees:
|
if args.attendees:
|
||||||
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",")]
|
for email in args.attendees.split(","):
|
||||||
|
cmd += ["--attendee", email.strip()]
|
||||||
result = service.events().insert(calendarId=args.calendar, body=event).execute()
|
if args.calendar != "primary":
|
||||||
print(json.dumps({
|
cmd += ["--calendar", args.calendar]
|
||||||
"status": "created",
|
gws(*cmd)
|
||||||
"id": result["id"],
|
|
||||||
"summary": result.get("summary", ""),
|
|
||||||
"htmlLink": result.get("htmlLink", ""),
|
|
||||||
}, indent=2))
|
|
||||||
|
|
||||||
|
|
||||||
def calendar_delete(args):
|
def calendar_delete(args):
|
||||||
service = build_service("calendar", "v3")
|
gws(
|
||||||
service.events().delete(calendarId=args.calendar, eventId=args.event_id).execute()
|
"calendar", "events", "delete",
|
||||||
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
|
"--params", json.dumps({"calendarId": args.calendar, "eventId": args.event_id}),
|
||||||
|
"--format", "json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- Drive --
|
||||||
# Drive
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def drive_search(args):
|
def drive_search(args):
|
||||||
service = build_service("drive", "v3")
|
query = args.query if args.raw_query else f"fullText contains '{args.query}'"
|
||||||
query = f"fullText contains '{args.query}'" if not args.raw_query else args.query
|
gws(
|
||||||
results = service.files().list(
|
"drive", "files", "list",
|
||||||
q=query, pageSize=args.max, fields="files(id, name, mimeType, modifiedTime, webViewLink)",
|
"--params", json.dumps({
|
||||||
).execute()
|
"q": query,
|
||||||
files = results.get("files", [])
|
"pageSize": args.max,
|
||||||
print(json.dumps(files, indent=2, ensure_ascii=False))
|
"fields": "files(id,name,mimeType,modifiedTime,webViewLink)",
|
||||||
|
}),
|
||||||
|
"--format", "json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- Contacts --
|
||||||
# Contacts
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def contacts_list(args):
|
def contacts_list(args):
|
||||||
service = build_service("people", "v1")
|
gws(
|
||||||
results = service.people().connections().list(
|
"people", "people", "connections", "list",
|
||||||
resourceName="people/me",
|
"--params", json.dumps({
|
||||||
pageSize=args.max,
|
"resourceName": "people/me",
|
||||||
personFields="names,emailAddresses,phoneNumbers",
|
"pageSize": args.max,
|
||||||
).execute()
|
"personFields": "names,emailAddresses,phoneNumbers",
|
||||||
contacts = []
|
}),
|
||||||
for person in results.get("connections", []):
|
"--format", "json",
|
||||||
names = person.get("names", [{}])
|
)
|
||||||
emails = person.get("emailAddresses", [])
|
|
||||||
phones = person.get("phoneNumbers", [])
|
|
||||||
contacts.append({
|
|
||||||
"name": names[0].get("displayName", "") if names else "",
|
|
||||||
"emails": [e.get("value", "") for e in emails],
|
|
||||||
"phones": [p.get("value", "") for p in phones],
|
|
||||||
})
|
|
||||||
print(json.dumps(contacts, indent=2, ensure_ascii=False))
|
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- Sheets --
|
||||||
# Sheets
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def sheets_get(args):
|
def sheets_get(args):
|
||||||
service = build_service("sheets", "v4")
|
gws(
|
||||||
result = service.spreadsheets().values().get(
|
"sheets", "+read",
|
||||||
spreadsheetId=args.sheet_id, range=args.range,
|
"--spreadsheet", args.sheet_id,
|
||||||
).execute()
|
"--range", args.range,
|
||||||
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
|
"--format", "json",
|
||||||
|
)
|
||||||
|
|
||||||
def sheets_update(args):
|
def sheets_update(args):
|
||||||
service = build_service("sheets", "v4")
|
|
||||||
values = json.loads(args.values)
|
values = json.loads(args.values)
|
||||||
body = {"values": values}
|
gws(
|
||||||
result = service.spreadsheets().values().update(
|
"sheets", "spreadsheets", "values", "update",
|
||||||
spreadsheetId=args.sheet_id, range=args.range,
|
"--params", json.dumps({
|
||||||
valueInputOption="USER_ENTERED", body=body,
|
"spreadsheetId": args.sheet_id,
|
||||||
).execute()
|
"range": args.range,
|
||||||
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
|
"valueInputOption": "USER_ENTERED",
|
||||||
|
}),
|
||||||
|
"--json", json.dumps({"values": values}),
|
||||||
|
"--format", "json",
|
||||||
|
)
|
||||||
|
|
||||||
def sheets_append(args):
|
def sheets_append(args):
|
||||||
service = build_service("sheets", "v4")
|
|
||||||
values = json.loads(args.values)
|
values = json.loads(args.values)
|
||||||
body = {"values": values}
|
gws(
|
||||||
result = service.spreadsheets().values().append(
|
"sheets", "+append",
|
||||||
spreadsheetId=args.sheet_id, range=args.range,
|
"--spreadsheet", args.sheet_id,
|
||||||
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body,
|
"--json-values", json.dumps(values),
|
||||||
).execute()
|
"--format", "json",
|
||||||
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
|
)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- Docs --
|
||||||
# Docs
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def docs_get(args):
|
def docs_get(args):
|
||||||
service = build_service("docs", "v1")
|
gws(
|
||||||
doc = service.documents().get(documentId=args.doc_id).execute()
|
"docs", "documents", "get",
|
||||||
# Extract plain text from the document structure
|
"--params", json.dumps({"documentId": args.doc_id}),
|
||||||
text_parts = []
|
"--format", "json",
|
||||||
for element in doc.get("body", {}).get("content", []):
|
)
|
||||||
paragraph = element.get("paragraph", {})
|
|
||||||
for pe in paragraph.get("elements", []):
|
|
||||||
text_run = pe.get("textRun", {})
|
|
||||||
if text_run.get("content"):
|
|
||||||
text_parts.append(text_run["content"])
|
|
||||||
result = {
|
|
||||||
"title": doc.get("title", ""),
|
|
||||||
"documentId": doc.get("documentId", ""),
|
|
||||||
"body": "".join(text_parts),
|
|
||||||
}
|
|
||||||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# -- CLI parser (backward-compatible interface) --
|
||||||
# CLI parser
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent")
|
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent (gws backend)")
|
||||||
sub = parser.add_subparsers(dest="service", required=True)
|
sub = parser.add_subparsers(dest="service", required=True)
|
||||||
|
|
||||||
# --- Gmail ---
|
# --- Gmail ---
|
||||||
@@ -421,7 +229,7 @@ def main():
|
|||||||
p.add_argument("--body", required=True)
|
p.add_argument("--body", required=True)
|
||||||
p.add_argument("--cc", default="")
|
p.add_argument("--cc", default="")
|
||||||
p.add_argument("--html", action="store_true", help="Send body as HTML")
|
p.add_argument("--html", action="store_true", help="Send body as HTML")
|
||||||
p.add_argument("--thread-id", default="", help="Thread ID for threading")
|
p.add_argument("--thread-id", default="", help="Thread ID (unused with gws, kept for compat)")
|
||||||
p.set_defaults(func=gmail_send)
|
p.set_defaults(func=gmail_send)
|
||||||
|
|
||||||
p = gmail_sub.add_parser("reply")
|
p = gmail_sub.add_parser("reply")
|
||||||
|
|||||||
89
skills/productivity/google-workspace/scripts/gws_bridge.py
Executable file
89
skills/productivity/google-workspace/scripts/gws_bridge.py
Executable file
@@ -0,0 +1,89 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Bridge between Hermes OAuth token and gws CLI.
|
||||||
|
|
||||||
|
Refreshes the token if expired, then executes gws with the valid access token.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def get_hermes_home() -> Path:
|
||||||
|
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
|
||||||
|
|
||||||
|
|
||||||
|
def get_token_path() -> Path:
|
||||||
|
return get_hermes_home() / "google_token.json"
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_token(token_data: dict) -> dict:
|
||||||
|
"""Refresh the access token using the refresh token."""
|
||||||
|
import urllib.error
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
params = urllib.parse.urlencode({
|
||||||
|
"client_id": token_data["client_id"],
|
||||||
|
"client_secret": token_data["client_secret"],
|
||||||
|
"refresh_token": token_data["refresh_token"],
|
||||||
|
"grant_type": "refresh_token",
|
||||||
|
}).encode()
|
||||||
|
|
||||||
|
req = urllib.request.Request(token_data["token_uri"], data=params)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req) as resp:
|
||||||
|
result = json.loads(resp.read())
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
body = e.read().decode("utf-8", errors="replace")
|
||||||
|
print(f"ERROR: Token refresh failed (HTTP {e.code}): {body}", file=sys.stderr)
|
||||||
|
print("Re-run setup.py to re-authenticate.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
token_data["token"] = result["access_token"]
|
||||||
|
token_data["expiry"] = datetime.fromtimestamp(
|
||||||
|
datetime.now(timezone.utc).timestamp() + result["expires_in"],
|
||||||
|
tz=timezone.utc,
|
||||||
|
).isoformat()
|
||||||
|
|
||||||
|
get_token_path().write_text(json.dumps(token_data, indent=2))
|
||||||
|
return token_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_valid_token() -> str:
|
||||||
|
"""Return a valid access token, refreshing if needed."""
|
||||||
|
token_path = get_token_path()
|
||||||
|
if not token_path.exists():
|
||||||
|
print("ERROR: No Google token found. Run setup.py --auth-url first.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
token_data = json.loads(token_path.read_text())
|
||||||
|
|
||||||
|
expiry = token_data.get("expiry", "")
|
||||||
|
if expiry:
|
||||||
|
exp_dt = datetime.fromisoformat(expiry.replace("Z", "+00:00"))
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if now >= exp_dt:
|
||||||
|
token_data = refresh_token(token_data)
|
||||||
|
|
||||||
|
return token_data["token"]
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Refresh token if needed, then exec gws with remaining args."""
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: gws_bridge.py <gws args...>", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
access_token = get_valid_token()
|
||||||
|
env = os.environ.copy()
|
||||||
|
env["GOOGLE_WORKSPACE_CLI_TOKEN"] = access_token
|
||||||
|
|
||||||
|
result = subprocess.run(["gws"] + sys.argv[1:], env=env)
|
||||||
|
sys.exit(result.returncode)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -23,6 +23,7 @@ Agent workflow:
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -128,7 +129,11 @@ def check_auth():
|
|||||||
from google.auth.transport.requests import Request
|
from google.auth.transport.requests import Request
|
||||||
|
|
||||||
try:
|
try:
|
||||||
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
# Don't pass scopes — user may have authorized only a subset.
|
||||||
|
# Passing scopes forces google-auth to validate them on refresh,
|
||||||
|
# which fails with invalid_scope if the token has fewer scopes
|
||||||
|
# than requested.
|
||||||
|
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"TOKEN_CORRUPT: {e}")
|
print(f"TOKEN_CORRUPT: {e}")
|
||||||
return False
|
return False
|
||||||
@@ -137,8 +142,9 @@ def check_auth():
|
|||||||
if creds.valid:
|
if creds.valid:
|
||||||
missing_scopes = _missing_scopes_from_payload(payload)
|
missing_scopes = _missing_scopes_from_payload(payload)
|
||||||
if missing_scopes:
|
if missing_scopes:
|
||||||
print(f"AUTH_SCOPE_MISMATCH: {_format_missing_scopes(missing_scopes)}")
|
print(f"AUTHENTICATED (partial): Token valid but missing {len(missing_scopes)} scopes:")
|
||||||
return False
|
for s in missing_scopes:
|
||||||
|
print(f" - {s}")
|
||||||
print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}")
|
print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -148,8 +154,9 @@ def check_auth():
|
|||||||
TOKEN_PATH.write_text(creds.to_json())
|
TOKEN_PATH.write_text(creds.to_json())
|
||||||
missing_scopes = _missing_scopes_from_payload(_load_token_payload(TOKEN_PATH))
|
missing_scopes = _missing_scopes_from_payload(_load_token_payload(TOKEN_PATH))
|
||||||
if missing_scopes:
|
if missing_scopes:
|
||||||
print(f"AUTH_SCOPE_MISMATCH: {_format_missing_scopes(missing_scopes)}")
|
print(f"AUTHENTICATED (partial): Token refreshed but missing {len(missing_scopes)} scopes:")
|
||||||
return False
|
for s in missing_scopes:
|
||||||
|
print(f" - {s}")
|
||||||
print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}")
|
print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -272,16 +279,33 @@ def exchange_auth_code(code: str):
|
|||||||
|
|
||||||
_ensure_deps()
|
_ensure_deps()
|
||||||
from google_auth_oauthlib.flow import Flow
|
from google_auth_oauthlib.flow import Flow
|
||||||
|
from urllib.parse import parse_qs, urlparse
|
||||||
|
|
||||||
|
# Extract granted scopes from the callback URL if present
|
||||||
|
if returned_state and "scope" in parse_qs(urlparse(code).query if isinstance(code, str) and code.startswith("http") else {}):
|
||||||
|
granted_scopes = parse_qs(urlparse(code).query)["scope"][0].split()
|
||||||
|
else:
|
||||||
|
# Try to extract from code_or_url parameter
|
||||||
|
if isinstance(code, str) and code.startswith("http"):
|
||||||
|
params = parse_qs(urlparse(code).query)
|
||||||
|
if "scope" in params:
|
||||||
|
granted_scopes = params["scope"][0].split()
|
||||||
|
else:
|
||||||
|
granted_scopes = SCOPES
|
||||||
|
else:
|
||||||
|
granted_scopes = SCOPES
|
||||||
|
|
||||||
flow = Flow.from_client_secrets_file(
|
flow = Flow.from_client_secrets_file(
|
||||||
str(CLIENT_SECRET_PATH),
|
str(CLIENT_SECRET_PATH),
|
||||||
scopes=SCOPES,
|
scopes=granted_scopes,
|
||||||
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
|
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
|
||||||
state=pending_auth["state"],
|
state=pending_auth["state"],
|
||||||
code_verifier=pending_auth["code_verifier"],
|
code_verifier=pending_auth["code_verifier"],
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Accept partial scopes — user may deselect some permissions in the consent screen
|
||||||
|
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
|
||||||
flow.fetch_token(code=code)
|
flow.fetch_token(code=code)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"ERROR: Token exchange failed: {e}")
|
print(f"ERROR: Token exchange failed: {e}")
|
||||||
@@ -290,11 +314,21 @@ def exchange_auth_code(code: str):
|
|||||||
|
|
||||||
creds = flow.credentials
|
creds = flow.credentials
|
||||||
token_payload = json.loads(creds.to_json())
|
token_payload = json.loads(creds.to_json())
|
||||||
|
|
||||||
|
# Store only the scopes actually granted by the user, not what was requested.
|
||||||
|
# creds.to_json() writes the requested scopes, which causes refresh to fail
|
||||||
|
# with invalid_scope if the user only authorized a subset.
|
||||||
|
actually_granted = list(creds.granted_scopes or []) if hasattr(creds, "granted_scopes") and creds.granted_scopes else []
|
||||||
|
if actually_granted:
|
||||||
|
token_payload["scopes"] = actually_granted
|
||||||
|
elif granted_scopes != SCOPES:
|
||||||
|
# granted_scopes was extracted from the callback URL
|
||||||
|
token_payload["scopes"] = granted_scopes
|
||||||
|
|
||||||
missing_scopes = _missing_scopes_from_payload(token_payload)
|
missing_scopes = _missing_scopes_from_payload(token_payload)
|
||||||
if missing_scopes:
|
if missing_scopes:
|
||||||
print(f"ERROR: Refusing to save incomplete Google Workspace token. {_format_missing_scopes(missing_scopes)}")
|
print(f"WARNING: Token missing some Google Workspace scopes: {', '.join(missing_scopes)}")
|
||||||
print(f"Existing token at {TOKEN_PATH} was left unchanged.")
|
print("Some services may not be available.")
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
TOKEN_PATH.write_text(json.dumps(token_payload, indent=2))
|
TOKEN_PATH.write_text(json.dumps(token_payload, indent=2))
|
||||||
PENDING_AUTH_PATH.unlink(missing_ok=True)
|
PENDING_AUTH_PATH.unlink(missing_ok=True)
|
||||||
|
|||||||
@@ -211,14 +211,15 @@ class TestExchangeAuthCode:
|
|||||||
assert setup_module.PENDING_AUTH_PATH.exists()
|
assert setup_module.PENDING_AUTH_PATH.exists()
|
||||||
assert not setup_module.TOKEN_PATH.exists()
|
assert not setup_module.TOKEN_PATH.exists()
|
||||||
|
|
||||||
def test_refuses_to_overwrite_existing_token_with_narrower_scopes(self, setup_module, capsys):
|
def test_accepts_narrower_scopes_with_warning(self, setup_module, capsys):
|
||||||
|
"""Partial scopes are accepted with a warning (gws migration: v2.0)."""
|
||||||
setup_module.PENDING_AUTH_PATH.write_text(
|
setup_module.PENDING_AUTH_PATH.write_text(
|
||||||
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
json.dumps({"state": "saved-state", "code_verifier": "saved-verifier"})
|
||||||
)
|
)
|
||||||
setup_module.TOKEN_PATH.write_text(json.dumps({"token": "existing-token", "scopes": setup_module.SCOPES}))
|
setup_module.TOKEN_PATH.write_text(json.dumps({"token": "***", "scopes": setup_module.SCOPES}))
|
||||||
FakeFlow.credentials_payload = {
|
FakeFlow.credentials_payload = {
|
||||||
"token": "narrow-token",
|
"token": "***",
|
||||||
"refresh_token": "refresh-token",
|
"refresh_token": "***",
|
||||||
"token_uri": "https://oauth2.googleapis.com/token",
|
"token_uri": "https://oauth2.googleapis.com/token",
|
||||||
"client_id": "client-id",
|
"client_id": "client-id",
|
||||||
"client_secret": "client-secret",
|
"client_secret": "client-secret",
|
||||||
@@ -228,10 +229,12 @@ class TestExchangeAuthCode:
|
|||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
with pytest.raises(SystemExit):
|
setup_module.exchange_auth_code("4/test-auth-code")
|
||||||
setup_module.exchange_auth_code("4/test-auth-code")
|
|
||||||
|
|
||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
assert "refusing to save incomplete google workspace token" in out.lower()
|
assert "warning" in out.lower()
|
||||||
assert json.loads(setup_module.TOKEN_PATH.read_text())["token"] == "existing-token"
|
assert "missing" in out.lower()
|
||||||
assert setup_module.PENDING_AUTH_PATH.exists()
|
# Token is saved (partial scopes accepted)
|
||||||
|
assert setup_module.TOKEN_PATH.exists()
|
||||||
|
# Pending auth is cleaned up
|
||||||
|
assert not setup_module.PENDING_AUTH_PATH.exists()
|
||||||
|
|||||||
@@ -1,117 +1,175 @@
|
|||||||
"""Regression tests for Google Workspace API credential validation."""
|
"""Tests for Google Workspace gws bridge and CLI wrapper."""
|
||||||
|
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import types
|
import types
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
SCRIPT_PATH = (
|
BRIDGE_PATH = (
|
||||||
|
Path(__file__).resolve().parents[2]
|
||||||
|
/ "skills/productivity/google-workspace/scripts/gws_bridge.py"
|
||||||
|
)
|
||||||
|
API_PATH = (
|
||||||
Path(__file__).resolve().parents[2]
|
Path(__file__).resolve().parents[2]
|
||||||
/ "skills/productivity/google-workspace/scripts/google_api.py"
|
/ "skills/productivity/google-workspace/scripts/google_api.py"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class FakeAuthorizedCredentials:
|
|
||||||
def __init__(self, *, valid=True, expired=False, refresh_token="refresh-token"):
|
|
||||||
self.valid = valid
|
|
||||||
self.expired = expired
|
|
||||||
self.refresh_token = refresh_token
|
|
||||||
self.refresh_calls = 0
|
|
||||||
|
|
||||||
def refresh(self, _request):
|
|
||||||
self.refresh_calls += 1
|
|
||||||
self.valid = True
|
|
||||||
self.expired = False
|
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return json.dumps({
|
|
||||||
"token": "refreshed-token",
|
|
||||||
"refresh_token": self.refresh_token,
|
|
||||||
"token_uri": "https://oauth2.googleapis.com/token",
|
|
||||||
"client_id": "client-id",
|
|
||||||
"client_secret": "client-secret",
|
|
||||||
"scopes": [
|
|
||||||
"https://www.googleapis.com/auth/gmail.readonly",
|
|
||||||
"https://www.googleapis.com/auth/gmail.send",
|
|
||||||
"https://www.googleapis.com/auth/gmail.modify",
|
|
||||||
"https://www.googleapis.com/auth/calendar",
|
|
||||||
"https://www.googleapis.com/auth/drive.readonly",
|
|
||||||
"https://www.googleapis.com/auth/contacts.readonly",
|
|
||||||
"https://www.googleapis.com/auth/spreadsheets",
|
|
||||||
"https://www.googleapis.com/auth/documents.readonly",
|
|
||||||
],
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
class FakeCredentialsFactory:
|
|
||||||
creds = FakeAuthorizedCredentials()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_authorized_user_file(cls, _path, _scopes):
|
|
||||||
return cls.creds
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def google_api_module(monkeypatch, tmp_path):
|
def bridge_module(monkeypatch, tmp_path):
|
||||||
google_module = types.ModuleType("google")
|
hermes_home = tmp_path / ".hermes"
|
||||||
oauth2_module = types.ModuleType("google.oauth2")
|
hermes_home.mkdir()
|
||||||
credentials_module = types.ModuleType("google.oauth2.credentials")
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||||
credentials_module.Credentials = FakeCredentialsFactory
|
|
||||||
auth_module = types.ModuleType("google.auth")
|
|
||||||
transport_module = types.ModuleType("google.auth.transport")
|
|
||||||
requests_module = types.ModuleType("google.auth.transport.requests")
|
|
||||||
requests_module.Request = object
|
|
||||||
|
|
||||||
monkeypatch.setitem(sys.modules, "google", google_module)
|
spec = importlib.util.spec_from_file_location("gws_bridge_test", BRIDGE_PATH)
|
||||||
monkeypatch.setitem(sys.modules, "google.oauth2", oauth2_module)
|
|
||||||
monkeypatch.setitem(sys.modules, "google.oauth2.credentials", credentials_module)
|
|
||||||
monkeypatch.setitem(sys.modules, "google.auth", auth_module)
|
|
||||||
monkeypatch.setitem(sys.modules, "google.auth.transport", transport_module)
|
|
||||||
monkeypatch.setitem(sys.modules, "google.auth.transport.requests", requests_module)
|
|
||||||
|
|
||||||
spec = importlib.util.spec_from_file_location("google_workspace_api_test", SCRIPT_PATH)
|
|
||||||
module = importlib.util.module_from_spec(spec)
|
module = importlib.util.module_from_spec(spec)
|
||||||
assert spec.loader is not None
|
assert spec.loader is not None
|
||||||
spec.loader.exec_module(module)
|
spec.loader.exec_module(module)
|
||||||
|
|
||||||
monkeypatch.setattr(module, "TOKEN_PATH", tmp_path / "google_token.json")
|
|
||||||
return module
|
return module
|
||||||
|
|
||||||
|
|
||||||
def _write_token(path: Path, scopes):
|
@pytest.fixture
|
||||||
path.write_text(json.dumps({
|
def api_module(monkeypatch, tmp_path):
|
||||||
"token": "access-token",
|
hermes_home = tmp_path / ".hermes"
|
||||||
"refresh_token": "refresh-token",
|
hermes_home.mkdir()
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||||
|
|
||||||
|
spec = importlib.util.spec_from_file_location("gws_api_test", API_PATH)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
assert spec.loader is not None
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
def _write_token(path: Path, *, token="ya29.test", expiry=None, **extra):
|
||||||
|
data = {
|
||||||
|
"token": token,
|
||||||
|
"refresh_token": "1//refresh",
|
||||||
|
"client_id": "123.apps.googleusercontent.com",
|
||||||
|
"client_secret": "secret",
|
||||||
"token_uri": "https://oauth2.googleapis.com/token",
|
"token_uri": "https://oauth2.googleapis.com/token",
|
||||||
"client_id": "client-id",
|
**extra,
|
||||||
"client_secret": "client-secret",
|
}
|
||||||
"scopes": scopes,
|
if expiry is not None:
|
||||||
}))
|
data["expiry"] = expiry
|
||||||
|
path.write_text(json.dumps(data))
|
||||||
|
|
||||||
|
|
||||||
def test_get_credentials_rejects_missing_scopes(google_api_module, capsys):
|
def test_bridge_returns_valid_token(bridge_module, tmp_path):
|
||||||
FakeCredentialsFactory.creds = FakeAuthorizedCredentials(valid=True)
|
"""Non-expired token is returned without refresh."""
|
||||||
_write_token(google_api_module.TOKEN_PATH, [
|
future = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
|
||||||
"https://www.googleapis.com/auth/drive.readonly",
|
token_path = bridge_module.get_token_path()
|
||||||
"https://www.googleapis.com/auth/spreadsheets",
|
_write_token(token_path, token="ya29.valid", expiry=future)
|
||||||
])
|
|
||||||
|
|
||||||
|
result = bridge_module.get_valid_token()
|
||||||
|
assert result == "ya29.valid"
|
||||||
|
|
||||||
|
|
||||||
|
def test_bridge_refreshes_expired_token(bridge_module, tmp_path):
|
||||||
|
"""Expired token triggers a refresh via token_uri."""
|
||||||
|
past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
|
||||||
|
token_path = bridge_module.get_token_path()
|
||||||
|
_write_token(token_path, token="ya29.old", expiry=past)
|
||||||
|
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = json.dumps({
|
||||||
|
"access_token": "ya29.refreshed",
|
||||||
|
"expires_in": 3600,
|
||||||
|
}).encode()
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", return_value=mock_resp):
|
||||||
|
result = bridge_module.get_valid_token()
|
||||||
|
|
||||||
|
assert result == "ya29.refreshed"
|
||||||
|
# Verify persisted
|
||||||
|
saved = json.loads(token_path.read_text())
|
||||||
|
assert saved["token"] == "ya29.refreshed"
|
||||||
|
|
||||||
|
|
||||||
|
def test_bridge_exits_on_missing_token(bridge_module):
|
||||||
|
"""Missing token file causes exit with code 1."""
|
||||||
with pytest.raises(SystemExit):
|
with pytest.raises(SystemExit):
|
||||||
google_api_module.get_credentials()
|
bridge_module.get_valid_token()
|
||||||
|
|
||||||
err = capsys.readouterr().err
|
|
||||||
assert "missing google workspace scopes" in err.lower()
|
|
||||||
assert "gmail.send" in err
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_credentials_accepts_full_scope_token(google_api_module):
|
def test_bridge_main_injects_token_env(bridge_module, tmp_path):
|
||||||
FakeCredentialsFactory.creds = FakeAuthorizedCredentials(valid=True)
|
"""main() sets GOOGLE_WORKSPACE_CLI_TOKEN in subprocess env."""
|
||||||
_write_token(google_api_module.TOKEN_PATH, list(google_api_module.SCOPES))
|
future = (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
|
||||||
|
token_path = bridge_module.get_token_path()
|
||||||
|
_write_token(token_path, token="ya29.injected", expiry=future)
|
||||||
|
|
||||||
creds = google_api_module.get_credentials()
|
captured = {}
|
||||||
|
|
||||||
assert creds is FakeCredentialsFactory.creds
|
def capture_run(cmd, **kwargs):
|
||||||
|
captured["cmd"] = cmd
|
||||||
|
captured["env"] = kwargs.get("env", {})
|
||||||
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
|
with patch.object(sys, "argv", ["gws_bridge.py", "gmail", "+triage"]):
|
||||||
|
with patch.object(subprocess, "run", side_effect=capture_run):
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
bridge_module.main()
|
||||||
|
|
||||||
|
assert captured["env"]["GOOGLE_WORKSPACE_CLI_TOKEN"] == "ya29.injected"
|
||||||
|
assert captured["cmd"] == ["gws", "gmail", "+triage"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_api_calendar_list_uses_agenda_by_default(api_module):
|
||||||
|
"""calendar list without dates uses +agenda helper."""
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def capture_run(cmd, **kwargs):
|
||||||
|
captured["cmd"] = cmd
|
||||||
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
|
args = api_module.argparse.Namespace(
|
||||||
|
start="", end="", max=25, calendar="primary", func=api_module.calendar_list,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(subprocess, "run", side_effect=capture_run):
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
api_module.calendar_list(args)
|
||||||
|
|
||||||
|
gws_args = captured["cmd"][2:] # skip python + bridge path
|
||||||
|
assert "calendar" in gws_args
|
||||||
|
assert "+agenda" in gws_args
|
||||||
|
assert "--days" in gws_args
|
||||||
|
|
||||||
|
|
||||||
|
def test_api_calendar_list_respects_date_range(api_module):
|
||||||
|
"""calendar list with --start/--end uses raw events list API."""
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def capture_run(cmd, **kwargs):
|
||||||
|
captured["cmd"] = cmd
|
||||||
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
|
args = api_module.argparse.Namespace(
|
||||||
|
start="2026-04-01T00:00:00Z",
|
||||||
|
end="2026-04-07T23:59:59Z",
|
||||||
|
max=25,
|
||||||
|
calendar="primary",
|
||||||
|
func=api_module.calendar_list,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(subprocess, "run", side_effect=capture_run):
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
api_module.calendar_list(args)
|
||||||
|
|
||||||
|
gws_args = captured["cmd"][2:]
|
||||||
|
assert "events" in gws_args
|
||||||
|
assert "list" in gws_args
|
||||||
|
params_idx = gws_args.index("--params")
|
||||||
|
params = json.loads(gws_args[params_idx + 1])
|
||||||
|
assert params["timeMin"] == "2026-04-01T00:00:00Z"
|
||||||
|
assert params["timeMax"] == "2026-04-07T23:59:59Z"
|
||||||
|
|||||||
Reference in New Issue
Block a user