npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

nats.do

v0.1.0

Published

NATS/JetStream on Cloudflare Durable Objects

Readme

NATS.do

NATS/JetStream on Cloudflare Durable Objects

NATS.do implements NATS Core messaging and JetStream persistence using Cloudflare Workers and Durable Objects with SQLite storage. It provides a nats.js-compatible API accessible via JSON-RPC 2.0 over HTTP, WebSockets, or Cloudflare Workers Service Bindings.

Features

  • NATS Core: Publish/subscribe messaging with subject wildcards (*, >)
  • JetStream Streams: Persistent message storage with configurable retention policies
  • JetStream Consumers: Pull and push consumers with acknowledgment tracking
  • MCP Integration: Model Context Protocol tools for AI agent access
  • Edge-Native: Runs entirely on Cloudflare's global network

Installation

npm install nats.do

Quick Start

Basic Pub/Sub

import { StringCodec, JSONCodec } from 'nats.do'

const sc = StringCodec()
const jc = JSONCodec()

// Publish a message
nc.publish('orders.new', sc.encode('Hello NATS!'))

// Subscribe with wildcards
const sub = nc.subscribe('orders.*')
for await (const msg of sub) {
  console.log(`Received: ${sc.decode(msg.data)}`)
}

// Request/Reply pattern
const response = await nc.request('api.users.get', jc.encode({ id: 123 }))
console.log(jc.decode(response.data))

JetStream Streams

// Create a stream
const jsm = nc.jetstreamManager()
await jsm.streams.add({
  name: 'ORDERS',
  subjects: ['orders.*'],
  retention: 'workqueue',
  max_msgs: 10000,
  max_age: 24 * 60 * 60 * 1_000_000_000, // 24 hours in nanoseconds
})

// Publish with acknowledgment
const js = nc.jetstream()
const ack = await js.publish('orders.new', sc.encode('{"id": 456}'))
console.log(`Published to ${ack.stream} at seq ${ack.seq}`)

JetStream Consumers

// Create a durable consumer
await jsm.consumers.add('ORDERS', {
  durable_name: 'order-processor',
  ack_policy: 'explicit',
  deliver_policy: 'all',
})

// Fetch messages
const consumer = await js.consumers.get('ORDERS', 'order-processor')
const messages = await consumer.fetch({ max_messages: 10 })

for await (const msg of messages) {
  console.log(`Processing: ${sc.decode(msg.data)}`)
  msg.ack()
}

API Reference

Core Types

// Connection options
interface ConnectionOptions {
  servers: string | string[]
  name?: string
  token?: string
  timeout?: number
}

// Message
interface Msg {
  subject: string
  data: Uint8Array
  reply?: string
  headers?: MsgHdrs
  respond(data?: Uint8Array): boolean
}

// Subscription
interface Subscription extends AsyncIterable<Msg> {
  getSubject(): string
  unsubscribe(max?: number): void
  drain(): Promise<void>
}

JetStream Types

// Stream configuration
interface StreamConfig {
  name: string
  subjects: string[]
  retention?: 'limits' | 'interest' | 'workqueue'
  storage?: 'file' | 'memory'
  max_msgs?: number
  max_bytes?: number
  max_age?: number // nanoseconds
  discard?: 'old' | 'new'
}

// Consumer configuration
interface ConsumerConfig {
  name?: string
  durable_name?: string
  ack_policy: 'none' | 'all' | 'explicit'
  deliver_policy?: 'all' | 'last' | 'new' | 'by_start_sequence' | 'by_start_time'
  filter_subject?: string
  max_deliver?: number
  ack_wait?: number // nanoseconds
}

// Publish acknowledgment
interface PubAck {
  stream: string
  seq: number
  duplicate?: boolean
}

Codecs

import { StringCodec, JSONCodec, Empty } from 'nats.do'

const sc = StringCodec()
sc.encode('hello')  // Uint8Array
sc.decode(data)     // string

const jc = JSONCodec<MyType>()
jc.encode({ key: 'value' })  // Uint8Array
jc.decode(data)              // MyType

Empty  // Empty Uint8Array for messages without payload

Subject Wildcards

NATS.do supports NATS subject wildcards for subscriptions:

  • * matches exactly one token: orders.* matches orders.new but not orders.us.new
  • > matches one or more tokens (must be last): orders.> matches orders.new and orders.us.new
import { matchSubject, isValidSubject, isValidWildcard } from 'nats.do/utils'

matchSubject('orders.*', 'orders.new')     // true
matchSubject('orders.*', 'orders.us.new')  // false
matchSubject('orders.>', 'orders.us.new')  // true

Architecture

NATS.do uses three Durable Object classes:

| Durable Object | Scope | Responsibility | |---------------|-------|----------------| | NatsCoordinator | Global singleton | Stream registry, consumer discovery, cluster metadata | | NatsPubSub | Per region | Core NATS pub/sub, WebSocket connections, request/reply | | StreamDO | Per stream | Message storage, consumer state, ack tracking, retention |

RPC Protocol

NATS.do uses JSON-RPC 2.0 for communication:

// Request
{
  "jsonrpc": "2.0",
  "method": "nats.publish",
  "params": { "subject": "orders.new", "data": "base64..." },
  "id": 1
}

// Response
{
  "jsonrpc": "2.0",
  "result": { "success": true },
  "id": 1
}

MCP Tools

NATS.do exposes MCP (Model Context Protocol) tools for AI agent integration:

| Tool | Description | |------|-------------| | nats_publish | Publish a message to a subject | | nats_subscribe | Subscribe to a subject | | nats_request | Send a request and wait for response | | jetstream_publish | Publish to JetStream with acknowledgment | | jetstream_stream_create | Create a new stream | | jetstream_stream_info | Get stream information | | jetstream_consumer_create | Create a consumer | | jetstream_consumer_fetch | Fetch messages from a consumer |

Cloudflare Workers Deployment

wrangler.jsonc

{
  "name": "nats.do",
  "main": "src/index.ts",
  "compatibility_date": "2024-01-01",
  "compatibility_flags": ["nodejs_compat"],

  "durable_objects": {
    "bindings": [
      { "name": "NATS_COORDINATOR", "class_name": "NatsCoordinator" },
      { "name": "NATS_PUBSUB", "class_name": "NatsPubSub" },
      { "name": "STREAM_DO", "class_name": "StreamDO" }
    ]
  },

  "migrations": [
    {
      "tag": "v1",
      "new_sqlite_classes": ["NatsCoordinator", "NatsPubSub", "StreamDO"]
    }
  ]
}

Service Binding Usage

// In another Worker
export default {
  async fetch(request: Request, env: Env) {
    const id = env.NATS_COORDINATOR.idFromName('global')
    const stub = env.NATS_COORDINATOR.get(id)

    const response = await stub.fetch(new Request('http://internal/rpc', {
      method: 'POST',
      body: JSON.stringify({
        jsonrpc: '2.0',
        method: 'consumers.list',
        params: { streamName: 'ORDERS' },
        id: 1
      })
    }))

    return response
  }
}

Development

# Install dependencies
npm install

# Run tests
npm test

# Run tests in watch mode
npm run test:watch

# Type checking
npm run typecheck

# Local development
npm run dev

License

MIT