@statewavedev/connectors-gmail
v0.3.0
Published
Gmail connector for Statewave — messages and threads as relationship-memory episodes (pull + Pub/Sub push receiver with built-in OIDC verification)
Maintainers
Readme
@statewavedev/connectors-gmail
Gmail connector for Statewave — turns messages matching an operator-supplied Gmail search query into normalized relationship-memory episodes.
Part of the Statewave Connectors ecosystem.
What it ingests
| Source | Episode kind |
|---|---|
| Inbound message (no SENT label) | gmail.message.received |
| Outbound message (SENT label present) | gmail.message.sent |
The --query flag is required — there is no "ingest the whole mailbox" default. You scope what to pull explicitly.
Quickstart
export GMAIL_CLIENT_ID=...
export GMAIL_CLIENT_SECRET=...
export GMAIL_REFRESH_TOKEN=...
statewave-connectors sync gmail \
--query 'label:inbox newer_than:30d' \
--dry-run
# Per-contact pull
statewave-connectors sync gmail \
--query 'from:[email protected] after:2026/01/01' \
--max-items 50 \
--dry-runAuth
OAuth 2.0 refresh-token flow. The connector accepts three credentials and exchanges them for a short-lived access token at runtime:
| Env var | CLI flag | What it is |
|---|---|---|
| GMAIL_CLIENT_ID | --client-id | OAuth 2.0 client id from your Google Cloud project |
| GMAIL_CLIENT_SECRET | --client-secret | OAuth 2.0 client secret |
| GMAIL_REFRESH_TOKEN | --refresh-token | Long-lived refresh token issued during the one-time consent flow |
The access token is cached until ~1 minute before expiry and refreshed transparently — there's no per-request OAuth round-trip.
One-time setup
- Create an OAuth client in Google Cloud Console → APIs & Services → Credentials → Create Credentials → OAuth client ID → Desktop app (or Web application with
http://localhostas a redirect URI). Copy the client id + secret. - Enable the Gmail API under APIs & Services → Library → Gmail API → Enable.
- Run a one-time consent flow with scope
https://www.googleapis.com/auth/gmail.readonly. The simplest path is the OAuth 2.0 Playground → gear icon → check "Use your own OAuth credentials" → paste your client id/secret → in the left rail, find Gmail API v1 → checkhttps://www.googleapis.com/auth/gmail.readonly→ Authorize APIs → after consent, Exchange authorization code for tokens → copy theRefresh token. - Export the three credentials and run the connector.
The refresh token is valid until you revoke it (in Google Account → Security → Third-party apps with account access). The access token is short-lived (~1 hour) and is never persisted by the connector.
Service-account auth with domain-wide delegation (for Workspace admins reading mailboxes across a domain) is queued for v0.1.1 — it requires JWT signing.
The credentials are used only by this connector and only sent to https://oauth2.googleapis.com (token exchange) and https://gmail.googleapis.com (Gmail API).
Subject routing
Episodes default to relationship:<other_email>:
- For received messages, the "other party" is the From address.
- For sent messages, the "other party" is the first To recipient.
- Both are lowercased and stripped of any display name (
Bob <bob@x>andbob@xroute to the samerelationship:bob@xsubject). - Pathological messages with no From and no To (rare — system-only mail) fall back to
thread:<thread_id>so episodes still group coherently.
Override per sync with --subject thread:<id> or any custom string.
Body extraction
Gmail returns email bodies as base64url-encoded MIME parts. The connector walks the MIME tree and extracts plaintext in this preference order:
text/plainpart — used as-istext/htmlpart — tags stripped,&entity;references decoded- Snippet fallback — Gmail's server-side first-200-chars snippet
Bodies are truncated at 8000 characters with an ellipsis marker so a single huge email doesn't dominate context bundles.
Options
--client-id ID OAuth 2.0 client id (required)
--client-secret SECRET OAuth 2.0 client secret (required)
--refresh-token TOKEN OAuth 2.0 refresh token (required)
--query Q Gmail search query (required) — e.g. 'label:inbox', 'from:[email protected] after:2026/01/01'
--label-ids LIST (v0.1.1) typed label-id allowlist pushed to Gmail's `labelIds=` server-side filter (AND semantics; e.g. INBOX,IMPORTANT). Use Gmail's stable label ids when you want a typed filter rather than encoding label names into `--query`.
--cursor TOKEN (v0.1.2 — global flag, also honored here) opaque historyId returned on the previous run's `summary.cursor`. When set, the sync uses Gmail's History API to fetch only what's new since. Falls back to a cold-start re-pull when the historyId is older than ~7 days (Gmail's history retention window).
--subject SUBJECT override the default `relationship:<email>` subject
--since YYYY-MM-DD skip messages whose internalDate is older (belt-and-suspenders — Gmail's `after:` operator is usually the right primitive)
--max-items N cap mapped episodes
--dry-run preview mapped episodes without ingesting (recommended for new use)Pub/Sub push receiver (v0.2.0)
The same package also ships a Gmail Pub/Sub push receiver — a pure (Request) => Promise<Response> handler that ingests Gmail's "your mailbox changed" notifications, walks the Gmail History API to fetch the actually-changed messages, and emits each as a gmail.message.received / gmail.message.sent episode in real time. Same handler shape as the Slack/Freshdesk/Zendesk/Intercom receivers.
How Gmail's push model works
Gmail doesn't deliver event payloads directly. The flow is:
- Operator creates a Cloud Pub/Sub topic + push subscription pointing at the daemon URL.
- Operator calls
users.watchon the Gmail API, registering the topic. Gmail returnshistoryId+expiration(max 7 days; renew via cron). - Whenever the mailbox changes, Gmail publishes
{ emailAddress, historyId }to the topic. - Pub/Sub POSTs that pointer to the daemon URL.
- The daemon walks
users.history.list?startHistoryId=<lastSeen>to fetch the actual deltas, thenusers.messages.getfor each new message id, and ingests each as an episode.
Cursor state (the last-seen historyId per mailbox) is persistent — the receiver ships an in-memory store by default and exposes a GmailHistoryCursorStore interface so production deploys can plug in Redis / Postgres.
Run it as a daemon
export GMAIL_PUBSUB_TOKEN=... # random secret you put in the Pub/Sub subscription URL
export GMAIL_CLIENT_ID=... # same OAuth credentials the pull connector uses
export GMAIL_CLIENT_SECRET=...
export GMAIL_REFRESH_TOKEN=...
export GMAIL_QUERY='label:inbox' # optional — same semantics as pull --query
export STATEWAVE_URL=http://localhost:8100
export STATEWAVE_API_KEY=...
statewave-connectors listen gmail --port 3000
# → http://0.0.0.0:3000/gmail/eventsThe daemon expects the path-token either as the last URL path segment (/gmail/events/<token>) or as a query-string parameter (?token=<value>) — both work and the Pub/Sub subscription can be configured either way.
Configure Cloud Pub/Sub + Gmail watch
In the Google Cloud Console (using the same Google Cloud project that owns your Gmail OAuth client):
Pub/Sub → Topics → Create topic (e.g.
gmail-push). Note the full resource nameprojects/<project-id>/topics/gmail-push.IAM: grant
roles/pubsub.publisheron the topic to[email protected](Gmail's service account that publishes notifications).Pub/Sub → Subscriptions → Create subscription on that topic. Pick Push as the delivery type and set the endpoint to:
https://you.example.com/gmail/events?token=<GMAIL_PUBSUB_TOKEN>Use the same value as
GMAIL_PUBSUB_TOKENin the daemon.Register the watch by calling
users.watchon the Gmail API with the topic name. The simplest path is a one-line script:curl -X POST https://gmail.googleapis.com/gmail/v1/users/me/watch \ -H "Authorization: Bearer $GMAIL_ACCESS_TOKEN" \ -H "Content-Type: application/json" \ -d '{"topicName":"projects/<project-id>/topics/gmail-push","labelIds":["INBOX"]}'Re-run before the 7-day expiration to keep the watch alive (cron / scheduled function).
Authentication
Two built-in methods are available — pick one or combine them:
1. Path-token (simplest, default)
Configure a random secret in the Pub/Sub subscription URL:
https://you.example.com/gmail/events?token=<random-secret>The receiver constant-time compares the token. Right for prototypes and small deployments where the URL itself is the secret.
[[push.gmail]]
path_token = "${GMAIL_PUBSUB_TOKEN}"2. OIDC (recommended for production)
Pub/Sub can sign every push request with a Google-issued OIDC token in Authorization: Bearer <id_token>. The receiver fetches Google's well-known JWKs, caches them in memory, and verifies the RS256 JWT (signature + iss + aud + exp) on every delivery. Optionally restrict the email claim to a specific service account.
[[push.gmail]]
oidc = {
audience = "https://you.example.com/gmail/founder/events",
expected_emails = ["[email protected]"],
}Configure the matching Pub/Sub subscription:
- Console → Pub/Sub → Subscriptions → … → Authentication
- Tick Enable authentication
- Service account: pick the service account that owns the subscription (you'll list it in
expected_emails) - Audience: paste the same value you put in
oidc.audience(typically the endpoint URL, or any operator-chosen identifier)
The verifier accepts tokens within a 60-second clock-skew leeway by default — adjust via oidc.leeway_sec.
Combining both (defense in depth)
If both path_token and oidc are configured, both must pass:
[[push.gmail]]
path_token = "${GMAIL_PUBSUB_TOKEN}"
oidc = { audience = "https://you.example.com/gmail/founder/events" }Useful when you want the URL secret as a layer-1 filter (drops anonymous traffic before any crypto runs) plus OIDC as the cryptographic proof.
Custom verifier
Operators with non-standard needs can plug a verifyAuth: (req) => Promise<boolean> callback that runs instead of both built-ins. Programmatic API only — there's no config-file equivalent.
Cursor + replay model
| State | Default | How to override |
|---|---|---|
| Last-seen historyId per mailbox | InMemoryGmailHistoryCursorStore (lost on restart — fine for single-process daemons) | Pass historyCursorStore: ... implementing get/set (Redis, Postgres, …) |
| Pub/Sub messageId dedup | InMemoryGmailPubsubDedupCache (FIFO, 10k entries) | Pass dedupCache: ... |
On cold start (no persisted historyId for that mailbox), the receiver acks 200 and persists the notification's historyId without ingesting anything — the operator is expected to seed history via a cold-start pull (statewave-connectors sync gmail --query …) before turning the daemon on.
When Gmail returns 404 on the History endpoint (cursor older than ~7 days), the receiver logs a warning, resets the cursor to the latest historyId, and acks 200 — the operator should re-run a cold-start pull to backfill the lost window.
Or mount on Vercel / Cloudflare / Express
Same framework-agnostic shape as the other receivers:
import { createGmailPubsubHandler } from '@statewavedev/connectors-gmail'
export const POST = createGmailPubsubHandler({
pathToken: process.env.GMAIL_PUBSUB_TOKEN!,
credentials: {
clientId: process.env.GMAIL_CLIENT_ID!,
clientSecret: process.env.GMAIL_CLIENT_SECRET!,
refreshToken: process.env.GMAIL_REFRESH_TOKEN!,
},
query: 'label:inbox',
statewaveUrl: process.env.STATEWAVE_URL!,
statewaveApiKey: process.env.STATEWAVE_API_KEY,
})For OIDC verification, pass oidc: { audience, expectedEmails?, ... } directly — same shape as the config block, camelCase here. The verifier caches JWKs internally so sharing one handler across the lifetime of the daemon keeps the JWKs fetch budget at one round-trip per cooldownDuration (default 30s).
Status
v0.3.0 — pull mode for messages matching a Gmail query (with --label-ids server-side filter and History-API delta sync) + Pub/Sub push receiver with built-in OIDC verification of push tokens. See RELEASE_NOTES.md.
Out of scope for v0.3 (planned for follow-ups):
- Service account / domain-wide delegation auth on the Gmail API itself (needs JWT signing for the Gmail API access token; OIDC verification of Pub/Sub deliveries shipped in v0.3.0)
- Thread-level episodes (today each message is its own episode; threads are grouped via
metadata.thread_id) - Attachment metadata extraction
- A renew-watch helper that calls
users.watchon a schedule (today: ship your own cron)
