redistribute
v0.0.3
Published
Distributed and replayable streams, server and client
Downloads
7
Readme
Redistribute is a toolkit for building streaming real-time applications using Node.js, Redis Streams, WebSockets and localStorage, which aims to dramatically reduce the amount of code and cash required to write reliable, distributed, real-time applications with no single points of failure.
Redistribute has two components, redistribute
, the server package (a Node.js library), and redistribute-client
, a browser and Node.js compatible client library,
An example will look quite a bit like an example for a standard WebSocket library, with a few key differences:
Redistribute Streams are distributed; any other server (and their clients) which is subscribed to a channel will receive the messages. No need for single points of failure, no need for vertically scaling a single Node.js instance, no need for the
cluster
module or other premature optimizations and complexities.Redistribute Streams are replayable; past messages can be re-read from the Redis stream after they have been originally sent (up to a configurable limit).
Redistribute Streams are shardable; a fleet of worker machines can work together to handle messages in a stream via "Consumer Groups".
Table of Contents
About
Redistribute is powered by brand new features in the Redis Database - Streams! You should read the original blog post and the official Redis docs. Redistribute uses XREAD and XADD to read and write to Redis Streams, and exposes an API that allows easy creation of real-time WebSocket services. Note that we require brand new features, including the recently merged CLIENT UNBLOCK
command - You can use the erulabs/redis-unstable
image for Redis for now (this should land in stable redis before too long). Redistribute is meant to be a low level library, and not a full featured chat system, for example.
Example
Server
// Connect to Redis5 instances
const server = require('redistribute')([
{ host: '127.0.0.1', port: 6379 }
])
// Hoist a Redistribute WebSocket server
server.listen({
key: '...', cert: '...', port: 8080
})
Client
const client = require('redistribute-client')({
targets: ['https://redistribute-api.demo:8080']
})
client.subscribe('myTestChannel')
client.on('messages', (channel, messages) => {
// TODO: Update chat app display with new messages!
})
client.publish('myTestChannel', 'TEST_MESSAGE', { foo: 'bar' })
API Documentation
Messages
Messages events provide arrays of "Message" objects, which have the following properties:
const Message = {
offset: string,
key: value,
key2: value2...
}
For example, consider the following:
// Server
socket.emit('testChannel', 'TEST_MESSAGE', { foo: 'bar' })
// Client
client.on('messages', (channel, messages) => {
// channel === "testChannel"
// messages === [
// {
// offset: "1518951480106-0",
// TEST_MESSAGE: { foo: 'bar' } }
// ]
})
Server
Redistribute(options)
Where "options" is an object with the following properties:
| Option | Required | Default | Description |
| :-------- | :------: | :--------------: | ----------------------------------------- |
| targets
| yes | - | An array of Redis server URIs |
| encode
| no | JSON.stringify
| A function for encoding published objects |
| decode
| no | JSON.parse
| A function for decoding published objects |
Server Events
- connect - A new socket has connected
- error - An error has been encountered (connection to Redis failed, etc)
Redistribute.listen(options)
Where "options" is an object with the following properties:
| Option | type | Required | Default | Description |
| :----- | :--------: | :------: | :-----: | -------------------------------------------------- |
| key
| string | yes | - | Filepath to SSL key |
| cert
| string | yes | - | Filepath to SSL certificate |
| port
| number | no | 8080 | Port number to listen on for websocket connections |
async Redistribute.publish(channel, ...)
Send data upstream to the Redis service.
await server.publish('myChannel', 'some', 'data')
// returns with locally loaded messages and most recent offset
Note that arguments after the channel name are in key-value pairs of two, matching the XADD Documentation. The exception is when the second argument is a number, in which case it is assumed to be a MAXLEN (Maximum stream length):
// Delete old stream entries after ~ 5000 messages
server.publish('myChannel', 5000, 'foo', 'bar')
Redistribute.maxlen(channel, maxlength)
Tells the server to prepend a MAXLEN argument to all XADD commands for this channel (Read more here). Note that this command only effects the current Redistribute instance - it is not propagated to other instances or persisted anywhere except memory.
server.maxlen('myChannel', 5000)
For dynamic channels, consider setting the MAXLEN during publishing
Socket Events
- disconnect - A socket has disconnected
- subscribe - A socket has requested a subscription to a channel
- unsubscribe - A socket no long wants messages from a channel
- publish - A socket has a message to publish for a channel
- messages - Messages have arrived from a subscribed channel
Socket.subscribe(channel, offset)
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | ---------------------- |
| channel
| string | yes | - | Channel identifier key |
| offset
| string | no | $
| Redis stream offset |
Socket.unsubscribe(channel)
Unsubscribes a socket from a given channel. This is called automatically when a socket disconnects, if there is no existing disconnect
handler defined.
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | --------------------------- |
| channel
| string | yes | - | Channel to unsubscribe from |
Socket.emit(channel, messages)
Send data to a connected socket
Client
Client(options)
Where "options" is an object with the following properties:
| Option | Required | Default | Description |
| :------------- | :------: | :--------------: | ----------------------------------------- |
| targets
| yes | - | An array of Redistribute server URIs |
| encode
| no | JSON.stringify
| A function for encoding published objects |
| decode
| no | JSON.parse
| A function for decoding published objects |
| localStorage
| no | true | Automatically store retrieved messages |
Client Events
- connect - Socket is connected to the server
- disconnect - Socket has been disconnected from the server
- messages - Messages have arrived from a subscribed channel
- error - An error has been received from the server
async Client.load(channel)
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | ------------------------------------------ |
| channel
| string | yes | - | Channel to load locally stored message for |
const { messages, lastOffset } = await client.load('myChannel')
// returns with locally loaded messages and most recent offset
async Client.subscribe(channel, offset)
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | ------------------------------------------ |
| channel
| string | yes | - | Channel to load locally stored message for |
| offset
| string | no | $
| Redis stream offset |
await client.subscribe('myChannel')
// returns true if subscribe message sent successfully
async Client.unsubscribe(channel)
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | --------------------------- |
| channel
| string | yes | - | Channel to unsubscribe from |
await client.unsubscribe('myChannel')
// returns true if unsubscribe message sent successfully
async Client.publish(channel, ...)
| Argument | type | Required | Default | Description |
| :-------- | :--------: | :------: | :-----: | ------------------------------ |
| channel
| string | yes | - | Channel to publish messages to |
The rest of the arguments are considered key-value pairs, to be used during the Redis Stream XADD. This means that the number of arguments after channel
must be an even number. For example:
// Examples of Client Publish
await client.publish('test', 'foo', 'bar')
// Success! Returns with new message offset
await client.publish('test', 'foo')
// Error! Invalid argument count!
await client.publish('test', 'foo', { bar: "baz" })
// Note that object arguments are stringified automatically
await client.publish('test', 'foo', 'bar', 'baz', 'bam', 'words', 'things')
// Success! We can post as many key-value pairs as desired
Advanced Topics
Restricting channels and messages (authentication)
Redistribute has a set of default handlers for the subscribe
and messages
events - we simply pass them to Socket.subscribe
and Server.publish
directly (effectively meaning anyone can subscribe and publish to any channel). You can (can probably should!) overwrite that behavior to implement permissions and authentication by overwriting those event handlers:
server.on('connect', socket => {
// Handle subscription requests from users
socket.on('subscribe', (channel, offset) => {
if (userHasAccessToChannel(socket, channel))
socket.subscribe(channel, offset)
})
// Handle messages from a user
socket.on('messages', (channel, messages)) => {
if (userHasAccessToChannel(socket, channel))
server.publish(channel, messages)
})
})
Maximum stream length (XADD MAXLEN)
Redistribute can set a MAXLEN argument when performing XADDs (Read more here)
// Set ahead of time, for specific channels
// Limit the stream to the last 5000 messages
// Note this setting only persists locally within this Node instance
server.maxlen('myTestChannel', 5000)
// When publishing on the server
server.publish('myTestChannel', 5000, 'foo', 'bar')
Filtering messages to end-users
On the server, define a streamEvent
handler, and intercept messages from Redis Stream which are on their way to end-users
// This handler is not required (this is the default behavior!)
server.on('streamEvent', (channel, message) => {
// Ship messages to sockets who are subscribed to those channels
// This handler is useful in situations where the server ought to take some action,
// but it's not required to notify end-users of the message
server.broadcast(channel, message)
})