npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@pandelis/nestjs-redis-streams

v2.1.2

Published

Redis Streams Transport for NestJS. A drop in replacement for the built-in microservice Redis transport.

Downloads

240

Readme

@pandelis/nestjs-redis-streams

Note about this fork

  • This repository is a maintained fork of https://github.com/tamimaj/nestjs-redis-streams. Full credit to @tamimaj (Tamim Abbas Aljuratli) for the original implementation and design.
  • Goal: keep this package up to date for NestJS 11+ and provide a drop-in replacement for the built-in microservice Redis transport. This fork aims to integrate with Nest’s shutdown lifecycle to support drain/stop-consumption semantics and graceful, high-availability (HA) stream processing: pause consumption, finish in-flight work, ACK safely, and close Redis connections cleanly.

Status

  • Work is ongoing toward lifecycle-driven draining and HA-safe shutdown. The public API and general usage remain compatible with the original package so you can adopt this as an in-place drop-in.

Codecov npm npm GitHub issues GitHub Repo stars GitHub forks

Features

  • Built-in support for TypeScript.

  • Client and server-side transport strategy.

  • Client-side supports sending streams and receiving responses. It can also emit streams as fire-and-forget operations.

  • Server-side can listen on streams, acknowledge received messages, and respond with streams.

  • Simplified stream listening by plugging handlers into controllers.

  • Automatic creation of consumer groups for streams during bootstrap.

  • Convenient methods for responding with streams or multiple streams.

  • Built-in serialization and deserialization support.

  • Customizable serialization and deserialization with plug-able functionality.

  • Graceful shutdown and drain support:

    • Stop consuming on SIGINT/SIGTERM (or custom signals), optionally deregister the consumer (XGROUP DELCONSUMER), wait for in-flight jobs to finish (with optional timeout), then close Redis connections cleanly.

Installation

with npm

npm install --save @tamimaj/nestjs-redis-streams

with yarn

yarn add @tamimaj/nestjs-redis-streams

How to use?

Server Side (Receiver app)

In your main.ts. Initialize the custom strategy like this:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { RedisStreamStrategy } from '@tamimaj/nestjs-redis-streams';

async function bootstrap() {
  const strategy = new RedisStreamStrategy({
    // optional. All ioredis options + url.
    connection: {
      url: '0.0.0.0:6379',
      // host: 'localhost',
      // port: 6379,
      // password: '123456',
      // etc...
    },
    // mandatory.
    streams: {
      block: 5000,
      consumer: 'users-1',
      consumerGroup: 'users',
      deleteMessagesAfterAck: true,  // optional: delete message from stream
    },
    // optional. See our example main.ts file for more details...
    // serialization: {},

    // optional. Graceful shutdown/drain options (see section below)
    // shutdown: { signals: ['SIGINT', 'SIGTERM'], drainTimeoutMs: 30000, deregisterConsumer: true, exitProcess: false },
  });

  const app = await NestFactory.createMicroservice(AppModule, {
    strategy,
  });

  await app.listen();
}
bootstrap();

Graceful shutdown and draining

When a shutdown signal is received (default: SIGINT or SIGTERM), the server will:

  • Stop consuming new messages (halts XREADGROUP loop).
  • Deregister this consumer from each stream’s consumer group using XGROUP DELCONSUMER (configurable).
  • Wait for in-flight jobs to finish using an internal active job counter (configurable timeout).
  • Close Redis connections cleanly and optionally exit the process.

Configuration (all fields optional):

new RedisStreamStrategy({
  connection: { url: 'redis://localhost:6379' },
  streams: {
    block: 5000,
    consumer: 'users-1',
    consumerGroup: 'users',
  },
  shutdown: {
    // Which OS signals trigger drain+shutdown:
    signals: ['SIGINT', 'SIGTERM'], // default

    // How long to wait for active jobs to finish before continuing shutdown:
    // - omit or 0: wait indefinitely until all jobs complete
    // - number (ms): maximum time to wait
    drainTimeoutMs: 30000,

    // Deregister this consumer from the consumer group on each stream (XGROUP DELCONSUMER)
    deregisterConsumer: true, // default

    // Exit the process with code 0 after shutdown completes
    exitProcess: false, // default
  },
});

Notes

  • Draining behavior
    • The reader connection is disconnected to unblock any pending XREADGROUP call so the loop can exit. The client connection remains available to ACK remaining messages and deregister the consumer.
    • In-flight jobs are tracked and the server waits until they complete (or until the optional timeout).
  • NestJS lifecycle integration
    • If you orchestrate microservices from a primary Nest application, you can enable shutdown hooks there so Nest forwards termination signals and calls close() appropriately:
      const app = await NestFactory.create(AppModule);
      app.enableShutdownHooks();
      // connectMicroservice(...) or app.startAllMicroservices() if applicable
      await app.listen(3000);
    • This transport also installs its own signal handlers (SIGINT/SIGTERM by default), so it will gracefully drain even when used via createMicroservice() in isolation.
  • Manual drain
    • If you keep a reference to the strategy instance, you may trigger a programmatic drain:
      await strategy.shutdownGracefully(); // stops consumption, waits for in-flight jobs, closes Redis

Using with Nest graceful shutdown

Who calls process.exit?

  • Under a standard Nest application with shutdown hooks enabled, let Nest manage process lifetime. Keep the transport’s shutdown.exitProcess set to false (default). Node will exit naturally after all handles are closed.
  • Only enable shutdown.exitProcess in the transport when you are running the microservice standalone and you want the transport to terminate the process after a clean drain.

Recommended configurations

A) Nest-managed application (recommended)

  • Let Nest capture SIGINT/SIGTERM and orchestrate shutdown. Disable the transport-level signal handlers and keep exitProcess false.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { RedisStreamStrategy } from '@tamimaj/nestjs-redis-streams';

async function bootstrap() {
  const strategy = new RedisStreamStrategy({
    connection: { url: 'redis://localhost:6379' },
    streams: { block: 5000, consumer: 'users-1', consumerGroup: 'users' },
    shutdown: {
      signals: [],             // disable transport-level signal handlers; Nest will handle signals
      drainTimeoutMs: 30_000,  // optional
      deregisterConsumer: true, // default
      exitProcess: false,       // default
    },
  });

  // Hybrid app: main HTTP + microservice. Nest handles signals and will call microservice.close()
  const app = await NestFactory.create(AppModule);
  app.enableShutdownHooks();
  app.connectMicroservice({ strategy });
  await app.startAllMicroservices();
  await app.listen(3000);
}

bootstrap();

B) Standalone microservice (transport-managed signals)

  • Use the transport’s own signal handlers and optionally set exitProcess: true if you want it to call process.exit(0) after shutdown.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { RedisStreamStrategy } from '@tamimaj/nestjs-redis-streams';

async function bootstrap() {
  const strategy = new RedisStreamStrategy({
    connection: { url: 'redis://localhost:6379' },
    streams: { block: 5000, consumer: 'users-1', consumerGroup: 'users' },
    shutdown: {
      signals: ['SIGINT', 'SIGTERM'], // default
      drainTimeoutMs: 30_000,
      deregisterConsumer: true,
      exitProcess: true,              // transport will exit process after clean drain
    },
  });

  const app = await NestFactory.createMicroservice(AppModule, { strategy });
  await app.listen();
}

bootstrap();

Caveats

  • Avoid duplicate signal handlers: If both Nest and the transport listen for signals, shutdown will be invoked twice (shutdown is idempotent, but it’s unnecessary). Prefer one owner:
    • Under Nest: set shutdown.signals: [] on the transport.
    • Without Nest hooks: keep the transport’s signals enabled (default).
  • Kubernetes/containers: Use SIGTERM with a terminationGracePeriodSeconds longer than your drainTimeoutMs to allow a clean drain.

In one of your controllers where you want to handle the messages coming from a stream.

This fork is a direct replacement for @EventPattern rather than introducing a new decorator. Use our decorator @EventPattern("users-1") to tell the lib to register this handler and listen on that "users-1" stream and whenever it receive a message, this handler will be called with the data and a created message context.

import { Ctx, Payload, EventPattern } from '@nestjs/microservices';
import {
  RedisStreamHandler,
  StreamResponse,
  RedisStreamContext,
} from '@tamimaj/nestjs-redis-streams';

export class UsersEventHandlers {
  @EventPattern('users:create') // stream name.
  async handleUserCreate(@Payload() data: any, @Ctx() ctx: RedisStreamContext) {
    console.log('Handler users:create called with payload: ', data);
    console.log('Headers: ', ctx.getMessageHeaders());

    return [
      {
        payload: {
          // optional headers to override or add new headers keys.
          // everything except data is considered headers for our serialization.
          correlationId: 'THE BEST CORRELATION ID EVER',
          extraKey: 'Whatever1234',

          // data is the only mandatory key. for our serializer/deserializer.
          data: { name: 'Tamim', lastName: 'Abbas' },
        },

        stream: 'user:created',
      },
    ] as StreamResponse;

    // return [] as StreamResponse;

    // return null;
  }
}

Handling Responses in Your Handler

The behavior of the library depends on what you return from your handler function. The return value instructs the library on what actions to take:

  • If you don't return anything or return null: The library will not publish any streams in response and will not acknowledge the received stream message.

  • If you return an empty array: The library will only acknowledge the received stream message without publishing any streams in response.

  • If you return an array of one or more payloads: The library will publish each payload as a stream and then acknowledge the received stream message.

By controlling the return value of your handler, you can customize the library's behavior and determine whether to publish streams, acknowledge messages, or perform both actions based on your application's needs.

Client Side (Requestor app)

First you have to import the client module into your app module, or any other module you want to use it in. There is two ways to use the client module: sync and async. We will explain both.

Sync (register / forRoot)

When you have your redis connection config, streams config, etc, beforehand and you want to pass them to the client module, you can use the sync way.

In your app.module.ts or any other module you want to use the client to publish streams:

import { Module } from '@nestjs/common';
import { RedisStreamClientModule } from '@tamimaj/nestjs-redis-streams';

@Module({
  imports: [
    RedisStreamClientModule.register({
      connection: { url: '0.0.0.0:6379' },
      streams: { consumer: 'api-1', block: 5000, consumerGroup: 'api' },
      responseStreams: ['users:created', 'users:created:copy'],
    }),
  ],
})
export class AppModule {}

Async (registerAsync / forRootAsync)

When you don't have your redis connection config, streams config, beforehand, or you want to use the nestjs config module to load them from .env file, you can use the async way.

In your app.module.ts or any other module you want to use the client to publish streams:

import { Module } from '@nestjs/common';
import { RedisStreamClientModule } from '@tamimaj/nestjs-redis-streams';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  // more examples about useClass, useFactory, in the example client app.
  imports: [
    RedisStreamClientModule.registerAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        connection: configService.get('REDIS_CONNECTION'),
        streams: configService.get('REDIS_STREAMS'),
        responseStreams: configService.get('REDIS_RESPONSE_STREAMS'),
      }),
      inject: [ConfigService],
    }),
  ],
})
export class AppModule {}

NestJS will manage to inject the client into your service or controller.

Check the example app to see how to use the client to publish streams.

Send a message and get a response.

In your service or controller:

import { Controller, Get } from '@nestjs/common';
import { RedisStreamClient } from '@tamimaj/nestjs-redis-streams';
import { lastValueFrom } from 'rxjs';

@Controller()
export class AppController {
  constructor(private readonly redisStreamClient: RedisStreamClient) {} // inject the client.

  @Get('/send')
  async sendMessage(): Promise<any> {
    // send a message and get a response.

    const observable = this.redisStreamClient.send('stream:name:here', {
      data: { name: 'tamim' }, // will be JSON.stringify() and stored in the data key.
      anyOtherHeadersKey: 'anyOtherHeadersValue', // header key, will be kept as key/value.
    });

    const response = await lastValueFrom(observable); // get the last value from the observable.

    console.log('response from the stream: ', response);

    return JSON.stringify(response);
  }
}

Emit a message without waiting for a response. (fire and forget)

In your service or controller:

import { Controller, Get } from '@nestjs/common';
import { RedisStreamClient } from '@tamimaj/nestjs-redis-streams';

@Controller()
export class AppController {
  constructor(private readonly redisStreamClient: RedisStreamClient) {} // inject the client.

  @Get('/emit')
  async emitMessage(): Promise<any> {
    // emit a message and don't wait for a response.
    // fire and forget.

    this.redisStreamClient.emit('stream:name:here', {
      data: { name: 'tamim', fireAndForgetEvent: true }, // main key.
      anyOtherHeadersKey: 'anyOtherHeadersValue', // header key, will  be kept as key/value.
    });

    return 'Message Sent.';
  }
}

How our default serialization/deserialization work?

In our library, we provide default serialization and deserialization logic that is tailored for enterprise microservices architecture. Our approach takes into consideration the use of headers and metadata, which can be valuable for various purposes such as authentication tokens or message tracing in logging services like Datadog.

Headers

The headers part of the message comprises key/value pairs that store important information. One crucial header is the correlationId, which serves as a unique identifier for a request. By including the correlationId in the headers section, we ensure that responses carry the same correlationId. This enables us to accurately map responses to their corresponding handlers based on the correlationId stored during the initial request. Having a consistent correlationId throughout the request-response cycle allows for effective tracking and correlation of messages, facilitating seamless communication and response handling.

Data

The data part of the message is represented by a single key, "data", which contains an object as its value. This structure resembles the body of a POST request, allowing you to include any desired data within it. Before sending the message, the data value is transformed into a JSON string using JSON.stringify() and then stored in a stream message. Upon receiving a message, our deserializer parses the JSON and forwards the data to the designated handler.

This default serialization/deserialization mechanism ensures seamless communication and interoperability within an enterprise microservices architecture while providing flexibility and easy integration with existing systems.

Use your custom serializer/deserializer?

We defined holes in our flow to use your custom serializer/deserializer. You can provide them when initializing the strategy (server side) in the main.ts file. Also, you can define your custom serializer/deserializer in the client side, when initializing the client module. You use the key of the options you pass to the constructor: serialization: {serializer, deserializer}

  • The deserializer receive two parameters, the row message as its received from Redis, and the inbound context so you can store your headers there.

  • The serializer receive two parameters, the payload from user-land (your controller, service, etc.), and the inbound context to extract your headers from it and attach them back to response message before publishing it (server side responding to a received message scenario)

Custom Serialization/Deserialization

In addition to our default serialization/deserialization logic, we provide the flexibility for you to use your own custom serializer and deserializer. This allows you to tailor the serialization and deserialization process to meet your specific requirements.

Server-Side Customization

To use your custom serializer/deserializer on the server side, you can pass them as options when initializing the strategy in the main.ts file. By specifying the serialization key in the options, you can provide your custom serializer and deserializer.

The deserializer function takes two parameters: the raw message as received from Redis, and the inbound context. You can use the inbound context to store any headers or metadata related to the message.

The serializer function also takes two parameters: the payload from the user-land (e.g., your controller or service), and the inbound context. You can extract headers from the inbound context and attach them to the response message before publishing it.

For detailed usage examples and implementation details, you can refer to our example main.ts file. We have included commented boilerplate code that demonstrates how to utilize custom serialization.

Please note that using custom serialization/deserialization gives you full control over the message format and allows for seamless integration with your existing systems and infrastructure.

Client-Side Customization

Along with server-side customization, we also provide the ability to customize serialization and deserialization on the client side. When initializing the client module, you can specify various options, including custom serializer and deserializer functions.

To customize serialization and deserialization on the client side, include the serialization key in the options object when initializing the client module. Within the serialization object, you can provide your own serializer and deserializer functions.

  • The serializer function also takes two parameters: the payload from the user-land (e.g., your controller or service), and the outbound context (placeholder at that point the message have not been published).

  • The deserializer function takes two parameters: the raw message as received from Redis (when a stream response arrive) and the inbound context. You can use the inbound context to store any headers or metadata related to the message and just return the parsed message to the user-land.

By plugging in your custom serializer and deserializer functions, you can tailor the serialization and deserialization process to meet your specific needs and seamlessly integrate with your existing systems. When utilizing client-side customization, you have full control over how messages are serialized and deserialized, ensuring compatibility and efficient communication with your microservices ecosystem.

License

MIT

Author

Tamim Abbas Aljuratli

Co-author

Ali Mahdavi Pandelis Zembashis