npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

tapeworm_dispatcher_mdb_rmq

v0.2.0-beta.0

Published

MongoDB change stream to RabbitMQ dispatcher for tapeworm commits

Readme

tapeworm_dispatcher_mdb_rmq

MongoDB-to-RabbitMQ commit dispatcher for tapeworm event stores.

Watches a tapeworm commits collection for new inserts and publishes each commit to a RabbitMQ headers exchange with at-least-once delivery semantics.

Features

  • Two watch modes: Change stream (majority-safe) or direct oplog tailing (lowest latency)
  • Two-level resume: MongoDB resume token (primary) + UUID v7 .token cursor (fallback)
  • Publisher confirms: Every message is acknowledged by the RabbitMQ broker before checkpointing
  • Pluggable resume storage: Built-in MongoDB store or bring your own IResumeTokenStore
  • Library + CLI: Use as a library in your process or run as a standalone daemon
  • Multi-tenant: Optional tenant header on every AMQP message

CLI Usage

Basic — change stream mode (default)

npx tapeworm-dispatcher \
  --mongodb-uri mongodb://localhost:27017 \
  --database mydb \
  --collection tw_master_commits \
  --rabbitmq-uri amqp://localhost \
  --exchange tw.commits.exchange

Direct oplog mode (lowest latency)

npx tapeworm-dispatcher \
  --mongodb-uri mongodb://localhost:27017 \
  --database mydb \
  --collection tw_master_commits \
  --rabbitmq-uri amqp://localhost \
  --exchange tw.commits.exchange \
  --watch-mode oplog

Multi-tenant with custom resume collection

npx tapeworm-dispatcher \
  --mongodb-uri mongodb://rs0.example.com:27017,rs1.example.com:27017/?replicaSet=rs0 \
  --database lx3_acme \
  --collection tw_master_commits \
  --rabbitmq-uri amqp://user:[email protected]:5672 \
  --exchange acme.commits.exchange \
  --watch-mode changeStream \
  --resume-collection tw_dispatcher_state \
  --tenant acme

All CLI flags

| Flag | Required | Default | Description | | --------------------- | -------- | --------------------- | --------------------------------------------- | | --mongodb-uri | Yes | — | MongoDB connection URI | | --database | Yes | — | Database name | | --collection | Yes | — | Commits collection (e.g. tw_master_commits) | | --rabbitmq-uri | Yes | — | AMQP connection URI | | --exchange | Yes | — | Fanout exchange name (asserted on startup) | | --watch-mode | No | changeStream | changeStream or oplog | | --resume-collection | No | tw_dispatcher_state | MongoDB collection for resume tokens | | --tenant | No | — | Tenant ID added to AMQP message headers |

Docker

Build

The Dockerfile uses the monorepo root as build context:

docker build -f packages/tapeworm_dispatcher_mdb_rmq/Dockerfile -t tapeworm-dispatcher .

Run

All CLI flags can be set via environment variables:

docker run -d \
  -e MONGODB_URI=mongodb://mongo:27017 \
  -e DATABASE=mydb \
  -e COLLECTION=tw_master_commits \
  -e RABBITMQ_URI=amqp://rabbitmq:5672 \
  -e EXCHANGE=tw.commits.exchange \
  tapeworm-dispatcher

Environment Variables

| Variable | Required | Default | CLI equivalent | | ------------------- | -------- | ----------------------- | ---------------------------------------------------------------------------------- | | MONGODB_URI | Yes | — | --mongodb-uri | | DATABASE | Yes | — | --database | | COLLECTION | Yes | — | --collection | | RABBITMQ_URI | Yes | — | --rabbitmq-uri | | EXCHANGE | Yes | — | --exchange | | RESUME_COLLECTION | No | tw_dispatcher_state | --resume-collection | | WATCH_MODE | No | changeStream | --watch-mode | | TENANT | No | — | --tenant | | DEBUG | No | tapeworm-dispatcher:* | Controls log output (uses debug namespaces) |

CLI arguments take precedence over environment variables.

CI/CD

The root Jenkinsfile provides a declarative pipeline:

  1. InstallBuildTestDocker Build (all branches)
  2. Publish (main branch only) — npm publish via changesets + Docker push

Operator setup

Configure these in your Jenkins instance (not in the repo):

| Item | Jenkins type | ID | Purpose | | -------------------- | ----------------- | -------------- | ----------------------- | | npm auth token | Secret text | npm-token | Publish packages to npm | | Docker registry auth | Username/password | docker-creds | Push images to registry |

Set DOCKER_REGISTRY in the Publish stage environment to your registry hostname (e.g. ghcr.io/yourorg, docker.io/youruser).

Library Usage

import { MongoClient } from "mongodb";
import { Dispatcher, MongoResumeTokenStore } from "tapeworm_dispatcher_mdb_rmq";

const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("mydb");

const dispatcher = new Dispatcher({
  mongodb: {
    db,
    collection: "tw_master_commits",
  },
  rabbitmq: {
    uri: "amqp://localhost",
    exchange: "tw.commits.exchange",
  },
  resumeTokenStore: new MongoResumeTokenStore(db, "tw_dispatcher_state"),
  watchMode: "oplog", // or "changeStream" (default)
  tenant: "acme", // optional
});

dispatcher.on("started", () => console.log("Watching for commits..."));
dispatcher.on("dispatched", (commit) => console.log(`Published: ${commit.id}`));
dispatcher.on("fallback", () =>
  console.warn("Oplog token expired, replaying from .token cursor"),
);
dispatcher.on("error", (err) => console.error("Error:", err));

// Graceful shutdown
process.on("SIGTERM", async () => {
  await dispatcher.stop();
  await client.close();
});

// Blocks until stop() is called
await dispatcher.start();

Custom resume token store

import type {
  IResumeTokenStore,
  ResumeState,
} from "tapeworm_dispatcher_mdb_rmq";

class RedisResumeTokenStore implements IResumeTokenStore {
  async load(): Promise<ResumeState | null> {
    // Load from Redis
  }
  async save(state: ResumeState): Promise<void> {
    // Save to Redis
  }
}

Watch Modes

changeStream (default)

Uses MongoDB Change Streams. The change stream only fires after a write is majority-committed (replicated to a majority of replica set members). This is the safe default — no risk of delivering commits that later get rolled back.

oplog

Tails local.oplog.rs directly with a tailable-await cursor. Sees inserts immediately on the primary, before replication. This gives the lowest possible dispatch latency.

⚠️ Trade-off: If the primary loses an election before the write replicates, the write may be rolled back. The dispatcher would have already published a message for a commit that no longer exists. Consumers must handle this case (e.g. idempotent projections, or accepting rare phantom events).

Resume Strategy

The dispatcher uses a two-level resume strategy to survive restarts:

| Level | Token | Stored as | When used | | -------- | ---------------------------------------------- | ------------------------------------------ | -------------------------------------------- | | Primary | Change stream resume token / oplog Timestamp | ResumeState.changeStreamToken | Normal restart within oplog retention window | | Fallback | UUID v7 .token field on commits | ResumeState.lastCommitToken (hex string) | When primary token is expired or invalid |

Resume flow on startup:

  1. Load ResumeState from the token store
  2. Try resuming from the primary token
  3. If the primary token is expired → emit "fallback", query commits by .token > lastCommitToken, replay missed commits
  4. Switch to live tailing (change stream or oplog)

Checkpoint flow per commit:

  1. Publish to RabbitMQ → wait for broker ack (publisher confirms)
  2. Only after confirmed delivery: save the resume state

This ensures at-least-once delivery. If the process crashes between publish and checkpoint, the commit will be re-published on restart. Consumers should use messageId (= commit.id) for deduplication.

AMQP Message Format

Exchange: Fanout (durable), asserted on startup.

Message body (JSON):

{
  "id": "a1b2c3d4-...",
  "partitionId": "master",
  "streamId": "order-123",
  "commitSequence": 5,
  "events": [{ "id": "...", "type": "order.created", "payload": {} }],
  "token": "<BSON UUID v7>",
  "isDispatched": false,
  "createDateTime": "2026-04-16T..."
}

AMQP properties: | Property | Value | |----------|-------| | contentType | application/json | | deliveryMode | 2 (persistent) | | messageId | commit.id (for deduplication) | | timestamp | Unix epoch seconds |

AMQP headers: | Header | Value | |--------|-------| | collection | Commits collection name (e.g. tw_master_commits) | | partitionId | Tapeworm partition ID (e.g. master) | | streamId | Aggregate/stream ID | | tenant | Tenant ID (only if configured) |

Architecture

                  ┌─────────────────────────────────┐
                  │         MongoDB Replica Set      │
                  │                                  │
                  │  tw_master_commits   oplog.rs    │
                  │        │                │        │
                  └────────┼────────────────┼────────┘
                           │                │
               changeStream│          oplog │
                    mode   │          mode  │
                           ▼                ▼
                  ┌────────────────────────────────┐
                  │     tapeworm-dispatcher         │
                  │                                 │
                  │  ChangeStreamWatcher  ──┐       │
                  │           OR            ├─►     │
                  │  OplogWatcher        ──┘       │
                  │         │                       │
                  │         ▼                       │
                  │  CommitPublisher                │
                  │  (publisher confirms)           │
                  │         │                       │
                  │         ▼                       │
                  │  IResumeTokenStore              │
                  │  (checkpoint after publish)     │
                  └────────┬───────────────────────┘
                           │
                           ▼
                  ┌─────────────────────┐
                  │  RabbitMQ Fanout    │
                  │  Exchange (durable) │
                  └────────┬────────────┘
                           │
                    ┌──────┼──────┐
                    ▼      ▼      ▼
                 Queue   Queue   Queue
                (consumer bindings)

Exports

| Export | Type | Description | | ----------------------- | --------- | -------------------------------------- | | Dispatcher | Class | Main orchestrator | | ChangeStreamWatcher | Class | Change stream watcher | | OplogWatcher | Class | Direct oplog watcher | | CommitPublisher | Class | RabbitMQ headers publisher | | MongoResumeTokenStore | Class | MongoDB resume token store | | ICommitWatcher | Interface | Watcher contract | | IResumeTokenStore | Interface | Resume store contract | | DispatcherConfig | Type | Config for Dispatcher | | DispatcherEvents | Type | Event map for typed listeners | | MongoConfig | Type | MongoDB config | | RabbitConfig | Type | RabbitMQ config | | ResumeState | Type | Resume token state | | WatchMode | Type | "changeStream" \| "oplog" | | CommitHandler | Type | Callback signature for commit handlers |

Requirements

  • Node.js >= 20
  • MongoDB replica set (required for both change streams and oplog tailing)
  • RabbitMQ broker