@azlib/queue
v1.0.0
Published
Provider-agnostic queueing primitives for asynchronous heavy-task processing.
Downloads
413
Readme
@azlib/queue
Provider-agnostic queueing primitives for asynchronous heavy-task processing.
Capabilities
- Asynchronous enqueue and consume workflows.
- Retry policy with fixed, exponential, and jittered backoff.
- Dead-letter routing and remediation helpers.
- Pluggable structured logging for enqueue, retry, ack, and dead-letter events.
- Provider switching with one stable contract.
- Dashboard query service for queue health and item operations.
Install
pnpm add @azlib/queueQuick start (memory provider)
import { createQueueService } from "@azlib/queue";
import { createProviderFromEnv } from "@azlib/queue/providers";
const provider = createProviderFromEnv({ QUEUE_PROVIDER: "memory" });
const queue = createQueueService({
provider,
defaultQueue: "heavy-jobs",
retryPolicy: {
maxAttempts: 5,
backoffType: "exponential-jitter",
baseDelayMs: 1000,
maxDelayMs: 60_000,
},
// logger omitted -> defaults to @azlib/logger
});
const accepted = await queue.enqueue({
idempotencyKey: "invoice:123:v1",
payloadRef: { invoiceId: 123 },
});
console.log(accepted.taskId);Database persistence
Use the persistence option to make queue task state, attempts, and idempotency durable.
import { createQueueService } from "@azlib/queue";
import { createProviderFromEnv } from "@azlib/queue/providers";
import {
createPersistenceConfig,
createSqlClientAdapter,
createSqliteDialectAdapter,
} from "@azlib/persistence";
const persistence = createPersistenceConfig(
createSqlClientAdapter(appDbDriver),
createSqliteDialectAdapter(),
"app"
);
const queue = createQueueService({
// `memory` is the transport provider only.
// Queue state is durable because `persistence` points to SQL.
provider: createProviderFromEnv({ QUEUE_PROVIDER: "memory" }),
defaultQueue: "heavy-jobs",
retryPolicy: {
maxAttempts: 3,
backoffType: "fixed",
baseDelayMs: 250,
maxDelayMs: 5_000,
},
persistence,
});Production transport with the same SQL persistence:
const queue = createQueueService({
provider: createProviderFromEnv({
QUEUE_PROVIDER: "aws-sqs",
AWS_SQS_QUEUE_URL: process.env.AWS_SQS_QUEUE_URL,
AWS_SQS_DLQ_URL: process.env.AWS_SQS_DLQ_URL,
}),
defaultQueue: "heavy-jobs",
retryPolicy,
persistence,
});Logging integration
By default, createQueueService() uses @azlib/logger (JSON format + console transport) when logger is omitted.
QueueServiceOptions.logger is still fully pluggable and accepts any object with:
debug(message, meta)info(message, meta)warn(message, meta)error(message, meta)
Default behavior (built-in @azlib/logger)
import { createQueueService } from "@azlib/queue";
import { createProviderFromEnv } from "@azlib/queue/providers";
const queue = createQueueService({
provider: createProviderFromEnv({ QUEUE_PROVIDER: "memory" }),
defaultQueue: "heavy-jobs",
retryPolicy: {
maxAttempts: 5,
backoffType: "exponential-jitter",
baseDelayMs: 1000,
maxDelayMs: 60_000,
},
// no logger passed -> defaults to @azlib/logger
});Swap in a different logger provider
import pino from "pino";
const pinoLogger = pino();
const queue = createQueueService({
provider,
defaultQueue: "heavy-jobs",
retryPolicy,
logger: {
debug: (message, meta) => pinoLogger.debug(meta ?? {}, message),
info: (message, meta) => pinoLogger.info(meta ?? {}, message),
warn: (message, meta) => pinoLogger.warn(meta ?? {}, message),
error: (message, meta) => pinoLogger.error(meta ?? {}, message),
},
});When supplied, the queue emits structured logs for:
- task enqueue and claim,
- retry scheduling and retry requests,
- ack success,
- dead-letter transitions,
- operator requeue/archive actions.
Provider switching
Switch providers by environment configuration:
const provider = createProviderFromEnv({
QUEUE_PROVIDER: "aws-sqs",
AWS_SQS_QUEUE_URL: "https://sqs.ap-southeast-1.amazonaws.com/123/heavy-jobs",
AWS_SQS_DLQ_URL: "https://sqs.ap-southeast-1.amazonaws.com/123/heavy-jobs-dlq",
});The application queue API remains unchanged across providers.
Dead-letter and remediation
When retries are exhausted, tasks transition to dead-letter state with failure metadata.
await queue.consume("heavy-jobs", async (task, ctx) => {
try {
await processHeavyTask(task.payloadRef);
await ctx.ack();
} catch (error) {
await ctx.fail(error);
}
});Use operator actions from dashboard service integrations to requeue or archive dead-letter tasks.
Dashboard API
@azlib/queue/dashboard exports createQueueDashboardService and dashboard types for:
- queue health snapshots,
- item filtering by status/time/search,
- task attempt timelines,
- dead-letter requeue and archive operations.
Examples
See runnable reference snippets in:
packages/queue/examples/basic-memory.tspackages/queue/examples/aws-sqs-provider.tspackages/queue/examples/dashboard-operations.tspackages/queue/examples/database-persistence.ts
