npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

kafka-express

v2.0.3

Published

simple kafka listener in the style of an express app

Readme

kafka-express

A kafka consuming server in the style of Express.js

"kafka consuming server"

Start a server that consumes messages from a kafka server, listening to one or more topics, powered by KafkaJS.

"in the style of Express.js"

Defines the topics you want to listen to as express routes and add middlewares to them, just like you would to an express server.
It uses middlewary

Usage

const kafkaExpress = require('kafka-express');

const server = kafkaExpress();

server.use((req, res, next) => {
  console.log('Shared middleware');
  next();
});

server.use('my-test-topic', (req, res, next) => {
  console.log('my-test-topic middleware');
  res.end();
});

server.use('another-test-topic', (req, res, next) => {
  console.log('another-test-topic first middleware');
  next();
}, (req, res, next) => {
  console.log('another-test-topic second middleware');
  res.end();
});

server.use('test.topic.*', (req, res, next) => {
  console.log('all topics that start by test.topic middleware');
  res.status(201).end();
});

server.use((err, req, res, next) => {
  console.log('Global error middleware');
  res.status(500).end(err);
});

const clientConf = {
  clientId: 'kafka-express-test',
  brokers: ['localhost:9092'],
};

const consumerConf = {
  groupId: 'kafka-express-test-consumer',
};

const fromBeginning = true;

server.listen(clConf, csConf, fromBeginning);

If you are unfamiliar with how express middlewares work, I would suggest you read the Express doc, here is just a reminder that you need to call either "next()" or "res.end()" in each of your middleware.

Differences

For obvious reasons the request and reponse objects you receive in your middlewares are different than for a HTTP request:

const {
  app, // The kafka-express running application
  raw, // raw kafka message (KafkaMessage)
  topic, // name of the topic sending the message (string)
  path, // name of the topic sending the message (string)
  kafkaConsumer, // The KafkaJS consumer object in case you to need to interact with it in your handlers (consumer object)
  partition, // partition id (string)
  key, // message key (string)
  value, // message value (string)
  body, // message value converted to JSON if possible (object)
  headers, // message headers if present (object)
  params, // converted params from the topic name if present (object)
  res, // the response object
  next, // the next callback
} = req;

const {
  req, // The request object
  app, // The kafka-express running application
  statusCode, // Status code for the response (default to 200)

  end(), // function to call to end the request-response cycle
  status(), // Sets the status for the response. It is chainable.
} = res;

The response object is an event dispatcher and emits the 'finish' event when the request cycle ends.

The "Topic" object replaces the Router object. Note that you can mount a topic to another topic to create chained topics but only those that have a mounted middleware will be subscribed to the kafka server.

const kafkaExpress = require('kafka-express');

const { Topic } = kafkaExpress;

const server = kafkaExpress();

const testTopic = new Topic('test');
const outTopic = new Topic('out');
const noMiddlewareTopic = new Topic('no');

testTopic.use((req, res) => {
  console.log('hello test');
  next();
});
outTopic.use((req, res) => {
  console.log('hello out');
  res.end();
});

outTopic.use(noMiddlewareTopic);
testTopic.use(outTopic);

server.use(testTopic);

// defined paths here are ['test', 'test.out']
console.log(server.paths);
// defined topics here are [ /^test\/?$/i, /^test\.out\/?$/i ]
console.log(server.topics);

You can subscribe to topics using a Regexp and including parameters (i.e 'topic.name.:param'), these parameters will be added to the request object.
You need to be aware that the KafkaJS client will only subscribe to the topics that match your regex AND already exists on the broker. It will not add topics that are created later.
The default separator is "." instead of "/".

KafkaJS

The "Application.listen" consumes messages through the "eachMessage" handler and accepts a client configuration for the KafkaJS client and a consumer configuration for the KafkaJS consumer client. You can specify the "fromBeginning" property as a third argument, it defaults to false if not present.
To use the "eachBatch" handler you need to manage the KafkaJS client yourself and use the onMessage function of the application:

const kafkaExpress = require('../src/kafka-express');
const { Kafka } = require('kafkajs');

const server = kafkaExpress();

... add some middlewares here

const kafka = new Kafka(clientConfig);

const consumer = kafka.consumer(consumerConfig);

await consumer.connect();

// server.topics get you all topics that have a middleware, in regex format
await consumer.subscribe({ topics: server.topics, fromBeginning: true });

await consumer.run({
  eachBatch: async (batch) => {
    for (let message of batch.messages) {
      await server.onMessage(consumer, message);
    }
  },
});