npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kozen/etl-mk

v1.0.2

Published

Bi-directional MongoDB ↔ Kafka ETL pipeline module for the Kozen framework

Readme

🚀 Kozen ETL MongoDB - Kafka: Bidirectional Pipeline Module

Bi-directional MongoDB ↔ Kafka ETL pipeline module for the Kozen framework.

Define one delegate file to run a single pipeline direction, or define both for a full bidirectional pipeline. Each direction is an independent process controlled by its own configuration.


🌟 Key Features

  • Bi-directional in one process: MongoDB → Kafka (MK) and Kafka → MongoDB (KM) run concurrently, each controlled by an independent delegate
  • Delegate-driven transforms: your code handles only the business logic; the module manages connections, retries, and delivery guarantees
  • At-least-once delivery: the KM pipeline commits Kafka offsets only after a successful write or dead-letter routing; configurable retry with exponential backoff
  • IoC-native delegate loading: ESM (.mjs) and CJS (.cjs) delegates are resolved through the Kozen container; the same delegate can be reused across modules
  • Built on @kozen/trigger: reuses ChangeStreamService for the MK direction; no change stream reimplementation
  • Zero-boilerplate activation: set a delegate file variable to enable a direction; omit it to disable; no mode flags required
  • Structured logging: all output via Kozen logger:service; zero console.log; PII-safe at INFO level
  • Full TypeScript declarations: IEtlMongoToKafkaTools extends ITriggerTools; all public types exported from the barrel

⚡ Why Use This?

Wiring MongoDB change streams to Kafka, or consuming Kafka messages into MongoDB, requires setting up KafkaJS producers and consumers, MongoDB cursors, offset management, retry loops, and dead-letter routing. All of that infrastructure must exist before writing a single line of business logic.

Kozen ETL MongoDB - Kafka handles that infrastructure layer. Provide a delegate file that transforms each event; the module manages connections, retries, and delivery guarantees. Because delegates receive tools.assistant (the Kozen IoC container), they can compose with @kozen/secret, @kozen/iam-rectification, and any other module in the ecosystem.


📦 Installation

npm install @kozen/etl-mk

Requires Node.js 18 or later. kafkajs is bundled as a runtime dependency. A Kafka cluster and a MongoDB replica set or Atlas cluster are required at runtime.

Quick commands:

# Start (reads KOZEN_ETL_* from environment or --envFile)
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:start --envFile=.env

# Validate configuration without connecting
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:validate --envFile=.env

# Print full help
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:help

📚 References

| Page | Description | |---|---| | Get Started | Installation and minimal working examples | | Configuration | Full KOZEN_ETL_* variable reference and .env templates | | ETL via CLI | CLI actions, flags, and examples | | Delegate | Writing MK and KM delegate handlers; error handling and DLQ | | API | Programmatic SDK: types and service classes | | Kozen Integration | IoC tokens, module composition, delegate loading internals | | Deployment | Docker Compose stack: Kafka, MongoDB replica set, and ETL service, step-by-step tutorial | | Contributing Policy | Licence, disclaimer, branch model, code standards |

External resources:

This project is open source and distributed under the terms described in the Contributing Policy and Usage Disclaimer.


🚀 Quick start

MongoDB → Kafka

Create a delegate that handles the change events you care about:

// delegates/orders.mjs
export async function insert(change, tools) {
  return {
    id:     change.fullDocument._id.toString(),
    status: change.fullDocument.status
  };
}

Set the required variables and run:

KOZEN_ETL_MK_SOURCE_URI=mongodb+srv://appUser:[email protected]/ \
KOZEN_ETL_MK_SOURCE_DATABASE=mydb \
KOZEN_ETL_MK_SOURCE_COLLECTION=orders \
KOZEN_ETL_MK_DESTINATION_BROKERS=broker1:9092 \
KOZEN_ETL_MK_DESTINATION_TOPIC=orders.events \
KOZEN_ETL_MK_DELEGATE_FILE=/app/delegates/orders.mjs \
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:start

Kafka → MongoDB

// delegates/archive.mjs
export async function message(msg, tools) {
  return { ...msg, archivedAt: new Date() };
}
KOZEN_ETL_KM_SOURCE_BROKERS=broker1:9092 \
KOZEN_ETL_KM_SOURCE_TOPIC=orders.events \
KOZEN_ETL_KM_DESTINATION_URI=mongodb+srv://appUser:[email protected]/ \
KOZEN_ETL_KM_DESTINATION_DATABASE=mydb \
KOZEN_ETL_KM_DESTINATION_COLLECTION=orders_archive \
KOZEN_ETL_KM_DELEGATE_FILE=/app/delegates/archive.mjs \
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:start

Bidirectional pipeline

Configure both KOZEN_ETL_MK_* and KOZEN_ETL_KM_* in the same .env file:

cp node_modules/@kozen/etl-mk/cfg/env.bidirectional.example .env
# fill in connection strings and delegate paths
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:start --envFile=.env

🔧 Deployment

PM2

// ecosystem.config.js
module.exports = {
  apps: [{
    name:   'orders-etl',
    script: 'node_modules/@kozen/engine/dist/bin/kozen.js',
    args:   '--moduleLoad=@kozen/etl-mk --action=etl:start',
    env: {
      KOZEN_ETL_MK_SOURCE_URI:             process.env.MONGO_URI,
      KOZEN_ETL_MK_SOURCE_DATABASE:        'production',
      KOZEN_ETL_MK_SOURCE_COLLECTION:      'orders',
      KOZEN_ETL_MK_DESTINATION_BROKERS:    process.env.KAFKA_BROKERS,
      KOZEN_ETL_MK_DESTINATION_TOPIC:      'orders.events',
      KOZEN_ETL_MK_DELEGATE_FILE:          '/opt/app/delegates/orders.mjs',
      KOZEN_ETL_KM_SOURCE_BROKERS:         process.env.KAFKA_BROKERS,
      KOZEN_ETL_KM_SOURCE_TOPIC:           'orders.events',
      KOZEN_ETL_KM_DESTINATION_URI:        process.env.MONGO_URI,
      KOZEN_ETL_KM_DESTINATION_DATABASE:   'production',
      KOZEN_ETL_KM_DESTINATION_COLLECTION: 'orders_archive',
      KOZEN_ETL_KM_DELEGATE_FILE:          '/opt/app/delegates/archive.mjs',
      KOZEN_LOG_LEVEL:                     'INFO'
    },
    restart_delay: 5000,
    max_restarts:  10
  }]
};

Docker

FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY delegates/ ./delegates/
CMD ["npx", "kozen", "--moduleLoad=@kozen/etl-mk", "--action=etl:start"]
docker run -d \
  -e KOZEN_ETL_MK_SOURCE_URI="mongodb+srv://appUser:[email protected]/" \
  -e KOZEN_ETL_MK_SOURCE_DATABASE=production \
  -e KOZEN_ETL_MK_SOURCE_COLLECTION=orders \
  -e KOZEN_ETL_MK_DESTINATION_BROKERS="broker1:9092" \
  -e KOZEN_ETL_MK_DESTINATION_TOPIC=orders.events \
  -e KOZEN_ETL_MK_DELEGATE_FILE=/app/delegates/orders.mjs \
  -v /host/delegates:/app/delegates \
  my-etl-mk-image

🛠️ Development

npm install
npx tsc --noEmit                              # type-check
npm run build                                 # compile + copy assets to dist/
npm run dev -- --action=etl:help              # run with ts-node
npm run dev -- --action=etl:start --envFile=cfg/env.bidirectional.example
npx kozen --moduleLoad=@kozen/etl-mk --action=etl:validate --envFile=.env

The module entry point is src/index.ts. The compiled output is written to dist/.