npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@message-queue-toolkit/core

v24.2.0

Published

Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently

Downloads

13,005

Readme

@message-queue-toolkit/core

Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.

Table of Contents

Installation

npm install @message-queue-toolkit/core zod

Peer Dependencies:

  • zod - Schema validation

Overview

The core package provides the foundational building blocks used by all protocol-specific implementations (SQS, SNS, AMQP, Kafka, GCP Pub/Sub). It includes:

  • Base Classes: Abstract classes for publishers and consumers
  • Handler System: Type-safe message routing and handling
  • Validation: Zod schema validation and message parsing
  • Utilities: Retry logic, date handling, environment utilities
  • Testing: Handler spies for testing async message flows
  • Extensibility: Interfaces for payload stores, deduplication stores, and metrics

Core Concepts

Message Schemas

Messages are validated using Zod schemas. The library uses configurable field names:

  • messageTypeField (required): Field containing the message type discriminator (must be z.literal() for routing)
  • messageIdField (default: 'id'): Field containing the message ID
  • messageTimestampField (default: 'timestamp'): Field containing the timestamp
import { z } from 'zod'

const UserCreatedSchema = z.object({
  id: z.string(),
  type: z.literal('user.created'),  // Used for routing
  timestamp: z.string().datetime(),
  userId: z.string(),
  email: z.string().email(),
})

type UserCreated = z.infer<typeof UserCreatedSchema>

Handler Configuration

Use MessageHandlerConfigBuilder to configure handlers for different message types:

import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

const handlers = new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
  .addConfig(
    UserCreatedSchema,
    async (message, context, preHandlingOutputs) => {
      await context.userService.createUser(message.userId)
      return { result: 'success' }
    }
  )
  .addConfig(
    UserUpdatedSchema,
    async (message, context, preHandlingOutputs) => {
      await context.userService.updateUser(message.userId, message.changes)
      return { result: 'success' }
    }
  )
  .build()

Pre-handlers and Barriers

Pre-handlers are middleware functions that run before the main handler:

const preHandlers = [
  (message, context, output, next) => {
    // Enrich context, validate prerequisites, etc.
    output.logger = context.logger.child({ messageId: message.id })
    next({ result: 'success' })
  },
]

Barriers control whether a message should be processed or retried later:

const preHandlerBarrier = async (message, context, preHandlerOutput) => {
  const prerequisiteMet = await checkPrerequisite(message)
  return {
    isPassing: prerequisiteMet,
    output: { ready: true },
  }
}

Handler Spies

Handler spies enable testing of async message flows:

// Enable in consumer/publisher options
{ handlerSpy: true }

// Wait for specific messages in tests
const result = await consumer.handlerSpy.waitForMessageWithId('msg-123', 'consumed', 5000)
expect(result.userId).toBe('user-456')

Key Classes

AbstractQueueService

Base class for all queue services. Provides:

  • Message serialization/deserialization
  • Schema validation
  • Retry logic with exponential backoff
  • Payload offloading support
  • Message deduplication primitives

MessageHandlerConfigBuilder

Fluent builder for configuring message handlers:

import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

const handlers = new MessageHandlerConfigBuilder<
  SupportedMessages,
  ExecutionContext,
  PrehandlerOutput
>()
  .addConfig(Schema1, handler1)
  .addConfig(Schema2, handler2, {
    preHandlers: [preHandler1, preHandler2],
    preHandlerBarrier: barrierFn,
    messageLogFormatter: (msg) => ({ id: msg.id }),
  })
  .build()

HandlerContainer

Routes messages to appropriate handlers based on message type:

import { HandlerContainer } from '@message-queue-toolkit/core'

const container = new HandlerContainer({
  messageHandlers: handlers,
  messageTypeField: 'type',
})

const handler = container.resolveHandler(message.type)

MessageSchemaContainer

Manages Zod schemas and validates messages:

import { MessageSchemaContainer } from '@message-queue-toolkit/core'

const container = new MessageSchemaContainer({
  messageSchemas: [Schema1, Schema2],
  messageTypeField: 'type',
})

const schema = container.resolveSchema(message.type)

AbstractPublisherManager

Factory pattern for spawning publishers on demand:

import { AbstractPublisherManager } from '@message-queue-toolkit/core'

// Automatically spawns publishers and fills metadata
await publisherManager.publish('user-events-topic', {
  type: 'user.created',
  userId: 'user-123',
})

DomainEventEmitter

Event emitter for domain events:

import { DomainEventEmitter } from '@message-queue-toolkit/core'

const emitter = new DomainEventEmitter()

emitter.on('user.created', async (event) => {
  console.log('User created:', event.userId)
})

await emitter.emit('user.created', { userId: 'user-123' })

Utilities

NO_MESSAGE_TYPE_FIELD

Use this constant when your consumer should accept all message types without routing:

import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'

// Consumer will use a single handler for all messages
{
  messageTypeField: NO_MESSAGE_TYPE_FIELD,
  handlers: new MessageHandlerConfigBuilder()
    .addConfig(PassthroughSchema, async (message) => {
      // Handles any message type
      return { result: 'success' }
    })
    .build(),
}

Error Classes

import {
  MessageValidationError,
  MessageInvalidFormatError,
  DoNotProcessMessageError,
  RetryMessageLaterError,
} from '@message-queue-toolkit/core'

// Validation failed
throw new MessageValidationError(zodError.issues)

// Message format is invalid (cannot parse)
throw new MessageInvalidFormatError({ message: 'Invalid JSON' })

// Do not process this message (skip without retry)
throw new DoNotProcessMessageError({ message: 'Duplicate message' })

// Retry this message later
throw new RetryMessageLaterError({ message: 'Dependency not ready' })

Message Deduplication

Interfaces for implementing deduplication stores:

import type { MessageDeduplicationStore, ReleasableLock } from '@message-queue-toolkit/core'

// Implement custom deduplication store
class MyDeduplicationStore implements MessageDeduplicationStore {
  async keyExists(key: string): Promise<boolean> { /* ... */ }
  async setKey(key: string, ttlSeconds: number): Promise<void> { /* ... */ }
  async acquireLock(key: string, options: AcquireLockOptions): Promise<ReleasableLock> { /* ... */ }
}

Payload Offloading

Interfaces for implementing payload stores:

import type { PayloadStore, PayloadStoreConfig } from '@message-queue-toolkit/core'

// Implement custom payload store
class MyPayloadStore implements PayloadStore {
  async storePayload(payload: Buffer, messageId: string): Promise<PayloadRef> { /* ... */ }
  async retrievePayload(ref: PayloadRef): Promise<Buffer> { /* ... */ }
}

API Reference

Types

// Handler result type
type HandlerResult = Either<'retryLater', 'success'>

// Pre-handler signature
type Prehandler<Message, Context, Output> = (
  message: Message,
  context: Context,
  output: Output,
  next: (result: PrehandlerResult) => void
) => void

// Barrier signature
type BarrierCallback<Message, Context, PrehandlerOutput, BarrierOutput> = (
  message: Message,
  context: Context,
  preHandlerOutput: PrehandlerOutput
) => Promise<BarrierResult<BarrierOutput>>

// Barrier result
type BarrierResult<Output> =
  | { isPassing: true; output: Output }
  | { isPassing: false; output?: never }

Utility Functions

// Environment utilities
isProduction(): boolean
reloadConfig(): void

// Date utilities
isRetryDateExceeded(timestamp: string | Date, maxRetryDuration: number): boolean

// Message parsing
parseMessage<T>(data: unknown, schema: ZodSchema<T>): ParseMessageResult<T>

// Wait utilities
waitAndRetry<T>(fn: () => Promise<T>, options: WaitAndRetryOptions): Promise<T>

// Object utilities
objectMatches(obj: unknown, pattern: unknown): boolean
isShallowSubset(subset: object, superset: object): boolean

Links