npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@nestjs-redis/streams-transporter

v1.2.0

Published

Redis Streams-based transporter for NestJS microservices, enabling message passing via Redis Streams

Downloads

411

Readme

@nestjs-redis/streams-transporter

Custom NestJS microservices transporter using Redis Streams with event and request/response patterns

npm version npm downloads License: MIT TypeScript NestJS Redis


Features

  • Redis Streams–based transport: Events and requests stored as stream entries; replies written to per-client reply streams
  • Event and request/response: Fire-and-forget events (dispatchEvent) and request/response via send() with routing callbacks
  • Consumer groups: Server uses XREADGROUP + XACK for at-least-once delivery and horizontal scaling
  • Configurable options: Stream prefix, consumer group/name, block timeout, batch size, max stream length (MAXLEN trim), retry delay
  • NestJS integration: RedisStreamsContext (stream name, message id, consumer group/name) passed to handlers; optional onProcessingStartHook / onProcessingEndHook
  • Type-safe: Event/request/response type guards and resolved options

Installation

npm install @nestjs-redis/streams-transporter redis

Quick Start

Server (microservice)

// main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { RedisStreamServer } from '@nestjs-redis/streams-transporter';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new RedisStreamServer({
        url: 'redis://localhost:6379',
        streamPrefix: '_microservices',
        consumerGroup: 'nestjs-streams',
        consumerName: 'my-consumer',
      }),
    },
  );

  await app.listen();
}
bootstrap();

Client (hybrid app or separate service)

// app.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisStreamClient } from '@nestjs-redis/streams-transporter';
import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'STREAMS_SERVICE',
        customClass: RedisStreamClient,
        options: {
          url: 'redis://localhost:6379',
          streamPrefix: '_microservices',
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
// app.controller.ts
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';

@Controller()
export class AppController {
  constructor(
    @Inject('STREAMS_SERVICE') private readonly client: ClientProxy,
  ) {}

  @Get('echo')
  async echo() {
    return firstValueFrom(this.client.send('user.echo', { hello: 'world' }));
  }
}

Event handlers (server)

// app.controller.ts
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload } from '@nestjs/microservices';
import { Ctx } from '@nestjs/microservices';
import { RedisStreamsContext } from '@nestjs-redis/streams-transporter';

@Controller()
export class AppController {
  @MessagePattern('user.echo')
  echo(@Payload() data: object, @Ctx() ctx: RedisStreamsContext) {
    return { ok: true, data };
  }

  @EventPattern('user.created')
  onUserCreated(@Payload() data: object, @Ctx() ctx: RedisStreamsContext) {
    // fire-and-forget; no reply
    console.log('User created', data, ctx.getStreamName(), ctx.getMessageId());
  }
}

Options

RedisStreamServer and RedisStreamClient accept RedisStreamsOptions, which extends Redis client options and adds:

| Option | Default | Description | | ----------------- | ------------------------------------- | ------------------------------------------------------------------------- | | streamPrefix | '_microservices' | Prefix for request streams (e.g. {prefix}:user.echo) and reply streams. | | consumerGroup | 'nestjs-streams' | Consumer group name for server XREADGROUP. | | consumerName | '' (then consumer-${process.pid}) | Consumer name in the group. | | blockTimeout | 100 | Block timeout (ms) for XREAD / XREADGROUP. | | batchSize | 50 | Max entries per read (COUNT). | | maxStreamLength | 10000 | Max length for streams; XADD ... TRIM MAXLEN ~ is used on add. | | retryDelay | 250 | Delay (ms) before retrying after a read/connection error. |

Use resolveRedisStreamsOptions(options) to get a fully resolved options object (all optional fields filled with defaults).

API

  • RedisStreamClient – NestJS ClientProxy implementation. Connects to Redis, publishes events/requests to streams, listens for replies on a dedicated reply stream and dispatches to routingMap callbacks. close() flushes callbacks with an error, deletes the reply stream, and quits the client.
  • RedisStreamServer – NestJS CustomTransportStrategy. Creates consumer groups, consumes via XREADGROUP, ACKs with XACK, invokes message/event handlers with RedisStreamsContext, and writes replies to the client’s reply stream.
  • RedisStreamsContext – Context passed to handlers (like Nest’s RPC context). Methods: getStreamName(), getMessageId(), getConsumerGroup(), getConsumerName().
  • RedisStreamsOptions / RedisStreamsResolvedOptions – Options and resolved type; resolveRedisStreamsOptions(options) – returns resolved options.

Stream entry shape:

  • Events: e: '1', data (JSON).
  • Requests: e: '0', id, replyTo, data (JSON).
  • Responses: id, data or err (JSON), isDisposed: '1'.

Links

Contributing

Please see the root contributing guidelines.

License

MIT © CSenshi