npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@hearit-io/redis-channels

v1.2.1

Published

Redis pre-sharded chanells implementation

Downloads

1,368

Readme

@hearit-io/redis-channels

Contributor Covenant js-standard-style NPM License NPM Downloads

It is a fast, reliable, and scalable message broker implementation based on Redis streams.

Suitable for IoT applications with massive network traffic, pub/sub-use cases, or any implementation with multiple producers/consumers.

Allows simple integration with SSE(Server-Sent Event)/WebSocket implementation. See our SSE Fastify based example.

See a live SSE frontend implementation used in our HEARIT.IO project here

Implements a possibility to scale the message processing across different consumers, without single consumers having to process all the messages. A group of consumers, working as a team, can cooperate and consume a separate portion of those messages from the same channel.

It can be used with a single Redis instance and later updated easily to a cluster configuration without the need for any application change. Under the hood ioredis is used as a client.

Simple integration in web frameworks, already available plug-in fastify-redis-channels for our favorite framework Fastify. See all step-by-step examples there.

The implementation uses native Promises.

Do you want your project to grow? Then start right from the begging.

Table of Contents

Install

$ npm install @hearit-io/redis-channels --save

Usage example

Requires running Redis server on a host localhost and a port 6379.

Every consumer processes all messages

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

// A consumer implementation
async function consume(tunnel, channels) {
  for await (const messages of channels.consume(tunnel)) {
    for (const i in messages) {
      // Do something with messages
      console.log(messages[i].data)
    }
  }
}


async function main () {
  try {
  
    const channels = new RedisChannels()
    const tunnelProducer = await channels.use('room')
    
    const tunnelConsumerOne = await channels.use('room')
    const tunnelConsumerTwo = await channels.use('room')
    
    // Subscribe consumer tunnels
    await channels.subscribe(tunnelConsumerOne)
    await channels.subscribe(tunnelConsumerTwo)

    // Start consuming 
    consume(tunnelConsumerOne).catch((error) => {
      console.error(error)
    })
    consume(tunnelConsumerTwo).catch((error) => {
      console.error(error)
    })

    // Produce messages
    await channels.produce(tunnelProducer, 'Hello')
    await channels.produce(tunnelProducer, 'Wold')

    // Unsubscribe all consumers
    await channels.unsubscribe(tunnelConsumerOne)
    await channels.unsubscribe(tunnelConsumerTwo)

    // Delete a channel related to the 'room'.
    await channels.delete('room')
  }
  catch (error) {
    console.error(error)
  }
}
main()

The result on the console is:

Hello
World
Hello
World

Consumer's team

In order to create a group of consumers (a team) which cooperates consuming a different portion of the messages form the same channel, just add a team parameter in the subscribemethod like this:

 
 // Subscribe consumer tunnels
 await channels.subscribe(tunnelConsumerOne, 'team')
 await channels.subscribe(tunnelConsumerTwo, 'team')
 

The result on the console is:

Hello
World

API Documentation

new RedisChannels([options])

Creates an instance of a RedisChannels class. It uses ioredis to manage connections to a Redis.

Options:

| Property | Type | Default | Description | | --- | --- | --- | --- | | channels | Object | | Options related to channels | | channels.application | string | 'app' | Application name, used as a prefix part in the Redis key | | channels.version | number | 1 | Version number, used as a prefix part in the Redis key | | channels.schema | string | 'channel' | Schema name, used as a prefix part in the Redis key | | channels.overflow | number | 100 | Trims the channel size (approximately), but never less than the specified number of elements. You must choose a value suitable for your use case and available memory | | channels.slots | number | 32 | The number of pre-sharded slots. Possible values are 32 or 64 | | channels.sharded | boolean | false | Whether the channels will be spread even over the shards. It makes sense if your application connects to a Redis cluster | | channels.log | Object | | A logger instance, by default logging is disabled, we use abstract-logging for this purpose | | redis | Object | | Options for a Redis clients | | redis.nodes | Array | | An array of nodes in the cluster, [{ port: number, host: string }]. If nodes are defined a connection to a cluster will be tried. | | redis.url | string | | A URL string to a single Redis server, for example 'redis://user:[email protected]:6379/' | | redis.host | string | 'localhost' | A host name of a single Redis server | | redis.port | number | 6379 | A port number to connect to a single Redis server | | redis.... | Rest properites | | Any other options will be passed to the Redis or the Cluster client |

Examples

Channels will use a Redis cluster connected to three nodes with enabled offline queue in a sharded mode.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  channels: {
    sharded: true
  },
  redis: {
    nodes: [
      { host: '127.0.0.1', port: 6380 },
      { host: '127.0.0.1', port: 6381 },
      { host: '127.0.0.1', port: 6382 }
    ],
    options: {
      enableOfflineQueue: true
    }
  }
}
const channels = new RedisChannels(options)

Channels will use a single Redis connection defined with a URL and keep alive set to 10 seconds.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  redis: {
    url: 'redis://10.101.201.100:3333',
    keepAlive: 10000
  }
}
const channels = new RedisChannels(options)

The same as above but defined with a host and port.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  redis: {
    host: '10.101.201.100',
    port: 3333,
    keepAlive: 10000
  }
}
const channels = new RedisChannels(options)

RedisChannels methods

channels.use(group)

Creates a tunnel Object required to do any further operation with the channel associated with the group.

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | group | string | | A name of a channel group (i.e. chat room etc.) |

Returns a Promise to a tunnel Object.

channels.produce(tunnel, message[, type = 'all'])

Produces a message to a channel.

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | tunnel | Object | | A tunnel required to peform any operation with the channel | | message | string | | A message to produce | | type | string | 'all' | Identifies a message source (origination) |

Returns a Promise.

channels.subscribe(tunnel[, team, consumer])

Subscribes a tunnel to make possible a consume operation.

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | tunnel | Object | | A tunnel required to perform any operation with the channel | | team | string | the same as a consumer | A name of a consumer team. If specified, every consumer within a team will receive different part of the messages arrived in the channels | | consumer | string | a generated v4 uuid | A unique consumer name within a team |

Returns a Promise.

channels.consume(tunnel[, type = 'all', count = 100, timeout = 10000, fromId = time_in_milisecounds-sequence, messageOnTimeOut = false])

It is an asynchronous iterator which returns an array of messages.

Every message is a couple of { id: <string>, data: <string> }

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | tunnel | Object | | A tunnel required to peform any operation with the channel | | type | string | 'all' | Identifies a message source (origination) to consume | | count | number | 100 | Defines a maximum number of messages consumed per iteration | | timeout | number | 10000 | A blocking timeout in milliseconds | | fromId | string | '>' or '*' | Starts consuming messages newer then a given id. Default value is set to > or * whether if it is consumed in a team or not. This means staring form messages that were never delivered to any other consumer. The format is time_in_milisecounds-sequence or only the miliseconds part of the id.| | messageOnTimeOut | boolean | false |If set, in a case of a timeout a message array [{id: data: null}] will be returned to indecate it. If there were no consumed messages the id value will be undefined. This feature is usefull if you want to build in your consumer an additional functionality - for example sending to an SSE(Service Send Event)/websocket ping messages on every consumer timeout.|

Returns a Promise to an Array of Objects.

channels.unsubscribe(tunnel)

Unsubscribes form a tunnel. This causes the consume method to finish with the iteration and cleans up the associated consumer.

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | tunnel | Object | | A tunnel required to peform any operation with the channel. Use only subscribed tunnels, otherways an exception will throw. |

Returns a Promise.

channels.delete(group)

Deletes all data in the Redis database associated with the group.

| Parameter | Type | Default | Description | | --- | --- | --- | --- | | group | string | | A name of a channel group (i.e. chat room etc.) |

Returns a Promise .

channels.cleanup()

Closes all redis clients and deletes all consumers/consumer groups. Useful in agraceful application shutdown.

Returns a Promise .

Error handling

In an error all methods throw an instance of RedisChannelsError class.

It has following properties:

| Property | Type | Default | Description | | --- | --- | --- | --- | | message | string | | An error description | | error | Object | | An error object caused the exception |

A usage example of an error handling:

'use strict'
const {RedisChannels, RedisChannelsError} = require('@hearit-io/redis-channels')

try {
   // Your code
} 
catch (exception) {
   if (exception instanceof RedisChannelsError) {
      // Handle the error
      console.error(exception.message)
      console.error (exception.error)
   }
}

Running tests

We want to deliver the highest possible quality to our valuable open source community by implementing 100 % test coverage for all packages.

To run the test cases a running Redis server is required.

In the environment variable REDIS_NODES you can set a list of Redis nodes separated by space as example:

export REDIS_NODES="127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382"

If there are more than one entry in the list a connection to a Redis Cluster will be used for the tests. If REDIS_NODES is not set a connection to 127.0.0.1:6379 will be tried.

Similar to a variable REDIS_NODES, a variable REDIS_DOWN_NODES is used in the test cases covering missing connection to a Redis server. A default value is '127.0.0.1:3333'.

After each test, the data base will be FLUSHED. Please make sure there is no valuable data in the data base before running the tests!

Unit tests

Start the unit tests with:

npm test

Heavy load/reach the limit tests

The purpose of those tests is to verify the behavior of your system in a situation of a massive traffic - go to the limits.

This can be used to estimate the maximum number of consumer/producers, the throughput, and the need of operation system tunning.

You can use following environment variables to configure your tests:

  • NUMBER_OF_GROUPS - defines the number of channel groups (i. e. chat rooms, ...). Default value is 10.
  • NUMBER_OF_MESSAGES - number of messages to produce per group. Default value is 100
  • NUMBER_OF_CONSUMERS - number of consumers per group. Default value is 10
  • SHARDED - If channels sharded mode will be used. Default value is 'false', to enable it set to 'true'
  • GROUP_PREFIX - The prefix for the group name. Default value is 'GROUP'.

Following command starts four node processes:

cd ./test/heavy-load 
./heavy-run.sh

The test results for each process can be found in the ./test/heavy-load/log directory.

To clean-up the data base call:

./heavy-run-cleanup.sh

Make sure there is no valuable data in the data base before running the tests!

Tunning Redis and OS

You may need to adapt maxclients configuration parameter in the Redis server confg file (usually /etc/redis.conf). For more details see the Redis documentation about this topic.

On a Unix based system increase the maximum allowed number of temporary ports.

Check the current system value of local port range with:

cat /proc/sys/net/ipv4/ip_local_port_range  

Increase the value by defining a bigger range, for example with the command:

echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range

To make the change permanent add following line in /etc/sysctl.conf file:

net.ipv4.ip_local_port_range = 1024 65535

Check the used TCP and UDP connection with the command:

netstat -an | grep -e tcp -e udp | wc -l

or

ss -s

Benchmarks

We reached follwing values in our benchmark tests on a Linux Debain server with 64 GB memory, single i7-7700 CPU 3.60GHz:

  • Message size - 4 000 bytes.
  • Number of Redis servers - 1.
  • Number of node processes - 4.
  • Number of channels groups (i.e. chat rooms, ...) - 3 000.
  • Number of consumers - 30 000.
  • Number of produced messages - 300 000.
  • Number of consumed messages - 3 000 000.
  • Avarage processed messages per second - 24 200.

TODO

The list of already implemented / planed features:

  • [x] Limit the maximum number of channel elements in the produce method (capped streams).
  • [x] Implement a scenario where all consumers are served with the same messages arrived in the channels.
  • [x] Add Benchmarks.
  • [x] Implement a scenario where consumers are served with the different part of the messages arrived in the channels (a work in a team).
  • [x] Introduce a parameter in the consume method which allows starting message consuming form a given period in the past.
  • [ ] Implement consume with acknowledge functionality.
  • [ ] Implement a channel monitoring capability.
  • [ ] TypeScript support.

Project status

hearit.io

| Smart home automatization designed for visually impaired people. ------------ | -------------

@heart-io/redis-channels is used productive in our progressive web app. You can try it with a user [email protected] and password: 11223344

The package will be updated and maintained regularly.

The main goal of hearit.io is to make accessible the world of IoT to everyone. We created a speaking home suitable for all.

We will be grateful to you if you make awareness to other people of our project.

To finance our idea, we would be glad to support and work on your projects. Contact Emil [email protected] for details and our availability.

Other open-source packages, part of our project, will be available soon. We use Fastify as an application framework.

Authors and acknowledgment

Emil Usunov

hearit.io

License

MIT