npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

service-bus-azure-watcher

v1.0.10

Published

Read azure bus service queue messages to be processed by an user

Downloads

11

Readme

Service Bus Azure Watcher

Small library to help handle Azure Service Bus queue messages using azure peek message strategy. This library retrieves a max number of messages (specified in concurrency variable) and perform next flow:

Changelog

Version 1.0.8

  • Add more info data for getWatcherInfo

Version 1.0.7

  • Fix bug to avoid non messages comsuption when Azure Service Bus has connection problems (connection reset, timeout, etc)
  • Added method getWatcherInfo to retrieve util watcher information for debug purposes

How this library works

  1. Read message from Azure Service Bus
  2. Pass message to user callback function
  3. User function is being executed and can goes well or fail
    1. user function calls done() to indicate a successful operation (processing the message)
    2. user function calls done(err) to indicate a failure operation
  4. If user function goes well, the library will delete the message
  5. If user function goes wrong, the library will release the message to be available later

Some problems can occur in the process of retrieve message, delete message or release message (unlock):

  • Unexpected but common errors: specially network problems like unreach server, connection reset, etc
  • Expected errors: Logical errors like tyring to delete a message using an expired lock id (depending on azure configuration "Lock duration")

Handle errors emitter by the library

For network problem, this library will try to perform the call 3 times before emit an error.

To catch this errors, this library provide by an error listener which can tell us what happened. For any kind of error, this library will provide us an ErrorMessage object given us next attributes: status, message and queueMessage.

Here, we can see the available error status messages (err.status):

  • on_error_read_message_from_azure (cause by network problems)
  • queue_not_found
  • on_unlock_message_from_azure
  • on_remove_unlock_message_from_azure
  • on_unlock_message_from_azure_max_attempts
  • on_delete_message_from_azure_max_attempts

In next versions, error messages will increase to have better knowledge about what is happening.

Performance benefits

To avoid call Azure Service Bus if there is no messages, user concurrency can be modified dynamically. This happens when all messages were delivered and processed, so, concurrency will be decreased to one and there is an internal checker to see if there are more messages to increase the concurrency according to that but never without pass the limit specified initially by the user.

Installation

npm install service-bus-azure-watcher

ServiceBusWatcher initialization

Constructor of ServiceBusWatcher needs:

  • serviceBus: an azure serviceBusService
  • queueName: The name of the queue where to retrieve the messages
  • concurrency: Max simultaneous messages to be handle

Usage

const azure = require('azure');
const ServiceBusAzureWatcher = require('service-bus-azure-watcher');

const AZURE_SERVICE_BUS_KEY = 'YOUR_SERVICE_BUS_CONNECTION_STRING';
const QUEUE_NAME = 'YOUR_QUEUE_NAME';
const CONCURRENCY = 50;

const retryOperations = new azure.ExponentialRetryPolicyFilter();
const serviceBus = azure.createServiceBusService(AZURE_SERVICE_BUS_KEY).withFilter(retryOperations);
const myServiceBusWatcher = new ServiceBusAzureWatcher(serviceBus, QUEUE_NAME, CONCURRENCY);

/**
* User function that process a message. When finish, it's necessary to notify "done" function.
* This "done" function allow one error parameter to indicate user operation failed.
* @param {Object} message Azure message
* @param {Function} done callback function to notify service bus
*  watcher that user finished to process the message
* */
myServiceBusWatcher.onMessage((message, done) => {
    console.log('received message', message.body);
    done();
    // if user operation failed, you need to call done('some problem') or done(new Error('some problem'))
});

/**
* @param {Object} err This error is an instace of ErrorMessage:
*  stack, error message, status and queueMessage (undefined by default) are available
* */
myServiceBusWatcher.onError((err) => {
    console.log('user function onError', err.message, err.status, err.queueMessage);
});

myServiceBusWatcher.start();