@bancame/mq
v1.0.0
Published
TypeScript library for AWS SNS and SQS messaging
Readme
Bancame MQ
A simple TypeScript library for AWS SNS (Simple Notification Service) and SQS (Simple Queue Service) messaging with a clean, intuitive API.
Features
- Simple API: Publish and subscribe with just a few lines of code
- Automatic Queue Management: Queues are created automatically when you subscribe
- Built-in Retry Logic: Easy dequeue/requeue functionality
- TypeScript: Full type safety with comprehensive interfaces
- Testing: Jest unit tests with mocked AWS SDK
- Error Handling: Robust error handling and validation
- Flexible Configuration: Support for AWS credentials and custom configurations
Installation
npm install @bancame/mqQuick Start
Basic Setup
import BancameMQ from '@bancame/mq';
const bancameMQ = new BancameMQ({
AWS: {
region: 'us-east-1',
accessKeyId: 'your-access-key',
secretAccessKey: 'your-secret-key',
},
channelBaseName: 'my-app',
subscriptionBaseName: 'my-service',
});Publishing Messages
const publisher = bancameMQ.Publisher();
// Publish a single message
await publisher.publish({
topic: 'user-registered',
message: { userId: '123', email: '[email protected]' },
subject: 'User Registration Event',
});
// Publish multiple messages in batch
await publisher.publishBatch({
topic: 'order-created',
messages: [
{ orderId: '1', amount: 100 },
{ orderId: '2', amount: 200 },
{ orderId: '3', amount: 300 },
],
});Subscribing to Messages
const subscriber = bancameMQ.Subscriber();
// Simple subscription
subscriber.subscribe({
topic: 'user-registered',
handler: async ({ message, dequeue, requeue }) => {
console.log('Processing user registration:', message);
try {
// Process the message
await processUserRegistration(message);
// Success - remove from queue
await dequeue();
} catch (error) {
if (isRetryableError(error)) {
// Retry later
await requeue(error);
} else {
// Permanent failure - remove from queue
await dequeue(error);
}
}
},
});
// Advanced subscription with retry logic
subscriber.subscribe({
topic: 'order-created',
handler: async ({ message, dequeue, requeue }) => {
console.log('Processing order:', message);
try {
await processOrder(message);
await dequeue();
} catch (error) {
await requeue(error);
}
},
retryIntervals: [60, 120, 180], // Retry after 1, 2, 3 minutes
redrivePolicy: { maxReceiveCount: 4 }, // Send to DLQ after 4 attempts
numberOfParallelMessages: 5, // Process 5 messages in parallel
errorHandler: (error) => {
console.error('Subscription error:', error);
return true; // Continue polling
},
});Graceful Shutdown
// Handle SIGINT gracefully
subscriber.sigIntOverride(10000); // 10 second delay
// Or unsubscribe manually
await subscriber.unsubscribe('user-registered');Webhook Integration Examples
@bancame/mq is perfect for handling webhook events reliably. Here are common patterns:
Express.js Webhook Handler
import express from 'express';
import BancameMQ from '@bancame/mq';
const app = express();
const bancameMQ = BancameMQ({
AWS: { region: 'us-east-1' },
channelBaseName: 'webhooks',
subscriptionBaseName: 'webhook-processor',
});
const publisher = bancameMQ.Publisher();
// Webhook endpoint - receives and queues events
app.post('/webhook/stripe', express.json(), async (req, res) => {
try {
// Publish webhook payload to queue for processing
await publisher.publish({
topic: 'stripe-webhook',
message: {
event: req.body,
headers: req.headers,
timestamp: new Date().toISOString(),
webhookId: req.headers['stripe-signature'],
},
});
res.status(200).json({ received: true });
} catch (error) {
console.error('Failed to queue webhook:', error);
res.status(500).json({ error: 'Failed to process webhook' });
}
});Webhook Event Processor
// Separate service to process webhook events
const subscriber = bancameMQ.Subscriber();
subscriber.subscribe({
topic: 'stripe-webhook',
requeueDelaySeconds: 60, // Retry failed webhooks after 1 minute
numberOfParallelMessages: 5, // Process 5 webhooks concurrently
handler: async ({ message, dequeue, requeue }) => {
const { event, webhookId } = message;
try {
// Process different event types
switch (event.type) {
case 'payment_intent.succeeded':
await handlePaymentSuccess(event.data.object);
break;
case 'payment_intent.payment_failed':
await handlePaymentFailure(event.data.object);
break;
case 'customer.subscription.created':
await handleSubscriptionCreated(event.data.object);
break;
default:
console.log(`Unhandled event type: ${event.type}`);
}
await dequeue(); // Success - remove from queue
} catch (error) {
console.error(`Failed to process webhook ${webhookId}:`, error);
throw error; // Let error handler decide whether to requeue
}
},
errorHandler: (error) => {
// Requeue on temporary errors, stop on permanent errors
const temporaryErrors = ['ECONNRESET', 'TIMEOUT', 'RATE_LIMITED'];
const isTemporary = temporaryErrors.some((err) =>
error.message.includes(err),
);
if (isTemporary) {
console.log('Temporary error - requeuing webhook');
return true; // Requeue
} else {
console.error('Permanent error - webhook will be discarded:', error);
return false; // Don't requeue
}
},
});
async function handlePaymentSuccess(paymentIntent) {
// Update database, send confirmation email, etc.
console.log('Processing successful payment:', paymentIntent.id);
}
async function handlePaymentFailure(paymentIntent) {
// Notify customer, update order status, etc.
console.log('Processing failed payment:', paymentIntent.id);
}GitHub Webhook Example
// GitHub webhook for CI/CD pipeline
app.post('/webhook/github', express.json(), async (req, res) => {
const event = req.headers['x-github-event'];
const signature = req.headers['x-hub-signature-256'];
// Verify webhook signature (recommended)
if (!verifyGitHubSignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
await publisher.publish({
topic: 'github-events',
message: {
event: event,
action: req.body.action,
repository: req.body.repository?.name,
payload: req.body,
deliveryId: req.headers['x-github-delivery'],
},
});
res.status(200).json({ processed: true });
});
// Process GitHub events
subscriber.subscribe({
topic: 'github-events',
handler: async ({ message, dequeue }) => {
const { event, action, repository, payload } = message;
if (event === 'push' && payload.ref === 'refs/heads/main') {
await triggerDeployment(repository, payload.after);
} else if (event === 'pull_request' && action === 'opened') {
await runTests(repository, payload.pull_request);
}
await dequeue();
},
numberOfParallelMessages: 3, // Limit concurrent deployments
});Benefits of Using MQ for Webhooks
- Reliability: Webhooks are queued and processed even if your service is temporarily down
- Scalability: Handle webhook spikes by processing them asynchronously
- Retry Logic: Automatic retry with exponential backoff for failed webhooks
- Ordering: Process webhooks in the order they were received
- Monitoring: Built-in logging for webhook processing status
- Decoupling: Webhook receipt and processing are separated for better architecture
API Reference
Configuration
interface BancameMQConfig {
AWS: {
region: string;
accessKeyId?: string;
secretAccessKey?: string;
sessionToken?: string;
snsApiVersion?: string;
sqsApiVersion?: string;
sqsWaitTimeSeconds?: number;
sqsVisibilityTimeout?: number;
keepAlive?: boolean;
httpOptions?: {
timeout?: number;
};
};
channelBaseName?: string;
subscriptionBaseName?: string;
pollingTimeInterval?: number;
log?: {
info: (obj: any, name?: string) => void;
warn: (obj: any, name?: string) => void;
error: (obj: any, name?: string) => void;
fatal: (obj: any, name?: string) => void;
};
}Publisher API
interface Publisher {
publish(message: PublishMessage): Promise<PublishCommandOutput>;
publishBatch(message: PublishBatchMessage): Promise<PublishCommandOutput[]>;
}
interface PublishMessage {
topic: string;
message: any;
subject?: string;
messageAttributes?: MessageAttributes;
}
interface PublishBatchMessage {
topic: string;
messages: any[];
}Subscriber API
interface Subscriber {
subscribe(config: SubscribeConfig): Promise<void>;
unsubscribe(topicName: string): Promise<void>;
sigIntOverride(delay?: number): void;
}
interface SubscribeConfig {
topic: string;
handler: (context: MessageContext) => Promise<void>;
errorHandler?: (error: Error) => boolean;
retryIntervals?: number[];
redrivePolicy?: {
maxReceiveCount: number;
};
messageRetentionPeriod?: number;
numberOfParallelMessages?: number;
bucketEventsEnabled?: boolean;
determinePolling?: () => Promise<boolean>;
requeueDelaySeconds?: number; // Delay in seconds before requeued message becomes visible again
}
interface MessageContext {
message: any;
dequeue: (error?: Error, customParameters?: any) => Promise<void>;
requeue: (error?: Error, customParameters?: any) => Promise<void>;
}Advanced Features
Retry Intervals
Configure custom retry intervals for failed message processing:
subscriber.subscribe({
topic: 'payment-processing',
handler: async ({ message, dequeue, requeue }) => {
// Process payment
},
retryIntervals: [30, 60, 120, 300], // Retry after 30s, 1m, 2m, 5m
redrivePolicy: { maxReceiveCount: 5 }, // Send to DLQ after 5 attempts
});Message Requeuing
Automatically requeue messages for retry when processing fails. The errorHandler determines whether to requeue (true) or stop processing (false):
subscriber.subscribe({
topic: 'order-processing',
requeueDelaySeconds: 30, // Delay before retry (default: 30 seconds)
handler: async ({ message, dequeue }) => {
try {
await processOrder(message);
await dequeue(); // Success - remove from queue
} catch (error) {
throw error; // Let errorHandler decide what to do
}
},
errorHandler: (error) => {
console.error('Processing failed:', error.message);
// Return true to requeue, false to stop processing
if (error.message.includes('TEMPORARY_ERROR')) {
return true; // Requeue for retry
} else {
return false; // Permanent error - stop processing
}
},
});Dead Letter Queues
Automatically create DLQs for failed messages:
subscriber.subscribe({
topic: 'critical-process',
handler: async ({ message, dequeue, requeue }) => {
// Process critical message
},
redrivePolicy: { maxReceiveCount: 3 },
messageRetentionPeriod: 86400, // 1 day
});Parallel Processing
Process multiple messages in parallel:
subscriber.subscribe({
topic: 'high-throughput',
handler: async ({ message, dequeue, requeue }) => {
// Process message
},
numberOfParallelMessages: 10, // Process 10 messages simultaneously
});Custom Logging
Configure custom logging:
const bancameMQ = new BancameMQ({
AWS: { region: 'us-east-1' },
log: {
info: (obj, name) => console.log(`[INFO] ${name}:`, obj),
warn: (obj, name) => console.warn(`[WARN] ${name}:`, obj),
error: (obj, name) => console.error(`[ERROR] ${name}:`, obj),
fatal: (obj, name) => console.error(`[FATAL] ${name}:`, obj),
},
});Testing
Run the test suite:
# Run all tests
npm test
# Run tests in watch mode
npm run test:watch
# Run tests with coverage
npm run test:coverageDevelopment
# Install dependencies
npm install
# Build the library
npm run build
# Lint the code
npm run lint
# Fix linting issues
npm run lint:fixConfiguration
AWS Credentials
The library supports multiple ways to provide AWS credentials:
- Explicit credentials (as shown in examples)
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - AWS credentials file (
~/.aws/credentials) - IAM roles (when running on EC2, Lambda, etc.)
Environment Variables
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1Error Handling
The library includes comprehensive error handling:
- Validation errors: Missing required parameters, invalid configurations
- AWS API errors: Network issues, permissions, invalid configurations
- Message processing errors: Handled gracefully with dequeue/requeue
Migration from Legacy API
If you're using the legacy API, you can migrate gradually:
// Old way
const messagingService = new AWSMessagingService(snsConfig, sqsConfig);
await messagingService.publishToSNS(message);
// New way
const bancameMQ = new BancameMQ(config);
const publisher = bancameMQ.Publisher();
await publisher.publish({ topic: 'my-topic', message: data });Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Run the test suite
- Submit a pull request
License
MIT License - see LICENSE file for details.
Local Testing
Prerequisites
- AWS Credentials: Set up your AWS credentials as environment variables:
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"
export AWS_REGION="us-east-1" # optional, defaults to us-east-1- Build the library:
npm run build# Run unit tests (with mocked AWS)
npm test
# Run local tests with real AWS credentials
npm run test:local # JavaScript version
npm run test:local:ts # TypeScript version
# Test message requeuing functionality
npm run test:requeue
# List AWS resources created during testing
npm run cleanup
# Delete AWS resources (after testing)
npm run cleanup:delete
# Combined test and cleanup script
./test-and-cleanup.shTest and Cleanup Script
We provide a convenient script that will:
- ✅ Check AWS credentials
- 🔨 Build the library
- 🧪 Run local tests with real AWS
- 🔍 Show created AWS resources
- 🧹 Clean up everything (with confirmation)
# Make sure you have AWS credentials set
export AWS_ACCESS_KEY_ID="your_access_key_id"
export AWS_SECRET_ACCESS_KEY="your_secret_access_key"
export AWS_REGION="us-east-1"
# Run the combined test and cleanup
./test-and-cleanup.shWhat the Local Tests Do
- Create SNS topics and SQS queues automatically
- Publish and receive messages through real AWS
- Test both single and batch message publishing
- Show real-world usage examples
- Clean up resources when done
Cost Considerations
- SNS: $0.50 per 1 million requests
- SQS: $0.40 per 1 million requests
- These tests typically cost less than $0.01 to run
Resource Cleanup
The cleanup script will:
- 🔍 List all SNS topics and SQS queues with
bancame-mqprefix - ⚠️ Ask for confirmation before deleting
- 🧹 Remove all SNS topics, subscriptions, and SQS queues
- ✅ Confirm everything is removed
Advanced cleanup options:
# Clean up debug resources created during development
npx ts-node examples/cleanup.ts --delete --prefix=debug
# Clean up both bancame-mq and debug resources
npx ts-node examples/cleanup.ts --delete --prefix=all