@jamx-framework/queue
v1.0.0
Published
JAMX Framework — Async job queue
Maintainers
Readme
@jamx-framework/queue
Descripción
Módulo de colas (queues) y workers para JAMX Framework. Proporciona una API para gestionar tareas asíncronas, procesamiento en background y ejecución de jobs con soporte para múltiples backends (Redis, RabbitMQ, AWS SQS). Incluye características como prioridades, retries, dead letter queues, y scheduling de tareas.
Cómo funciona
El módulo implementa un sistema de colas basado en productor-consumidor:
- Queue: Clase principal que encola jobs y gestiona workers
- Worker: Procesa jobs de una cola específica, con concurrencia y retry configurable
- Drivers: Implementaciones para diferentes backends (Redis, RabbitMQ, SQS)
- Job: Representa una tarea con payload, prioridad, y metadata
Componentes principales
- src/queue.ts: Clase
Queueque gestiona encolado y workers - src/worker.ts: Clase
Workerque procesa jobs - src/types.ts: Tipos (
Job,QueueOptions,WorkerOptions, etc.) - src/index.ts: Punto de exportación
Uso básico
import { Queue, RedisDriver } from '@jamx-framework/queue';
// Crear cola con driver Redis
const queue = new Queue(new RedisDriver({ url: 'redis://localhost:6379' }));
// Añadir job
await queue.add('send-email', {
to: '[email protected]',
subject: 'Hello',
body: 'Message body',
});
// Procesar jobs con worker
const worker = queue.process('send-email', async (job) => {
await mailer.send(job.data);
});
// O con opciones de concurrencia y retry
const worker = queue.process('process-image', async (job) => {
await processImage(job.data);
}, {
concurrency: 5,
maxRetries: 3,
retryDelay: 5000,
});Ejemplos
Job con prioridad
// Añadir job con prioridad (menor número = mayor prioridad)
await queue.add('critical-task', { data: 'urgent' }, { priority: 1 });
await queue.add('normal-task', { data: 'normal' }, { priority: 5 });
await queue.add('low-task', { data: 'low' }, { priority: 10 });
// Los workers procesarán en orden de prioridadJob con delay (programado)
// Ejecutar job en 5 minutos
await queue.add('reminder', { userId: 123 }, { delay: 300_000 });
// Ejecutar en fecha específica
const runAt = new Date(Date.now() + 3600_000);
await queue.add('scheduled-report', { reportId: 'abc' }, { runAt });Retry con backoff
const worker = queue.process('api-call', async (job) => {
const result = await callExternalApi(job.data);
if (!result.success) throw new Error('API failed');
return result;
}, {
maxRetries: 5,
retryDelay: 1000,
backoff: 'exponential', // 1s, 2s, 4s, 8s, 16s
});Dead Letter Queue (DLQ)
const worker = queue.process('payment-processing', async (job) => {
await processPayment(job.data);
}, {
maxRetries: 3,
deadLetterQueue: 'payment-failed', // jobs fallidos van aquí
});
// Procesar DLQ separadamente
queue.process('payment-failed', async (job) => {
await notifyAdmin(job.data);
await logFailure(job);
});Job batches
// Añadir múltiples jobs en una operación atómica
await queue.addBulk([
{ name: 'email-1', data: { to: '[email protected]' } },
{ name: 'email-2', data: { to: '[email protected]' } },
{ name: 'email-3', data: { to: '[email protected]' } },
]);
// Procesar en batch (misma función para todos)
await queue.processBatch('send-emails', async (jobs) => {
for (const job of jobs) {
await mailer.send(job.data);
}
}, { batchSize: 10 });Pausar/reanudar cola
// Pausar procesamiento (no acepta nuevos jobs)
await queue.pause('email-queue');
// Reanudar
await queue.resume('email-queue');
// Ver estado
const stats = await queue.getStats('email-queue');
console.log('Jobs waiting:', stats.waiting);
console.log('Jobs active:', stats.active);Job con progreso
const worker = queue.process('large-import', async (job, update) => {
const total = job.data.items.length;
for (let i = 0; i < total; i++) {
await processItem(job.data.items[i]);
// Actualizar progreso
await update({ progress: (i + 1) / total });
}
});
// Monitorear progreso desde otro lugar
const progress = await queue.getJobProgress(jobId);
console.log(`Progreso: ${(progress * 100).toFixed(1)}%`);Uso con inyección de dependencias
import { Container } from '@jamx-framework/core';
import { Queue } from '@jamx-framework/queue';
Container.registerSingleton('queue', () => {
return new Queue(new RedisDriver({ url: process.env.REDIS_URL }));
});
const queue = Container.resolve<Queue>('queue');
await queue.add('task', { data: 'value' });Flujo interno
- Encolado:
queue.add(name, data, options)serializa el job y lo envía al driver - Almacenamiento: El driver guarda el job en Redis/RabbitMQ/SQS con metadata (priority, delay, etc.)
- Workers:
queue.process(name, handler, options)crea workers que escuchan la cola - Desencolado: Los workers obtienen jobs (respetando prioridad y delay)
- Ejecución: Se ejecuta el handler con el job como argumento
- Retry: Si falla, se reintenta según política (maxRetries, delay, backoff)
- DLQ: Después de maxRetries, el job va a dead letter queue
- Completado: Si tiene éxito, se elimina de la cola
API Reference (Resumen)
Queue
constructor(driver: QueueDriver, options?: QueueOptions)async add(name: string, data: any, options?: JobOptions): Promise<string>(retorna jobId)async addBulk(jobs: BulkJob[]): Promise<string[]>async process(name: string, handler: JobHandler, options?: WorkerOptions): Promise<Worker>async pause(name: string): Promise<void>async resume(name: string): Promise<void>async getJob(jobId: string): Promise<Job | null>async getJobProgress(jobId: string): Promise<number>async getStats(name?: string): Promise<QueueStats>async removeQueue(name: string): Promise<void>async clearQueue(name: string): Promise<void>
Worker
id: stringname: stringisRunning(): booleanstop(): Promise<void>
Job
id: stringname: stringdata: anypriority: numberstatus: 'pending' | 'active' | 'completed' | 'failed'attempts: numberfailedReason?: stringcreatedAt: DateprocessedAt?: Date
QueueDriver (interface)
async connect(): Promise<void>async disconnect(): Promise<void>async add(queue: string, job: JobData): Promise<string>async addBulk(queue: string, jobs: JobData[]): Promise<string[]>async getNext(queue: string): Promise<Job | null>async ack(queue: string, jobId: string): Promise<void>async nack(queue: string, jobId: string, requeue: boolean): Promise<void>async getStats(queue?: string): Promise<QueueStats>async pause(queue: string): Promise<void>async resume(queue: string): Promise<void>
Performance Considerations
- Concurrency: Los workers pueden procesar múltiples jobs concurrentemente
- Batching:
addBulkreduce roundtrips al backend - Connection pooling: Los drivers reutilizan conexiones
- Visibility timeout: En Redis, los jobs activos se ocultan temporalmente
- Memory usage: Los jobs grandes deben evitarse (usar referencias a almacenamiento)
Configuration Options
// Redis Driver
const redisDriver = new RedisDriver({
url: 'redis://localhost:6379',
namespace: 'jamx:queue',
maxRetries: 3,
retryDelay: 5000,
});
// Queue options
const queue = new Queue(redisDriver, {
defaultConcurrency: 10,
defaultPriority: 5,
defaultDelay: 0,
});
// Worker options
queue.process('my-queue', handler, {
concurrency: 5, // jobs simultáneos
maxRetries: 3, // reintentos máximos
retryDelay: 1000, // delay entre reintentos (ms)
backoff: 'linear', // 'linear' | 'exponential'
deadLetterQueue: 'dlq', // cola de fallidos
batchSize: 1, // tamaño de batch (1 = no batch)
});Testing
Tests en packages/queue/tests/unit/:
pnpm testCubre:
- Encolado y desencolado
- Prioridades
- Retry y backoff
- Dead letter queues
- Concurrencia
- Pausa/resume
Compatibility
- Compatible con Node.js 18+
- Drivers: Redis, RabbitMQ, AWS SQS
- Funciona en Windows, macOS, Linux
- No requiere dependencias nativas
CLI Integration
jamx queue:list: Lista todas las colasjamx queue:stats <queue>: Muestra estadísticas de una colajamx queue:jobs <queue>: Lista jobs en una colajamx queue:retry <jobId>: Reintenta un job fallidojamx queue:delete <jobId>: Elimina un jobjamx queue:purge <queue>: Limpia toda una colajamx queue:dlq: Muestra y gestiona dead letter queues
Best Practices
- Idempotency: Los jobs deben ser idempotentes (ejecutar múltiples veces no daña)
- Small payloads: No guardar datos grandes en el job; usar referencias (IDs)
- Timeouts: Configurar timeouts apropiados para evitar jobs colgados
- Monitoring: Monitorear longitud de cola, tasa de fallos, latencia
- Dead letter queues: Siempre configurar DLQ para investigar fallos
- Priorities: Usar prioridades para jobs críticos vs best-effort
- Graceful shutdown: Los workers deben manejar SIGTERM y terminar jobs en curso
Troubleshooting
Cola llena
Aumentar concurrency o añadir más workers.
Jobs stuck en "active"
Verificar que los workers no estén caídos; revisar timeouts.
Alta tasa de fallos
Revisar DLQ, ajustar retry policy, verificar dependencias externas.
Memory leaks
Asegurarse de que los handlers no retengan referencias a jobs completados.
This queue module provides a robust, production-ready job processing system for JAMX applications, enabling asynchronous task execution, background processing, and reliable delivery with retries and dead letter queues.
