npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafkajs-buffer

v1.0.2

Published

Plugin for kafkajs to buffer messages and send them in batches, inspired by node-rdkafka

Downloads

1,863

Readme

kafkjajs-buffer

Plugin for kafkajs to buffer messages and send them in batches, inspired by node-rdkafka

Overview

kafkajs-buffer adds queue/buffer capabilities to a kafkajs producer to buffer the messages before sending. It splits the buffer in batches and sends the messages to Kafka, optmizing the number of requests and hidding all this complexity. Delivered messages will be notified in a callback function avoiding the need of awaiting and improving streaming times.

Usage

You can install the kafkajs-buffer module like any other module:

npm install kafkajs-buffer

To use the module, you must require and instance it.

import { KafkajsBuffer } from "kafkajs-buffer";
const producerBuffer = new KafkajsBuffer(producer, options);

To send the messages push them in the buffer, similar the way you would send it using Kafkajs.

  producerBuffer.push({
    "topic-1",
    messages: [
      {
        key: "m1",
        value: "message 1",
      },
      {
        key: "m2",
        value: "message 2",
      },
    ],
  });

You can also push messages for different topics.

producerBuffer.push([
  {
    topic: "topic-1",
    messages: [
      {
        key: "m1",
        value: "message 1",
      },
    ],
  },
  {
    topic: "topic-2",
    messages: [
      {
        key: "m2",
        value: "message 2",
      },
      {
        key: "m3",
        value: "message 3",
      },
    ],
  },
]);

You can programatically request to send the buffer messages to kafka. This avoids reaching the max buffer size. Depending on the time from the last sending, the messages in the buffer queue will be sent immediately or postponed.

producerBuffer.poll();

In addition you can set the producer to poll on an interval.

producerBuffer.startAutoPolling(100);

Don't forget stop the autopolling before your program execution ends.

producerBuffer.stopAutoPolling();

To receive the confirmation when the messages are published to kafka use the callback functions 'onBatchDelivered' and/or 'onMessageDelivered'.

// This function is called everytime a message is successfully sent to Kafka
const onMessageDelivered = (messageDelivered) => {
  messagesDeliveredCount += 1;
};
// This function is called everytime a batch is successfully sent to Kafka
const onBatchDelivered = (messagesDelivered: IDeliveredMessage[]) => {
  messagesDeliveredCount += messagesDelivered.length;
};

In addition you can add extra information to the messages that won't be sent to kafka but will be received in the callback function.

type Info = {
  timestamp: number;
};

const producerBuffer = new KafkajsBuffer<Info>(producer, options);

const messageToSend: IMessageWithInfo<Info> = {
  key: "1",
  value: "message value",
  info: {
    timestamp: Date.now(),
  },
};

const onMessageDelivered = (messageDelivered: IDeliveredMessage<Info>) => {
  console.log(
    `Message created at ${messageDelivered.info?.timestamp} was delivered to kafka`
  );
};

To greacefully shutdonwn your process you must call and await 'flush'. It will wait until a possible onprogress sending ends and/or will send any pending message still in the buffer.

await producerBuffer.flush();

Configuration

const options = {
  batchNumMessages: 1000, // The buffer is sent to kafka splitted in batches of this size.
  queueBufferingMaxMs: 1000, // Time the messages are buffered before sending. Polling actions will be trigger the sending after this time.
  queueBufferingMaxMessages: 100000, // Max number of messages allowed in the buffer. When more messages are pushed it will throw an error.
  onMessageDelivered: () => {}, // Callback confirmation when a message is delivered to kafka.
  onBatchDeliverd: () => {}, // Callback confirmation when a batch is delivered to kafka.
  onSendError: (err) => {}, // Callback with error when the messages are tried to be sent after a poll and fail
  messageAcks: -1, // Control the number of required acks (https://kafka.js.org/docs/producing)
  responseTimeout: 30000, // The time to await a response in ms (https://kafka.js.org/docs/producing)
  messageCompression: CompressionTypes.None, // Compression codec (https://kafka.js.org/docs/producing)
};