npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@rewaa/event-broker

v6.1.5

Published

A broker for all the events that Rewaa will ever produce or consume

Downloads

57,067

Readme

event-broker

A broker for producing and consuming messages across multiple micro-services.

This package is intended to abstract out the functionality of an event broker keeping the underlying client hidden.

For now, the underlying client is SNS+SQS. So this package requires AWS and is limited in functionality by the quotas for SNS and SQS respectively.

Features

  • Multiple exchange types: Fanout and Queue
  • Dead Letter Queues
  • Batch emit/consume
  • Updates for topic properties like retention period, high throughput etc.
  • Serverless support
  • Localstack support

Supported Exchange Types

The broker supports 2 exchange types currently:

  • Queue: This means that the topic will have 1 - 1 mapping between the producer and consumer i.e. only 1 consumer can consume the messages on this type of topic.

  • Fanout: This type of exchange means a 1 - Many mapping between the producer and consumer. So messages on this type of topic can be consumed by multiple consumers. For fanout topics, message filtering is also supported.

Serverless Support

Topics can be mapped to lambda functions for consumption. The broker supports specifying lambda functions along with the batch size in which case it does the mapping by itself. However it is recommended to do the mapping in the serverless file. For this purpose, the broker also exposes helper functions to get the internally generated ARNs of both the topic and queues. A serverless plugin can be used in this case. The broker also exposes a method processMessage which takes in any consumed message and executes it as per the mapping it has. This can be used in case you are using only 1 lambda for all types of topics. Since the broker knows about the mapping of events to functions, it will handle the execution.

Offline Support

This broker package can be simulated on an offline platform like localstack. As long as Lambda, SQS and SNS are on the same network the broker will work. For offline support, the broker takes as input any options for sqs, sns or lambda like the endpoint and a flag isLocal which when true will use the provided endpoints.

For serverless, serverless-offline-sqs can be used to redirect sqs to localstack so that the event source mapping can be entertained by serverless itself.

Usage

Initialising the broker

import {
	Emitter,
	IEmitterOptions,
} from  "@rewaa/event-broker";
import { EventEmitter } from  'events';

const env = `local`;
const region = `us-east-1`;
const emitterOptions: IEmitterOptions = {
	environment: env,
	localEmitter: new EventEmitter(),
	useExternalBroker: `true`,
	awsConfig: {
		accountId: `000000000000`,
		region
	},
	log:  true,
};

if(env === 'local') {
	emitterOptions.isLocal = true;
	emitterOptions.lambdaConfig = {
		endpoint: `http://localhost:4000`,
		region
	}
	emitterOptions.sqsConfig = {
		endpoint: `http://localhost:4566`,
		region
	}
	emitterOptions.snsConfig = {
		endpoint:  `http://localhost:4566`,
		region
	}
}

const emitter = new Emitter(this.emitterOptions);

Consuming from topics

import { ExchangeType } from  '@rewaa/event-broker';

interface Notification {
	name: string;
	payload: any;
}

emitter.on<Notification>("Notification",
	async (...data) => {
		const  input = data[0]; // typeof === Notification
		// Do something with the Notification
	}, {
		isFifo: false,
		exchangeType:  ExchangeType.Queue,
		deadLetterQueueEnabled:  true,
	}
);

Emitting to topics

import { ExchangeType } from  '@rewaa/event-broker';

const notification = {
	name: `Some notification`,
	payload: {
		text: `Hello`
	}
}

await emitter.emit("Notification", {
	exchangeType:  ExchangeType.Queue,
	isFifo:  false
}, notification);

Fanout

For fanout, the exchange type of message must be 'Fanout'.

Emitter:

import { ExchangeType } from  '@rewaa/event-broker';

const notification = {
	name: `Some notification`,
	payload: {
		text: `Hello`
	}
}

await emitter.emit("Notification", {
	exchangeType:  ExchangeType.Fanout,
	isFifo:  false
}, notification);

Consumer 1:

import { ExchangeType } from  '@rewaa/event-broker';

emitter.on<Notification>("Notification",
	async (...data) => {
		const  input = data[0]; // typeof === Notification
		// Do something with the Notification
	}, {
		isFifo: false,
		exchangeType:  ExchangeType.Fanout,
		deadLetterQueueEnabled:  true,
		separateConsumerGroup: "consumer_group_1"
	}
);

Consumer 2:

import { ExchangeType } from  '@rewaa/event-broker';

emitter.on<Notification>("Notification",
	async (...data) => {
		const  input = data[0]; // typeof === Notification
		// Do something with the Notification
	}, {
		isFifo: false,
		exchangeType:  ExchangeType.Fanout,
		deadLetterQueueEnabled:  true,
		separateConsumerGroup: "consumer_group_2"
	}
);

Deploying

The deployment of resources created by the broker is a separate process extracted out in the method bootstrap.

This method takes an optional array of topics which can be useful for serverless case where we might not have attached the consumer by calling the on method. if you have called the on method on the emitter object for all the topics before calling bootstrap, then the topics array is not required.

bootsrap can be called only once during deployment. The APIs used internally are idempotent so providing the same topics won't create duplicate resources.

Updating Topic Properties

Following table shows which properties are automatically updated when bootstrapping if changed in a topic:

| Property | Updated | | ------------- | ------------- | | visibilityTimeout | Yes | | batchSize | Yes | | maxRetryCount | Yes | | deadLetterQueueEnabled | Yes, only attaches/detaches the DLQ. Doesn't delete it| | separateConsumerGroup | Yes, creates a new queue. Old one is not deleted | | enableHighThroughput | Yes | | retentionPeriod | Yes | | contentBasedDeduplication | Yes |

Roadmap

The following things are part of the roadmap for the broker:

  • Schema Registry: Adding a layer for registering topics and providing validations for schema. This will also help in eliminating the options that are required to be provided while emitting using emit or attaching a consumer using on. The registration part can be part of the deployment phase.