npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

kafka-queue

v1.0.4

Published

Kafka Queue

Downloads

7

Readme

Kafka Queue

Wrapper around Kafka with built in assumptions to make a keyed-message queuing system a little easier to build. This wrapper was built to support an IoT -like system, where devices in the outside world are communicating to a cloud service. Each device has a unique deviceId. Any process that consumes a device should consume devices with the same deviceId always, and those messages should arrive in order.

This wrapper uses the no-kafka library, which supports retries on connection failures automatcially. This wrapper was developed and tested using this docker container.

Usage

Producer (enqueue)

let Q = require( 'kafka-queue' )( config );
Q.producer.connect( function( err ) {
  if ( err ) exit( err );
  let message = { deviceId: '1001', p1: 'p1', p2: 'p2' };
  Q.producer.send( queueName, message, function( err ) {
    if ( err ) console.error( err );
    exit();
  });
});

Consumer (dequeue)

let Q = require( 'kafka-queue' )( config );
Q.consumer.connect( queueName, groupId, function( message, cb ) {
  let handle = message.handle;
  let msg = message.msg;

  console.log( JSON.stringify( msg ) );
  // when you know processing is good/done, advance the consumer's position in the queue
  // to the next message.
  cb();
});

Calling the cb() passed to your message handler controls the message commit. Called with an err as the first argument will cancel the commit.

You can explicity handle commits, although you should have to normally:

Q.consumer.commit( handle, function( err ) {
  if ( err ) console.error( 'commit error:', err );
});

The groupId is optional to the call to connect(). If not specified, then your config should contain a groupId. If groupId is explicitly passed, it will override any value specified in the config.

Config

This wrapper expects config to look like:

{
  "keyField": "deviceId",
  "connectionString": "192.168.99.103:9092",
  "logger": {
    "logLevel": 1
  }
}

The config is generally the same as documented here. The keyField is required and is the name of the field in the incoming messages (being passed to producer.send()) that contains the device id that you want to use as a key.

Example Application

You can run a simple test in this directory. The test environment consists of an "ingest.js" script that emulates three devices sending messages into the system. These messages get sent to the "ingest" queue. There is a "relayer.js" script that reads from the ingest queue and duplicates those incoming messages to a "staging" queue and a "prod" queue. There is a "pipeline.js" script that reads from the staging or prod queue (specifiy on the command line) and prints the messages to stdout.

If you create the "ingest" queue with one partition, you can run one instance of relayer.js. If you create the queue with two partions you can run two instances of the relayer.js script ... and so forth. Same with the other queues. Let us say that you create all three (ingest, staging, prod) with 2 partions each. Then you can run 2 instances of the relayer.js script and four instances of the pipeline.js script; two with "staging" as an argument and two with "prod" as an argument. Then run ingest.js and you'll see messages flow through the system, being duplicated into the two stacks and being "worked on" in the pipeline scripts.

If you kill one of the instances in a pair, you'll see the other instance begin to take over the processing of the killed instance. If you restart the killed instance, it'll begin to process its own messages again.

You should see that messages with id 'X' will always get sent to a consistent instance of the pipeline.js script, except when that instance dies, in which case 'X' will start getting processed by a remaining instance.

Setting up Kafka for this example

Create a docker machine to host zookeeper and kafka. Execute eval $(docker-machine env MACHINE), then execute "sh RUN.sh".

Reference Documentation