npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@janiscommerce/sqs-consumer

v0.4.0

Published

A wrapper for SQS Consumers and Lambda

Downloads

1,377

Readme

SQS Consumer

Build Status Coverage Status npm version

A wrapper for SQS Consumers and Lambda

:inbox_tray: Installation

npm install @janiscommerce/sqs-consumer

:hammer: Usage

Your business logic must be implemented as a SQSConsumer. There are two types of consumers exported for implementation ease:

BatchSQSConsumer

This consumer processes all the records in one single call. This is useful for example when you want to fetch some data using some field on every record.

For this consumer, you have to implement the processBatch method. The method signature is the following:

processBatch(records: Array<ParsedSQSRecordWithLogger>): Promise<void> | void

To add a log message for a record, you can use the built-in logger like this:

record[Symbol.for('logger')].info('Some info message');

Logger has been implemented as a Symbol property to ensure that it won't collide with existing properties of the SQS Record

Logging levels follow lllog levels.

IterativeSQSConsumer

This consumer processes one record at a time. This is useful when records are completely unrelated and can be processed in parallel with no dependencies between them, or when you consume only one SQS message per invocation (batchSize = 1).

For this consumer, you have to implement the processSingleRecord method. The method signature is the following:

processSingleRecord(record: ParsedSQSRecord, logger: LogTransport): Promise<void> | void

To add a log message for a record, you can use the logger passed as argument like this:

logger.info('Some info message');

Logging levels follow lllog levels.

Partial failure reporting

To implement Partial failure reporting, you should add each message ID that fails using the method addFailedMessage(messageId).

The lambda will automatically return the failed messages formatted as expected.

:zap: Usage with serverless (lambda)

This package also exports a SQSHandler to easily integrate with AWS Lambda.

Usage is as easy as it can be, just export the following in your lambda:

module.exports.handler = event => SQSHandler.handle(MySQSConsumer, event);

:warning: Advanced usage

Conditional processing

In case you want to process the messages in batch in some cases and individually in others, you can extend the handlesBatch method to implement your own custom logic. The method's signature is the following:

handlesBatch(event: SQSEvent): boolean

Important: This method must be synchronous

Message formatting

This package expects each message body to be a JSON string and will fail if it's not.

In case you want to parse the records in a different way (or silently fail if format is invalid) you can override the parseRecord method. The method's signature is the following:

parseRecord(record: SQSRecord): ParsedSQSRecord

Important: This method must be synchronous

:computer: Examples

Lambda Batch consumer

Process a batch of new ratings of a product and save them as not-verified

const {
	SQSHandler,
	BatchSQSConsumer
} = require('@janiscommerce/sqs-consumer');

const DbHandler = require('./your-db-handler');

class MyBatchConsumer extends BatchSQSConsumer {

	async processBatch(records) {

		const ratings = records.map(({ body }) => ({
			rating: body.rating,
			verified: false
		}));

		return DbHandler.insertMany(ratings);
	}
}

module.exports.handler = event => SQSHandler.handle(MyBatchConsumer, event);

Lambda Iterative consumer

Process a batch of orders placed in you ecommerce and send an email for each of them

const {
	SQSHandler,
	IterativeSQSConsumer
} = require('@janiscommerce/sqs-consumer');

const MailingService = require('./your-mailing-service');

class MyIterativeConsumer extends IterativeSQSConsumer {

	async processSingleRecord(record, logger) {

		const { body: orderPlaced } = record.body;

		logger.info(`Sending email for order ${orderPlaced.id}`);

		return MailingService.sendTemplate('orderPlaced', orderPlaced);
	}
}

module.exports.handler = event => SQSHandler.handle(MyIterativeConsumer, event);

Validate with Struct (Optional)

When you declare a struct, before any process, all records are validated and only continue if pass the validation, this validations should return a valid struct.

You must declare a get struct() in your class.

const {
	SQSHandler,
	IterativeSQSConsumer
} = require('@janiscommerce/sqs-consumer');
const { struct } = require('@janiscommerce/superstruct');

class MyConsumer extends IterativeSQSConsumer {

	get struct() {
		return struct.partial({
			name: 'string'
		});
	}

}

module.exports.handler = event => SQSHandler.handle(MyConsumer, event);

Session injection

This package implements API Session. In order to associate a request to a session, the record should be contain the property janis-client in the messageAttributes.

In case the messageAttribute is set, you can access the session in your Consumer as this.session. Otherwise, this.session will be undefined.

Session details and customization details can be found in api-session README.