npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@snow-tzu/fastify-sqs-listener

v0.0.4

Published

Fastify adapter for @snow-tzu/sqs-listener

Downloads

398

Readme

@snow-tzu/fastify-sqs-listener

Fastify plugin for integrating SQS message consumption with native Fastify patterns. This adapter wraps the framework-agnostic @snow-tzu/sqs-listener core package to provide seamless integration with Fastify's plugin system, lifecycle hooks, and logging.

Features

  • 🔌 Native Fastify Plugin - Integrates seamlessly with Fastify's plugin system
  • 🚀 Automatic Lifecycle Management - Starts/stops with Fastify server lifecycle
  • 📝 Fastify Logger Integration - Uses Fastify's built-in pino logger
  • 🔒 Type-Safe - Full TypeScript support with Fastify type augmentation
  • 🎯 Composable Patterns - Support for QueueListener interface and decorators
  • Framework-Agnostic Core - Same message processing logic across all frameworks

Performance

This adapter provides the same high-performance characteristics as the core package:

  • Throughput: ~500 msgs/sec at concurrency 20
  • Latency: p95 < 310ms, p99 < 320ms
  • Memory Efficient: ~12.3MB average increase with no memory leaks
  • Cost Optimized: 10x fewer API calls with batch acknowledgements

For detailed performance analysis and benchmark results, see:

Installation

npm install @snow-tzu/fastify-sqs-listener @snow-tzu/sqs-listener
# or
yarn add @snow-tzu/fastify-sqs-listener @snow-tzu/sqs-listener

For message validation support, also install class-validator:

npm install class-validator
# or
yarn add class-validator

Quick Start

Here's a complete example showing Fastify-specific usage with plugin registration:

import Fastify from 'fastify';
import { SQSClient } from '@aws-sdk/client-sqs';
import { sqsListenerPlugin, ValidationFailureMode, ValidatorOptions } from '@snow-tzu/fastify-sqs-listener';
import { QueueListener, TraceQueueListener } from '@snow-tzu/sqs-listener';
import { IsString, IsNumber, Min } from 'class-validator';

// Define your message type with validation
class OrderCreatedEvent {
  @IsString()
  orderId: string;

  @IsString()
  customerId: string;

  @IsNumber()
  @Min(0)
  amount: number;
}

// Implement your message listener
class OrderListener implements QueueListener<OrderCreatedEvent> {
  constructor(private logger: any) {}

  async handle(message: OrderCreatedEvent): Promise<void> {
    this.logger.info('Processing order', { 
      orderId: message.orderId,
      customerId: message.customerId,
      amount: message.amount 
    });
    
    // Your business logic here
    await this.processOrder(message);
    
    this.logger.info('Order processed successfully', { orderId: message.orderId });
  }

  private async processOrder(order: OrderCreatedEvent): Promise<void> {
    // Simulate order processing
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

// Create Fastify instance with logging
const fastify = Fastify({ 
  logger: {
    level: 'info',
    transport: {
      target: 'pino-pretty'
    }
  }
});

// Create SQS client
const sqsClient = new SQSClient({
  region: 'us-east-1',
  endpoint: 'http://localhost:4566' // LocalStack for local development
});

// Use composable decorator pattern
const orderListener = new TraceQueueListener(
  new OrderListener(fastify.log)
);

// Register the SQS listener plugin
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener:
    {
      messageType: OrderCreatedEvent,
      listener: orderListener
    },
  sqsClient,
  autoStartup: true, // Start automatically when Fastify is ready
  maxConcurrentMessages: 5
});

// Start the server - SQS listener starts automatically
await fastify.listen({ port: 3000, host: '0.0.0.0' });

Plugin Options

The plugin accepts the following configuration options:

interface FastifySqsListenerOptions {
  queueNameOrUrl: string;                    // Required: SQS queue URL
  listener: {                   // Required: Message listeners
    messageType: Type<any>;
    listener: QueueListener<any>;
  };
  sqsClient: SQSClient;               // Required: AWS SQS client
  autoStartup?: boolean;              // Optional: Auto-start with Fastify (default: true)
  maxConcurrentMessages?: number;     // Optional: Max concurrent processing (default: 1)
  acknowledgementMode?: AcknowledgementMode; // Optional: When to acknowledge messages
  logger?: LoggerInterface;           // Optional: Custom logger (uses Fastify logger by default)
  
  // Validation Options
  enableValidation?: boolean;         // Optional: Enable class-validator validation
  validationFailureMode?: ValidationFailureMode; // Optional: How to handle validation failures
  validatorOptions?: ValidatorOptions; // Optional: class-validator options
}

Message Validation

The plugin supports automatic message validation using class-validator decorators. This ensures that incoming SQS messages conform to your expected data structure before being processed by your listeners.

Basic Validation Setup

First, install class-validator if you haven't already:

npm install class-validator class-transformer

Define your message class with validation decorators:

import { IsString, IsNumber, IsEmail, Min, Max, IsOptional } from 'class-validator';

class OrderCreatedEvent {
  @IsString()
  orderId: string;

  @IsString()
  customerId: string;

  @IsEmail()
  customerEmail: string;

  @IsNumber()
  @Min(0)
  amount: number;

  @IsOptional()
  @IsString()
  notes?: string;
}

Enable validation in your plugin configuration:

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true // Enable automatic validation
});

Validation Failure Modes

You can control how validation failures are handled using the validationFailureMode option:

THROW Mode (Default)

Throws an error and invokes the error handler. The message remains in the queue for retry:

import { ValidationFailureMode } from '@snow-tzu/fastify-sqs-listener';

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validationFailureMode: ValidationFailureMode.THROW // Default behavior
});

ACKNOWLEDGE Mode

Logs the validation error and removes the message from the queue (prevents retry):

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validationFailureMode: ValidationFailureMode.ACKNOWLEDGE // Remove invalid messages
});

REJECT Mode

Logs the validation error and allows the message to be retried:

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validationFailureMode: ValidationFailureMode.REJECT // Allow retry
});

Advanced Validator Options

You can customize class-validator behavior using the validatorOptions parameter:

Whitelist Mode

Strip properties not defined in your class:

import { ValidatorOptions } from '@snow-tzu/fastify-sqs-listener';

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validatorOptions: {
    whitelist: true, // Remove unknown properties
    forbidNonWhitelisted: true // Throw error for unknown properties
  }
});

Validation Groups

Validate only specific groups of properties:

class OrderCreatedEvent {
  @IsString({ groups: ['create', 'update'] })
  orderId: string;

  @IsString({ groups: ['create'] })
  customerId: string;

  @IsNumber({ groups: ['update'] })
  @Min(0, { groups: ['update'] })
  amount: number;
}

// Validate only 'create' group properties
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validatorOptions: {
    groups: ['create'] // Only validate properties in 'create' group
  }
});

Skip Missing Properties

Skip validation for undefined properties:

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-events',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validatorOptions: {
    skipMissingProperties: true // Don't validate undefined properties
  }
});

Complete Validation Example

Here's a comprehensive example showing validation in action:

import Fastify from 'fastify';
import { SQSClient } from '@aws-sdk/client-sqs';
import { sqsListenerPlugin, ValidationFailureMode, ValidatorOptions } from '@snow-tzu/fastify-sqs-listener';
import { QueueListener } from '@snow-tzu/sqs-listener';
import { IsString, IsNumber, IsEmail, IsOptional, Min, Max, IsIn } from 'class-validator';

// Define message with comprehensive validation
class UserRegistrationEvent {
  @IsString()
  userId: string;

  @IsEmail()
  email: string;

  @IsString()
  @Min(2)
  @Max(50)
  firstName: string;

  @IsString()
  @Min(2)
  @Max(50)
  lastName: string;

  @IsNumber()
  @Min(13)
  @Max(120)
  age: number;

  @IsOptional()
  @IsIn(['premium', 'standard', 'basic'])
  plan?: string;

  @IsOptional()
  @IsString()
  referralCode?: string;
}

// Listener implementation
class UserRegistrationListener implements QueueListener<UserRegistrationEvent> {
  constructor(private logger: any) {}

  async handle(message: UserRegistrationEvent): Promise<void> {
    this.logger.info('Processing user registration', {
      userId: message.userId,
      email: message.email,
      plan: message.plan || 'standard'
    });

    // Your business logic here - message is guaranteed to be valid
    await this.createUserAccount(message);
    await this.sendWelcomeEmail(message);
    
    this.logger.info('User registration completed', { userId: message.userId });
  }

  private async createUserAccount(user: UserRegistrationEvent): Promise<void> {
    // Create user account logic
  }

  private async sendWelcomeEmail(user: UserRegistrationEvent): Promise<void> {
    // Send welcome email logic
  }
}

// Setup Fastify with validation
const fastify = Fastify({ logger: true });

const sqsClient = new SQSClient({
  region: process.env.AWS_REGION || 'us-east-1'
});

// Register plugin with comprehensive validation
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.USER_REGISTRATION_QUEUE_URL!,
  listener: {
    messageType: UserRegistrationEvent,
    listener: new UserRegistrationListener(fastify.log)
  },
  sqsClient,
  enableValidation: true,
  validationFailureMode: ValidationFailureMode.ACKNOWLEDGE, // Remove invalid messages
  validatorOptions: {
    whitelist: true, // Remove unknown properties
    forbidNonWhitelisted: true, // Reject messages with unknown properties
    stopAtFirstError: false, // Collect all validation errors
    validationError: {
      target: false, // Don't include the target object in error
      value: false   // Don't include the invalid value in error
    }
  }
});

await fastify.listen({ port: 3000 });

Validation Error Handling

When validation fails, detailed error information is logged through Fastify's logger:

// Example validation error log output
{
  "level": 40,
  "time": 1640995200000,
  "msg": "Message validation failed",
  "messageId": "12345-67890-abcdef",
  "validationErrors": [
    {
      "property": "email",
      "value": "invalid-email",
      "constraints": {
        "isEmail": "email must be an email"
      }
    },
    {
      "property": "age",
      "value": -5,
      "constraints": {
        "min": "age must not be less than 13"
      }
    }
  ]
}

Testing Validation

You can test your validation setup using Fastify's testing utilities:

import { test } from 'tap';
import { build } from './helper';

test('message validation', async (t) => {
  const app = build(t);
  
  // Mock SQS client for testing
  const mockSqsClient = {
    send: async () => ({ Messages: [] })
  };

  await app.register(sqsListenerPlugin, {
    queueNameOrUrl: 'test-queue',
    listener: {
      messageType: UserRegistrationEvent,
      listener: new UserRegistrationListener(app.log)
    },
    sqsClient: mockSqsClient,
    enableValidation: true,
    autoStartup: false
  });

  await app.ready();
  
  t.ok(app.sqsContainer, 'SQS container should be configured with validation');
});

## Composable QueueListener Pattern

The adapter supports the same composable QueueListener pattern as the core package, allowing you to reuse listeners across different frameworks:

### Basic QueueListener Implementation

```typescript
import { QueueListener } from '@snow-tzu/sqs-listener';

class NotificationListener implements QueueListener<NotificationEvent> {
  constructor(private logger: any) {}

  async handle(message: NotificationEvent): Promise<void> {
    this.logger.info('Sending notification', { 
      userId: message.userId,
      type: message.type 
    });
    
    await this.sendNotification(message);
  }

  private async sendNotification(notification: NotificationEvent): Promise<void> {
    // Send email, SMS, push notification, etc.
  }
}

Using Composable Decorators

import { 
  TraceQueueListener, 
  RetryQueueListener,
  QueueListener 
} from '@snow-tzu/sqs-listener';

// Create base listener
const baseListener = new NotificationListener(fastify.log);

// Add tracing
const tracedListener = new TraceQueueListener(baseListener);

// Register with plugin
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.NOTIFICATION_QUEUE_URL,
  listener:
    {
      messageType: NotificationEvent,
      listener: tracedListener
    },
  sqsClient
});

Batch Acknowledgements

Batch acknowledgements can reduce your AWS SQS API calls by up to 10x, providing significant cost savings and performance improvements for high-volume Fastify applications.

Quick Start

Enable batch acknowledgements in your Fastify plugin configuration:

import Fastify from 'fastify';
import { SQSClient } from '@aws-sdk/client-sqs';
import { sqsListenerPlugin } from '@snow-tzu/fastify-sqs-listener';

const fastify = Fastify({ logger: true });

const sqsClient = new SQSClient({ region: 'us-east-1' });

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-queue',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableBatchAcknowledgement: true,  // Enable batch acknowledgements
  batchAcknowledgementOptions: { maxSize: 10, flushIntervalMs: 100 },
  autoStartup: true
});

await fastify.listen({ port: 3000 });

Advanced Configuration

High-Volume Applications

For applications processing thousands of messages, maximize cost savings:

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'high-volume-queue',
  listener: {
    messageType: HighVolumeEvent,
    listener: new HighVolumeListener(fastify.log)
  },
  sqsClient,
  maxConcurrentMessages: 20,
  enableBatchAcknowledgement: true,
  batchAcknowledgementOptions: {
    maxSize: 10,        // Maximum batch size (AWS limit)
    flushIntervalMs: 200 // Wait longer for larger batches
  }
});

Low-Latency Applications

For applications where acknowledgement latency matters:

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'low-latency-queue',
  listener: {
    messageType: UrgentEvent,
    listener: new UrgentListener(fastify.log)
  },
  sqsClient,
  enableBatchAcknowledgement: true,
  batchAcknowledgementOptions: {
    maxSize: 5,         // Smaller batches
    flushIntervalMs: 50 // Flush quickly
  }
});

Multiple Queues with Different Batch Settings

Register multiple plugins with different batch acknowledgement configurations:

import Fastify from 'fastify';
import { sqsListenerPlugin } from '@snow-tzu/fastify-sqs-listener';

const fastify = Fastify({ logger: true });

// High-volume order processing
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'order-queue',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new OrderListener(fastify.log)
  },
  sqsClient,
  enableBatchAcknowledgement: true,
  batchAcknowledgementOptions: {
    maxSize: 10,
    flushIntervalMs: 100
  }
});

// Critical notifications (low latency)
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'notification-queue',
  listener: {
    messageType: NotificationEvent,
    listener: new NotificationListener(fastify.log)
  },
  sqsClient,
  enableBatchAcknowledgement: true,
  batchAcknowledgementOptions: {
    maxSize: 3,         // Small batches
    flushIntervalMs: 25 // Very fast flush
  }
});

await fastify.listen({ port: 3000 });

Manual Acknowledgement with Batching

Combine manual acknowledgement with batch processing for fine-grained control:

import { QueueListener, MessageContext } from '@snow-tzu/sqs-listener';

class TransactionalOrderListener implements QueueListener<OrderCreatedEvent> {
  constructor(private logger: any) {}

  async handle(event: OrderCreatedEvent, context: MessageContext): Promise<void> {
    try {
      // Start database transaction
      await this.beginTransaction();
      
      // Process order
      await this.processOrder(event);
      
      // Process payment
      await this.processPayment(event);
      
      // Commit transaction
      await this.commitTransaction();
      
      // Acknowledge only after successful transaction
      await context.acknowledge();  // Batched automatically
      
      this.logger.info(`Order ${event.orderId} processed successfully`);
    } catch (error) {
      await this.rollbackTransaction();
      this.logger.error(`Order ${event.orderId} failed: ${error.message}`);
      throw error; // Don't acknowledge - message will retry
    }
  }
}

// Register with manual acknowledgement and batching
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: 'transactional-queue',
  listener: {
    messageType: OrderCreatedEvent,
    listener: new TransactionalOrderListener(fastify.log)
  },
  sqsClient,
  acknowledgementMode: AcknowledgementMode.MANUAL,  // Manual control
  enableBatchAcknowledgement: true,                 // But still batch
  autoStartup: true
});

Configuration Options Summary

interface FastifyBatchAcknowledgementOptions {
  enableBatchAcknowledgement?: boolean;           // Enable/disable batching
  batchAcknowledgementOptions?: {
    maxSize?: number;                             // Max batch size (1-10, default: 10)
    flushIntervalMs?: number;                     // Flush interval (default: 100ms)
  };
}

Fastify Lifecycle Integration

The plugin integrates seamlessly with Fastify's lifecycle hooks:

Automatic Startup and Shutdown

// The plugin automatically handles lifecycle when autoStartup is true
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.QUEUE_URL,
  listener: /* your listener */,
  sqsClient,
  autoStartup: true // SQS listener starts when Fastify is ready
});

// Fastify handles shutdown automatically
await fastify.listen({ port: 3000 });

// When Fastify closes, the SQS listener stops automatically
process.on('SIGTERM', () => fastify.close());

Manual Lifecycle Control

// Disable auto-startup for manual control
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.QUEUE_URL,
  listener: /* your listener */,
  sqsClient,
  autoStartup: false
});

// Access the SQS container through Fastify instance
fastify.addHook('onReady', async () => {
  await fastify.sqsContainer.start();
  fastify.log.info('SQS listener started manually');
});

fastify.addHook('onClose', async () => {
  await fastify.sqsContainer.stop();
  fastify.log.info('SQS listener stopped');
});

Logger Integration

The plugin integrates with Fastify's built-in pino logger by default:

Using Fastify's Logger

// The plugin automatically uses Fastify's logger
const fastify = Fastify({ 
  logger: {
    level: 'info',
    transport: {
      target: 'pino-pretty',
      options: {
        colorize: true
      }
    }
  }
});

class MyListener implements QueueListener<MyMessage> {
  constructor(private logger: any) {}

  async handle(message: MyMessage): Promise<void> {
    // Use structured logging with Fastify's pino logger
    this.logger.info('Processing message', {
      messageId: message.id,
      timestamp: new Date().toISOString()
    });
  }
}

// Pass Fastify's logger to your listener
const listener = new MyListener(fastify.log);

Custom Logger Integration

import { LoggerInterface } from '@snow-tzu/sqs-listener';

class CustomLogger implements LoggerInterface {
  log(message: string, context?: any): void {
    console.log(`[INFO] ${message}`, context);
  }

  error(message: string, context?: any): void {
    console.error(`[ERROR] ${message}`, context);
  }

  warn(message: string, context?: any): void {
    console.warn(`[WARN] ${message}`, context);
  }

  debug(message: string, context?: any): void {
    console.debug(`[DEBUG] ${message}`, context);
  }
}

// Use custom logger with plugin
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.QUEUE_URL,
  listener: /* your listener */,
  sqsClient,
  logger: new CustomLogger()
});

Error Handling

The plugin integrates with Fastify's error handling system:

Fastify Error Integration

// Errors are automatically logged through Fastify's logger
fastify.setErrorHandler((error, request, reply) => {
  fastify.log.error(error);
  reply.status(500).send({ error: 'Internal Server Error' });
});

// SQS processing errors are emitted through Fastify's error system
class ErrorProneListener implements QueueListener<MyMessage> {
  async handle(message: MyMessage): Promise<void> {
    if (message.shouldFail) {
      throw new Error('Processing failed');
    }
    // Process normally
  }
}

Custom Error Handling

import { QueueListenerErrorHandler } from '@snow-tzu/sqs-listener';

class CustomErrorHandler implements QueueListenerErrorHandler {
  constructor(private logger: any) {}

  async handleError(error: Error, message: any, context: any): Promise<void> {
    this.logger.error('SQS message processing failed', {
      error: error.message,
      messageId: context.getMessageId(),
      stack: error.stack
    });
    
    // Send to dead letter queue, alert monitoring, etc.
  }
}

await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.QUEUE_NAME,
  listener: /* your listener */,
  sqsClient,
  errorHandler: new CustomErrorHandler(fastify.log)
});

TypeScript Support

The plugin provides full TypeScript support with proper Fastify type augmentation:

Type-Safe Plugin Registration

import { FastifyInstance } from 'fastify';
import { sqsListenerPlugin, FastifySqsListenerOptions } from '@snow-tzu/fastify-sqs-listener';

declare module 'fastify' {
  interface FastifyInstance {
    sqsContainer: SqsMessageListenerContainer;
  }
}

const fastify: FastifyInstance = Fastify();

const options: FastifySqsListenerOptions = {
  queueNameOrUrl: process.env.QUEUE_URL!,
  listener: 
    {
      messageType: MyMessage,
      listener: new MyListener()
    },
  sqsClient: new SQSClient({ region: 'us-east-1' })
};

await fastify.register(sqsListenerPlugin, options);

Generic Message Types

interface UserEvent {
  userId: string;
  action: 'created' | 'updated' | 'deleted';
  timestamp: string;
}

class UserEventListener implements QueueListener<UserEvent> {
  async handle(message: UserEvent): Promise<void> {
    // TypeScript provides full type safety here
    console.log(`User ${message.userId} ${message.action} at ${message.timestamp}`);
  }
}

Advanced Examples

Multiple Queue Listeners

// Register multiple listeners for different message types
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.ORDER_QUEUE_URL,
  listener: 
    {
      messageType: OrderCreatedEvent,
      listener: new TraceQueueListener(new OrderListener(fastify.log))
    },
  sqsClient
});

// Register another plugin instance for a different queue
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.NOTIFICATION_QUEUE_URL,
  listener: 
    {
      messageType: NotificationEvent,
      listener: new NotificationListener(fastify.log)
    },
  sqsClient
});

Integration with Fastify Plugins

import fastifyRedis from '@fastify/redis';
import fastifyMongodb from '@fastify/mongodb';

// Register other Fastify plugins
await fastify.register(fastifyRedis, {
  host: 'localhost',
  port: 6379
});

await fastify.register(fastifyMongodb, {
  url: 'mongodb://localhost:27017/mydb'
});

// Use dependencies in your listeners
class OrderListener implements QueueListener<OrderCreatedEvent> {
  constructor(
    private logger: any,
    private redis: any,
    private mongodb: any
  ) {}

  async handle(message: OrderCreatedEvent): Promise<void> {
    // Use Redis for caching
    await this.redis.set(`order:${message.orderId}`, JSON.stringify(message));
    
    // Use MongoDB for persistence
    await this.mongodb.db.collection('orders').insertOne(message);
    
    this.logger.info('Order processed and cached', { orderId: message.orderId });
  }
}

// Register SQS plugin with access to other plugins
await fastify.register(sqsListenerPlugin, {
  queueNameOrUrl: process.env.ORDER_QUEUE_URL,
  listener: 
    {
      messageType: OrderCreatedEvent,
      listener: new OrderListener(fastify.log, fastify.redis, fastify.mongo)
    },
  sqsClient
});

Testing

Testing with Fastify's Test Utilities

import { test } from 'tap';
import { build } from './helper'; // Fastify test helper

test('SQS plugin registration', async (t) => {
  const app = build(t);
  
  await app.register(sqsListenerPlugin, {
    queueNameOrUrl: 'test-queue',
    listener: 
      {
        messageType: TestMessage,
        listener: new TestListener()
      },
    sqsClient: mockSqsClient,
    autoStartup: false // Don't start in tests
  });

  await app.ready();
  
  t.ok(app.sqsContainer, 'SQS container should be available');
});

Related Packages

License

MIT

Contributing

Contributions are welcome! Please read our contributing guidelines and submit pull requests to our GitHub repository.

Support