agent-sse-listener
v0.3.0
Published
Zero-dependency, never-break Server-Sent Events client for AI agents. Works across three durability tiers — ephemeral (in your agent session), foreground (on your laptop), or supervised (systemd/Docker/pm2). Reliable delivery via reconnect + exponential b
Maintainers
Readme
agent-sse-listener
Zero-dependency, never-break Server-Sent Events client for AI agents.
Point it at any SSE URL. Get reliable delivery via reconnect with exponential backoff, Last-Event-ID resume, heartbeat timeout detection, and a prepare() hook for dynamic auth. Ships with a CLI, built-in Telegram and email forwarding adapters, and process-supervisor templates.
npx agent-sse-listener \
--url https://api.example.com/stream \
--bearer $MY_TOKEN \
--json- Zero runtime dependencies. No
viem, noeventsourcepolyfill. Pure Node stdlib. Install in ~1 second, nothing to audit. - Generic. Works with any SSE endpoint on the open web. The package doesn't know about any particular service.
- Never-break. Reconnects on disconnect, honors server
retry:fields, resumes from last event ID, detects stuck TCP connections via heartbeat timeout. - Agent-first CLI. Every option is both a flag and an env var, so it drops straight into systemd / Docker / pm2 / Railway without ceremony.
- Flexible auth. Static headers, bearer tokens, query params, or a dynamic
prepare()hook for JWT refresh, wallet-signed messages, rotating nonces — anything.
Requires Node >=20.19.0 (or >=22.12.0).
Why this exists
AI agents that need to listen to real-time event streams hit a wall: writing a correct SSE client (reconnect, heartbeats, Last-Event-ID, dynamic auth) is 200 lines of subtle code that everybody gets slightly wrong. And the follow-up question — "where do I run it?" — has three real answers, not one:
- Tier 1 — in the agent's own session. For ad-hoc requests: "monitor @elonmusk for the next hour." The listener runs for as long as the session does. Stops when the session ends. This is a valid choice for a lot of use cases, and it's what an agent should reach for first.
- Tier 2 — on the user's laptop. Foreground or detached in a terminal. Lives while the machine is on.
- Tier 3 — supervised (systemd / Docker / pm2 / Railway worker). For production where every event matters.
Earlier "always use a supervisor" advice was correct for tier 3 and wrong for everything else — it broke agent DX by telling them "go install systemd" for what should be a 30-second task.
This package collapses the SSE client part to one CLI command (or a three-line library setup), and surfaces the tier choice explicitly: the CLI auto-detects which tier you're running in and prints it on startup so the user/agent knows what they're getting.
Install
npm install agent-sse-listener
# Or use the CLI without installing:
npx agent-sse-listener --helpCLI
agent-sse-listener --url <URL> [options]
sse-listen --url <URL> [options] (short alias)
REQUIRED
--url <URL> SSE endpoint (env: SSE_URL)
AUTH
--bearer <token> Authorization: Bearer <token> (env: SSE_BEARER)
-H, --header "Name: value" Custom header (repeatable) (env: SSE_HEADER_1, _2, ...)
--query "key=value" Query param (repeatable) (env: SSE_QUERY_1, _2, ...)
CONNECTION
--method <METHOD> Default GET
--body <text|@file> Request body (for POST SSE)
--last-event-id <id> Resume from a persisted cursor
RELIABILITY
--reconnect-initial-ms <ms> Initial backoff, default 1000
--reconnect-max-ms <ms> Max backoff, default 30000
--max-reconnect-attempts <n> Default: unlimited
--no-reconnect Run once, then exit
--heartbeat-timeout-ms <ms> Force reconnect if no bytes for this long.
Default 120000. Set 0 to disable.
--connect-timeout-ms <ms> Initial-response timeout, default 30000
OUTPUT
--json One JSON object per event to stdout
--filter <event-name> Only emit events with this name (repeatable)
--log-file <path> Tee stdout to a file
--quiet Suppress status lines on stderr
--on-comment Also emit comment/heartbeat lines
ADAPTERS — forward events to chat/email (optional, any combination)
--telegram-bot-token <token> + --telegram-chat-id <id>
--email-provider <name> "resend" (default) or "generic"
--email-api-key <key> API key for the provider
--email-from <address> Sender address (must be verified)
--email-to <address> Recipient address
-h, --help This help
-v, --version Print versionCLI examples
Public stream, plain text output
npx agent-sse-listener --url https://stream.example.com/eventsBearer token → JSON → jq
npx agent-sse-listener \
--url https://api.example.com/sse \
--bearer $TOKEN \
--json | jq 'select(.name == "tweet")'Multiple custom headers
npx agent-sse-listener \
--url https://x402.moltie.co/api/stream \
-H "X-Wallet-Address: 0xabc..." \
-H "X-Wallet-Signature: 0xdef..." \
--jsonPOST SSE stream with a JSON body
npx agent-sse-listener \
--url https://api.example.com/subscribe \
--method POST \
--body '{"channels":["alerts","trades"]}' \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN"Resume from a persisted cursor
npx agent-sse-listener \
--url https://api.example.com/stream \
--last-event-id $(cat .sse-cursor) \
--bearer $TOKENAll-env-vars mode (for systemd / Docker / Railway)
SSE_URL=https://api.example.com/stream \
SSE_BEARER=xyz \
SSE_HEADER_1="X-API-Key: abc" \
SSE_HEADER_2="X-Client-ID: agent-1" \
npx agent-sse-listener --jsonStream events → Telegram
npx agent-sse-listener \
--url https://api.example.com/stream \
--bearer $TOKEN \
--telegram-bot-token $TELEGRAM_BOT_TOKEN \
--telegram-chat-id $TELEGRAM_CHAT_IDStream events → email (via Resend)
npx agent-sse-listener \
--url https://api.example.com/stream \
--bearer $TOKEN \
--email-api-key $RESEND_API_KEY \
--email-from [email protected] \
--email-to [email protected]Stream events → Telegram AND email at the same time
npx agent-sse-listener \
--url https://api.example.com/stream \
--bearer $TOKEN \
--telegram-bot-token $TG_TOKEN \
--telegram-chat-id $TG_CHAT \
--email-api-key $RESEND_API_KEY \
--email-from [email protected] \
--email-to [email protected]Library
import { SSEListener } from "agent-sse-listener";
const listener = new SSEListener({
url: "https://api.example.com/stream",
headers: { Authorization: "Bearer xyz" },
});
// Listen to every event (catch-all)
listener.on("event", ({ name, data, id }) => {
console.log(name, data, id);
});
// Or listen to a specific server-side event name directly
listener.on("tweet", (data, fullPayload) => {
console.log("new tweet:", data);
});
// Lifecycle
listener.on("open", () => console.log("stream open"));
listener.on("error", (e) => console.error(e));
listener.on("reconnect", ({ attempt, delay_ms, reason, lastEventId }) => {
console.log(`reconnecting #${attempt} in ${delay_ms}ms — ${reason}`);
});
listener.on("close", () => console.log("stream closed"));
await listener.start();
// Stop gracefully
// listener.stop();Full options reference
new SSEListener({
url: "https://api.example.com/stream", // required
headers: { Authorization: "Bearer xyz" }, // optional
query: { channel: "alerts" }, // optional
method: "GET", // default GET
body: JSON.stringify({...}), // for POST SSE
// Dynamic auth — called before every (re)connect
prepare: async () => ({
headers: { Authorization: `Bearer ${await getFreshJwt()}` },
// Can also return { query, url }
}),
// Reliability
autoReconnect: true, // default
reconnectInitialMs: 1000, // default
reconnectMaxMs: 30_000, // default
maxReconnectAttempts: Infinity, // default
heartbeatTimeoutMs: 120_000, // default; 0 to disable
connectTimeoutMs: 30_000, // default
// Last-Event-ID resume
lastEventId: "saved-from-previous-run",
// Inject a custom fetch if you need it (e.g. for TLS client certs)
fetch: customFetch,
});Events
| Event | When | Payload |
|---|---|---|
| open | Connection established. | { url, attempt } |
| event | Catch-all for every parsed SSE message. Attach adapters here. | { name, data, id, raw } |
| <server-event-name> | Also emitted under the server's event: name, e.g. listener.on("tweet", ...). | (data, fullPayload) |
| message / data | Alias for the default (unnamed) SSE event. Matches browser EventSource. | (data, fullPayload) |
| comment | SSE comment line (:heartbeat, etc.) | (text) |
| close | Server closed the stream gracefully. | { reason } |
| error | Any error. Listener keeps retrying unless autoReconnect: false. | Error |
| reconnect | Fired before each reconnect. | { attempt, delay_ms, reason, lastEventId } |
| stopped | Fired once after stop() fully completes. | — |
data in the event payload is auto-parsed as JSON if the string starts with { or [ and is valid JSON; otherwise it's the raw string. raw always contains the exact string the server sent.
Methods
listener.start()— Opens the stream and begins the reconnect loop. Resolves on first successful connection; rejects on first connection failure. Keeps running in the background after resolution.listener.stop()— Aborts any in-flight request and cancels reconnect. Idempotent.listener.lastEventId— Getter. The current Last-Event-ID cursor. Useful for persisting between process restarts.
The prepare() hook — one feature, many use cases
The package has no built-in concept of "auth". Instead, a single hook lets you wire any signing/refresh/nonce flow into the connect loop. It runs before the first connect and before every reconnect, so time-limited credentials just work across disconnects without special-casing.
JWT refresh (short-lived tokens)
new SSEListener({
url: "...",
prepare: async () => ({
headers: { Authorization: `Bearer ${await refreshJwt()}` },
}),
});Wallet-signed message (x402 / moltbank-style services)
import { privateKeyToAccount } from "viem/accounts";
const account = privateKeyToAccount(process.env.PK);
new SSEListener({
url: "https://x402.moltie.co/api/stream",
prepare: async () => {
const today = new Date().toISOString().slice(0, 10);
const sig = await account.signMessage({
message: `x402-tweet-notifier:stream:${today}`,
});
return {
headers: {
"X-Wallet-Address": account.address,
"X-Wallet-Signature": sig,
},
};
},
});Rotating nonce
new SSEListener({
url: "...",
prepare: async () => ({
query: { nonce: crypto.randomUUID(), ts: String(Date.now()) },
}),
});Full URL override (for services that put auth in the path)
new SSEListener({
url: "https://api.example.com/stream",
prepare: async () => ({
url: `https://api.example.com/stream/${await getChannelId()}`,
}),
});Reliability features (what "never-break" means in practice)
- Exponential backoff reconnect. 1s → 2s → 4s → ... → 30s (configurable). Resets to 1s on every successful connection.
- Honors server-sent
retry:field. If the server tells the client how long to wait before reconnecting, we honor it once (overriding the backoff for that attempt only). - Last-Event-ID resume. Every
id:field the server sends is tracked. On reconnect, the package sendsLast-Event-ID: <cursor>so servers that support replay can resume cleanly. - Heartbeat timeout (default 120s). If no bytes — data OR SSE comments — arrive for this long, we assume the TCP connection is stuck (common with NAT routers, load balancers, and idle proxies) and force a reconnect. Disable with
heartbeatTimeoutMs: 0if your server never sends anything in quiet periods. - Connect timeout (default 30s). If the initial response doesn't come back within this window, abort and retry.
- Clean shutdown.
stop()aborts in-flight requests, cancels pending reconnects, and emitsstoppedexactly once. - Graceful error recovery.
prepare()hook failures, server errors, network errors, and heartbeat timeouts all feed into the same retry loop. A single transient failure doesn't kill the listener.
Durability tiers — pick what you need
The CLI detects which tier it's running in and prints it on startup. Override with --tier 1|2|3 or the SSE_TIER env var if detection is wrong.
Tier 1 — ephemeral / session-bound (agent default)
Just run the CLI foreground in your session. No infrastructure.
npx agent-sse-listener --url https://example.com/stream --bearer $TOKEN --jsonUse for:
- Ad-hoc requests ("monitor this for the next hour")
- Agent turns where the whole interaction is contained
- Testing / demos
Caveats:
- Dies when the session ends. Further events aren't delivered.
- For services with refund-on-miss semantics (e.g. moltbank x402 tweet notifier), missed events are not charged — so there's no financial risk, just events-not-delivered.
Tier 2 — user foreground (laptop / desktop)
Same CLI. User runs it themselves on their own machine. Terminal stays open, or detach with nohup:
# Foreground
npx agent-sse-listener --url ... --bearer ... --json
# Detached (Linux/macOS)
nohup npx agent-sse-listener --url ... --bearer ... --json > listener.log 2>&1 &
disownStop with pkill -f agent-sse-listener or kill <pid>.
Use for:
- Personal monitoring across a workday
- Users who don't have a VPS
Caveats:
- Dies on sleep / shutdown / terminal close. No automatic restart on crash.
- Put secrets in env files (
chmod 600), not CLI flags (process list is world-readable).
Tier 3 — always-on / supervised (production)
Templates in examples/:
| Supervisor | File | When to use |
|---|---|---|
| systemd | examples/systemd.service | Linux server you own |
| Docker | examples/Dockerfile + examples/docker-compose.yml | Containerized infra |
| pm2 | examples/pm2-ecosystem.config.cjs | Simplest cross-platform |
| Railway | examples/railway.json | Managed hosting, no infra |
The CLI auto-detects tier 3 when it sees any of: INVOCATION_ID / NOTIFY_SOCKET (systemd), pm_id (pm2), RAILWAY_SERVICE_NAME, FLY_APP_NAME, RENDER_SERVICE_ID, or KUBERNETES_SERVICE_HOST.
Use for:
- Production monitoring where every event matters
- Multi-day subscriptions
- Any setup where human intervention to restart is unacceptable
Environments that don't work well at any tier
- ❌ Serverless functions with a short max-duration (AWS Lambda, Vercel Functions) — they time out while holding the connection.
- ❌ Cron jobs that re-spawn the listener every N minutes — there's a connect-window gap where events are missed. Use tier 3 instead.
- ❌ CI runners / ephemeral containers — they're designed to exit on job completion, not hold connections open.
Built-in notification adapters
Both adapters are zero-dependency — they use fetch() to call the provider's HTTP API. They work from both the CLI (via flags) and the library (via import). Adapter errors are isolated — one failing doesn't crash the listener or block the other.
Telegram
import { SSEListener, telegram } from "agent-sse-listener";
const listener = new SSEListener({ url: "..." });
listener.on("event", telegram({
botToken: process.env.TELEGRAM_BOT_TOKEN,
chatId: process.env.TELEGRAM_CHAT_ID,
// Optional:
// parseMode: "HTML" | "MarkdownV2" | null, // default HTML
// silent: false, // send without notification sound
// filter: ["tweet", "alert"], // only forward these event names
// format: (event) => "custom string", // return null to skip
// timeoutMs: 10_000, // per-request timeout
}));
await listener.start();Setup: talk to @BotFather on Telegram, /newbot, copy the token. Add the bot to a group/channel or DM it. Get your numeric chat_id from https://api.telegram.org/bot<TOKEN>/getUpdates.
Email (via Resend)
import { SSEListener, email } from "agent-sse-listener";
const listener = new SSEListener({ url: "..." });
listener.on("event", email({
provider: "resend", // default
apiKey: process.env.RESEND_API_KEY,
from: "[email protected]", // must be verified with Resend
to: "[email protected]", // or comma-separated list
// Optional:
// subject: "[SSE] {name} event", // {name} is replaced with event name
// filter: ["tweet", "alert"],
// format: (event) => ({ subject: "...", body: "..." }), // return null to skip
// timeoutMs: 10_000,
}));
await listener.start();Setup: create a free account at resend.com, verify your domain, and create an API key.
Email (generic — any HTTP email API)
listener.on("event", email({
provider: "generic",
from: "[email protected]",
to: "[email protected]",
url: "https://api.mailgun.net/v3/example.com/messages",
headers: { Authorization: "Basic " + btoa("api:" + process.env.MAILGUN_KEY) },
transform: (params) => ({
from: params.from,
to: params.to,
subject: params.subject,
text: params.body,
}),
}));The transform function reshapes the {from, to, subject, body} params into whatever JSON shape your email API expects. If omitted, the default sends { from, to, subject, text: body }.
Writing your own adapter
No plugin protocol. An adapter is just (event) => Promise<void>:
async function myAdapter(event) {
// event = { name, data, id, raw } from listener.on("event", ...)
await doSomethingWith(event);
}
listener.on("event", myAdapter);Scope and boundaries
This package is a reliable SSE client with optional notification forwarding. It does NOT:
- Know about any specific service (x402, moltbank, Linear, GitHub, etc.)
- Persist events to a database (use the
eventlistener + your DB of choice) - Provide rate limiting or per-event deduplication
- Support WebSocket (not SSE — use
wsorsocket.io-client) - Support Slack/Discord adapters (use the
eventlistener and call the webhook yourself — seeexamples/06-piping-to-handler.mjsfor the pattern)
Publishing to npm
cd packages/agent-sse-listener
npm login
npm publish --access publicLicense
MIT. See LICENSE.
