npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

atomic-queues

v1.4.1

Published

A plug-and-play NestJS library for atomic process handling per entity with BullMQ, Redis distributed locking, and dynamic worker management

Readme

atomic-queues

A NestJS library for atomic, sequential job processing per entity using BullMQ and Redis.


Table of Contents


Overview

atomic-queues solves the fundamental concurrency problem in distributed systems: ensuring that operations on the same logical entity execute sequentially, even when requests arrive simultaneously across multiple service instances.

Rather than relying on distributed locks—which introduce contention, latency degradation, and complex failure modes—this library implements a per-entity queue architecture where each entity (user account, order, document) has its own dedicated processing queue and worker.


The Concurrency Problem

Consider a banking system where a user with a $100 balance submits two concurrent $80 withdrawal requests:

Time    Request A                    Request B                    Database State
─────────────────────────────────────────────────────────────────────────────────
T₀      SELECT balance → $100        SELECT balance → $100        balance = $100
T₁      CHECK: $100 >= $80 ✓         CHECK: $100 >= $80 ✓              
T₂      UPDATE: balance = $20        UPDATE: balance = $20        balance = $20
T₃                                   UPDATE: balance = -$60       balance = -$60
─────────────────────────────────────────────────────────────────────────────────
Result: Both withdrawals succeed. Balance becomes -$60. Integrity violated.

With atomic-queues, operations are queued and processed sequentially:

Time    Queue State                 Worker Execution              Database State
───────────────────────────────────────────────────────────────────────────────────
T₀      [Withdraw $80, Withdraw $80]                              balance = $100
T₁      [Withdraw $80]              Process Op₁: $100 - $80       balance = $20
T₂      []                          Process Op₂: $20 < $80 → REJECT   balance = $20
───────────────────────────────────────────────────────────────────────────────────
Result: First withdrawal succeeds. Second is rejected. Integrity preserved.

The Per-Entity Queue Architecture

                                    ┌─────────────────────────────────────────┐
   Request A ─┐                     │         Per-Entity Queue                │
              │                     │  ┌─────┐ ┌─────┐ ┌─────┐               │
   Request B ─┼──▶ [Entity Router] ─┼─▶│ Op₁ │→│ Op₂ │→│ Op₃ │→ [Worker] ─┐ │
              │                     │  └─────┘ └─────┘ └─────┘             │ │
   Request C ─┘                     │                                      │ │
                                    │      Sequential Processing ◄─────────┘ │
                                    └─────────────────────────────────────────┘

Key features:

  • Each entity has exactly one active worker (enforced via Redis heartbeat)
  • Workers spawn automatically when jobs arrive
  • Workers terminate after configurable idle period
  • Node failure → heartbeat expires → worker respawns on healthy node

Installation

npm install atomic-queues bullmq ioredis

Quick Start

1. Configure the Module

The entities configuration is optional. Choose the approach that fits your needs:

Option A: Minimal Setup (uses default naming)

import { Module } from '@nestjs/common';
import { AtomicQueuesModule } from 'atomic-queues';

@Module({
  imports: [
    AtomicQueuesModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      keyPrefix: 'myapp',
      enableCronManager: true,
      // No entities config needed! Uses default naming:
      // Queue: {keyPrefix}:{entityType}:{entityId}:queue
      // Worker: {keyPrefix}:{entityType}:{entityId}:worker
    }),
  ],
})
export class AppModule {}

Option B: Custom Queue/Worker Naming (via entities config)

@Module({
  imports: [
    AtomicQueuesModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      keyPrefix: 'myapp',
      enableCronManager: true,
      
      // Optional: Define custom naming and settings per entity type
      entities: {
        account: {
          queueName: (id) => `${id}-queue`,        // Custom queue naming
          workerName: (id) => `${id}-worker`,      // Custom worker naming
          maxWorkersPerEntity: 1,
          idleTimeoutSeconds: 15,
        },
      },
    }),
  ],
})
export class AppModule {}

Option C: Custom Naming via @WorkerProcessor

For advanced use cases, define a processor class instead of entities config:

@WorkerProcessor({
  entityType: 'account',
  queueName: (id) => `${id}-queue`,
  workerName: (id) => `${id}-worker`,
  maxWorkersPerEntity: 1,
  idleTimeoutSeconds: 15,
})
@Injectable()
export class AccountProcessor {}

When to use each:

  • Option A: Default naming works for you
  • Option B: Need custom naming but no custom job handling logic
  • Option C: Need custom naming AND custom @JobHandler methods

2. Create Commands with Decorators

import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class WithdrawCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly transactionId: string,
  ) {}
}

@QueueEntity('account')
export class DepositCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly source: string,
  ) {}
}

3. Create Command Handlers (standard @nestjs/cqrs)

import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { WithdrawCommand } from './commands';

@CommandHandler(WithdrawCommand)
export class WithdrawHandler implements ICommandHandler<WithdrawCommand> {
  constructor(private readonly accountRepo: AccountRepository) {}

  async execute(command: WithdrawCommand) {
    const { accountId, amount, transactionId } = command;
    
    // SAFE: No race conditions! Processed sequentially per account.
    const account = await this.accountRepo.findById(accountId);
    
    if (account.balance < amount) {
      throw new InsufficientFundsError(accountId, account.balance, amount);
    }
    
    account.balance -= amount;
    await this.accountRepo.save(account);
    
    return { success: true, newBalance: account.balance };
  }
}

4. Enqueue Jobs

import { Injectable } from '@nestjs/common';
import { QueueBus } from 'atomic-queues';
import { WithdrawCommand, DepositCommand } from './commands';

@Injectable()
export class AccountService {
  constructor(private readonly queueBus: QueueBus) {}

  async withdraw(accountId: string, amount: number, transactionId: string) {
    // Command is automatically routed to the account's queue
    await this.queueBus.enqueue(new WithdrawCommand(accountId, amount, transactionId));
  }

  async deposit(accountId: string, amount: number, source: string) {
    await this.queueBus.enqueue(new DepositCommand(accountId, amount, source));
  }
}

That's it! The library automatically:

  • Creates a queue for each accountId when jobs arrive
  • Spawns a worker to process jobs sequentially
  • Routes jobs to the correct @CommandHandler
  • Terminates idle workers after the configured timeout

Commands and Decorators

@QueueEntity(entityType)

Marks a command class for queue routing. The entityType must match a key in your entities config.

@QueueEntity('account')
export class TransferCommand { ... }

@QueueEntityId()

Marks which property contains the entity ID for queue routing. Only one per class.

@QueueEntity('account')
export class TransferCommand {
  constructor(
    @QueueEntityId() public readonly sourceAccountId: string,  // Routes to source account's queue
    public readonly targetAccountId: string,
    public readonly amount: number,
  ) {}
}

Alternative: Use defaultEntityId

If all commands for an entity use the same property name, configure it once:

// In module config
entities: {
  account: {
    defaultEntityId: 'accountId',  // Commands without @QueueEntityId use this
    // ...
  },
}

// Then commands don't need @QueueEntityId
@QueueEntity('account')
export class WithdrawCommand {
  constructor(
    public readonly accountId: string,  // Automatically used
    public readonly amount: number,
  ) {}
}

Configuration

AtomicQueuesModule.forRoot({
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'secret',
  },
  
  keyPrefix: 'myapp',            // Redis key prefix (default: 'aq')
  enableCronManager: true,       // Enable worker lifecycle management
  cronInterval: 5000,            // Scaling check interval (ms)
  
  workerDefaults: {
    concurrency: 1,              // Jobs processed simultaneously
    stalledInterval: 1000,       // Stalled job check interval (ms)
    lockDuration: 30000,         // Job lock duration (ms)
    heartbeatTTL: 3,             // Worker heartbeat TTL (seconds)
  },
  
  // OPTIONAL: Per-entity configuration
  // If omitted, uses default naming: {keyPrefix}:{entityType}:{entityId}:queue/worker
  entities: {
    account: {
      defaultEntityId: 'accountId',
      queueName: (id) => `${id}-queue`,
      workerName: (id) => `${id}-worker`,
      maxWorkersPerEntity: 1,
      idleTimeoutSeconds: 15,
      autoSpawn: true,           // Default: true
      workerConfig: {            // Override defaults per entity
        concurrency: 1,
        lockDuration: 60000,
      },
    },
    order: {
      defaultEntityId: 'orderId',
      queueName: (id) => `order-${id}-queue`,
      idleTimeoutSeconds: 30,
    },
  },
});

Complete Example

A banking service handling financial transactions:

// ─────────────────────────────────────────────────────────────────
// app.module.ts
// ─────────────────────────────────────────────────────────────────
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { AtomicQueuesModule } from 'atomic-queues';

@Module({
  imports: [
    CqrsModule,
    AtomicQueuesModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      keyPrefix: 'banking',
      enableCronManager: true,
      entities: {
        account: {
          queueName: (id) => `${id}-queue`,
          workerName: (id) => `${id}-worker`,
          maxWorkersPerEntity: 1,
          idleTimeoutSeconds: 15,
          workerConfig: {
            concurrency: 1,
            lockDuration: 60000,
          },
        },
      },
    }),
  ],
  providers: [
    AccountService,
    WithdrawHandler,
    DepositHandler,
    TransferHandler,
  ],
  controllers: [AccountController],
})
export class AppModule {}

// ─────────────────────────────────────────────────────────────────
// commands/withdraw.command.ts
// ─────────────────────────────────────────────────────────────────
import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class WithdrawCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly transactionId: string,
  ) {}
}

// ─────────────────────────────────────────────────────────────────
// commands/deposit.command.ts
// ─────────────────────────────────────────────────────────────────
import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class DepositCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly source: string,
  ) {}
}

// ─────────────────────────────────────────────────────────────────
// commands/transfer.command.ts
// ─────────────────────────────────────────────────────────────────
import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class TransferCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,  // Source account
    public readonly toAccountId: string,
    public readonly amount: number,
    public readonly transactionId: string,
  ) {}
}

// ─────────────────────────────────────────────────────────────────
// handlers/withdraw.handler.ts
// ─────────────────────────────────────────────────────────────────
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { WithdrawCommand } from '../commands';

@CommandHandler(WithdrawCommand)
export class WithdrawHandler implements ICommandHandler<WithdrawCommand> {
  constructor(private readonly accountRepo: AccountRepository) {}

  async execute(command: WithdrawCommand) {
    const { accountId, amount } = command;
    
    // SAFE: Sequential execution per account
    const account = await this.accountRepo.findById(accountId);
    
    if (account.balance < amount) {
      throw new InsufficientFundsError(accountId, account.balance, amount);
    }
    
    account.balance -= amount;
    await this.accountRepo.save(account);
    
    return { success: true, newBalance: account.balance };
  }
}

// ─────────────────────────────────────────────────────────────────
// handlers/transfer.handler.ts
// ─────────────────────────────────────────────────────────────────
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { TransferCommand, DepositCommand } from '../commands';
import { QueueBus } from 'atomic-queues';

@CommandHandler(TransferCommand)
export class TransferHandler implements ICommandHandler<TransferCommand> {
  constructor(
    private readonly accountRepo: AccountRepository,
    private readonly queueBus: QueueBus,
  ) {}

  async execute(command: TransferCommand) {
    const { accountId, toAccountId, amount } = command;
    
    // Debit source (we're in source account's queue)
    const source = await this.accountRepo.findById(accountId);
    if (source.balance < amount) {
      throw new InsufficientFundsError(accountId, source.balance, amount);
    }
    
    source.balance -= amount;
    await this.accountRepo.save(source);
    
    // Credit destination (enqueued to destination's queue)
    await this.queueBus.enqueue(new DepositCommand(
      toAccountId,
      amount,
      `transfer:${accountId}`,
    ));
    
    return { success: true };
  }
}

// ─────────────────────────────────────────────────────────────────
// account.controller.ts
// ─────────────────────────────────────────────────────────────────
import { Controller, Post, Body, Param } from '@nestjs/common';
import { QueueBus } from 'atomic-queues';
import { WithdrawCommand, TransferCommand } from './commands';
import { v4 as uuid } from 'uuid';

@Controller('accounts')
export class AccountController {
  constructor(private readonly queueBus: QueueBus) {}

  @Post(':accountId/withdraw')
  async withdraw(
    @Param('accountId') accountId: string,
    @Body() body: { amount: number },
  ) {
    const transactionId = uuid();
    
    await this.queueBus.enqueue(
      new WithdrawCommand(accountId, body.amount, transactionId)
    );

    return { queued: true, transactionId };
  }

  @Post(':accountId/transfer')
  async transfer(
    @Param('accountId') accountId: string,
    @Body() body: { toAccountId: string; amount: number },
  ) {
    const transactionId = uuid();
    
    await this.queueBus.enqueue(
      new TransferCommand(accountId, body.toAccountId, body.amount, transactionId)
    );

    return { queued: true, transactionId };
  }
}

Advanced: Custom Worker Processors

For special cases where you need custom job handling logic, you can still define a @WorkerProcessor:

import { Injectable } from '@nestjs/common';
import { WorkerProcessor, JobHandler } from 'atomic-queues';
import { Job } from 'bullmq';

@WorkerProcessor({
  entityType: 'account',
  queueName: (id) => `${id}-queue`,
  workerName: (id) => `${id}-worker`,
  maxWorkersPerEntity: 1,
  idleTimeoutSeconds: 15,
})
@Injectable()
export class AccountProcessor {
  // Custom handler for specific job types
  @JobHandler('special-operation')
  async handleSpecialOperation(job: Job, entityId: string) {
    // Custom logic here
  }

  // Wildcard handler for everything else
  @JobHandler('*')
  async handleAll(job: Job, entityId: string) {
    // Falls back to CQRS routing automatically
  }
}

Note: When you define a @WorkerProcessor for an entity type, it takes precedence over config-based default registration.


License

MIT