npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

atomic-queues

v3.0.0

Published

Per-entity sequential processing for Node.js — Worker Threads, gRPC cluster, strictly-once delivery

Readme


What is atomic-queues?

Per-entity sequential processing with virtual actors for NestJS.

One worker per entity instance, spawned on demand, destroyed when idle. The worker IS the serialization boundary. If only one worker exists for account:a-123 across the entire cluster, all operations on that account are serial by construction. No locks. No transactions. No race conditions.

Motto: Strictly once, fail if interrupted.

npm install atomic-queues ioredis

Peer dependencies: @nestjs/common, @nestjs/core, @nestjs/cqrs, ioredis

Optional: @grpc/grpc-js, @grpc/proto-loader (cluster mode), zod (CLI schema validation)


The Problem

Time    Request A                    Request B                    Database
──────────────────────────────────────────────────────────────────────────
T0      SELECT balance -> $100       SELECT balance -> $100       $100
T1      CHECK: $100 >= $80           CHECK: $100 >= $80
T2      UPDATE: $100 - $80 = $20                                  $20
T3                                   UPDATE: $100 - $80 = $20     -$60
──────────────────────────────────────────────────────────────────────────
Result: Balance is -$60. Both withdrawals succeed. Integrity violated.

Row locks, optimistic locking, Redlock — they all trade throughput for correctness.

The Insight

Don't lock the database. Don't lock the resource. Route all operations for a given entity through a single worker. That worker processes messages sequentially. Different entities have their own workers running concurrently.

  account:a-1  ──► [Worker] ──► handler1 → handler2 → handler3  (sequential)
  account:a-2  ──► [Worker] ──► handler1 → handler2              (sequential)
  order:o-5    ──► [Worker] ──► handler1                          (sequential)
                                                    (all concurrent across entities)

One worker per entity. Spawned when a message arrives. Destroyed when idle. The worker runs on the event loop — async I/O interleaves naturally across entities. No threads, no separate processes, no extra NestJS contexts.


Quick Start

1. Register the module

@Module({
  imports: [
    AtomicQueuesModule.forRoot({
      redis: { host: 'localhost', port: 6379 },
      entities: {
        account: {},
        order: { onInterrupt: 'dead-letter' },
      },
    }),
  ],
})
export class AppModule {}

2. Define commands

import { EntityType, QueueEntityId, Reply } from 'atomic-queues';

@EntityType('account')
class DepositCommand implements Reply<{ balance: number }> {
  constructor(
    @QueueEntityId() public readonly accountId: string,
    public readonly amount: number,
  ) {}
}

3. Handle commands

Standard @nestjs/cqrs handlers — nothing new to learn:

@CommandHandler(DepositCommand)
class DepositHandler implements ICommandHandler<DepositCommand> {
  async execute(cmd: DepositCommand) {
    // Runs sequentially per accountId — no concurrent deposits to the same account
    const balance = await this.accountService.deposit(cmd.accountId, cmd.amount);
    return { balance };
  }
}

4. Dispatch

import { QueueBus } from 'atomic-queues';

@Injectable()
class PaymentService {
  constructor(private readonly queueBus: QueueBus) {}

  async deposit(accountId: string, amount: number) {
    // Fire and forget
    await this.queueBus.enqueue(new DepositCommand(accountId, amount));

    // Wait for typed result (Reply<R> branding)
    const { balance } = await this.queueBus.enqueueAndWait(
      new DepositCommand(accountId, amount),
    );
  }
}

First message for account:a-123 spawns a worker. All subsequent messages for that account queue behind it. The handler runs on your app's event loop using your existing DI container.


Queries

Queries work identically to commands but route through the QueryBus. They are sequenced with commands — a query enqueued after a deposit will see the deposit's effect.

@EntityType('account')
class GetBalanceQuery implements Reply<{ balance: number }> {
  constructor(@QueueEntityId() public readonly accountId: string) {}
}

const { balance } = await queueBus.enqueueAndWait(new GetBalanceQuery('acc-123'));

How It Works

Virtual Actors (EntityWorker)

Each entity instance (account:a-123, order:o-5) gets its own virtual actor — a processor callback with a FIFO message queue. The actor:

  1. Spawns on first message (no pre-registration needed)
  2. Processes messages sequentially (one at a time, on the event loop)
  3. Yields at await points (other entities' actors proceed concurrently)
  4. Tears down after idle timeout (configurable, default 30s)

Write-Ahead Log (WAL)

Every message is dual-written: in-memory queue (speed) + Redis WAL (durability). The WAL is a state machine:

enqueued → dispatched → completed | failed | interrupted

Each transition is an atomic Lua script that checks the current state before moving forward. Recovery runs automatically on startup:

  • enqueued → re-dispatch (handler never ran — this is the first attempt, not a retry)
  • dispatcheddead-letter (handler was running when the process crashed — never re-execute)
  • completed / failed / interrupted → cleanup (stale terminal entries)

A background cleanup timer evicts terminal WAL entries on a configurable interval.

Master Topology (Cluster Mode)

Each replica set has a deterministic master — the node with the lowest serverId among live nodes in the same serviceGroup. No locks, no elections, no Redlock. All nodes read the same Redis-backed heartbeat registry and independently compute who the master is.

The master:

  • Owns the worker assignment table: which entity:entityId lives on which replica
  • Routes all petitions: replicas forward via gRPC to the master
  • Resolves workers via three tiers: existing assignment → consistent hash ring → least-loaded replica
  • Epoch fences every dispatch: replicas reject commands from stale masters
Replica Set: billing-service
┌──────────────────────────────────────────────┐
│  Master (deterministic: lowest serverId)     │
│  ├── Assignment Table                        │
│  │   account:a-1 → replica-2                 │
│  │   account:a-2 → replica-1                 │
│  └── Routes petitions, balances load         │
│                                              │
│  Replica-1: [worker: account:a-2]            │
│  Replica-2: [worker: account:a-1]            │
│  Replica-3: (master pod, no workers yet)     │
└──────────────────────────────────────────────┘

Masters interconnect across service groups:

Master (billing) ←── gRPC ──→ Master (warehouse)

Master Failover

  1. Master crashes → heartbeat TTL expires
  2. Remaining nodes recompute leader from node list → next-lowest serverId becomes master
  3. New master queries all replicas via gRPC ListWorkers
  4. Rebuilds assignment table from live cluster state (petitions rejected during rebuild — fail-fast over misrouting)
  5. Old master pushes its worker list to the new master on demotion
  6. Resumes operations

No split-brain: leadership is a pure function of the live node set. Epoch fencing rejects any stale-master commands that arrive during transitions.

Health Monitoring

Redis health: Periodic PING. Consecutive failures above threshold → degraded mode (reject new messages, leader resigns, discovery steps down). Automatic recovery when Redis responds again.

gRPC peer connectivity: Native gRPC channel state watching (READY → alive, TRANSIENT_FAILURE → suspected dead). Debounce timer prevents flapping on brief disconnects.

Per-peer circuit breakers: gRPC connections track consecutive failures. After threshold → circuit opens (fast-fail, no network calls). After cooldown → half-open (one probe). Success → closed. Failure → re-open.


Enqueuing Messages

// Fire-and-forget
await queueBus.enqueue(new WithdrawCommand(accountId, 100));

// Enqueue and wait for typed result
const { balance } = await queueBus.enqueueAndWait(new GetBalanceQuery(accountId));

// Scoped API
const account = queueBus.forEntity('account', accountId);
await account.enqueue(new DepositCommand(accountId, 500));

// Raw string API (cross-service, no class needed)
await queueBus.enqueue('warehouse', 'ReserveStockCommand', 'SKU-001', {
  sku: 'SKU-001', quantity: 50,
});

Backpressure

Three levels, all configurable:

| Level | Config | Behavior | |-------|--------|----------| | Per-worker | workerMaxQueueDepth | Rejects with QUEUE_DEPTH_EXCEEDED | | Global workers | maxTotalWorkers | Rejects new entities with WORKER_LIMIT_EXCEEDED (existing entities still accepted) | | Global depth | maxTotalQueueDepth | Rejects all enqueues with QUEUE_DEPTH_EXCEEDED |

In cluster mode, the master also enforces maxConcurrentPetitions to bound petition processing.


Configuration

Minimal (single server)

AtomicQueuesModule.forRoot({
  redis: { host: 'localhost', port: 6379 },
})

That's it. Everything else has defaults. Add entities to customize per-entity behavior, grpc to enable cluster mode.

Full reference

AtomicQueuesModule.forRoot(config)

| Field | Type | Required | Default | Description | |-------|------|----------|---------|-------------| | redis | IRedisConfig | yes | — | Redis connection. Accepts { host, port, password, db } or { url } | | entities | Record<string, IEntityConfig> | no | {} | Per-entity-type overrides (see below) | | keyPrefix | string | no | 'aq' | Prefix for all Redis keys | | maxTotalWorkers | number | no | 10000 | Max concurrent entity workers across all types. 0 = unbounded | | maxTotalQueueDepth | number | no | 100000 | Max total pending messages across all workers. 0 = unbounded | | retry | IRetryPolicy | no | { maxAttempts: 1 } | Default retry policy (strictly-once by default) | | wal | IWalConfig | no | { enabled: true } | Write-ahead log settings | | grpc | IGrpcConfig | no | { enabled: false } | Cluster mode — omit entirely for single-server | | verbose | boolean | no | false | Enable verbose logging |

IEntityConfig — per entity type

entities: {
  account: { /* all fields optional */ },
  order: { onInterrupt: 'dead-letter', workerIdleTimeout: 60_000 },
}

| Field | Type | Default | Description | |-------|------|---------|-------------| | defaultEntityId | string | — | Property name used as entity ID when @QueueEntityId is not present | | onInterrupt | 'dead-letter' \| 'retry' | 'dead-letter' | What to do when a message is found mid-execution on recovery | | workerIdleTimeout | number (ms) | 30000 | How long an idle worker lives before teardown | | workerMaxQueueDepth | number | 0 (unbounded) | Max pending messages per worker. Rejects with QUEUE_DEPTH_EXCEEDED | | replyTimeout | number (ms) | 5000 | Default timeout for enqueueAndWait on this entity type | | retry | IRetryPolicy | inherits root | Per-entity retry policy override |

IRetryPolicy

| Field | Type | Default | Description | |-------|------|---------|-------------| | maxAttempts | number | 1 | Total attempts. 1 = strictly once, no retries | | backoff | 'fixed' \| 'exponential' | 'exponential' | Backoff strategy between retries | | backoffDelay | number (ms) | 1000 | Base delay between retries | | maxDelay | number (ms) | 30000 | Maximum delay cap for exponential backoff |

IWalConfig — write-ahead log

| Field | Type | Default | Description | |-------|------|---------|-------------| | enabled | boolean | true | Disable WAL for testing only — never disable in production | | cleanupInterval | number (ms) | 5000 | How often to evict completed/failed WAL entries | | entryTTL | number (seconds) | 86400 (24h) | Safety TTL for WAL entries in Redis |

IGrpcConfig — cluster mode

Omit entirely for single-server. Set enabled: true to activate.

grpc: {
  enabled: true,
  listenAddress: '0.0.0.0:50051',
  advertisedAddress: '10.0.1.5:50051',
  serverId: 'billing-1',
  serviceGroup: 'billing',
}

| Field | Type | Default | Description | |-------|------|---------|-------------| | enabled | boolean | false | Enable gRPC cluster transport | | listenAddress | string | '0.0.0.0:50051' | Address the gRPC server binds to | | advertisedAddress | string | os.hostname() + ':50051' | Address other nodes use to reach this one | | serverId | string | auto-generated UUID | Unique node ID. Must be stable across restarts for predictable leader election | | serviceGroup | string | 'default' | Logical grouping — nodes in the same group form a replica set | | maxForwardHops | number | 3 | Max cross-service forwarding hops to prevent loops | | maxConcurrentPetitions | number | 50 | Max in-flight petitions the master processes. 0 = unbounded |

Timing (ms)

| Field | Default | Description | |-------|---------|-------------| | heartbeatMs | 400 | How often this node heartbeats to Redis | | nodeTTLMs | 1500 | Node considered dead after this long without heartbeat | | reconcileIntervalMs | 2000 | How often to scan Redis for membership changes | | leaderTTLMs | 2000 | Leader lock TTL | | leaderRenewalMs | 400 | Leader lock renewal interval | | leaderDebounceMs | 800 | Debounce window before recomputing leader after ring changes |

Health monitoring

| Field | Default | Description | |-------|---------|-------------| | peerMonitorEnabled | true | Watch gRPC channel state for fast failure detection | | peerSuspectDebounceMs | 500 | Debounce before declaring a peer suspected-dead | | redisHealthCheckMs | 500 | Redis PING interval | | redisHealthFailureThreshold | 3 | Consecutive PING failures before degraded mode |

Circuit breaker (per-peer gRPC connections)

| Field | Default | Description | |-------|---------|-------------| | circuitBreakerFailureThreshold | 3 | Consecutive failures before opening the circuit | | circuitBreakerCooldownMs | 2000 | Time before a half-open probe is allowed |

gRPC keepalive

| Field | Default | Description | |-------|---------|-------------| | keepaliveTimeMs | 10000 | Keepalive ping interval (minimum enforced by grpc-js) | | keepaliveTimeoutMs | 5000 | Connection dead if no keepalive response |

RPC deadlines (deadlines sub-object)

| Field | Default | Description | |-------|---------|-------------| | deadlines.forwardMs | 1500 | Deadline for fire-and-forget RPCs (forward, petition, enqueueToWorker) | | deadlines.pingMs | 1000 | Deadline for health ping | | deadlines.andWaitMs | 60000 | Default deadline for *AndWait RPCs when no replyTimeout is set | | deadlines.syncMs | 1000 | Deadline for listWorkers during master table rebuild | | deadlines.connectivityWatchMs | 30000 | Timeout for peer connectivity watch loop re-arm |


Dead Letter Queue

Messages found in dispatched state on recovery, or that exhaust all retry attempts, are moved to a Redis-backed dead letter queue.

npx atomic-queues dlq list
npx atomic-queues dlq replay --id <message-id>
npx atomic-queues dlq purge

CLI

# Inspect live entity/command/query registry from Redis
npx atomic-queues introspect

# Generate TypeScript from the live registry
npx atomic-queues generate --classes -o ./src/generated   # decorated class files
npx atomic-queues generate --ts -o ./src/generated        # namespace interfaces + DispatchMap
npx atomic-queues generate --json-schema -o ./src/generated

Guarantees

| Guarantee | Mechanism | |---|---| | FIFO per entity | One worker per entity:entityId with FIFO queue | | Single-writer per entity | Only one worker exists across the cluster | | At-most-once delivery | WAL: enqueued → dispatched → completed. Never re-executed after dispatch. | | Fail if interrupted | Dispatched on crash → dead-lettered, source notified | | Concurrent across entities | Event loop interleaves at await points | | Durability | Redis WAL (dual-write: in-memory + Redis) | | Auto-recovery | WAL recovery + cleanup run automatically on startup | | Cluster coordination | Deterministic master topology with gRPC | | Master failover | Heartbeat expiry → deterministic re-election + assignment table rebuild | | Epoch fencing | Replicas reject commands from stale masters | | No distributed locks | The worker IS the serialization — not a lock, not Redlock, not SET NX |


Design Philosophy

AtomicQueues is pessimistic by design. At every decision point, it chooses safety over liveness:

  • Interrupted? Dead-letter, don't retry.
  • Redis down? Reject new work, don't buffer.
  • Stale epoch? Reject, don't process.
  • Master rebuilding? Reject petitions, don't guess.
  • Unknown assignment? Bounce and retry through the master, don't deliver speculatively.

The system refuses to operate under uncertainty rather than risk executing a message twice.


Migrating from v2

Removed: executor, registry, gateTTL, ActorSystem, LogService, GateService, SchedulerService, ExecutorPoolService, ResultCollector, RegistryService, workers config, WorkerModule.

Added: EntityWorker, EntityWorkerManager, MasterCoordinator, workerIdleTimeout in entity config.

Unchanged: All decorators, QueueBus public API, CLI generators.

Migration: Remove executor/registry/workers from config. That's it. Workers are now internal.


License

MIT