npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

atomic-queues

v1.5.2

Published

A plug-and-play NestJS library for atomic process handling per entity with BullMQ, Redis distributed locking, and dynamic worker management

Readme


Why atomic-queues?

Distributed locks (Redlock, advisory locks, optimistic locking) all share the same fundamental flaw: contention collapse. When multiple pods fight for the same lock simultaneously, they spend more time retrying failed acquisitions than doing actual work. The harder you push, the slower they go.

atomic-queues eliminates contention entirely. Instead of locking, each entity gets its own dedicated BullMQ queue. Operations execute sequentially — back-to-back with zero wasted cycles. There's nothing to contend over.

atomic-queues vs Redlock

| | Redlock | atomic-queues | |---|---|---| | Architecture | Distributed mutex (quorum-based) | Per-entity queue (sequential) | | Under contention | Degrades — retry storms, backoff delays | Constant — jobs queue up, execute instantly | | Per-entity throughput | ~20-50 ops/s (heavy contention) | ~200-300 ops/s (queue-bound, no contention) | | Failure mode | Silent double-execution (clock drift) | Job stuck in queue (visible, retryable) | | Split-brain risk | Yes (timing assumptions) | Impossible (serial queue) | | Warm-path overhead | 5-7ms per op (acquire + release) | 0ms (in-memory hot cache) | | Cold-start | None | ~2-3ms one-time per entity | | Multi-pod scaling | Contention increases with pods | Throughput increases with pods |


Table of Contents


How It Works

The Problem

Every distributed system eventually hits this:

Time    Request A                    Request B                    Database
──────────────────────────────────────────────────────────────────────────
T₀      SELECT balance → $100        SELECT balance → $100        $100
T₁      CHECK: $100 ≥ $80 ✓          CHECK: $100 ≥ $80 ✓
T₂      UPDATE: $100 − $80 = $20                                  $20
T₃                                   UPDATE: $100 − $80 = $20     −$60
──────────────────────────────────────────────────────────────────────────
Result: Balance is −$60. Both withdrawals succeed. Integrity violated.

The Solution

atomic-queues routes operations through per-entity queues. Same entity → same queue → sequential execution. Different entities → parallel queues → full throughput.

                        ┌─────────────────────────────────────────────────┐
  Request A ─┐          │           Entity: account-42                    │
             │          │  ┌──────┐  ┌──────┐  ┌──────┐                  │
  Request B ─┼─► Route ─┼─►│ Op 1 │─►│ Op 2 │─►│ Op 3 │─► [Worker] ──┐  │
             │          │  └──────┘  └──────┘  └──────┘               │  │
  Request C ─┘          │                    Sequential ◄─────────────┘  │
                        └─────────────────────────────────────────────────┘

                        ┌─────────────────────────────────────────────────┐
  Request D ─┐          │           Entity: account-99                    │
             │          │  ┌──────┐  ┌──────┐                            │
  Request E ─┼─► Route ─┼─►│ Op 1 │─►│ Op 2 │─────────► [Worker] ──┐    │
             │          │  └──────┘  └──────┘                       │    │
  Request F ─┘          │                    Sequential ◄───────────┘    │
                        └─────────────────────────────────────────────────┘

                        ▲ These two queues run in PARALLEL across pods ▲

Key properties:

  • One worker per entity — enforced via Redis heartbeat TTL. No duplicates, ever.
  • Auto-spawn — workers materialize when jobs arrive, on the pod that sees them first.
  • Auto-terminate — idle workers shut down after a configurable timeout.
  • Self-healing — node failure → heartbeat expires → worker respawns on a healthy pod.
  • Distributed — workers spread across all pods via atomic SET NX claim. No leader election, no single point of failure.

Installation

# npm
npm install atomic-queues bullmq ioredis

# pnpm
pnpm add atomic-queues bullmq ioredis

# yarn
yarn add atomic-queues bullmq ioredis

Peer dependencies: NestJS 10+, @nestjs/cqrs (optional — for auto-routing commands/queries)


Quick Start

1. Configure the Module

import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { AtomicQueuesModule } from 'atomic-queues';

@Module({
  imports: [
    CqrsModule,
    AtomicQueuesModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      keyPrefix: 'myapp',
      entities: {
        account: {
          queueName: (id) => `account-${id}-queue`,
          workerName: (id) => `account-${id}-worker`,
          maxWorkersPerEntity: 1,
          idleTimeoutSeconds: 15,
        },
      },
    }),
  ],
})
export class AppModule {}

Tip: The entities config is optional. Without it, default naming applies: {keyPrefix}:{entityType}:{entityId}:queue.

2. Define Commands

import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class WithdrawCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
  ) {}
}

@QueueEntity('account')
export class DepositCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
  ) {}
}

3. Write Handlers (standard @nestjs/cqrs)

import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';

@CommandHandler(WithdrawCommand)
export class WithdrawHandler implements ICommandHandler<WithdrawCommand> {
  constructor(private readonly repo: AccountRepository) {}

  async execute({ accountId, amount }: WithdrawCommand) {
    // SAFE: No race conditions. Sequential execution per account.
    const account = await this.repo.findById(accountId);

    if (account.balance < amount) {
      throw new InsufficientFundsError(accountId, account.balance, amount);
    }

    account.balance -= amount;
    await this.repo.save(account);
  }
}

4. Enqueue Jobs

import { Injectable } from '@nestjs/common';
import { QueueBus } from 'atomic-queues';

@Injectable()
export class AccountService {
  constructor(private readonly queueBus: QueueBus) {}

  async withdraw(accountId: string, amount: number) {
    await this.queueBus.enqueue(new WithdrawCommand(accountId, amount));
  }
}

That's it. The library automatically:

  1. Creates a queue for each accountId when jobs arrive
  2. Spawns a worker (spread across pods) to process jobs sequentially
  3. Routes jobs to the correct @CommandHandler via CQRS
  4. Terminates idle workers after the configured timeout
  5. Self-heals if a pod dies (heartbeat expires → respawn elsewhere)

Commands & Decorators

@QueueEntity(entityType)

Marks a command/query class for queue routing.

@QueueEntity('account')
export class TransferCommand { ... }

@QueueEntityId()

Marks the property that contains the entity ID. One per class.

@QueueEntity('account')
export class TransferCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,  // Routes to this account's queue
    public readonly targetAccountId: string,
    public readonly amount: number,
  ) {}
}

@WorkerProcessor(options)

Optional. Define a processor class for custom job handling on top of CQRS auto-routing.

@WorkerProcessor({
  entityType: 'account',
  queueName: (id) => `account-${id}-queue`,
  workerName: (id) => `account-${id}-worker`,
  maxWorkersPerEntity: 1,
  idleTimeoutSeconds: 15,
})
@Injectable()
export class AccountProcessor {
  @JobHandler('special-audit')
  async handleAudit(job: Job, entityId: string) { ... }
}

@JobHandler(jobName) / @JobHandler('*')

Custom job handlers on a @WorkerProcessor. The wildcard '*' catches anything not matched by a specific handler.


Configuration

AtomicQueuesModule.forRoot({
  // ── Redis connection ──────────────────────────────────────
  redis: {
    host: 'redis',
    port: 6379,
    password: 'secret',       // optional
  },

  // ── Global settings ───────────────────────────────────────
  keyPrefix: 'myapp',          // Redis key namespace (default: 'aq')
  enableCronManager: true,     // Legacy cron-based scaling (optional)
  cronInterval: 5000,          // Cron tick interval in ms

  // ── Worker defaults ───────────────────────────────────────
  workerDefaults: {
    concurrency: 1,            // Jobs processed concurrently per worker
    stalledInterval: 1000,     // ms between stalled-job checks
    lockDuration: 30000,       // ms a job is locked during processing
    heartbeatTTL: 3,           // Heartbeat key TTL in seconds
  },

  // ── Per-entity configuration (optional) ───────────────────
  entities: {
    account: {
      queueName: (id) => `account-${id}-queue`,
      workerName: (id) => `account-${id}-worker`,
      maxWorkersPerEntity: 1,
      idleTimeoutSeconds: 15,
      defaultEntityId: 'accountId',
      workerConfig: {          // Override workerDefaults per entity
        concurrency: 1,
        lockDuration: 60000,
      },
    },
  },
});

Distributed Worker Lifecycle

Workers in atomic-queues have a fully automated lifecycle, distributed across all pods with no leader election:

  Job arrives                   SET NX claim
  on any pod  ──────►  ┌──────────────────────┐
                        │  Pod claims worker?   │
                        └──────┬───────┬───────┘
                           YES │       │ NO (another pod won)
                               ▼       ▼
                        ┌────────┐  ┌──────────────┐
                        │ Spawn  │  │ Wait — other │
                        │ worker │  │ pod handles  │
                        │ locally│  └──────────────┘
                        └───┬────┘
                            ▼
                     ┌──────────────┐
                     │  Processing  │◄──── Heartbeat refresh (pipeline)
                     │  jobs back-  │      every 1s (1 Redis round-trip)
                     │  to-back     │
                     └──────┬───────┘
                            │ No jobs for idleTimeoutSeconds
                            ▼
                     ┌──────────────┐
                     │  Idle sweep  │──── Hot cache eviction
                     │  closes      │     Heartbeat keys cleaned up
                     │  worker      │
                     └──────────────┘

Hot Cache (v1.5.0+)

After a worker is confirmed alive, subsequent job arrivals for that entity hit an in-memory cache — zero Redis calls on the warm path. This eliminates the per-job Redis overhead that plagues lock-based approaches.

| Path | Redis calls | When | |---|---|---| | Hot (cache hit) | 0 | Worker known alive | | Warm (cache miss) | 1 (EXISTS) | First time seeing entity | | Cold (no worker) | 1 (SET NX) | Worker needs creation |

SpawnQueueService (v1.4.2+)

For multi-pod deployments, the SpawnQueueService distributes worker creation across all pods via a shared BullMQ spawn queue. In v1.5.0, the direct local spawn path bypasses this queue entirely — the pod that first sees a job for a new entity claims it with an atomic SET NX and spawns the worker locally, saving hundreds of milliseconds.


Complete Example

A banking service with withdrawals, deposits, and cross-account transfers:

// ── Module ──────────────────────────────────────────────
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { AtomicQueuesModule } from 'atomic-queues';

@Module({
  imports: [
    CqrsModule,
    AtomicQueuesModule.forRoot({
      redis: { host: 'redis', port: 6379 },
      keyPrefix: 'banking',
      entities: {
        account: {
          queueName: (id) => `account-${id}-queue`,
          workerName: (id) => `account-${id}-worker`,
          maxWorkersPerEntity: 1,
          idleTimeoutSeconds: 15,
        },
      },
    }),
  ],
  providers: [
    AccountService,
    WithdrawHandler,
    DepositHandler,
    TransferHandler,
  ],
})
export class BankingModule {}

// ── Commands ────────────────────────────────────────────
import { QueueEntity, QueueEntityId } from 'atomic-queues';

@QueueEntity('account')
export class WithdrawCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly transactionId: string,
  ) {}
}

@QueueEntity('account')
export class DepositCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
    public readonly source: string,
  ) {}
}

@QueueEntity('account')
export class TransferCommand {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly toAccountId: string,
    public readonly amount: number,
  ) {}
}

// ── Handlers ────────────────────────────────────────────
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';

@CommandHandler(WithdrawCommand)
export class WithdrawHandler implements ICommandHandler<WithdrawCommand> {
  constructor(private readonly repo: AccountRepository) {}

  async execute({ accountId, amount }: WithdrawCommand) {
    const account = await this.repo.findById(accountId);
    if (account.balance < amount) throw new InsufficientFundsError();
    account.balance -= amount;
    await this.repo.save(account);
  }
}

@CommandHandler(TransferCommand)
export class TransferHandler implements ICommandHandler<TransferCommand> {
  constructor(
    private readonly repo: AccountRepository,
    private readonly queueBus: QueueBus,
  ) {}

  async execute({ accountId, toAccountId, amount }: TransferCommand) {
    // Debit source (we're in source account's queue — safe)
    const source = await this.repo.findById(accountId);
    if (source.balance < amount) throw new InsufficientFundsError();
    source.balance -= amount;
    await this.repo.save(source);

    // Credit destination (enqueued to destination's queue — also safe)
    await this.queueBus.enqueue(
      new DepositCommand(toAccountId, amount, `transfer:${accountId}`),
    );
  }
}

// ── Controller ──────────────────────────────────────────
import { Controller, Post, Body, Param } from '@nestjs/common';
import { QueueBus } from 'atomic-queues';

@Controller('accounts')
export class AccountController {
  constructor(private readonly queueBus: QueueBus) {}

  @Post(':id/withdraw')
  async withdraw(@Param('id') id: string, @Body() body: { amount: number }) {
    await this.queueBus.enqueue(new WithdrawCommand(id, body.amount, uuid()));
    return { queued: true };
  }

  @Post(':id/transfer')
  async transfer(
    @Param('id') id: string,
    @Body() body: { to: string; amount: number },
  ) {
    await this.queueBus.enqueue(new TransferCommand(id, body.to, body.amount));
    return { queued: true };
  }
}

Advanced: Custom Worker Processors

For cases where CQRS auto-routing isn't enough, define a @WorkerProcessor with explicit @JobHandler methods:

import { Injectable } from '@nestjs/common';
import { WorkerProcessor, JobHandler } from 'atomic-queues';
import { Job } from 'bullmq';

@WorkerProcessor({
  entityType: 'account',
  queueName: (id) => `account-${id}-queue`,
  workerName: (id) => `account-${id}-worker`,
  maxWorkersPerEntity: 1,
  idleTimeoutSeconds: 15,
})
@Injectable()
export class AccountProcessor {
  @JobHandler('high-priority-audit')
  async handleAudit(job: Job, entityId: string) {
    // Specific handler for this job type
  }

  @JobHandler('*')
  async handleAll(job: Job, entityId: string) {
    // Wildcard — catches everything not explicitly handled
    // Falls back to CQRS routing automatically when not defined
  }
}

Priority order: Explicit @JobHandler → CQRS auto-routing (@JobCommand/@JobQuery) → Wildcard handler


Performance

Throughput (measured — not estimated)

Tested on a 5-pod Kubernetes cluster (OrbStack), 20 concurrent entities, 12,300 orders:

| Metric | Result | |---|---| | Phase 1 — 10,000 orders (50 waves × 200 concurrent) | 167 orders/sec | | Phase 2 — 1,000 orders (workers still draining) | 140 orders/sec | | Phase 4 — 1,000 orders (cold start from zero workers) | 176 orders/sec | | Total deductions processed | 104,004 | | Stock drift | 0 (all 20 entities) | | Pod distribution | 5/5 pods actively creating workers | | Worker creates | 120 | | Idle closures | 180 |

Why it's fast

  1. Zero contention — no locks, no retries, no backoff. Jobs queue and execute.
  2. Hot cache — after first check, subsequent job arrivals for an entity incur 0 Redis calls.
  3. Direct local spawn — atomic SET NX claim, local worker creation. No queue round-trip.
  4. Pipelined heartbeats — heartbeat refresh uses a single Redis pipeline (1 round-trip for 2 keys).
  5. O(1) worker existence check — global alive key replaces KEYS pattern scan.

When to use what

| Use case | Recommendation | |---|---| | High-throughput entity operations (payments, inventory, game state) | atomic-queues | | Rare, low-frequency mutual exclusion (config updates, migrations) | Redlock / advisory locks | | Exactly-once semantics with audit trail | atomic-queues (BullMQ job IDs) | | Sub-millisecond synchronous response required | Redlock (synchronous acquire) | | Multi-pod, many entities, sustained load | atomic-queues (contention-free scaling) |


License

MIT