npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@omnihash/nestjs-azure-storage-queue

v1.0.8

Published

A NestJS module for Azure Storage Queue listening

Readme

Omnihash + Nest

A NestJS module for seamless Azure Storage Queue integration with automatic message processing through decorators.

NOTE: This module is currently under development. Please do not use in production.


Features

Decorator-based Queue Handlers - Use simple decorators to mark methods as queue message processors
🔄 Automatic Message Polling - Built-in polling mechanism with configurable intervals
⚙️ Flexible Configuration - Support for both synchronous and asynchronous configuration
🛡️ Error Handling - Automatic retry logic with configurable dequeue limits
📝 Comprehensive Logging - Built-in logging for monitoring and debugging
🚀 Auto-discovery - Automatically discovers and registers queue handlers at startup
🧩 Type Safety - Support for typed messages to improve developer experience


Installation

npm install @omnihash/nestjs-azure-storage-queue @azure/storage-queue
# or
yarn add @omnihash/nestjs-azure-storage-queue @azure/storage-queue

Quick Start

1. Configure Environment

Create a .env file in your project root:

AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=youraccount;AccountKey=yourkey;EndpointSuffix=core.windows.net

2. Setup Module

// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AzureStorageQueueModule } from '@omnihash/nestjs-azure-storage-queue';
import { MessageProcessorService } from './message-processor.service';

@Module({
  imports: [
    ConfigModule.forRoot(),
    AzureStorageQueueModule.forRootAsync({
      useFactory: (configService: ConfigService) => {
        const connectionString = configService.get<string>(
          'AZURE_STORAGE_CONNECTION_STRING',
        );
        if (!connectionString) {
          throw new Error('AZURE_STORAGE_CONNECTION_STRING must be defined');
        }
        return {
          connectionString,
          defaultPollingInterval: 5000,
          defaultVisibilityTimeout: 30,
          defaultMaxDequeueCount: 5,
        };
      },
      inject: [ConfigService],
    }),
  ],
  providers: [MessageProcessorService],
})
export class AppModule {}

3. Create Message Processors

// message-processor.service.ts
import { Injectable, Logger } from '@nestjs/common';
import {
  AzureStorageQueueHandler,
  AzureStorageQueueMessage,
} from '@omnihash/nestjs-azure-storage-queue';

// Define your custom message types for type safety
interface UserNotification {
  userId: string;
  message: string;
  timestamp: string;
}

interface OrderData {
  orderId: string;
  customerName: string;
  items: Array<{ id: string; quantity: number }>;
  total: number;
}

@Injectable()
export class MessageProcessorService {
  private readonly logger = new Logger(MessageProcessorService.name);

  @AzureStorageQueueHandler({
    queueName: 'user-notifications',
    pollingInterval: 3000,
    visibilityTimeout: 30,
    maxDequeueCount: 1,
    maxMessages: 10,
  })
  async handleUserNotifications(
    message: AzureStorageQueueMessage<UserNotification>,
  ) {
    this.logger.log(`Processing user notification: ${message.id}`);

    // Strongly typed message body
    const notification = message.body;
    this.logger.log(
      `Notification for user ${notification.userId}: ${notification.message}`,
    );

    // Simulate processing
    await new Promise((resolve) => setTimeout(resolve, 1000));

    this.logger.log(`Completed processing message: ${message.id}`);
  }

  @AzureStorageQueueHandler({
    queueName: 'order-processing',
    pollingInterval: 1000,
    visibilityTimeout: 60,
    maxMessages: 20,
    maxDequeueCount: 10,
  })
  async handleOrderProcessing(message: AzureStorageQueueMessage<OrderData>) {
    this.logger.log(`Processing order: ${message.id}`);

    try {
      // Strongly typed message body
      const orderData = message.body;

      // Full type safety with IDE intellisense
      this.logger.log(
        `Processing order ${orderData.orderId} for ${orderData.customerName}`,
      );
      this.logger.log(`Order contains ${orderData.items.length} items`);
      this.logger.log(`Total amount: $${orderData.total}`);

      // Process order logic here

      this.logger.log(`Order processed successfully: ${orderData.orderId}`);
    } catch (error) {
      this.logger.error(`Failed to process order: ${error.message}`);
      throw error; // This will cause the message to be retried
    }
  }
}

Configuration

Module Configuration Options

| Option | Type | Description | Default | | -------------------------- | -------- | ------------------------------------------------- | ---------- | | connectionString | string | Azure Storage connection string | required | | defaultPollingInterval | number | Default polling interval in milliseconds | 5000 | | defaultVisibilityTimeout | number | Default message visibility timeout in seconds | 30 | | defaultMaxDequeueCount | number | Default maximum dequeue count before poison queue | 5 |

Queue Handler Options

| Option | Type | Description | Default | | ------------------- | -------- | ------------------------------------- | -------------- | | queueName | string | Name of the Azure Storage Queue | required | | pollingInterval | number | Polling interval in milliseconds | Module default | | visibilityTimeout | number | Message visibility timeout in seconds | Module default | | maxMessages | number | Maximum messages to retrieve per poll | 1 | | maxDequeueCount | number | Maximum dequeue count before deletion | Module default |


Advanced Usage

Static Configuration

// app.module.ts
import { AzureStorageQueueModule } from '@omnihash/nestjs-azure-storage-queue';

@Module({
  imports: [
    AzureStorageQueueModule.forRoot({
      connectionString: 'your-connection-string',
      defaultPollingInterval: 3000,
      defaultVisibilityTimeout: 45,
      defaultMaxDequeueCount: 3,
    }),
  ],
})
export class AppModule {}

Sending Messages

import { Injectable } from '@nestjs/common';
import { AzureStorageQueueService } from '@omnihash/nestjs-azure-storage-queue';

// Define your message types
interface UserNotification {
  userId: string;
  message: string;
  timestamp: string;
}

interface OrderUpdate {
  orderId: string;
  status: string;
  updatedAt: string;
}

@Injectable()
export class NotificationService {
  constructor(private readonly queueService: AzureStorageQueueService) {}

  async sendNotification(userId: string, message: string) {
    // Create a strongly-typed message
    const notification: UserNotification = {
      userId,
      message,
      timestamp: new Date().toISOString(),
    };

    // Send typed message - automatic serialization
    await this.queueService.sendMessage<UserNotification>(
      'user-notifications',
      notification,
    );
  }

  async sendOrderUpdate(orderId: string, status: string) {
    // Create a strongly-typed message
    const update: OrderUpdate = {
      orderId,
      status,
      updatedAt: new Date().toISOString(),
    };

    // Send typed message - automatic serialization
    await this.queueService.sendMessage<OrderUpdate>(
      'order-processing',
      update,
    );
  }

  // You can still send simple string messages
  async sendSimpleMessage(queueName: string, text: string) {
    await this.queueService.sendMessage(queueName, text);
  }
}

Multiple Queue Handlers with Type Safety

import { Injectable, Logger } from '@nestjs/common';
import { AzureStorageQueueHandler, AzureStorageQueueMessage } from '@omnihash/nestjs-azure-storage-queue';

@Injectable()
export class MultiQueueProcessor {
  private readonly logger = new Logger(MultiQueueProcessor.name);

  // High priority tasks with custom types
  interface HighPriorityTask {
    taskId: string;
    priority: number;
    data: Record<string, unknown>;
  }

  @AzureStorageQueueHandler({
    queueName: 'high-priority',
    pollingInterval: 1000,
    maxMessages: 5
  })
  async handleHighPriority(message: AzureStorageQueueMessage<HighPriorityTask>) {
    const task = message.body;
    this.logger.log(`High priority task ${task.taskId} with priority ${task.priority}`);
    // Handle high priority messages with type safety
  }

  // Simple string messages for low priority queue
  @AzureStorageQueueHandler({
    queueName: 'low-priority',
    pollingInterval: 10000,
    maxMessages: 10,
  })
  async handleLowPriority(message: AzureStorageQueueMessage<string>) {
    this.logger.log(`Low priority message: ${message.body}`);
    // Handle simple string messages
  }

  // Batch processing with complex objects
  interface BatchItem {
    items: Array<{ id: string; action: string }>;
    batchId: string;
    processingOptions: {
      parallel: boolean;
      timeout: number;
    };
  }

  @AzureStorageQueueHandler({
    queueName: 'batch-processing',
    pollingInterval: 5000,
    maxMessages: 32, // Azure Storage Queue max
    visibilityTimeout: 120
  })
  async handleBatchProcessing(message: AzureStorageQueueMessage<BatchItem>) {
    const batch = message.body;
    this.logger.log(`Processing batch ${batch.batchId} with ${batch.items.length} items`);

    // Type-safe access to all properties
    if (batch.processingOptions.parallel) {
      this.logger.log(`Processing items in parallel with ${batch.processingOptions.timeout}ms timeout`);
    }

    // Process batch items
    for (const item of batch.items) {
      this.logger.log(`- Item ${item.id}: ${item.action}`);
    }
  }
}

Message Format

Messages received by your handlers will have the following structure:

interface AzureStorageQueueMessage<T = string> {
  id: string; // Message ID
  body: T; // Message content with type parameter
  dequeueCount: number; // Number of times dequeued
  insertedOn: Date; // When message was inserted
  expiresOn: Date; // When message expires
}

The generic type T allows you to specify the type of the message body. It defaults to string if not specified.


Error Handling

Automatic Retry Logic

When a handler throws an error, the message becomes visible again after the visibilityTimeout and will be retried. If a message exceeds maxDequeueCount, it will be automatically deleted (poison message handling).

Custom Error Handling

@AzureStorageQueueHandler({
  queueName: 'error-prone-queue',
  maxDequeueCount: 3,
})
async handleWithErrors(message: AzureStorageQueueMessage) {
  try {
    // Process message
    await this.processMessage(message.body);
  } catch (error) {
    this.logger.error(`Processing failed: ${error.message}`, {
      messageId: message.id,
      dequeueCount: message.dequeueCount,
    });

    if (message.dequeueCount >= 2) {
      // Send to dead letter queue or alert
      await this.handlePoisonMessage(message);
    }

    throw error; // Re-throw to trigger retry
  }
}

Schema Validation

You can combine typed messages with schema validation libraries for runtime safety:

import { z } from 'zod';
import { validateOrReject } from 'class-validator';
import { plainToInstance } from 'class-transformer';
import { AzureStorageQueueHandler, AzureStorageQueueMessage } from '@omnihash/nestjs-azure-storage-queue';

// Option 1: Using Zod
const UserSchema = z.object({
  userId: z.string(),
  email: z.string().email(),
  preferences: z.object({
    notifications: z.boolean(),
    theme: z.enum(['light', 'dark']),
  }),
});

type User = z.infer<typeof UserSchema>;

@AzureStorageQueueHandler({
  queueName: 'user-updates',
})
async handleUserUpdates(message: AzureStorageQueueMessage<User>) {
  try {
    // Validate at runtime
    UserSchema.parse(message.body);

    // Process validated message
    await this.processUserUpdate(message.body);
  } catch (error) {
    this.logger.error(`Invalid message schema: ${error.message}`);
    // Handle invalid schema (do not retry)
  }
}

// Option 2: Using class-validator
class OrderDto {
  @IsString()
  orderId: string;

  @IsNumber()
  amount: number;

  @IsArray()
  @ValidateNested({ each: true })
  items: OrderItemDto[];
}

@AzureStorageQueueHandler({
  queueName: 'orders',
})
async handleOrder(message: AzureStorageQueueMessage<OrderDto>) {
  try {
    // Transform plain object to class instance
    const orderDto = plainToInstance(OrderDto, message.body);

    // Validate the instance
    await validateOrReject(orderDto);

    // Process validated message
    await this.processOrder(orderDto);
  } catch (errors) {
    this.logger.error(`Validation failed:`, errors);
    // Handle invalid data
  }
}

Best Practices

  1. Queue Naming: Use descriptive, kebab-case names for queues
  2. Message Size: Keep messages under 64KB (Azure Storage Queue limit)
  3. Idempotency: Design handlers to be idempotent in case of retries
  4. Monitoring: Use the built-in logging to monitor queue processing
  5. Error Handling: Implement proper error handling and poison message detection
  6. Resource Cleanup: The module automatically cleans up polling intervals on shutdown
  7. Type Definitions: Define TypeScript interfaces for all your message types
  8. Schema Validation: Combine type hints with runtime validation for maximum safety

API Reference

AzureStorageQueueModule

Static Methods

  • forRoot(config: AzureStorageQueueConfig): DynamicModule
    Configure the module with static configuration

  • forRootAsync(options: AsyncModuleOptions): DynamicModule
    Configure the module with dynamic configuration using factories

AzureStorageQueueService

Methods

  • sendMessage<T = string>(queueName: string, message: T): Promise<void>
    Send a typed message to the specified queue

  • createQueueIfNotExists(queueName: string): Promise<QueueClient>
    Create a queue if it doesn't exist and return the client

  • startPolling<T = string>(options: AzureStorageQueuePollingOptions, handler: (message: AzureStorageQueueMessage<T>) => Promise<void>): Promise<void>
    Start polling a queue (used internally by the decorator)

  • stopPolling(queueName: string): void
    Stop polling a specific queue

@AzureStorageQueueHandler

Decorator to mark methods as queue message handlers.

@AzureStorageQueueHandler(options: AzureStorageQueuePollingOptions)

Use this decorator on methods that receive AzureStorageQueueMessage<T> as their parameter.


Troubleshooting

Common Issues

Connection String Invalid

Error: AZURE_STORAGE_CONNECTION_STRING must be defined

Ensure your connection string is properly set in environment variables.

Queue Not Found Queues are automatically created when first accessed. Ensure your Azure Storage account has the necessary permissions.

Messages Not Processing Check that:

  • Your handler methods are in services registered with NestJS
  • The @AzureStorageQueueHandler decorator is properly applied
  • Queue names match exactly (case-sensitive)

Debug Logging

Enable debug logging to see module activity:

// main.ts
import { Logger } from '@nestjs/common';

const app = await NestFactory.create(AppModule, {
  logger: ['error', 'warn', 'log', 'debug'],
});

License

MIT


Contributing

Contributions are welcome! Please feel free to submit a Pull Request.