atomic-queues
v3.0.0
Published
Per-entity sequential processing for Node.js — Worker Threads, gRPC cluster, strictly-once delivery
Maintainers
Readme
What is atomic-queues?
Per-entity sequential processing with virtual actors for NestJS.
One worker per entity instance, spawned on demand, destroyed when idle. The worker IS the serialization boundary. If only one worker exists for account:a-123 across the entire cluster, all operations on that account are serial by construction. No locks. No transactions. No race conditions.
Motto: Strictly once, fail if interrupted.
npm install atomic-queues ioredisPeer dependencies: @nestjs/common, @nestjs/core, @nestjs/cqrs, ioredis
Optional: @grpc/grpc-js, @grpc/proto-loader (cluster mode), zod (CLI schema validation)
The Problem
Time Request A Request B Database
──────────────────────────────────────────────────────────────────────────
T0 SELECT balance -> $100 SELECT balance -> $100 $100
T1 CHECK: $100 >= $80 CHECK: $100 >= $80
T2 UPDATE: $100 - $80 = $20 $20
T3 UPDATE: $100 - $80 = $20 -$60
──────────────────────────────────────────────────────────────────────────
Result: Balance is -$60. Both withdrawals succeed. Integrity violated.Row locks, optimistic locking, Redlock — they all trade throughput for correctness.
The Insight
Don't lock the database. Don't lock the resource. Route all operations for a given entity through a single worker. That worker processes messages sequentially. Different entities have their own workers running concurrently.
account:a-1 ──► [Worker] ──► handler1 → handler2 → handler3 (sequential)
account:a-2 ──► [Worker] ──► handler1 → handler2 (sequential)
order:o-5 ──► [Worker] ──► handler1 (sequential)
(all concurrent across entities)One worker per entity. Spawned when a message arrives. Destroyed when idle. The worker runs on the event loop — async I/O interleaves naturally across entities. No threads, no separate processes, no extra NestJS contexts.
Quick Start
1. Register the module
@Module({
imports: [
AtomicQueuesModule.forRoot({
redis: { host: 'localhost', port: 6379 },
entities: {
account: {},
order: { onInterrupt: 'dead-letter' },
},
}),
],
})
export class AppModule {}2. Define commands
import { EntityType, QueueEntityId, Reply } from 'atomic-queues';
@EntityType('account')
class DepositCommand implements Reply<{ balance: number }> {
constructor(
@QueueEntityId() public readonly accountId: string,
public readonly amount: number,
) {}
}3. Handle commands
Standard @nestjs/cqrs handlers — nothing new to learn:
@CommandHandler(DepositCommand)
class DepositHandler implements ICommandHandler<DepositCommand> {
async execute(cmd: DepositCommand) {
// Runs sequentially per accountId — no concurrent deposits to the same account
const balance = await this.accountService.deposit(cmd.accountId, cmd.amount);
return { balance };
}
}4. Dispatch
import { QueueBus } from 'atomic-queues';
@Injectable()
class PaymentService {
constructor(private readonly queueBus: QueueBus) {}
async deposit(accountId: string, amount: number) {
// Fire and forget
await this.queueBus.enqueue(new DepositCommand(accountId, amount));
// Wait for typed result (Reply<R> branding)
const { balance } = await this.queueBus.enqueueAndWait(
new DepositCommand(accountId, amount),
);
}
}First message for account:a-123 spawns a worker. All subsequent messages for that account queue behind it. The handler runs on your app's event loop using your existing DI container.
Queries
Queries work identically to commands but route through the QueryBus. They are sequenced with commands — a query enqueued after a deposit will see the deposit's effect.
@EntityType('account')
class GetBalanceQuery implements Reply<{ balance: number }> {
constructor(@QueueEntityId() public readonly accountId: string) {}
}
const { balance } = await queueBus.enqueueAndWait(new GetBalanceQuery('acc-123'));How It Works
Virtual Actors (EntityWorker)
Each entity instance (account:a-123, order:o-5) gets its own virtual actor — a processor callback with a FIFO message queue. The actor:
- Spawns on first message (no pre-registration needed)
- Processes messages sequentially (one at a time, on the event loop)
- Yields at
awaitpoints (other entities' actors proceed concurrently) - Tears down after idle timeout (configurable, default 30s)
Write-Ahead Log (WAL)
Every message is dual-written: in-memory queue (speed) + Redis WAL (durability). The WAL is a state machine:
enqueued → dispatched → completed | failed | interruptedEach transition is an atomic Lua script that checks the current state before moving forward. Recovery runs automatically on startup:
enqueued→ re-dispatch (handler never ran — this is the first attempt, not a retry)dispatched→ dead-letter (handler was running when the process crashed — never re-execute)completed/failed/interrupted→ cleanup (stale terminal entries)
A background cleanup timer evicts terminal WAL entries on a configurable interval.
Master Topology (Cluster Mode)
Each replica set has a deterministic master — the node with the lowest serverId among live nodes in the same serviceGroup. No locks, no elections, no Redlock. All nodes read the same Redis-backed heartbeat registry and independently compute who the master is.
The master:
- Owns the worker assignment table: which
entity:entityIdlives on which replica - Routes all petitions: replicas forward via gRPC to the master
- Resolves workers via three tiers: existing assignment → consistent hash ring → least-loaded replica
- Epoch fences every dispatch: replicas reject commands from stale masters
Replica Set: billing-service
┌──────────────────────────────────────────────┐
│ Master (deterministic: lowest serverId) │
│ ├── Assignment Table │
│ │ account:a-1 → replica-2 │
│ │ account:a-2 → replica-1 │
│ └── Routes petitions, balances load │
│ │
│ Replica-1: [worker: account:a-2] │
│ Replica-2: [worker: account:a-1] │
│ Replica-3: (master pod, no workers yet) │
└──────────────────────────────────────────────┘Masters interconnect across service groups:
Master (billing) ←── gRPC ──→ Master (warehouse)Master Failover
- Master crashes → heartbeat TTL expires
- Remaining nodes recompute leader from node list → next-lowest
serverIdbecomes master - New master queries all replicas via gRPC
ListWorkers - Rebuilds assignment table from live cluster state (petitions rejected during rebuild — fail-fast over misrouting)
- Old master pushes its worker list to the new master on demotion
- Resumes operations
No split-brain: leadership is a pure function of the live node set. Epoch fencing rejects any stale-master commands that arrive during transitions.
Health Monitoring
Redis health: Periodic PING. Consecutive failures above threshold → degraded mode (reject new messages, leader resigns, discovery steps down). Automatic recovery when Redis responds again.
gRPC peer connectivity: Native gRPC channel state watching (READY → alive, TRANSIENT_FAILURE → suspected dead). Debounce timer prevents flapping on brief disconnects.
Per-peer circuit breakers: gRPC connections track consecutive failures. After threshold → circuit opens (fast-fail, no network calls). After cooldown → half-open (one probe). Success → closed. Failure → re-open.
Enqueuing Messages
// Fire-and-forget
await queueBus.enqueue(new WithdrawCommand(accountId, 100));
// Enqueue and wait for typed result
const { balance } = await queueBus.enqueueAndWait(new GetBalanceQuery(accountId));
// Scoped API
const account = queueBus.forEntity('account', accountId);
await account.enqueue(new DepositCommand(accountId, 500));
// Raw string API (cross-service, no class needed)
await queueBus.enqueue('warehouse', 'ReserveStockCommand', 'SKU-001', {
sku: 'SKU-001', quantity: 50,
});Backpressure
Three levels, all configurable:
| Level | Config | Behavior |
|-------|--------|----------|
| Per-worker | workerMaxQueueDepth | Rejects with QUEUE_DEPTH_EXCEEDED |
| Global workers | maxTotalWorkers | Rejects new entities with WORKER_LIMIT_EXCEEDED (existing entities still accepted) |
| Global depth | maxTotalQueueDepth | Rejects all enqueues with QUEUE_DEPTH_EXCEEDED |
In cluster mode, the master also enforces maxConcurrentPetitions to bound petition processing.
Configuration
Minimal (single server)
AtomicQueuesModule.forRoot({
redis: { host: 'localhost', port: 6379 },
})That's it. Everything else has defaults. Add entities to customize per-entity behavior, grpc to enable cluster mode.
Full reference
AtomicQueuesModule.forRoot(config)
| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| redis | IRedisConfig | yes | — | Redis connection. Accepts { host, port, password, db } or { url } |
| entities | Record<string, IEntityConfig> | no | {} | Per-entity-type overrides (see below) |
| keyPrefix | string | no | 'aq' | Prefix for all Redis keys |
| maxTotalWorkers | number | no | 10000 | Max concurrent entity workers across all types. 0 = unbounded |
| maxTotalQueueDepth | number | no | 100000 | Max total pending messages across all workers. 0 = unbounded |
| retry | IRetryPolicy | no | { maxAttempts: 1 } | Default retry policy (strictly-once by default) |
| wal | IWalConfig | no | { enabled: true } | Write-ahead log settings |
| grpc | IGrpcConfig | no | { enabled: false } | Cluster mode — omit entirely for single-server |
| verbose | boolean | no | false | Enable verbose logging |
IEntityConfig — per entity type
entities: {
account: { /* all fields optional */ },
order: { onInterrupt: 'dead-letter', workerIdleTimeout: 60_000 },
}| Field | Type | Default | Description |
|-------|------|---------|-------------|
| defaultEntityId | string | — | Property name used as entity ID when @QueueEntityId is not present |
| onInterrupt | 'dead-letter' \| 'retry' | 'dead-letter' | What to do when a message is found mid-execution on recovery |
| workerIdleTimeout | number (ms) | 30000 | How long an idle worker lives before teardown |
| workerMaxQueueDepth | number | 0 (unbounded) | Max pending messages per worker. Rejects with QUEUE_DEPTH_EXCEEDED |
| replyTimeout | number (ms) | 5000 | Default timeout for enqueueAndWait on this entity type |
| retry | IRetryPolicy | inherits root | Per-entity retry policy override |
IRetryPolicy
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| maxAttempts | number | 1 | Total attempts. 1 = strictly once, no retries |
| backoff | 'fixed' \| 'exponential' | 'exponential' | Backoff strategy between retries |
| backoffDelay | number (ms) | 1000 | Base delay between retries |
| maxDelay | number (ms) | 30000 | Maximum delay cap for exponential backoff |
IWalConfig — write-ahead log
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| enabled | boolean | true | Disable WAL for testing only — never disable in production |
| cleanupInterval | number (ms) | 5000 | How often to evict completed/failed WAL entries |
| entryTTL | number (seconds) | 86400 (24h) | Safety TTL for WAL entries in Redis |
IGrpcConfig — cluster mode
Omit entirely for single-server. Set enabled: true to activate.
grpc: {
enabled: true,
listenAddress: '0.0.0.0:50051',
advertisedAddress: '10.0.1.5:50051',
serverId: 'billing-1',
serviceGroup: 'billing',
}| Field | Type | Default | Description |
|-------|------|---------|-------------|
| enabled | boolean | false | Enable gRPC cluster transport |
| listenAddress | string | '0.0.0.0:50051' | Address the gRPC server binds to |
| advertisedAddress | string | os.hostname() + ':50051' | Address other nodes use to reach this one |
| serverId | string | auto-generated UUID | Unique node ID. Must be stable across restarts for predictable leader election |
| serviceGroup | string | 'default' | Logical grouping — nodes in the same group form a replica set |
| maxForwardHops | number | 3 | Max cross-service forwarding hops to prevent loops |
| maxConcurrentPetitions | number | 50 | Max in-flight petitions the master processes. 0 = unbounded |
Timing (ms)
| Field | Default | Description |
|-------|---------|-------------|
| heartbeatMs | 400 | How often this node heartbeats to Redis |
| nodeTTLMs | 1500 | Node considered dead after this long without heartbeat |
| reconcileIntervalMs | 2000 | How often to scan Redis for membership changes |
| leaderTTLMs | 2000 | Leader lock TTL |
| leaderRenewalMs | 400 | Leader lock renewal interval |
| leaderDebounceMs | 800 | Debounce window before recomputing leader after ring changes |
Health monitoring
| Field | Default | Description |
|-------|---------|-------------|
| peerMonitorEnabled | true | Watch gRPC channel state for fast failure detection |
| peerSuspectDebounceMs | 500 | Debounce before declaring a peer suspected-dead |
| redisHealthCheckMs | 500 | Redis PING interval |
| redisHealthFailureThreshold | 3 | Consecutive PING failures before degraded mode |
Circuit breaker (per-peer gRPC connections)
| Field | Default | Description |
|-------|---------|-------------|
| circuitBreakerFailureThreshold | 3 | Consecutive failures before opening the circuit |
| circuitBreakerCooldownMs | 2000 | Time before a half-open probe is allowed |
gRPC keepalive
| Field | Default | Description |
|-------|---------|-------------|
| keepaliveTimeMs | 10000 | Keepalive ping interval (minimum enforced by grpc-js) |
| keepaliveTimeoutMs | 5000 | Connection dead if no keepalive response |
RPC deadlines (deadlines sub-object)
| Field | Default | Description |
|-------|---------|-------------|
| deadlines.forwardMs | 1500 | Deadline for fire-and-forget RPCs (forward, petition, enqueueToWorker) |
| deadlines.pingMs | 1000 | Deadline for health ping |
| deadlines.andWaitMs | 60000 | Default deadline for *AndWait RPCs when no replyTimeout is set |
| deadlines.syncMs | 1000 | Deadline for listWorkers during master table rebuild |
| deadlines.connectivityWatchMs | 30000 | Timeout for peer connectivity watch loop re-arm |
Dead Letter Queue
Messages found in dispatched state on recovery, or that exhaust all retry attempts, are moved to a Redis-backed dead letter queue.
npx atomic-queues dlq list
npx atomic-queues dlq replay --id <message-id>
npx atomic-queues dlq purgeCLI
# Inspect live entity/command/query registry from Redis
npx atomic-queues introspect
# Generate TypeScript from the live registry
npx atomic-queues generate --classes -o ./src/generated # decorated class files
npx atomic-queues generate --ts -o ./src/generated # namespace interfaces + DispatchMap
npx atomic-queues generate --json-schema -o ./src/generatedGuarantees
| Guarantee | Mechanism | |---|---| | FIFO per entity | One worker per entity:entityId with FIFO queue | | Single-writer per entity | Only one worker exists across the cluster | | At-most-once delivery | WAL: enqueued → dispatched → completed. Never re-executed after dispatch. | | Fail if interrupted | Dispatched on crash → dead-lettered, source notified | | Concurrent across entities | Event loop interleaves at await points | | Durability | Redis WAL (dual-write: in-memory + Redis) | | Auto-recovery | WAL recovery + cleanup run automatically on startup | | Cluster coordination | Deterministic master topology with gRPC | | Master failover | Heartbeat expiry → deterministic re-election + assignment table rebuild | | Epoch fencing | Replicas reject commands from stale masters | | No distributed locks | The worker IS the serialization — not a lock, not Redlock, not SET NX |
Design Philosophy
AtomicQueues is pessimistic by design. At every decision point, it chooses safety over liveness:
- Interrupted? Dead-letter, don't retry.
- Redis down? Reject new work, don't buffer.
- Stale epoch? Reject, don't process.
- Master rebuilding? Reject petitions, don't guess.
- Unknown assignment? Bounce and retry through the master, don't deliver speculatively.
The system refuses to operate under uncertainty rather than risk executing a message twice.
Migrating from v2
Removed: executor, registry, gateTTL, ActorSystem, LogService, GateService, SchedulerService, ExecutorPoolService, ResultCollector, RegistryService, workers config, WorkerModule.
Added: EntityWorker, EntityWorkerManager, MasterCoordinator, workerIdleTimeout in entity config.
Unchanged: All decorators, QueueBus public API, CLI generators.
Migration: Remove executor/registry/workers from config. That's it. Workers are now internal.
License
MIT
