hermes 初始配置

This commit is contained in:
administrator
2026-05-03 19:13:24 +00:00
commit cfbdc7baef
428 changed files with 173058 additions and 0 deletions
@@ -0,0 +1,279 @@
---
name: google-workspace
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise.
version: 1.0.0
author: Nous Research
license: MIT
metadata:
hermes:
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth]
homepage: https://github.com/NousResearch/hermes-agent
related_skills: [himalaya]
---
# Google Workspace
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — through Hermes-managed OAuth and a thin CLI wrapper. When `gws` is installed, the skill uses it as the execution backend for broader Google Workspace coverage; otherwise it falls back to the bundled Python client implementation.
## References
- `references/gmail-search-syntax.md` — Gmail search operators (is:unread, from:, newer_than:, etc.)
## Scripts
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
- `scripts/google_api.py` — compatibility wrapper CLI. It prefers `gws` for operations when available, while preserving Hermes' existing JSON output contract.
## First-Time Setup
The setup is fully non-interactive — you drive it step by step so it works
on CLI, Telegram, Discord, or any platform.
Define a shorthand first:
```bash
GSETUP="python ${HERMES_HOME:-$HOME/.hermes}/skills/productivity/google-workspace/scripts/setup.py"
```
### Step 0: Check if already set up
```bash
$GSETUP --check
```
If it prints `AUTHENTICATED`, skip to Usage — setup is already done.
### Step 1: Triage — ask the user what they need
Before starting OAuth setup, ask the user TWO questions:
**Question 1: "What Google services do you need? Just email, or also
Calendar/Drive/Sheets/Docs?"**
- **Email only** → They don't need this skill at all. Use the `himalaya` skill
instead — it works with a Gmail App Password (Settings → Security → App
Passwords) and takes 2 minutes to set up. No Google Cloud project needed.
Load the himalaya skill and follow its setup instructions.
- **Email + Calendar** → Continue with this skill, but use
`--services email,calendar` during auth so the consent screen only asks for
the scopes they actually need.
- **Calendar/Drive/Sheets/Docs only** → Continue with this skill and use a
narrower `--services` set like `calendar,drive,sheets,docs`.
- **Full Workspace access** → Continue with this skill and use the default
`all` service set.
**Question 2: "Does your Google account use Advanced Protection (hardware
security keys required to sign in)? If you're not sure, you probably don't
— it's something you would have explicitly enrolled in."**
- **No / Not sure** → Normal setup. Continue below.
- **Yes** → Their Workspace admin must add the OAuth client ID to the org's
allowed apps list before Step 4 will work. Let them know upfront.
### Step 2: Create OAuth credentials (one-time, ~5 minutes)
Tell the user:
> You need a Google Cloud OAuth client. This is a one-time setup:
>
> 1. Create or select a project:
> https://console.cloud.google.com/projectselector2/home/dashboard
> 2. Enable the required APIs from the API Library:
> https://console.cloud.google.com/apis/library
> Enable: Gmail API, Google Calendar API, Google Drive API,
> Google Sheets API, Google Docs API, People API
> 3. Create the OAuth client here:
> https://console.cloud.google.com/apis/credentials
> Credentials → Create Credentials → OAuth 2.0 Client ID
> 4. Application type: "Desktop app" → Create
> 5. If the app is still in Testing, add the user's Google account as a test user here:
> https://console.cloud.google.com/auth/audience
> Audience → Test users → Add users
> 6. Download the JSON file and tell me the file path
>
> Important Hermes CLI note: if the file path starts with `/`, do NOT send only the bare path as its own message in the CLI, because it can be mistaken for a slash command. Send it in a sentence instead, like:
> `The JSON file path is: /home/user/Downloads/client_secret_....json`
Once they provide the path:
```bash
$GSETUP --client-secret /path/to/client_secret.json
```
If they paste the raw client ID / client secret values instead of a file path,
write a valid Desktop OAuth JSON file for them yourself, save it somewhere
explicit (for example `~/Downloads/hermes-google-client-secret.json`), then run
`--client-secret` against that file.
### Step 3: Get authorization URL
Use the service set chosen in Step 1. Examples:
```bash
$GSETUP --auth-url --services email,calendar --format json
$GSETUP --auth-url --services calendar,drive,sheets,docs --format json
$GSETUP --auth-url --services all --format json
```
This returns JSON with an `auth_url` field and also saves the exact URL to
`~/.hermes/google_oauth_last_url.txt`.
Agent rules for this step:
- Extract the `auth_url` field and send that exact URL to the user as a single line.
- Tell the user that the browser will likely fail on `http://localhost:1` after approval, and that this is expected.
- Tell them to copy the ENTIRE redirected URL from the browser address bar.
- If the user gets `Error 403: access_denied`, send them directly to `https://console.cloud.google.com/auth/audience` to add themselves as a test user.
### Step 4: Exchange the code
The user will paste back either a URL like `http://localhost:1/?code=4/0A...&scope=...`
or just the code string. Either works. The `--auth-url` step stores a temporary
pending OAuth session locally so `--auth-code` can complete the PKCE exchange
later, even on headless systems:
```bash
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED" --format json
```
If `--auth-code` fails because the code expired, was already used, or came from
an older browser tab, it now returns a fresh `fresh_auth_url`. In that case,
immediately send the new URL to the user and have them retry with the newest
browser redirect only.
### Step 5: Verify
```bash
$GSETUP --check
```
Should print `AUTHENTICATED`. Setup is complete — token refreshes automatically from now on.
### Notes
- Token is stored at `~/.hermes/google_token.json` and auto-refreshes.
- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes.
- If `gws` is installed, `google_api.py` points it at the same `~/.hermes/google_token.json` credentials file. Users do not need to run a separate `gws auth login` flow.
- To revoke: `$GSETUP --revoke`
## Usage
All commands go through the API script. Set `GAPI` as a shorthand:
```bash
GAPI="python ${HERMES_HOME:-$HOME/.hermes}/skills/productivity/google-workspace/scripts/google_api.py"
```
### Gmail
```bash
# Search (returns JSON array with id, from, subject, date, snippet)
$GAPI gmail search "is:unread" --max 10
$GAPI gmail search "from:boss@company.com newer_than:1d"
$GAPI gmail search "has:attachment filename:pdf newer_than:7d"
# Read full message (returns JSON with body text)
$GAPI gmail get MESSAGE_ID
# Send
$GAPI gmail send --to user@example.com --subject "Hello" --body "Message text"
$GAPI gmail send --to user@example.com --subject "Report" --body "<h1>Q4</h1><p>Details...</p>" --html
$GAPI gmail send --to user@example.com --subject "Hello" --from '"Research Agent" <user@example.com>' --body "Message text"
# Reply (automatically threads and sets In-Reply-To)
$GAPI gmail reply MESSAGE_ID --body "Thanks, that works for me."
$GAPI gmail reply MESSAGE_ID --from '"Support Bot" <user@example.com>' --body "Thanks"
# Labels
$GAPI gmail labels
$GAPI gmail modify MESSAGE_ID --add-labels LABEL_ID
$GAPI gmail modify MESSAGE_ID --remove-labels UNREAD
```
### Calendar
```bash
# List events (defaults to next 7 days)
$GAPI calendar list
$GAPI calendar list --start 2026-03-01T00:00:00Z --end 2026-03-07T23:59:59Z
# Create event (ISO 8601 with timezone required)
$GAPI calendar create --summary "Team Standup" --start 2026-03-01T10:00:00-06:00 --end 2026-03-01T10:30:00-06:00
$GAPI calendar create --summary "Lunch" --start 2026-03-01T12:00:00Z --end 2026-03-01T13:00:00Z --location "Cafe"
$GAPI calendar create --summary "Review" --start 2026-03-01T14:00:00Z --end 2026-03-01T15:00:00Z --attendees "alice@co.com,bob@co.com"
# Delete event
$GAPI calendar delete EVENT_ID
```
### Drive
```bash
$GAPI drive search "quarterly report" --max 10
$GAPI drive search "mimeType='application/pdf'" --raw-query --max 5
```
### Contacts
```bash
$GAPI contacts list --max 20
```
### Sheets
```bash
# Read
$GAPI sheets get SHEET_ID "Sheet1!A1:D10"
# Write
$GAPI sheets update SHEET_ID "Sheet1!A1:B2" --values '[["Name","Score"],["Alice","95"]]'
# Append rows
$GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
```
### Docs
```bash
$GAPI docs get DOC_ID
```
## Output Format
All commands return JSON. Parse with `jq` or read directly. Key fields:
- **Gmail search**: `[{id, threadId, from, to, subject, date, snippet, labels}]`
- **Gmail get**: `{id, threadId, from, to, subject, date, labels, body}`
- **Gmail send/reply**: `{status: "sent", id, threadId}`
- **Calendar list**: `[{id, summary, start, end, location, description, htmlLink}]`
- **Calendar create**: `{status: "created", id, summary, htmlLink}`
- **Drive search**: `[{id, name, mimeType, modifiedTime, webViewLink}]`
- **Contacts list**: `[{name, emails: [...], phones: [...]}]`
- **Sheets get**: `[[cell, cell, ...], ...]`
## Rules
1. **Never send email or create/delete events without confirming with the user first.** Show the draft content and ask for approval.
2. **Check auth before first use** — run `setup.py --check`. If it fails, guide the user through setup.
3. **Use the Gmail search syntax reference** for complex queries — load it with `skill_view("google-workspace", file_path="references/gmail-search-syntax.md")`.
4. **Calendar times must include timezone** — always use ISO 8601 with offset (e.g., `2026-03-01T10:00:00-06:00`) or UTC (`Z`).
5. **Respect rate limits** — avoid rapid-fire sequential API calls. Batch reads when possible.
## Troubleshooting
| Problem | Fix |
|---------|-----|
| `NOT_AUTHENTICATED` | Run setup Steps 2-5 above |
| `REFRESH_FAILED` | Token revoked or expired — redo Steps 3-5 |
| `HttpError 403: Insufficient Permission` | Missing API scope — `$GSETUP --revoke` then redo Steps 3-5 |
| `HttpError 403: Access Not Configured` | API not enabled — user needs to enable it in Google Cloud Console |
| `ModuleNotFoundError` | Run `$GSETUP --install-deps` |
| Advanced Protection blocks auth | Workspace admin must allowlist the OAuth client ID |
## Revoking Access
```bash
$GSETUP --revoke
```
@@ -0,0 +1,63 @@
# Gmail Search Syntax
Standard Gmail search operators work in the `query` argument.
## Common Operators
| Operator | Example | Description |
|----------|---------|-------------|
| `is:unread` | `is:unread` | Unread messages |
| `is:starred` | `is:starred` | Starred messages |
| `is:important` | `is:important` | Important messages |
| `in:inbox` | `in:inbox` | Inbox only |
| `in:sent` | `in:sent` | Sent folder |
| `in:drafts` | `in:drafts` | Drafts |
| `in:trash` | `in:trash` | Trash |
| `in:anywhere` | `in:anywhere` | All mail including spam/trash |
| `from:` | `from:alice@example.com` | Sender |
| `to:` | `to:bob@example.com` | Recipient |
| `cc:` | `cc:team@example.com` | CC recipient |
| `subject:` | `subject:invoice` | Subject contains |
| `label:` | `label:work` | Has label |
| `has:attachment` | `has:attachment` | Has attachments |
| `filename:` | `filename:pdf` | Attachment filename/type |
| `larger:` | `larger:5M` | Larger than size |
| `smaller:` | `smaller:1M` | Smaller than size |
## Date Operators
| Operator | Example | Description |
|----------|---------|-------------|
| `newer_than:` | `newer_than:7d` | Within last N days (d), months (m), years (y) |
| `older_than:` | `older_than:30d` | Older than N days/months/years |
| `after:` | `after:2026/02/01` | After date (YYYY/MM/DD) |
| `before:` | `before:2026/03/01` | Before date |
## Combining
| Syntax | Example | Description |
|--------|---------|-------------|
| space | `from:alice subject:meeting` | AND (implicit) |
| `OR` | `from:alice OR from:bob` | OR |
| `-` | `-from:noreply@` | NOT (exclude) |
| `()` | `(from:alice OR from:bob) subject:meeting` | Grouping |
| `""` | `"exact phrase"` | Exact phrase match |
## Common Patterns
```
# Unread emails from the last day
is:unread newer_than:1d
# Emails with PDF attachments from a specific sender
from:accounting@company.com has:attachment filename:pdf
# Important unread emails (not promotions/social)
is:unread -category:promotions -category:social
# Emails in a thread about a topic
subject:"Q4 budget" newer_than:30d
# Large attachments to clean up
has:attachment larger:10M older_than:90d
```
@@ -0,0 +1,855 @@
#!/usr/bin/env python3
"""Google Workspace API CLI for Hermes Agent.
Uses the Google Workspace CLI (`gws`) when available, but preserves the
existing Hermes-facing JSON contract and falls back to the Python client
libraries if `gws` is not installed.
Usage:
python google_api.py gmail search "is:unread" [--max 10]
python google_api.py gmail get MESSAGE_ID
python google_api.py gmail send --to user@example.com --subject "Hi" --body "Hello"
python google_api.py gmail reply MESSAGE_ID --body "Thanks"
python google_api.py calendar list [--from DATE] [--to DATE] [--calendar primary]
python google_api.py calendar create --summary "Meeting" --start DATETIME --end DATETIME
python google_api.py drive search "budget report" [--max 10]
python google_api.py contacts list [--max 20]
python google_api.py sheets get SHEET_ID RANGE
python google_api.py sheets update SHEET_ID RANGE --values '[[...]]'
python google_api.py sheets append SHEET_ID RANGE --values '[[...]]'
python google_api.py docs get DOC_ID
"""
import argparse
import base64
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from pathlib import Path
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/contacts.readonly",
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/documents.readonly",
]
def _normalize_authorized_user_payload(payload: dict) -> dict:
normalized = dict(payload)
if not normalized.get("type"):
normalized["type"] = "authorized_user"
return normalized
def _ensure_authenticated():
if not TOKEN_PATH.exists():
print("Not authenticated. Run the setup script first:", file=sys.stderr)
print(f" python {Path(__file__).parent / 'setup.py'}", file=sys.stderr)
sys.exit(1)
def _stored_token_scopes() -> list[str]:
try:
data = json.loads(TOKEN_PATH.read_text())
except Exception:
return list(SCOPES)
scopes = data.get("scopes")
if isinstance(scopes, list) and scopes:
return scopes
return list(SCOPES)
def _gws_binary() -> str | None:
override = os.getenv("HERMES_GWS_BIN")
if override:
return override
return shutil.which("gws")
def _gws_env() -> dict[str, str]:
env = os.environ.copy()
env["GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE"] = str(TOKEN_PATH)
return env
def _run_gws(parts: list[str], *, params: dict | None = None, body: dict | None = None):
binary = _gws_binary()
if not binary:
raise RuntimeError("gws not installed")
_ensure_authenticated()
cmd = [binary, *parts]
if params is not None:
cmd.extend(["--params", json.dumps(params)])
if body is not None:
cmd.extend(["--json", json.dumps(body)])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env=_gws_env(),
)
if result.returncode != 0:
err = result.stderr.strip() or result.stdout.strip() or "Unknown gws error"
print(err, file=sys.stderr)
sys.exit(result.returncode or 1)
stdout = result.stdout.strip()
if not stdout:
return {}
try:
return json.loads(stdout)
except json.JSONDecodeError:
print("ERROR: Unexpected non-JSON output from gws:", file=sys.stderr)
print(stdout, file=sys.stderr)
sys.exit(1)
def _headers_dict(msg: dict) -> dict[str, str]:
return {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
def _extract_message_body(msg: dict) -> str:
body = ""
payload = msg.get("payload", {})
if payload.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
elif payload.get("parts"):
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
if not body:
for part in payload["parts"]:
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
return body
def _extract_doc_text(doc: dict) -> str:
text_parts = []
for element in doc.get("body", {}).get("content", []):
paragraph = element.get("paragraph", {})
for pe in paragraph.get("elements", []):
text_run = pe.get("textRun", {})
if text_run.get("content"):
text_parts.append(text_run["content"])
return "".join(text_parts)
def _datetime_with_timezone(value: str) -> str:
if not value:
return value
if "T" not in value:
return value
if value.endswith("Z"):
return value
tail = value[10:]
if "+" in tail or "-" in tail:
return value
return value + "Z"
def get_credentials():
"""Load and refresh credentials from token file."""
_ensure_authenticated()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), _stored_token_scopes())
if creds.expired and creds.refresh_token:
creds.refresh(Request())
TOKEN_PATH.write_text(
json.dumps(
_normalize_authorized_user_payload(json.loads(creds.to_json())),
indent=2,
)
)
if not creds.valid:
print("Token is invalid. Re-run setup.", file=sys.stderr)
sys.exit(1)
return creds
def build_service(api, version):
from googleapiclient.discovery import build
return build(api, version, credentials=get_credentials())
# =========================================================================
# Gmail
# =========================================================================
def gmail_search(args):
if _gws_binary():
results = _run_gws(
["gmail", "users", "messages", "list"],
params={"userId": "me", "q": args.query, "maxResults": args.max},
)
messages = results.get("messages", [])
output = []
for msg_meta in messages:
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": msg_meta["id"],
"format": "metadata",
"metadataHeaders": ["From", "To", "Subject", "Date"],
},
)
headers = _headers_dict(msg)
output.append(
{
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": msg.get("labelIds", []),
}
)
print(json.dumps(output, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
results = service.users().messages().list(
userId="me", q=args.query, maxResults=args.max
).execute()
messages = results.get("messages", [])
if not messages:
print("No messages found.")
return
output = []
for msg_meta in messages:
msg = service.users().messages().get(
userId="me", id=msg_meta["id"], format="metadata",
metadataHeaders=["From", "To", "Subject", "Date"],
).execute()
headers = _headers_dict(msg)
output.append({
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": msg.get("labelIds", []),
})
print(json.dumps(output, indent=2, ensure_ascii=False))
def gmail_get(args):
if _gws_binary():
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={"userId": "me", "id": args.message_id, "format": "full"},
)
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
msg = service.users().messages().get(
userId="me", id=args.message_id, format="full"
).execute()
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
def gmail_send(args):
if _gws_binary():
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
message["subject"] = args.subject
if args.cc:
message["cc"] = args.cc
if args.from_header:
message["from"] = args.from_header
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw}
if args.thread_id:
body["threadId"] = args.thread_id
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body=body,
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
message["subject"] = args.subject
if args.cc:
message["cc"] = args.cc
if args.from_header:
message["from"] = args.from_header
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw}
if args.thread_id:
body["threadId"] = args.thread_id
result = service.users().messages().send(userId="me", body=body).execute()
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_reply(args):
if _gws_binary():
original = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": args.message_id,
"format": "metadata",
"metadataHeaders": ["From", "Subject", "Message-ID"],
},
)
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
message = MIMEText(args.body)
message["to"] = headers.get("From", "")
message["subject"] = subject
if args.from_header:
message["from"] = args.from_header
if headers.get("Message-ID"):
message["In-Reply-To"] = headers["Message-ID"]
message["References"] = headers["Message-ID"]
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body={"raw": raw, "threadId": original["threadId"]},
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
original = service.users().messages().get(
userId="me", id=args.message_id, format="metadata",
metadataHeaders=["From", "Subject", "Message-ID"],
).execute()
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
message = MIMEText(args.body)
message["to"] = headers.get("From", "")
message["subject"] = subject
if args.from_header:
message["from"] = args.from_header
if headers.get("Message-ID"):
message["In-Reply-To"] = headers["Message-ID"]
message["References"] = headers["Message-ID"]
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw, "threadId": original["threadId"]}
result = service.users().messages().send(userId="me", body=body).execute()
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_labels(args):
if _gws_binary():
results = _run_gws(["gmail", "users", "labels", "list"], params={"userId": "me"})
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
return
service = build_service("gmail", "v1")
results = service.users().labels().list(userId="me").execute()
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
def gmail_modify(args):
body = {}
if args.add_labels:
body["addLabelIds"] = args.add_labels.split(",")
if args.remove_labels:
body["removeLabelIds"] = args.remove_labels.split(",")
if _gws_binary():
result = _run_gws(
["gmail", "users", "messages", "modify"],
params={"userId": "me", "id": args.message_id},
body=body,
)
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
return
service = build_service("gmail", "v1")
result = service.users().messages().modify(userId="me", id=args.message_id, body=body).execute()
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
# =========================================================================
# Calendar
# =========================================================================
def calendar_list(args):
now = datetime.now(timezone.utc)
time_min = _datetime_with_timezone(args.start or now.isoformat())
time_max = _datetime_with_timezone(args.end or (now + timedelta(days=7)).isoformat())
if _gws_binary():
results = _run_gws(
["calendar", "events", "list"],
params={
"calendarId": args.calendar,
"timeMin": time_min,
"timeMax": time_max,
"maxResults": args.max,
"singleEvents": True,
"orderBy": "startTime",
},
)
events = []
for e in results.get("items", []):
events.append({
"id": e["id"],
"summary": e.get("summary", "(no title)"),
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
"location": e.get("location", ""),
"description": e.get("description", ""),
"status": e.get("status", ""),
"htmlLink": e.get("htmlLink", ""),
})
print(json.dumps(events, indent=2, ensure_ascii=False))
return
service = build_service("calendar", "v3")
results = service.events().list(
calendarId=args.calendar, timeMin=time_min, timeMax=time_max,
maxResults=args.max, singleEvents=True, orderBy="startTime",
).execute()
events = []
for e in results.get("items", []):
events.append({
"id": e["id"],
"summary": e.get("summary", "(no title)"),
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
"location": e.get("location", ""),
"description": e.get("description", ""),
"status": e.get("status", ""),
"htmlLink": e.get("htmlLink", ""),
})
print(json.dumps(events, indent=2, ensure_ascii=False))
def calendar_create(args):
event = {
"summary": args.summary,
"start": {"dateTime": args.start},
"end": {"dateTime": args.end},
}
if args.location:
event["location"] = args.location
if args.description:
event["description"] = args.description
if args.attendees:
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",") if e.strip()]
if _gws_binary():
result = _run_gws(
["calendar", "events", "insert"],
params={"calendarId": args.calendar},
body=event,
)
print(json.dumps({
"status": "created",
"id": result["id"],
"summary": result.get("summary", ""),
"htmlLink": result.get("htmlLink", ""),
}, indent=2))
return
service = build_service("calendar", "v3")
result = service.events().insert(calendarId=args.calendar, body=event).execute()
print(json.dumps({
"status": "created",
"id": result["id"],
"summary": result.get("summary", ""),
"htmlLink": result.get("htmlLink", ""),
}, indent=2))
def calendar_delete(args):
if _gws_binary():
_run_gws(["calendar", "events", "delete"], params={"calendarId": args.calendar, "eventId": args.event_id})
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
return
service = build_service("calendar", "v3")
service.events().delete(calendarId=args.calendar, eventId=args.event_id).execute()
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
# =========================================================================
# Drive
# =========================================================================
def drive_search(args):
query = args.query if args.raw_query else f"fullText contains '{args.query}'"
if _gws_binary():
results = _run_gws(
["drive", "files", "list"],
params={
"q": query,
"pageSize": args.max,
"fields": "files(id, name, mimeType, modifiedTime, webViewLink)",
},
)
print(json.dumps(results.get("files", []), indent=2, ensure_ascii=False))
return
service = build_service("drive", "v3")
results = service.files().list(
q=query, pageSize=args.max, fields="files(id, name, mimeType, modifiedTime, webViewLink)",
).execute()
files = results.get("files", [])
print(json.dumps(files, indent=2, ensure_ascii=False))
# =========================================================================
# Contacts
# =========================================================================
def contacts_list(args):
if _gws_binary():
results = _run_gws(
["people", "people", "connections", "list"],
params={
"resourceName": "people/me",
"pageSize": args.max,
"personFields": "names,emailAddresses,phoneNumbers",
},
)
contacts = []
for person in results.get("connections", []):
names = person.get("names", [{}])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
contacts.append({
"name": names[0].get("displayName", "") if names else "",
"emails": [e.get("value", "") for e in emails],
"phones": [p.get("value", "") for p in phones],
})
print(json.dumps(contacts, indent=2, ensure_ascii=False))
return
service = build_service("people", "v1")
results = service.people().connections().list(
resourceName="people/me",
pageSize=args.max,
personFields="names,emailAddresses,phoneNumbers",
).execute()
contacts = []
for person in results.get("connections", []):
names = person.get("names", [{}])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
contacts.append({
"name": names[0].get("displayName", "") if names else "",
"emails": [e.get("value", "") for e in emails],
"phones": [p.get("value", "") for p in phones],
})
print(json.dumps(contacts, indent=2, ensure_ascii=False))
# =========================================================================
# Sheets
# =========================================================================
def sheets_get(args):
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "get"],
params={"spreadsheetId": args.sheet_id, "range": args.range},
)
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().get(
spreadsheetId=args.sheet_id, range=args.range,
).execute()
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
def sheets_update(args):
values = json.loads(args.values)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "update"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().update(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", body=body,
).execute()
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
def sheets_append(args):
values = json.loads(args.values)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "append"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
"insertDataOption": "INSERT_ROWS",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().append(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body,
).execute()
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
# =========================================================================
# Docs
# =========================================================================
def docs_get(args):
if _gws_binary():
doc = _run_gws(["docs", "documents", "get"], params={"documentId": args.doc_id})
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("docs", "v1")
doc = service.documents().get(documentId=args.doc_id).execute()
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
# =========================================================================
# CLI parser
# =========================================================================
def main():
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent")
sub = parser.add_subparsers(dest="service", required=True)
# --- Gmail ---
gmail = sub.add_parser("gmail")
gmail_sub = gmail.add_subparsers(dest="action", required=True)
p = gmail_sub.add_parser("search")
p.add_argument("query", help="Gmail search query (e.g. 'is:unread')")
p.add_argument("--max", type=int, default=10)
p.set_defaults(func=gmail_search)
p = gmail_sub.add_parser("get")
p.add_argument("message_id")
p.set_defaults(func=gmail_get)
p = gmail_sub.add_parser("send")
p.add_argument("--to", required=True)
p.add_argument("--subject", required=True)
p.add_argument("--body", required=True)
p.add_argument("--cc", default="")
p.add_argument("--from", dest="from_header", default="", help="Custom From header (e.g. '\"Agent Name\" <user@example.com>')")
p.add_argument("--html", action="store_true", help="Send body as HTML")
p.add_argument("--thread-id", default="", help="Thread ID for threading")
p.set_defaults(func=gmail_send)
p = gmail_sub.add_parser("reply")
p.add_argument("message_id", help="Message ID to reply to")
p.add_argument("--body", required=True)
p.add_argument("--from", dest="from_header", default="", help="Custom From header (e.g. '\"Agent Name\" <user@example.com>')")
p.set_defaults(func=gmail_reply)
p = gmail_sub.add_parser("labels")
p.set_defaults(func=gmail_labels)
p = gmail_sub.add_parser("modify")
p.add_argument("message_id")
p.add_argument("--add-labels", default="", help="Comma-separated label IDs to add")
p.add_argument("--remove-labels", default="", help="Comma-separated label IDs to remove")
p.set_defaults(func=gmail_modify)
# --- Calendar ---
cal = sub.add_parser("calendar")
cal_sub = cal.add_subparsers(dest="action", required=True)
p = cal_sub.add_parser("list")
p.add_argument("--start", default="", help="Start time (ISO 8601)")
p.add_argument("--end", default="", help="End time (ISO 8601)")
p.add_argument("--max", type=int, default=25)
p.add_argument("--calendar", default="primary")
p.set_defaults(func=calendar_list)
p = cal_sub.add_parser("create")
p.add_argument("--summary", required=True)
p.add_argument("--start", required=True, help="Start (ISO 8601 with timezone)")
p.add_argument("--end", required=True, help="End (ISO 8601 with timezone)")
p.add_argument("--location", default="")
p.add_argument("--description", default="")
p.add_argument("--attendees", default="", help="Comma-separated email addresses")
p.add_argument("--calendar", default="primary")
p.set_defaults(func=calendar_create)
p = cal_sub.add_parser("delete")
p.add_argument("event_id")
p.add_argument("--calendar", default="primary")
p.set_defaults(func=calendar_delete)
# --- Drive ---
drv = sub.add_parser("drive")
drv_sub = drv.add_subparsers(dest="action", required=True)
p = drv_sub.add_parser("search")
p.add_argument("query")
p.add_argument("--max", type=int, default=10)
p.add_argument("--raw-query", action="store_true", help="Use query as raw Drive API query")
p.set_defaults(func=drive_search)
# --- Contacts ---
con = sub.add_parser("contacts")
con_sub = con.add_subparsers(dest="action", required=True)
p = con_sub.add_parser("list")
p.add_argument("--max", type=int, default=50)
p.set_defaults(func=contacts_list)
# --- Sheets ---
sh = sub.add_parser("sheets")
sh_sub = sh.add_subparsers(dest="action", required=True)
p = sh_sub.add_parser("get")
p.add_argument("sheet_id")
p.add_argument("range")
p.set_defaults(func=sheets_get)
p = sh_sub.add_parser("update")
p.add_argument("sheet_id")
p.add_argument("range")
p.add_argument("--values", required=True, help="JSON array of arrays")
p.set_defaults(func=sheets_update)
p = sh_sub.add_parser("append")
p.add_argument("sheet_id")
p.add_argument("range")
p.add_argument("--values", required=True, help="JSON array of arrays")
p.set_defaults(func=sheets_append)
# --- Docs ---
docs = sub.add_parser("docs")
docs_sub = docs.add_subparsers(dest="action", required=True)
p = docs_sub.add_parser("get")
p.add_argument("doc_id")
p.set_defaults(func=docs_get)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()
+105
View File
@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""Bridge between Hermes OAuth token and gws CLI.
Refreshes the token if expired, then executes gws with the valid access token.
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def get_hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
def get_token_path() -> Path:
return get_hermes_home() / "google_token.json"
def _normalize_authorized_user_payload(payload: dict) -> dict:
normalized = dict(payload)
if not normalized.get("type"):
normalized["type"] = "authorized_user"
return normalized
def refresh_token(token_data: dict) -> dict:
"""Refresh the access token using the refresh token."""
import urllib.error
import urllib.parse
import urllib.request
required_keys = ["client_id", "client_secret", "refresh_token", "token_uri"]
missing = [k for k in required_keys if k not in token_data]
if missing:
print(f"ERROR: google_token.json is missing required fields: {', '.join(missing)}", file=sys.stderr)
print("Please re-authenticate by running the Google Workspace setup script.", file=sys.stderr)
sys.exit(1)
params = urllib.parse.urlencode({
"client_id": token_data["client_id"],
"client_secret": token_data["client_secret"],
"refresh_token": token_data["refresh_token"],
"grant_type": "refresh_token",
}).encode()
req = urllib.request.Request(token_data["token_uri"], data=params)
try:
with urllib.request.urlopen(req) as resp:
result = json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
print(f"ERROR: Token refresh failed (HTTP {e.code}): {body}", file=sys.stderr)
print("Re-run setup.py to re-authenticate.", file=sys.stderr)
sys.exit(1)
token_data["token"] = result["access_token"]
token_data["expiry"] = datetime.fromtimestamp(
datetime.now(timezone.utc).timestamp() + result["expires_in"],
tz=timezone.utc,
).isoformat()
get_token_path().write_text(
json.dumps(_normalize_authorized_user_payload(token_data), indent=2)
)
return token_data
def get_valid_token() -> str:
"""Return a valid access token, refreshing if needed."""
token_path = get_token_path()
if not token_path.exists():
print("ERROR: No Google token found. Run setup.py --auth-url first.", file=sys.stderr)
sys.exit(1)
token_data = json.loads(token_path.read_text())
expiry = token_data.get("expiry", "")
if expiry:
exp_dt = datetime.fromisoformat(expiry.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if now >= exp_dt:
token_data = refresh_token(token_data)
return token_data["token"]
def main():
"""Refresh token if needed, then exec gws with remaining args."""
if len(sys.argv) < 2:
print("Usage: gws_bridge.py <gws args...>", file=sys.stderr)
sys.exit(1)
access_token = get_valid_token()
env = os.environ.copy()
env["GOOGLE_WORKSPACE_CLI_TOKEN"] = access_token
result = subprocess.run(["gws"] + sys.argv[1:], env=env)
sys.exit(result.returncode)
if __name__ == "__main__":
main()
@@ -0,0 +1,409 @@
#!/usr/bin/env python3
"""Google Workspace OAuth2 setup for Hermes Agent.
Fully non-interactive — designed to be driven by the agent via terminal commands.
The agent mediates between this script and the user (works on CLI, Telegram, Discord, etc.)
Commands:
setup.py --check # Is auth valid? Exit 0 = yes, 1 = no
setup.py --client-secret /path/to.json # Store OAuth client credentials
setup.py --auth-url # Print the OAuth URL for user to visit
setup.py --auth-code CODE # Exchange auth code for token
setup.py --revoke # Revoke and delete stored token
setup.py --install-deps # Install Python dependencies only
Agent workflow:
1. Run --check. If exit 0, auth is good — skip setup.
2. Ask user for client_secret.json path. Run --client-secret PATH.
3. Run --auth-url. Send the printed URL to the user.
4. User opens URL, authorizes, gets redirected to a page with a code.
5. User pastes the code. Agent runs --auth-code CODE.
6. Run --check to verify. Done.
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
try:
from hermes_constants import display_hermes_home, get_hermes_home
except ModuleNotFoundError:
HERMES_AGENT_ROOT = Path(__file__).resolve().parents[4]
if HERMES_AGENT_ROOT.exists():
sys.path.insert(0, str(HERMES_AGENT_ROOT))
from hermes_constants import display_hermes_home, get_hermes_home
HERMES_HOME = get_hermes_home()
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/contacts.readonly",
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/documents.readonly",
]
REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google-auth-httplib2"]
# OAuth redirect for "out of band" manual code copy flow.
# Google deprecated OOB, so we use a localhost redirect and tell the user to
# copy the code from the browser's URL bar (or the page body).
REDIRECT_URI = "http://localhost:1"
def _normalize_authorized_user_payload(payload: dict) -> dict:
normalized = dict(payload)
if not normalized.get("type"):
normalized["type"] = "authorized_user"
return normalized
def _load_token_payload(path: Path = TOKEN_PATH) -> dict:
try:
return json.loads(path.read_text())
except Exception:
return {}
def _missing_scopes_from_payload(payload: dict) -> list[str]:
raw = payload.get("scopes") or payload.get("scope")
if not raw:
return []
granted = {s.strip() for s in (raw.split() if isinstance(raw, str) else raw) if s.strip()}
return sorted(scope for scope in SCOPES if scope not in granted)
def _format_missing_scopes(missing_scopes: list[str]) -> str:
bullets = "\n".join(f" - {scope}" for scope in missing_scopes)
return (
"Token is valid but missing required Google Workspace scopes:\n"
f"{bullets}\n"
"Run the Google Workspace setup again from this same Hermes profile to refresh consent."
)
def install_deps():
"""Install Google API packages if missing. Returns True on success."""
try:
import googleapiclient # noqa: F401
import google_auth_oauthlib # noqa: F401
print("Dependencies already installed.")
return True
except ImportError:
pass
print("Installing Google API dependencies...")
try:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "--quiet"] + REQUIRED_PACKAGES,
stdout=subprocess.DEVNULL,
)
print("Dependencies installed.")
return True
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to install dependencies: {e}")
print(f"Try manually: {sys.executable} -m pip install {' '.join(REQUIRED_PACKAGES)}")
return False
def _ensure_deps():
"""Check deps are available, install if not, exit on failure."""
try:
import googleapiclient # noqa: F401
import google_auth_oauthlib # noqa: F401
except ImportError:
if not install_deps():
sys.exit(1)
def check_auth():
"""Check if stored credentials are valid. Prints status, exits 0 or 1."""
if not TOKEN_PATH.exists():
print(f"NOT_AUTHENTICATED: No token at {TOKEN_PATH}")
return False
_ensure_deps()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
try:
# Don't pass scopes — user may have authorized only a subset.
# Passing scopes forces google-auth to validate them on refresh,
# which fails with invalid_scope if the token has fewer scopes
# than requested.
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH))
except Exception as e:
print(f"TOKEN_CORRUPT: {e}")
return False
payload = _load_token_payload(TOKEN_PATH)
if creds.valid:
missing_scopes = _missing_scopes_from_payload(payload)
if missing_scopes:
print(f"AUTHENTICATED (partial): Token valid but missing {len(missing_scopes)} scopes:")
for s in missing_scopes:
print(f" - {s}")
print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}")
return True
if creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
TOKEN_PATH.write_text(
json.dumps(
_normalize_authorized_user_payload(json.loads(creds.to_json())),
indent=2,
)
)
missing_scopes = _missing_scopes_from_payload(_load_token_payload(TOKEN_PATH))
if missing_scopes:
print(f"AUTHENTICATED (partial): Token refreshed but missing {len(missing_scopes)} scopes:")
for s in missing_scopes:
print(f" - {s}")
print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}")
return True
except Exception as e:
print(f"REFRESH_FAILED: {e}")
return False
print("TOKEN_INVALID: Re-run setup.")
return False
def store_client_secret(path: str):
"""Copy and validate client_secret.json to Hermes home."""
src = Path(path).expanduser().resolve()
if not src.exists():
print(f"ERROR: File not found: {src}")
sys.exit(1)
try:
data = json.loads(src.read_text())
except json.JSONDecodeError:
print("ERROR: File is not valid JSON.")
sys.exit(1)
if "installed" not in data and "web" not in data:
print("ERROR: Not a Google OAuth client secret file (missing 'installed' key).")
print("Download the correct file from: https://console.cloud.google.com/apis/credentials")
sys.exit(1)
CLIENT_SECRET_PATH.write_text(json.dumps(data, indent=2))
print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}")
def _save_pending_auth(*, state: str, code_verifier: str):
"""Persist the OAuth session bits needed for a later token exchange."""
PENDING_AUTH_PATH.write_text(
json.dumps(
{
"state": state,
"code_verifier": code_verifier,
"redirect_uri": REDIRECT_URI,
},
indent=2,
)
)
def _load_pending_auth() -> dict:
"""Load the pending OAuth session created by get_auth_url()."""
if not PENDING_AUTH_PATH.exists():
print("ERROR: No pending OAuth session found. Run --auth-url first.")
sys.exit(1)
try:
data = json.loads(PENDING_AUTH_PATH.read_text())
except Exception as e:
print(f"ERROR: Could not read pending OAuth session: {e}")
print("Run --auth-url again to start a fresh OAuth session.")
sys.exit(1)
if not data.get("state") or not data.get("code_verifier"):
print("ERROR: Pending OAuth session is missing PKCE data.")
print("Run --auth-url again to start a fresh OAuth session.")
sys.exit(1)
return data
def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]:
"""Accept either a raw auth code or the full redirect URL pasted by the user."""
if not code_or_url.startswith("http"):
return code_or_url, None
from urllib.parse import parse_qs, urlparse
parsed = urlparse(code_or_url)
params = parse_qs(parsed.query)
if "code" not in params:
print("ERROR: No 'code' parameter found in URL.")
sys.exit(1)
state = params.get("state", [None])[0]
return params["code"][0], state
def get_auth_url():
"""Print the OAuth authorization URL. User visits this in a browser."""
if not CLIENT_SECRET_PATH.exists():
print("ERROR: No client secret stored. Run --client-secret first.")
sys.exit(1)
_ensure_deps()
from google_auth_oauthlib.flow import Flow
flow = Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=SCOPES,
redirect_uri=REDIRECT_URI,
autogenerate_code_verifier=True,
)
auth_url, state = flow.authorization_url(
access_type="offline",
prompt="consent",
)
_save_pending_auth(state=state, code_verifier=flow.code_verifier)
# Print just the URL so the agent can extract it cleanly
print(auth_url)
def exchange_auth_code(code: str):
"""Exchange the authorization code for a token and save it."""
if not CLIENT_SECRET_PATH.exists():
print("ERROR: No client secret stored. Run --client-secret first.")
sys.exit(1)
pending_auth = _load_pending_auth()
code, returned_state = _extract_code_and_state(code)
if returned_state and returned_state != pending_auth["state"]:
print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.")
sys.exit(1)
_ensure_deps()
from google_auth_oauthlib.flow import Flow
from urllib.parse import parse_qs, urlparse
# Extract granted scopes from the callback URL if present
if returned_state and "scope" in parse_qs(urlparse(code).query if isinstance(code, str) and code.startswith("http") else {}):
granted_scopes = parse_qs(urlparse(code).query)["scope"][0].split()
else:
# Try to extract from code_or_url parameter
if isinstance(code, str) and code.startswith("http"):
params = parse_qs(urlparse(code).query)
if "scope" in params:
granted_scopes = params["scope"][0].split()
else:
granted_scopes = SCOPES
else:
granted_scopes = SCOPES
flow = Flow.from_client_secrets_file(
str(CLIENT_SECRET_PATH),
scopes=granted_scopes,
redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI),
state=pending_auth["state"],
code_verifier=pending_auth["code_verifier"],
)
try:
# Accept partial scopes — user may deselect some permissions in the consent screen
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"
flow.fetch_token(code=code)
except Exception as e:
print(f"ERROR: Token exchange failed: {e}")
print("The code may have expired. Run --auth-url to get a fresh URL.")
sys.exit(1)
creds = flow.credentials
token_payload = _normalize_authorized_user_payload(json.loads(creds.to_json()))
# Store only the scopes actually granted by the user, not what was requested.
# creds.to_json() writes the requested scopes, which causes refresh to fail
# with invalid_scope if the user only authorized a subset.
actually_granted = list(creds.granted_scopes or []) if hasattr(creds, "granted_scopes") and creds.granted_scopes else []
if actually_granted:
token_payload["scopes"] = actually_granted
elif granted_scopes != SCOPES:
# granted_scopes was extracted from the callback URL
token_payload["scopes"] = granted_scopes
missing_scopes = _missing_scopes_from_payload(token_payload)
if missing_scopes:
print(f"WARNING: Token missing some Google Workspace scopes: {', '.join(missing_scopes)}")
print("Some services may not be available.")
TOKEN_PATH.write_text(json.dumps(token_payload, indent=2))
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
print(f"Profile-scoped token location: {display_hermes_home()}/google_token.json")
def revoke():
"""Revoke stored token and delete it."""
if not TOKEN_PATH.exists():
print("No token to revoke.")
return
_ensure_deps()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
try:
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
import urllib.request
urllib.request.urlopen(
urllib.request.Request(
f"https://oauth2.googleapis.com/revoke?token={creds.token}",
method="POST",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
)
print("Token revoked with Google.")
except Exception as e:
print(f"Remote revocation failed (token may already be invalid): {e}")
TOKEN_PATH.unlink(missing_ok=True)
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"Deleted {TOKEN_PATH}")
def main():
parser = argparse.ArgumentParser(description="Google Workspace OAuth setup for Hermes")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Check if auth is valid (exit 0=yes, 1=no)")
group.add_argument("--client-secret", metavar="PATH", help="Store OAuth client_secret.json")
group.add_argument("--auth-url", action="store_true", help="Print OAuth URL for user to visit")
group.add_argument("--auth-code", metavar="CODE", help="Exchange auth code for token")
group.add_argument("--revoke", action="store_true", help="Revoke and delete stored token")
group.add_argument("--install-deps", action="store_true", help="Install Python dependencies")
args = parser.parse_args()
if args.check:
sys.exit(0 if check_auth() else 1)
elif args.client_secret:
store_client_secret(args.client_secret)
elif args.auth_url:
get_auth_url()
elif args.auth_code:
exchange_auth_code(args.auth_code)
elif args.revoke:
revoke()
elif args.install_deps:
sys.exit(0 if install_deps() else 1)
if __name__ == "__main__":
main()