@message-queue-toolkit/core
v24.2.0
Published
Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently
Downloads
13,005
Readme
@message-queue-toolkit/core
Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.
Table of Contents
Installation
npm install @message-queue-toolkit/core zodPeer Dependencies:
zod- Schema validation
Overview
The core package provides the foundational building blocks used by all protocol-specific implementations (SQS, SNS, AMQP, Kafka, GCP Pub/Sub). It includes:
- Base Classes: Abstract classes for publishers and consumers
- Handler System: Type-safe message routing and handling
- Validation: Zod schema validation and message parsing
- Utilities: Retry logic, date handling, environment utilities
- Testing: Handler spies for testing async message flows
- Extensibility: Interfaces for payload stores, deduplication stores, and metrics
Core Concepts
Message Schemas
Messages are validated using Zod schemas. The library uses configurable field names:
messageTypeField(required): Field containing the message type discriminator (must bez.literal()for routing)messageIdField(default:'id'): Field containing the message IDmessageTimestampField(default:'timestamp'): Field containing the timestamp
import { z } from 'zod'
const UserCreatedSchema = z.object({
id: z.string(),
type: z.literal('user.created'), // Used for routing
timestamp: z.string().datetime(),
userId: z.string(),
email: z.string().email(),
})
type UserCreated = z.infer<typeof UserCreatedSchema>Handler Configuration
Use MessageHandlerConfigBuilder to configure handlers for different message types:
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
const handlers = new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
.addConfig(
UserCreatedSchema,
async (message, context, preHandlingOutputs) => {
await context.userService.createUser(message.userId)
return { result: 'success' }
}
)
.addConfig(
UserUpdatedSchema,
async (message, context, preHandlingOutputs) => {
await context.userService.updateUser(message.userId, message.changes)
return { result: 'success' }
}
)
.build()Pre-handlers and Barriers
Pre-handlers are middleware functions that run before the main handler:
const preHandlers = [
(message, context, output, next) => {
// Enrich context, validate prerequisites, etc.
output.logger = context.logger.child({ messageId: message.id })
next({ result: 'success' })
},
]Barriers control whether a message should be processed or retried later:
const preHandlerBarrier = async (message, context, preHandlerOutput) => {
const prerequisiteMet = await checkPrerequisite(message)
return {
isPassing: prerequisiteMet,
output: { ready: true },
}
}Handler Spies
Handler spies enable testing of async message flows:
// Enable in consumer/publisher options
{ handlerSpy: true }
// Wait for specific messages in tests
const result = await consumer.handlerSpy.waitForMessageWithId('msg-123', 'consumed', 5000)
expect(result.userId).toBe('user-456')Key Classes
AbstractQueueService
Base class for all queue services. Provides:
- Message serialization/deserialization
- Schema validation
- Retry logic with exponential backoff
- Payload offloading support
- Message deduplication primitives
MessageHandlerConfigBuilder
Fluent builder for configuring message handlers:
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
const handlers = new MessageHandlerConfigBuilder<
SupportedMessages,
ExecutionContext,
PrehandlerOutput
>()
.addConfig(Schema1, handler1)
.addConfig(Schema2, handler2, {
preHandlers: [preHandler1, preHandler2],
preHandlerBarrier: barrierFn,
messageLogFormatter: (msg) => ({ id: msg.id }),
})
.build()HandlerContainer
Routes messages to appropriate handlers based on message type:
import { HandlerContainer } from '@message-queue-toolkit/core'
const container = new HandlerContainer({
messageHandlers: handlers,
messageTypeField: 'type',
})
const handler = container.resolveHandler(message.type)MessageSchemaContainer
Manages Zod schemas and validates messages:
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
const container = new MessageSchemaContainer({
messageSchemas: [Schema1, Schema2],
messageTypeField: 'type',
})
const schema = container.resolveSchema(message.type)AbstractPublisherManager
Factory pattern for spawning publishers on demand:
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
// Automatically spawns publishers and fills metadata
await publisherManager.publish('user-events-topic', {
type: 'user.created',
userId: 'user-123',
})DomainEventEmitter
Event emitter for domain events:
import { DomainEventEmitter } from '@message-queue-toolkit/core'
const emitter = new DomainEventEmitter()
emitter.on('user.created', async (event) => {
console.log('User created:', event.userId)
})
await emitter.emit('user.created', { userId: 'user-123' })Utilities
NO_MESSAGE_TYPE_FIELD
Use this constant when your consumer should accept all message types without routing:
import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'
// Consumer will use a single handler for all messages
{
messageTypeField: NO_MESSAGE_TYPE_FIELD,
handlers: new MessageHandlerConfigBuilder()
.addConfig(PassthroughSchema, async (message) => {
// Handles any message type
return { result: 'success' }
})
.build(),
}Error Classes
import {
MessageValidationError,
MessageInvalidFormatError,
DoNotProcessMessageError,
RetryMessageLaterError,
} from '@message-queue-toolkit/core'
// Validation failed
throw new MessageValidationError(zodError.issues)
// Message format is invalid (cannot parse)
throw new MessageInvalidFormatError({ message: 'Invalid JSON' })
// Do not process this message (skip without retry)
throw new DoNotProcessMessageError({ message: 'Duplicate message' })
// Retry this message later
throw new RetryMessageLaterError({ message: 'Dependency not ready' })Message Deduplication
Interfaces for implementing deduplication stores:
import type { MessageDeduplicationStore, ReleasableLock } from '@message-queue-toolkit/core'
// Implement custom deduplication store
class MyDeduplicationStore implements MessageDeduplicationStore {
async keyExists(key: string): Promise<boolean> { /* ... */ }
async setKey(key: string, ttlSeconds: number): Promise<void> { /* ... */ }
async acquireLock(key: string, options: AcquireLockOptions): Promise<ReleasableLock> { /* ... */ }
}Payload Offloading
Interfaces for implementing payload stores:
import type { PayloadStore, PayloadStoreConfig } from '@message-queue-toolkit/core'
// Implement custom payload store
class MyPayloadStore implements PayloadStore {
async storePayload(payload: Buffer, messageId: string): Promise<PayloadRef> { /* ... */ }
async retrievePayload(ref: PayloadRef): Promise<Buffer> { /* ... */ }
}API Reference
Types
// Handler result type
type HandlerResult = Either<'retryLater', 'success'>
// Pre-handler signature
type Prehandler<Message, Context, Output> = (
message: Message,
context: Context,
output: Output,
next: (result: PrehandlerResult) => void
) => void
// Barrier signature
type BarrierCallback<Message, Context, PrehandlerOutput, BarrierOutput> = (
message: Message,
context: Context,
preHandlerOutput: PrehandlerOutput
) => Promise<BarrierResult<BarrierOutput>>
// Barrier result
type BarrierResult<Output> =
| { isPassing: true; output: Output }
| { isPassing: false; output?: never }Utility Functions
// Environment utilities
isProduction(): boolean
reloadConfig(): void
// Date utilities
isRetryDateExceeded(timestamp: string | Date, maxRetryDuration: number): boolean
// Message parsing
parseMessage<T>(data: unknown, schema: ZodSchema<T>): ParseMessageResult<T>
// Wait utilities
waitAndRetry<T>(fn: () => Promise<T>, options: WaitAndRetryOptions): Promise<T>
// Object utilities
objectMatches(obj: unknown, pattern: unknown): boolean
isShallowSubset(subset: object, superset: object): boolean