@ozritesh/queue-agnostic
v1.0.2
Published
Universal queue abstraction library supporting RabbitMQ, AWS SQS, Azure Service Bus, and GCP Pub/Sub with a single unified interface
Maintainers
Readme
Queue-Agnostic Document Processing System
A flexible, queue-agnostic Node.js solution for processing documents (PDFs, images) that can seamlessly work with different queue providers including RabbitMQ, AWS SQS, Azure Service Bus, and Google Cloud Pub/Sub.
🚀 Features
- Provider Agnostic: Single interface for multiple queue providers
- Environment-Based Configuration: Easy deployment across different clients
- Support for Multiple Providers:
- RabbitMQ
- AWS SQS
- Azure Service Bus
- Google Cloud Pub/Sub
- Automatic Connection Management: Built-in connection handling and graceful shutdown
- Error Handling: Automatic message retry/requeue on failures
- Easy to Extend: Add new queue providers by implementing the QueueInterface
📦 Installation
npm install🔧 Configuration
Environment Variables
Copy the example environment file and configure it:
cp .env.example .envEdit .env and set the appropriate values for your queue provider:
# Set your queue provider
QUEUE_PROVIDER=rabbitmq # or aws-sqs, azure-servicebus, gcp-pubsub
# Set your queue/topic name
QUEUE_NAME=document-processing-queue
# Provider-specific configuration (see .env.example for details)Provider-Specific Setup
RabbitMQ
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://localhost:5672AWS SQS
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_key
AWS_SECRET_ACCESS_KEY=your_secretAzure Service Bus
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...Google Cloud Pub/Sub
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=your-project-id
GCP_KEY_FILENAME=./service-account-key.json📖 Usage
Quick Start - Subscriber
const QueueFactory = require('./src/queue/QueueFactory');
// Create queue from environment variables
const queue = QueueFactory.createFromEnv();
await queue.connect();
// Subscribe and process messages
await queue.subscribe('document-processing-queue', async (message) => {
console.log('Processing document:', message);
// Your document processing logic here
});Run the subscriber:
npm run start:subscriberQuick Start - Publisher
const QueueFactory = require('./src/queue/QueueFactory');
const queue = QueueFactory.createFromEnv();
await queue.connect();
// Publish a message
await queue.publish('document-processing-queue', {
documentId: 'doc-123',
documentUrl: 'https://example.com/doc.pdf',
documentType: 'pdf'
});
await queue.disconnect();Run the publisher:
npm run start:publisherDirect Usage (Without Environment Variables)
const QueueFactory = require('./src/queue/QueueFactory');
// Create queue with explicit configuration
const queue = QueueFactory.create({
provider: 'rabbitmq',
options: {
url: 'amqp://localhost:5672'
}
});
await queue.connect();
// ... use the queue
await queue.disconnect();🏗️ Architecture
Project Structure
├── src/
│ ├── queue/
│ │ ├── QueueInterface.js # Abstract interface
│ │ ├── QueueFactory.js # Factory for creating adapters
│ │ └── adapters/
│ │ ├── RabbitMQAdapter.js # RabbitMQ implementation
│ │ ├── AWSSQSAdapter.js # AWS SQS implementation
│ │ ├── AzureServiceBusAdapter.js # Azure implementation
│ │ └── GCPPubSubAdapter.js # GCP implementation
│ └── index.js # Main entry point
├── examples/
│ ├── subscriber.js # Example subscriber
│ ├── publisher.js # Example publisher
│ └── direct-usage.js # Direct usage examples
├── .env.example # Environment variables template
└── package.jsonDesign Pattern
The system uses the Adapter Pattern to provide a unified interface across different queue providers:
QueueInterface (Abstract)
↓
├── RabbitMQAdapter
├── AWSSQSAdapter
├── AzureServiceBusAdapter
└── GCPPubSubAdapter🔌 Queue Interface
All adapters implement the following interface:
Methods
connect()
Connect to the queue service.
await queue.connect();disconnect()
Disconnect from the queue service.
await queue.disconnect();publish(queueName, message, options)
Publish a message to a queue/topic.
await queue.publish('my-queue', {
documentId: '123',
type: 'pdf'
}, {
// Provider-specific options
});subscribe(queueName, handler, options)
Subscribe to a queue and process messages.
await queue.subscribe('my-queue', async (message) => {
// Process message
}, {
// Provider-specific options
});isConnected()
Check if the connection is active.
if (queue.isConnected()) {
// Do something
}⚙️ Provider-Specific Options
RabbitMQ Options
Subscribe Options:
{
prefetch: 1, // Number of messages to prefetch
durable: true, // Queue durability
requeue: true // Requeue on failure
}AWS SQS Options
Subscribe Options:
{
pollingInterval: 1000, // Polling interval in ms
maxMessages: 10, // Max messages per poll
waitTimeSeconds: 20, // Long polling wait time
visibilityTimeout: 30 // Message visibility timeout
}Azure Service Bus Options
Subscribe Options:
{
receiveMode: 'peekLock', // or 'receiveAndDelete'
maxConcurrentCalls: 1 // Concurrent message processing
}Google Cloud Pub/Sub Options
Subscribe Options:
{
topicName: 'my-topic', // Required for new subscriptions
createIfNotExists: true, // Auto-create topic/subscription
flowControl: {
maxMessages: 100
}
}🔄 Deployment Scenarios
Scenario 1: Client using RabbitMQ
# .env
QUEUE_PROVIDER=rabbitmq
RABBITMQ_URL=amqp://prod-rabbitmq:5672
QUEUE_NAME=client-a-documentsScenario 2: Client using AWS
# .env
QUEUE_PROVIDER=aws-sqs
AWS_REGION=us-west-2
QUEUE_NAME=client-b-documentsScenario 3: Client using Azure
# .env
QUEUE_PROVIDER=azure-servicebus
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
QUEUE_NAME=client-c-documentsScenario 4: Client using Google Cloud
# .env
QUEUE_PROVIDER=gcp-pubsub
GCP_PROJECT_ID=client-d-project
QUEUE_NAME=client-d-documents🛡️ Error Handling
All adapters include built-in error handling:
- Message Processing Errors: Messages are automatically requeued/nacked on handler errors
- Connection Errors: Logged and can be handled with reconnection logic
- Graceful Shutdown: SIGINT/SIGTERM handlers for clean disconnection
🔍 Monitoring & Logging
All adapters include console logging for:
- Connection status
- Message publishing
- Message receiving
- Error conditions
Example output:
✓ Connected to RabbitMQ
✓ Subscribed to RabbitMQ queue: document-processing-queue
📄 Received document for processing: { documentId: '123', ... }
✓ Successfully processed document: 123🚧 Extending with New Providers
To add a new queue provider:
- Create a new adapter in
src/queue/adapters/ - Extend
QueueInterface - Implement all required methods
- Add to
QueueFactory.js - Update environment variable handling
Example skeleton:
const QueueInterface = require('../QueueInterface');
class NewProviderAdapter extends QueueInterface {
async connect() { /* ... */ }
async disconnect() { /* ... */ }
async publish(queueName, message, options) { /* ... */ }
async subscribe(queueName, handler, options) { /* ... */ }
isConnected() { /* ... */ }
}
module.exports = NewProviderAdapter;📝 License
MIT
🤝 Contributing
Contributions are welcome! Feel free to submit issues or pull requests.
📞 Support
For issues or questions, please open an issue on the repository.
