npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

nest-nsq-transport

v0.3.0

Published

NestJS microservice transport for NSQ

Readme

nest-nsq-transport

The most basic and unopinionated implementation of NSQ transport for NestJS microservices.

finish() is called automatically when no errors are thrown while handling, otherwise requeue() is called.



install

npm i nest-nsq-transport nsqjs
npm i -D @types/nsqjs

configure

setup server:

import { Message } from 'nsqjs';
import { NSQStrategy, NSQStrategyOptions, NSQPattern } from 'nest-nsq-transport';

NestFactory.createMicroservice(
  AppModule,
  {
    strategy: new NSQStrategy(<NSQStrategyOptions>{
      // Optional deserializer, please see implementation in the sources.
      //
      // Default deserializer converts message data from Buffer to utf8 string.
      // Could be replaced with the built-in JSONDeserializer or a custom one.
      deserializer: new MyDeserializer();

      // optional default options that will be passed to all nsqjs.Reader's of
      // each channel. It's possible to skip this field and describe options
      // for each channel separately with `channels` field
      defaultChannelOptions: {
        nsqdTCPAddresses: [process.env.NSQD_TCP_ADDR],
        // or
        lookupdHTTPAddresses: [process.env.LOOKUPD_HTTP_ADDR],

        // and all the other options, see here:
        // https://github.com/dudleycarr/nsqjs#new-readertopic-channel-options

        // with one extra option:
        // as `requeue` is called automatically on error, there is no way to
        // pass parameters, but it's possible to provide either default args:
        requeueParams: [1, true],
        // or a callback
        requeueParams: (message: Message, pattern: NSQPattern) => [1, true],
      },

      // Optional map of topic -> channel -> options
      channels: {
        'my-topic': {
          'my-channel': {
            // this will be merged with the defaultChannelOptions
          },
        },
      },
    });
  },
)

setup client:

import { ClientsModule } from '@nestjs/microservices';
import { NSQClient, NSQClientOpitons } from 'nest-nsq-transport';

export const clientToken = Symbol();

@Module({
  imports: [
    ClientsModule.register([
      {
        name: clientToken,
        customClass: NSQClient,
        options: <NSQClientOpitons>{
          // Optional serializer, please see implementation in the sources.
          // 
          // Default serializer converts emitted data to a string and pass it as
          // a Buffer to nsqjs.Writer.publish method. Could be replaced with the
          // built-in JSONSerializer or a custom one.
          serializer: new MySerializer(),

          // Connections options:
          // ether full nsqd tcp address
          nsqdURL: process.env.NSQD_TCP_ADDR, // in `host:port` pattern
          // or host and port separately
          nsqdHost: process.env.NSQD_HOST,
          nsqdPort: Number(process.env.NSQD_PORT),

          // Other options from nsqjs.ConnectionConfigOptions, see here:
          // https://github.com/dudleycarr/nsqjs#new-writernsqdhost-nsqdport-options
          // ...
        },
      },
    ]),
  ],
})
class AppModule {}

usage

import { ClientProxy, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NSQContext } from 'nest-nsq-transport';

@Controller()
class TestController {
  constructor(
    // Token that passed to ClientsModule.register
    @Inject(clientToken) private readonly client: ClientProxy
  ) {}

  // To get only payload no decorators needed
  @EventPattern('my-topic/my-channel')
  async handle(payload: MyType) {
    //
  }

  // Decorators are required to get context
  @EventPattern('my-topic/my-channel-two')
  async handle(
    @Payload() payload: MyType,
    @Ctx() ctx: NSQContext
  ) {
    // 
  }

  async emit() {
    // This is an example how to emit data. Serializer which converts data to
    // Buffer have to be provided when client is registered. There are two
    // built-in serializers: JSONSerializer and StringSerializer
    await this.client.emit('my-topic', data as MyType).toPromise();
  }

  // Also if `maxAttempts` is set in NSQStrategyOptions message could be
  // discarded. It's possible to subscribe on discarded messages in a separate
  // handler adding `/discard` suffix to pattern. If `maxAttempts` is set and
  // such separate discard-handler is not provided then discarded message will
  // be sent to the original handler again. But it's always possible to check
  // current attempt with `ctx.message.attempts`, which starts from 1.
  @EventPattern('my-topic/my-channel/discard')
  async handle(
    @Payload() payload: MyType,
    @Ctx() ctx: NSQContext
  ) {
    await this.client.emit(
      `${ctx.pattern.topic}-${ctx.pattern.channel}-dead-letter`,
      payload,
    ).toPromise()
  }
}