npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

loopback4-message-bus-connector

v2.0.4

Published

loopback4-message-bus-connectors package provides connectors to interact with message bus services like AWS SQS, BullMQ, and AWS Event Bridge.

Readme

loopback4-message-bus-connector

Overview

This is a LoopBack 4 extension for adding message queue and event based communication to your LoopBack applications. It provides a unified and extensible interface for working with different queuing systems.

Supported Connectors

  • SQSConnector – Integrates with AWS SQS using @aws-sdk/client-sqs. Supports both message sending and consumption with polling, visibility timeout, etc.

  • BullMQConnector – Integrates with BullMQ (Redis-based queue). Supports advanced job options like retries, backoff, consumer concurrency, and job polling.

  • EventBridge - Allows sending events to AWS EventBridge with support for event buses and schemas. Provides the HTTPS endpoint for receiving events.

🧩 Core Features

  • Component Based Approach Central registry for components, enabling multi-bus usage in a single application.

  • @producer() Decorator Injects a producer for sending single or multiple typed events to any configured bus.

  • @consumer Decorator Registers a service class as a consumer for a specific event and queue, handling messages automatically.

  • IProducer Interface Exposes send() and sendMultiple() methods to send messages to buses.

  • IConsumer Interface Allows you to implement a handler for a specific event type and bus, supporting strongly typed data flow.

  • Typed Event Streams Encourages defining typed contracts for all events, improving consistency and type safety between producers and consumers.

You can configure one or more of the supported queue types in your application. For each, you simply provide the required connection and queue configuration. The rest—producer/consumer setup, bindings, and event handling—is abstracted and managed by the extension.

Installation

Install EventStreamConnectorComponent using npm;

$ [npm install | yarn add] loopback4-message-bus-connector

Flow Diagram

screencapture-kzmkc5owuvsij9sl8eox-lite-vusercontent-net-2025-06-29-09_06_14

Basic Use

Configure and load EventStreamConnectorComponent in the application constructor as shown below.

import {
  EventStreamConnectorComponent
} from 'loopback4-message-bus-connector';

// ...
export class MyApplication extends BootMixin(
  ServiceMixin(RepositoryMixin(RestApplication)),
) {
  constructor(options: ApplicationConfig = {}) {
    super();
    this.component(EventStreamConnectorComponent);
    // ...
  }
  // ...
}

SQS

To use SQS as their message queue, bind its required config and connector component in your application.

import {
  SQSConnector,
  SQSBindings,
  EventStreamConnectorComponent
} from 'loopback4-message-bus-connector';

// ...
export class MyApplication extends BootMixin(
  ServiceMixin(RepositoryMixin(RestApplication)),
) {
  constructor(options: ApplicationConfig = {}) {
    super();

    this.component(EventStreamConnectorComponent);
    // SQS Config and its connector
    this.bind(SQSBindings.Config).to({
      queueConfig: {
        QueueUrl: 'http://127.0.0.1:4566/000000000000/my-test-queue',
        MessageRetentionPeriod: 60, // at least 60 seconds
        MaximumMessageSize: 262144,
        ReceiveMessageWaitTimeSeconds: 20, // typical polling time
        VisibilityTimeout: 30, // 30 seconds
      },
      Credentials: {
        region: 'us-east-1',
        accessKeyId: 'test',
        secretAccessKey: 'test',
      },
      ConsumerConfig: {
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20,
        maxConsumers: 2,
      },
    });

    this.component(SQSConnector);

    // ...
  }
  // ...
}

to make the application as consumer, pass 'isConsumer' flag to be true in SQS config. like

const config = {
  // rest of ur config
  isConsumer: true,
};

Please follow the AWS SDK for JavaScript for more information on the configuration.

BullMQ

To use BullMq as their message queue, bind its required config and connector component in your application.

import {
  BullMQConnector,
  BullMQBindings,
  EventStreamConnectorComponent,
} from 'loopback4-message-bus-connector';

// ...
export class MyApplication extends BootMixin(
  ServiceMixin(RepositoryMixin(RestApplication)),
) {
  constructor(options: ApplicationConfig = {}) {
    super();

    this.component(EventStreamConnectorComponent);

    // Bull Mq config and connector
    this.bind(BullMQBindings.Config).to({
      QueueName: process.env.QUEUE_NAME ?? 'default-queue',
      redisConfig: {
        host: process.env.REDIS_HOST ?? 'localhost',
        port: parseInt(process.env.REDIS_PORT ?? '6379'),
        password: process.env.REDIS_PASSWORD ?? undefined,
      },
      producerConfig: {
        defaultJobOptions: {
          attempts: 3,
          backoff: 5000,
        },
      },
      consumerConfig: {
        MinConsumers: 1,
        MaxConsumers: 5,
        QueuePollInterval: 2000,
      },
    });
    this.component(BullMQConnector);
    // ...
  }
  // ...
}

to make the application as consumer, pass 'isConsumer' flag to be true in Bull config. like

const config = {
  // rest of ur config
  isConsumer: true,
};

Integration

loopback4-message-bus-connector provides a decorator '@producer()' that can be used to access the producer of each msg queue. It expects one arguement defining the type of queue, of which producer u want to use. like

@injectable({scope: BindingScope.TRANSIENT})
export class EventConnector implements IEventConnector<PublishedEvents> {
 constructor(
   @producer(QueueType.EventBridge)
   private producer: Producer,
   @producer(QueueType.SQS)
   private sqsProducer: Producer,
   @producer(QueueType.BullMQ)
   private bullMqProducer: Producer,
 ) {}

 // rest of implementation

}

Producer provider two ways of sending events - single event at a time and multiple event at a time.

export type Producer<Stream extends AnyObject = AnyObject> = {
   send: <Event extends keyof Stream>(data: Stream[Event], topic?: Event) => Promise<void>;
   sendMultiple: <Event extends keyof Stream>(data: Stream[Event][], topic?: Event) => Promise<void>;
};

It provides '@consumer' decorator to make a service as consumer. consumer needs to follow an interface.

export interface IConsumer<Stream extends AnyObject, Event extends keyof Stream> {
    event: Event;
    queue: QueueType;
    handle(data: Stream[Event]): Promise<void>;
}

and can be used as

import {
  IConsumer,
  QueueType,
  consumer,
} from 'loopback4-message-bus-connector';
import { OrchestratorStream, EventTypes, ProvisioningInputs } from '../../types';

@consumer
export class TenantProvisioningConsumerForEventSQS
  implements IConsumer<OrchestratorStream, EventTypes.TENANT_PROVISIONING>
{
  constructor(
  ) {}
  event: EventTypes.TENANT_PROVISIONING = EventTypes.TENANT_PROVISIONING;
  queue: QueueType = QueueType.SQS;
  async handle(data: ProvisioningInputs): Promise<void> {    
    console.log(`SQS: ${this.event} Event Recieved ` + JSON.stringify(data));
    return;
  }
}