npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@streamerson/consumer

v0.0.40

Published

> Typescript Consumer/Producer for Redis Streams

Downloads

119

Readme

@streamerson/consumer

Typescript Consumer/Producer for Redis Streams

This package is a part of the larger @streamerson monorepo, which contain tools to interact with Redis streams in event-oriented architectures. This particular member of the monorepo introduces a high-level interface for consuming and producing events/stream messages. The idea is fundamentally to bind handlers to events, where the handler optionally returns a message to another stream. This pattern is very similar to the Request->Handler trope we see in many webservers, and that's deliberate, to capture some of that functional style while working with streams.

Table of Contents

A Foreword on Naming and Purpose

The reason this package is called @streamerson/consumer rather than @streamerson/consumer-producer is mulititude: brevity, clarity (does it produce consumers?), and moreover, in the @streamerson architecture, consuming a stream very often means producing a message on another one, though that's not always the case-- i.e. this package supports multiple use-cases:

  • Consume a stream message and do nothing (if there is no handler)
  • Consume a stream message and do something (in a handler)
  • Consume a stream message, do something, and produce a message on another stream

Installation

  • yarn install @streamerson/consumer

Example

The following example will create a consumer reading from a Redis stream called my-stream-topic, and listening for messages with the messageType: "my-event". The example binds an event handler to "my-event", the return value of which will be sent along the bidirectional channel to whoever may be listening:

consumer-with-framework.example.ts

import { Topic } from '@streamerson/core';
import { StreamConsumer } from '@streamerson/consumer';

const consumer = new StreamConsumer({
    topic: new Topic('my-stream-topic'),
    bidirectional: true
});

consumer.registerStreamEvent<{ name: string }>('hello', async (e) => {
   return {
       howdy: `there, ${e.payload.name}`
   }
});

await consumer.connectAndListen();

Consumer-Producers

The @streamerson/consumer module is intended as an abstraction layer over the low-level components in the @streamerson/core modules. The intention is to provide a feature-rich, usable layer for application-layer consumers of events. I am going to keep mentioning that the feature-richness of the @streamerson/consumer is the reason for its existence (over just using the core modules), so let's take a look at some of those features:

Features

  • Hides the underlying Streams and EventEmitters behind functional interfaces
    • Allows access to these underlying constructs as an escape hatch
  • Creates an "Event Type" -> "Event Handler" contract familiar to developers
  • Supports dynamic switching of these event handlers
  • Supports configurable bidirectional / unidirectional modes
  • Supports message routing (a->b->c->...->x) modes for stream pipelines
  • Supports consuming from multiple streams at once (fan-in multiple Topics)
  • Remembers its streams so you don't have to reference them in operations
  • Supports configuration for providing your own Redis client, logger, etc.

To understand the motivation for these features, first, let's look at a side-by-side of a low-level featureless consumer written for your benefit (dear Reader), and beside it, the @streamerson/consumer for comparison.

Examples in Depth

consumer-without-framework.example.ts

import {MappedStreamEvent, StreamingDataSource, Topic} from '@streamerson/core';
import {Transform} from 'stream';

const streamTopic = new Topic('my-stream-topic');

const channels = {
    read: new StreamingDataSource(),
    write: new StreamingDataSource()
}

await Promise.all([
    channels.read.connect(),
    channels.write.connect()
]);

const [readableStream, writableStream] = [
    channels.read.getReadStream({
        stream: streamTopic.consumerKey()
    }),
    channels.write.getWriteStream({
        stream: streamTopic.producerKey()
    }),
];

const transform = new Transform({
    objectMode: true,
    transform: function (e: MappedStreamEvent, _, cb) {
        switch(e.messageType as string) {
            case 'hello':
                this.push(({
                    ...e,
                    payload: {
                        hello: 'world!  I just saw a message: \r\n\r\n' + JSON.stringify(e.payload, null, 2)
                    }
                } as MappedStreamEvent));
                cb();
                break;
            default:
                this.push(({
                    ...e,
                    payload: {
                        error: 'Unknown message type',
                        statusCode: 400
                    }
                } as MappedStreamEvent));
                cb();
                break;
        }
    }
});

readableStream.pipe(transform).pipe(writableStream);

You'll notice the code in the dropdown above is kind of grossly low-level (it is concerned with streams, Transforms, etc.) and requires assembly. Luckily, the @streamerson/consumer comes with lots of features out of the box, and conceals the configuration burden behind reusable interfaces, and allows for a declarative approach to the more imperative components in the monorepo. Let's take a look:

Hopefully this seems cleaner, less concerned with low-level details, and easier to understand from the perspective of someone doing service development. The handler for each event resembles in principle the handler for a web-request, and is routed along a MessageType in much the same way that a web-request is routed by its path. The metadata of the stream message is visible to the handler (much like the Request objects many developers know fondly) as is the payload of that message (again-- much like the Body of a Request). This familiarity is intentional and why the @streamerson/consumer is the sort of "blessed-path" over utilizing the lower level modules (as in the case of the "low-level" example code above).

API

_API.md

:factory: StreamConsumer

Methods

:gear: bindStreamEvents

| Method | Type | | ---------- | ---------- | | bindStreamEvents | (topic: Topic) => void |

:gear: setOutgoingChannel

| Method | Type | | ---------- | ---------- | | setOutgoingChannel | (channel: StreamingDataSource) => void |

:gear: registerStreamEvent

Bind an MessageType to a handler function

| Method | Type | | ---------- | ---------- | | registerStreamEvent | <T extends PayloadVariety = Record<string, NonNullablePrimitive>, R extends void or PayloadVariety = Record<string, NonNullablePrimitive>>(typeKey: keyof EventMap, handle: HandlerLogicFunction<...>) => void |

Parameters:

  • typeKey: the MessageType to bind
  • handle: the handler function to bind to the MessageType

:gear: deregisterStreamEvent

| Method | Type | | ---------- | ---------- | | deregisterStreamEvent | (typeKey: keyof EventMap) => void |

:gear: addStream

| Method | Type | | ---------- | ---------- | | addStream | (key: string) => void |

:gear: hasStream

| Method | Type | | ---------- | ---------- | | hasStream | (key: string) => boolean |

:gear: removeStream

| Method | Type | | ---------- | ---------- | | removeStream | (key: string) => void |

:gear: cacheComposite

| Method | Type | | ---------- | ---------- | | cacheComposite | (cacheKey: string) => { key: string; shard: string; } |

Message Acknowledgement / Tracking

The @streamerson/consumer does not support the acknowledgement of messages server-side-- i.e., the stream itself in Redis does not know who has read what, or who last read which message, and two consumer modules reading from the same key would get the same messages. (If this is disappointing, please read on for some appointment) This means that this module is appropriate for fan-in streams, event-broadcasting use-cases, etc.

So if you want to have a sort of once-only processing architecture (in which one or many readers each operate on different stream messages), this particular package is not it... the @streamerson/consumer package :star: is :star: though, so you're in luck!

If you found yourself in this section because you are wondering about acknowledgement, then you might be looking for a consumer-group of one member (or more), which "automagically" checks each message back in using the XACK Redis protocol, meaning that its consumer-group is message-processed aware. @streamerson/consumer instances are stream-position aware using a cursor, but they do not acknowledge a message as "complete" in Redis.

(Footnote: it is possible to have multiple consumer groups, each with independent message-processed awareness over the same set of messages, resulting in exactly-twice [or N-times] processing per message. I can think of only a few use-cases for this, but it's a cool idea.)

TLDR:

  • This consumer deliberately does not mark a message "processed" -- after all, there could be other readers in a fanout or broadcast scenario. This also makes the whole process faster.
  • If you want a stateless, once-only delivery of messages to a single consumer that marks its messages as processed in Redis when complete, look at the @streamerson/consumer package.

Stream Recovery / Cursor Iteration

!warning! Some of the following may not be fully implemented but will be in a 1.0 version: !warning!

The default behavior of the Consumer is to come alive and listen only for new messages. However, a cursor parameter allows the consumer to begin reading from a historical point in the stream. This is automatically done as the client curses across stream entries from Redis, but can be supplied manually for a number of reasons.

If you want to implement recovery at the process-level such that a reader can die, come alive, and not miss any messages, with the @streamerson/consumer module, it requires some configuration logic. This is by design, because these modules cannot know the identity of a reader on a stream, so storing its last position on the stream (statelessly) requires some bespoke value-- luckily, supported in configuration on the recoveryKey constructor parameter.

This key specifies at which key in Redis to store the iterators for a given client-- meaning that in a multi-reader scenario, to have per-reader recovery, you would want to give each of these readers a unique recoveryKey driven by environment or build configuration.

If this seems like a pain, it's potentially because you are crossing over the threshold from a consumer to a consumer-group when you begin caring about tracking the state of individual readers on a given stream. Much of that process is handled out-of-the-box for us by Redis when we utilize the consumer group API, which is implemented in the @streamerson/consumer module. A richer explanation of this difference can be found above.