npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@dotdo/events

v0.1.0

Published

Event emission, CDC, and lakehouse streaming for Durable Objects

Readme

@dotdo/events

Event streaming, CDC (Change Data Capture), and lakehouse analytics for Cloudflare Durable Objects.

npm version License: MIT

Installation

pnpm add @dotdo/events

Quick Start

import { EventEmitter } from '@dotdo/events'

export class MyDO extends DurableObject {
  events = new EventEmitter(this.ctx, this.env, {
    cdc: true,
  })

  async doSomething() {
    // Emit custom events
    this.events.emit({ type: 'user.action', userId: '123', action: 'click' })
  }

  // Required: forward alarm to EventEmitter for retry handling
  async alarm() {
    await this.events.handleAlarm()
  }
}

Features

  • Event batching and streaming - Batched emission to events.do with configurable batch size and flush intervals
  • CDC (Change Data Capture) - Automatic change events for collection mutations with SQLite bookmarks for PITR (Point-in-Time Recovery)
  • R2 lakehouse storage - Stream events to R2 in Parquet-friendly JSON Lines format organized by time partitions
  • DuckDB query generation - Built-in query builders for analytics, time-travel, and latency analysis
  • Alarm-based retry - Reliable delivery with exponential backoff using DO alarms
  • Hibernation support - Persist and restore event batches across hibernation cycles

API Reference

EventEmitter

The core class for emitting events from Durable Objects.

import { EventEmitter } from '@dotdo/events'

const events = new EventEmitter(ctx, env, options)

Constructor

new EventEmitter(
  ctx: DurableObjectState,
  env: Record<string, unknown>,
  options?: EventEmitterOptions
)

Methods

| Method | Description | |--------|-------------| | emit(event) | Emit a custom event (batched, non-blocking) | | emitChange(type, collection, docId, doc?, prev?) | Emit a CDC event for collection changes | | flush() | Manually flush pending events to endpoint | | handleAlarm() | Handle alarm for retry - call from your DO's alarm() method | | enrichFromRequest(request) | Enrich event identity from incoming request (colo, worker, etc.) | | persistBatch() | Persist pending batch before hibernation |

Properties

| Property | Type | Description | |----------|------|-------------| | pendingCount | number | Number of events in current batch | | doIdentity | object | Current DO identity info |

Example

export class ChatRoom extends DurableObject {
  events = new EventEmitter(this.ctx, this.env, {
    cdc: true,
    trackPrevious: true,
    r2Bucket: this.env.EVENTS_BUCKET,
  })

  async fetch(request: Request) {
    // Enrich events with request context
    this.events.enrichFromRequest(request)
    // ... handle request
  }

  async sendMessage(userId: string, text: string) {
    this.events.emit({
      type: 'message.sent',
      userId,
      textLength: text.length,
      roomId: this.ctx.id.toString(),
    })
  }

  async alarm() {
    await this.events.handleAlarm()
  }

  async webSocketClose(ws: WebSocket) {
    // Persist events before hibernation
    await this.events.persistBatch()
  }
}

CDCCollection

Wraps a collection with automatic CDC event emission for all mutations.

import { CDCCollection } from '@dotdo/events'

const users = new CDCCollection<User>(
  this.collection<User>('users'),
  this.events,
  'users'
)

Constructor

new CDCCollection<T>(
  collection: Collection<T>,
  emitter: EventEmitter,
  name: string
)

Methods

| Method | Description | Emits Event | |--------|-------------|-------------| | get(id) | Get document by ID | No | | put(id, doc) | Insert or update document | collection.insert or collection.update | | delete(id) | Delete document | collection.delete | | has(id) | Check if document exists | No | | find(filter?, options?) | Find documents matching filter | No | | count(filter?) | Count documents | No | | list(options?) | List all documents | No | | keys() | Get all document IDs | No | | clear() | Delete all documents | collection.delete for each | | putMany(docs) | Bulk insert/update | Event per document | | deleteMany(ids) | Bulk delete | Event per document |

Example

export class UserService extends DurableRPC {
  events = new EventEmitter(this.ctx, this.env, { cdc: true, trackPrevious: true })
  users = new CDCCollection<User>(this.collection<User>('users'), this.events, 'users')

  async createUser(data: { name: string; email: string }) {
    const id = crypto.randomUUID()
    // Automatically emits collection.insert event
    this.users.put(id, {
      name: data.name,
      email: data.email,
      createdAt: new Date().toISOString(),
    })
    return id
  }

  async updateUser(id: string, updates: Partial<User>) {
    const user = this.users.get(id)
    if (!user) return null
    // Automatically emits collection.update with prev doc
    this.users.put(id, { ...user, ...updates })
    return { ...user, ...updates }
  }
}

Event Types

import type {
  DurableEvent,
  BaseEvent,
  RpcCallEvent,
  CollectionChangeEvent,
  LifecycleEvent,
  WebSocketEvent,
  ClientEvent,
} from '@dotdo/events'

DurableEvent (Union Type)

All events extend BaseEvent and include:

interface BaseEvent {
  type: string        // Event type identifier
  ts: string          // ISO timestamp
  do: {
    id: string        // DO ID
    name?: string     // DO name (if named)
    class?: string    // DO class name
    colo?: string     // Cloudflare colo
    worker?: string   // Worker name
  }
}

Event Type Reference

| Type | Description | Additional Fields | |------|-------------|-------------------| | RpcCallEvent | RPC method call | method, namespace, durationMs, success, error | | CollectionChangeEvent | CDC mutation | collection, docId, doc, prev, bookmark | | LifecycleEvent | DO lifecycle | reason | | WebSocketEvent | WebSocket activity | connectionCount, code, reason | | ClientEvent | Browser analytics | event, properties, traits, userId, anonymousId |

Query Builders

Generate DuckDB queries for analyzing events stored in R2.

import { buildQuery, buildHistoryQuery, buildLatencyQuery, buildPITRRangeQuery } from '@dotdo/events'

buildQuery(options)

Build a general-purpose query for events.

const sql = buildQuery({
  bucket: 'my-events',
  dateRange: { start: new Date('2024-01-01'), end: new Date('2024-01-31') },
  eventTypes: ['rpc.call'],
  doClass: 'ChatRoom',
  limit: 1000,
})

buildHistoryQuery(options)

Reconstruct document history for time-travel queries.

const sql = buildHistoryQuery({
  bucket: 'my-events',
  collection: 'users',
  docId: 'user-123',
})

buildLatencyQuery(options)

Analyze RPC latency with percentiles.

const sql = buildLatencyQuery({
  bucket: 'my-events',
  doClass: 'ChatRoom',
  method: 'sendMessage',
})
// Returns p50, p95, p99, avg, max latency grouped by class and method

buildPITRRangeQuery(options)

Find events between two SQLite bookmarks for point-in-time recovery.

const sql = buildPITRRangeQuery({
  bucket: 'my-events',
  startBookmark: 'bookmark-abc',
  endBookmark: 'bookmark-xyz',
  collection: 'users',
})

Snapshot Utilities

Create and restore point-in-time snapshots of collections.

import { createSnapshot, restoreSnapshot, listSnapshots, deleteSnapshot } from '@dotdo/events'

createSnapshot(sql, doId, options)

const result = await createSnapshot(this.sql, this.ctx.id.toString(), {
  bucket: this.env.SNAPSHOTS_BUCKET,
  prefix: 'snapshots', // optional
})
// { key: 'snapshots/abc123/2024-01-15T12-34-56-789Z.json', collections: ['users'], totalDocs: 42, timestamp: '...' }

restoreSnapshot(sql, bucket, snapshotKey)

const result = await restoreSnapshot(
  this.sql,
  this.env.SNAPSHOTS_BUCKET,
  'snapshots/abc123/2024-01-15T12-34-56-789Z.json'
)
// { collections: ['users'], totalDocs: 42 }

listSnapshots(bucket, doId, options?)

const snapshots = await listSnapshots(this.env.SNAPSHOTS_BUCKET, this.ctx.id.toString(), {
  limit: 10,
})

deleteSnapshot(bucket, snapshotKey)

await deleteSnapshot(this.env.SNAPSHOTS_BUCKET, 'snapshots/abc123/2024-01-15T12-34-56-789Z.json')

Configuration Options

interface EventEmitterOptions {
  /** Endpoint to send events (default: 'https://events.do/ingest') */
  endpoint?: string

  /** Batch size before auto-flush (default: 100) */
  batchSize?: number

  /** Max time to hold events before flush in ms (default: 1000) */
  flushIntervalMs?: number

  /** Enable CDC for collections (default: false) */
  cdc?: boolean

  /** Include previous doc in CDC updates for diffs (default: false) */
  trackPrevious?: boolean

  /** R2 bucket for lakehouse streaming (optional) */
  r2Bucket?: R2Bucket

  /** API key for authentication (optional) */
  apiKey?: string
}

Integration with @dotdo/rpc

When using @dotdo/rpc, events integrate seamlessly with the DurableRPC base class:

import { DurableRPC } from '@dotdo/rpc'
import { EventEmitter, CDCCollection } from '@dotdo/events'

interface User {
  name: string
  email: string
  role: 'admin' | 'user'
  active: boolean
}

interface Env {
  EVENTS_BUCKET?: R2Bucket
  EVENTS_API_KEY?: string
}

export class MyDO extends DurableRPC {
  // Create event emitter with CDC enabled
  events = new EventEmitter(this.ctx, this.env as Env, {
    cdc: true,
    trackPrevious: true,
    r2Bucket: (this.env as Env).EVENTS_BUCKET,
    apiKey: (this.env as Env).EVENTS_API_KEY,
  })

  // Wrap collections with CDC
  users = new CDCCollection<User>(this.collection<User>('users'), this.events, 'users')

  async createUser(data: { name: string; email: string; role?: 'admin' | 'user' }) {
    const id = crypto.randomUUID()
    const user: User = {
      name: data.name,
      email: data.email,
      role: data.role ?? 'user',
      active: true,
    }

    // CDC event emitted automatically
    this.users.put(id, user)

    // Emit custom business event
    this.events.emit({
      type: 'user.registered',
      userId: id,
      email: user.email,
      role: user.role,
    })

    return user
  }

  async updateUserRole(userId: string, role: 'admin' | 'user') {
    const user = this.users.get(userId)
    if (!user) return null

    // CDC update event with previous doc included
    this.users.put(userId, { ...user, role })
    return { ...user, role }
  }

  // Required: forward alarm to event emitter
  async alarm() {
    await this.events.handleAlarm()
  }

  // Enrich events with request context
  async fetch(request: Request) {
    this.events.enrichFromRequest(request)
    return super.fetch(request)
  }

  // Persist batch before hibernation
  async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean) {
    await this.events.persistBatch()
    await super.webSocketClose(ws, code, reason, wasClean)
  }
}

Architecture

+----------------+     +------------------+     +------------------+
|  Durable       |     |                  |     |                  |
|  Object        | --> |  EventEmitter    | --> |  events.do       |
|                |     |  (batching)      |     |  (ingestion)     |
+----------------+     +------------------+     +------------------+
       |                       |                        |
       |                       v                        v
       |               +------------------+     +------------------+
       |               |  R2 Bucket       |     |  Analytics       |
       |               |  (lakehouse)     | <-- |  (DuckDB)        |
       |               +------------------+     +------------------+
       v
+------------------+
|  SQLite          |
|  (bookmarks)     |
+------------------+

Data Flow

  1. Event Emission: Your DO emits events via EventEmitter.emit() or CDC wrapper mutations
  2. Batching: Events are batched in memory (configurable size/interval)
  3. Streaming: Batches are sent to events.do for real-time processing
  4. Lakehouse: Optionally, events stream to R2 in time-partitioned JSON Lines format
  5. PITR: CDC events include SQLite bookmarks for point-in-time recovery
  6. Analytics: Query events with DuckDB using generated queries

R2 Storage Format

Events are stored in R2 with the following path structure:

events/{year}/{month}/{day}/{hour}/{do_id}_{timestamp}.jsonl

Example:

events/2024/01/15/14/abc123_1705329600000.jsonl

This format is optimized for:

  • Time-based partitioning (efficient date range queries)
  • Parquet conversion (compatible with lakehouse tools)
  • DuckDB glob patterns

License

MIT