@thuo-huynh/module-queue-core
v0.1.0
Published
A provider-agnostic queue abstraction library for Kafka (and future brokers). Standardizes producer, consumer, retry, DLQ, and observability patterns.
Downloads
9
Maintainers
Readme
module-queue
A provider-agnostic queue abstraction library for Node.js and TypeScript. Standardizes producer, consumer, retry, DLQ, and observability patterns — starting with Kafka.
Why module-queue?
Building Kafka producers and consumers directly in each service leads to:
- Duplicated retry logic across teams
- Inconsistent error handling
- Missing DLQ support
- No standard for correlation ID propagation
- Tight coupling to KafkaJS internals
module-queue solves this by providing a unified, opinionated abstraction that works with Kafka today and is ready for other brokers (RabbitMQ, etc.) in the future.
Features
- Producer — publish single or batch messages with automatic header enrichment
- Consumer — subscribe to topics with graceful shutdown and manual commit
- Retry — exponential backoff, linear, or custom retry strategies
- DLQ — automatic dead-letter queue handling after max retries
- Correlation IDs — distributed tracing headers injected on every message
- Config — environment variable or programmatic configuration
- Topic naming — conventions for consistent topic naming across services
- TypeScript-first — full type safety and IntelliSense support
Installation
npm install @module-queue/core kafkajs
kafkajsis a required peer dependency.
Quick Start
import {
KafkaProvider,
createQueueConfig,
ExponentialBackoffRetry,
KafkaDLQHandler,
TopicNaming,
} from '@module-queue/core';
// 1. Configure
const config = createQueueConfig({
provider: 'kafka',
kafka: {
brokers: ['localhost:9092'],
clientId: 'my-service',
},
});
// 2. Create provider
const provider = new KafkaProvider(config);
await provider.connect();
// 3. Produce
const producer = await provider.createProducer();
await producer.produce('orders.order.created', {
key: 'order-123',
payload: { orderId: 'order-123', amount: 99.99 },
});
// 4. Consume
const dlqProducer = await provider.createProducer();
const consumer = await provider.createConsumer({
groupId: 'my-service-orders',
topics: ['orders.order.created'],
retryPolicy: new ExponentialBackoffRetry(3),
dlqHandler: new KafkaDLQHandler(dlqProducer),
handler: async (message, context) => {
console.log(message.payload);
await context.commit();
},
});
await consumer.start();Environment Variable Configuration
KAFKA_BROKERS=localhost:9092,localhost:9093
KAFKA_CLIENT_ID=my-service
KAFKA_SSL=false
QUEUE_SERVICE_NAME=my-service
QUEUE_LOG_LEVEL=infoimport { KafkaProvider, loadConfigFromEnv } from '@module-queue/core';
const config = loadConfigFromEnv();
const provider = new KafkaProvider(config);Documentation
- Getting Started
- Producer Guide
- Consumer Guide
- Retry & DLQ
- Configuration
- Error Handling
- Architecture
- NPM Publishing
Supported Brokers
| Broker | Status | |-----------|-----------| | Kafka | ✅ v1 | | RabbitMQ | 🗓 Planned | | AWS SQS | 🗓 Planned |
Development
# Install dependencies
npm install
# Run tests (unit)
npm test
# Run tests with coverage
npm run test:coverage
# Build the package
npm run build
# Lint
npm run lint
# Type check
npm run type-checkIntegration tests (requires Docker)
docker-compose -f docker-compose.test.yml up -d
INTEGRATION=true npx vitest run tests/integration
docker-compose -f docker-compose.test.yml downContributing
See CONTRIBUTING.md.
License
MIT — see LICENSE.
