npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@durable-streams-cloudflare/subscription

v0.8.0

Published

Subscription service for durable-streams on Cloudflare Workers + Durable Objects

Readme

@durable-streams-cloudflare/subscription

Pub/sub fan-out for Cloudflare, built on @durable-streams-cloudflare/core. Subscribe sessions to streams, publish once and fan out to all subscribers. Automatic TTL-based session expiry, cron cleanup, and Analytics Engine metrics.

Requires @durable-streams-cloudflare/core deployed as a separate Worker.

This is a library — you import createSubscriptionWorker(), pass your auth config, and deploy as your own Cloudflare Worker. Same pattern as core.

How It Works

A publisher writes to a source stream via the subscription worker. The SubscriptionDO looks up all sessions subscribed to that stream and fans out the message — writing a copy to each subscriber's session stream via core. Clients read their session stream directly from the core worker (through CDN).

Publisher ── POST /v1/myapp/publish/stream-A ──> Subscription Worker
                                             │
                                             ├─> Core: write to source stream
                                             ├─> SubscriptionDO: get subscribers
                                             └─> Fan-out: write to each session stream
                                                  (session:alice, session:bob, ...)

Client ── GET /v1/stream/session:alice?live=sse ──> Core Worker (via CDN)

Quick Start

1. Prerequisites

A running @durable-streams-cloudflare/core worker. See the core package README for setup.

2. Install

npm install @durable-streams-cloudflare/subscription

3. Create Your Worker

src/worker.ts:

import { createSubscriptionWorker, SubscriptionDO } from "@durable-streams-cloudflare/subscription";

export default createSubscriptionWorker();
export { SubscriptionDO };

wrangler.toml:

name = "durable-streams-subscriptions"
main = "src/worker.ts"
compatibility_date = "2025-02-02"

[vars]
SESSION_TTL_SECONDS = "1800"
ANALYTICS_DATASET = "subscriptions_metrics"

[durable_objects]
bindings = [{ name = "SUBSCRIPTION_DO", class_name = "SubscriptionDO" }]

[[migrations]]
tag = "v1"
new_sqlite_classes = ["SubscriptionDO"]

[[analytics_engine_datasets]]
binding = "METRICS"
dataset = "subscriptions_metrics"

[[services]]
binding = "CORE"
service = "durable-streams"

# Required for projectJwtAuth()
[[kv_namespaces]]
binding = "REGISTRY"
id = "<your-kv-namespace-id>"

[triggers]
crons = ["*/5 * * * *"]

4. Deploy

npx wrangler deploy

5. Try It

CORE=https://durable-streams.<your-subdomain>.workers.dev
SUB=https://durable-streams-subscriptions.<your-subdomain>.workers.dev

# Subscribe a session to a stream
curl -X POST $SUB/v1/myapp/subscribe \
  -H 'Content-Type: application/json' \
  -H "Authorization: Bearer $JWT" \
  -d '{"streamId":"chat-room-1","sessionId":"user-alice"}'

# Publish a message (fans out to all subscribers)
curl -X POST $SUB/v1/myapp/publish/chat-room-1 \
  -H 'Content-Type: application/json' \
  -H "Authorization: Bearer $JWT" \
  -d '{"text":"hello world"}'

# Read the session stream via core (SSE)
curl -N -H "Authorization: Bearer $JWT" \
  "$CORE/v1/myapp/stream/session:user-alice?offset=0000000000000000_0000000000000000&live=sse"

# Check session info
curl -H "Authorization: Bearer $JWT" $SUB/v1/myapp/session/user-alice

# Extend session TTL
curl -X POST -H "Authorization: Bearer $JWT" $SUB/v1/myapp/session/user-alice/touch

# Unsubscribe
curl -X DELETE $SUB/v1/myapp/unsubscribe \
  -H 'Content-Type: application/json' \
  -H "Authorization: Bearer $JWT" \
  -d '{"streamId":"chat-room-1","sessionId":"user-alice"}'

Authentication

Per-Project JWT Auth (Default)

The built-in projectJwtAuth() uses per-project HMAC-SHA256 signing secrets stored in a REGISTRY KV namespace — same model as the core package.

import { createSubscriptionWorker, SubscriptionDO, projectJwtAuth } from "@durable-streams-cloudflare/subscription";

export default createSubscriptionWorker({ authorize: projectJwtAuth() });
export { SubscriptionDO };

Scope mapping:

| Action | Required scope | |--------|---------------| | publish | write | | unsubscribe, deleteSession | write | | subscribe, getSession, touchSession | read or write |

JWT claims are the same as core (sub, scope, exp). The stream_id claim is not used by subscription auth.

No Auth

createSubscriptionWorker() with no config allows all requests:

import { createSubscriptionWorker, SubscriptionDO } from "@durable-streams-cloudflare/subscription";

export default createSubscriptionWorker();
export { SubscriptionDO };

Custom Auth

Write your own AuthorizeSubscription callback with full route context:

import { createSubscriptionWorker, SubscriptionDO } from "@durable-streams-cloudflare/subscription";
import type { AuthorizeSubscription, SubscriptionRoute } from "@durable-streams-cloudflare/subscription";

export default createSubscriptionWorker({
  authorize: async (request, route, env) => {
    // Allow session reads without auth
    if (route.action === "getSession") return { ok: true };

    // Check session ownership for session operations
    if ("sessionId" in route) {
      const userId = request.headers.get("X-User-Id");
      if (route.sessionId !== userId) {
        return { ok: false, response: new Response("forbidden", { status: 403 }) };
      }
    }

    return { ok: true };
  },
});
export { SubscriptionDO };

The route parameter is a discriminated union (SubscriptionRoute) with the parsed action, project, and IDs:

  • { action: "publish", project, streamId } — publish to a stream
  • { action: "subscribe", project, streamId, sessionId } — subscribe a session
  • { action: "unsubscribe", project, streamId, sessionId } — unsubscribe a session
  • { action: "getSession", project, sessionId } — read session info
  • { action: "touchSession", project, sessionId } — extend session TTL
  • { action: "deleteSession", project, sessionId } — delete a session

Type signatures:

type AuthorizeSubscription<E> = (
  request: Request, route: SubscriptionRoute, env: E,
) => SubscriptionAuthResult | Promise<SubscriptionAuthResult>;

type SubscriptionAuthResult = { ok: true } | { ok: false; response: Response };

Health checks (GET /health) always bypass auth.

API

| Method | Path | Description | |--------|------|-------------| | POST | /v1/:project/subscribe | Subscribe a session to a stream. Body: { sessionId, streamId } | | DELETE | /v1/:project/unsubscribe | Unsubscribe a session from a stream. Body: { sessionId, streamId } | | POST | /v1/:project/publish/:streamId | Publish to a stream and fan out to all subscribers | | GET | /v1/:project/session/:sessionId | Get session info and active subscriptions | | POST | /v1/:project/session/:sessionId/touch | Extend session TTL | | DELETE | /v1/:project/session/:sessionId | Delete a session and its stream | | GET | /health | Health check |

Reading the session stream is done via the core worker: GET /v1/stream/session:<sessionId>.

Session Lifecycle

Sessions have a configurable TTL (default 30 minutes). Each touch resets the TTL. A cron job runs every 5 minutes, querying Analytics Engine for expired sessions and cleaning up their subscriptions and streams. Stale subscribers discovered during fan-out (core returns 404 for a deleted session stream) are also cleaned up automatically.

Configuration

Environment Variables

| Variable | Default | Description | |----------|---------|-------------| | SESSION_TTL_SECONDS | 1800 | Session TTL in seconds (default: 30 minutes) | | ACCOUNT_ID | (none) | Cloudflare account ID (required for cron cleanup) | | API_TOKEN | (none) | Cloudflare API token (required for cron cleanup Analytics Engine queries) | | ANALYTICS_DATASET | subscriptions_metrics | Analytics Engine dataset name |

Wrangler Bindings

| Binding | Type | Description | |---------|------|-------------| | SUBSCRIPTION_DO | Durable Object | SubscriptionDO namespace (required) | | REGISTRY | KV Namespace | Per-project signing secrets and CORS origins (required when using projectJwtAuth). See CORS Configuration. | | METRICS | Analytics Engine | Subscription and fan-out metrics (optional) | | CORE | Service Binding | Service binding to core worker (required) |

Cron

[triggers]
crons = ["*/5 * * * *"]

Runs session cleanup every 5 minutes.

Service Binding

The subscription worker communicates with core via a Cloudflare service binding — no network hop, no auth overhead. The CORE binding is required.

[[services]]
binding = "CORE"
service = "durable-streams"

See Also

License

MIT