npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@jamx-framework/queue

v1.0.0

Published

JAMX Framework — Async job queue

Readme

@jamx-framework/queue

Descripción

Módulo de colas (queues) y workers para JAMX Framework. Proporciona una API para gestionar tareas asíncronas, procesamiento en background y ejecución de jobs con soporte para múltiples backends (Redis, RabbitMQ, AWS SQS). Incluye características como prioridades, retries, dead letter queues, y scheduling de tareas.

Cómo funciona

El módulo implementa un sistema de colas basado en productor-consumidor:

  1. Queue: Clase principal que encola jobs y gestiona workers
  2. Worker: Procesa jobs de una cola específica, con concurrencia y retry configurable
  3. Drivers: Implementaciones para diferentes backends (Redis, RabbitMQ, SQS)
  4. Job: Representa una tarea con payload, prioridad, y metadata

Componentes principales

  • src/queue.ts: Clase Queue que gestiona encolado y workers
  • src/worker.ts: Clase Worker que procesa jobs
  • src/types.ts: Tipos (Job, QueueOptions, WorkerOptions, etc.)
  • src/index.ts: Punto de exportación

Uso básico

import { Queue, RedisDriver } from '@jamx-framework/queue';

// Crear cola con driver Redis
const queue = new Queue(new RedisDriver({ url: 'redis://localhost:6379' }));

// Añadir job
await queue.add('send-email', {
  to: '[email protected]',
  subject: 'Hello',
  body: 'Message body',
});

// Procesar jobs con worker
const worker = queue.process('send-email', async (job) => {
  await mailer.send(job.data);
});

// O con opciones de concurrencia y retry
const worker = queue.process('process-image', async (job) => {
  await processImage(job.data);
}, {
  concurrency: 5,
  maxRetries: 3,
  retryDelay: 5000,
});

Ejemplos

Job con prioridad

// Añadir job con prioridad (menor número = mayor prioridad)
await queue.add('critical-task', { data: 'urgent' }, { priority: 1 });
await queue.add('normal-task', { data: 'normal' }, { priority: 5 });
await queue.add('low-task', { data: 'low' }, { priority: 10 });

// Los workers procesarán en orden de prioridad

Job con delay (programado)

// Ejecutar job en 5 minutos
await queue.add('reminder', { userId: 123 }, { delay: 300_000 });

// Ejecutar en fecha específica
const runAt = new Date(Date.now() + 3600_000);
await queue.add('scheduled-report', { reportId: 'abc' }, { runAt });

Retry con backoff

const worker = queue.process('api-call', async (job) => {
  const result = await callExternalApi(job.data);
  if (!result.success) throw new Error('API failed');
  return result;
}, {
  maxRetries: 5,
  retryDelay: 1000,
  backoff: 'exponential', // 1s, 2s, 4s, 8s, 16s
});

Dead Letter Queue (DLQ)

const worker = queue.process('payment-processing', async (job) => {
  await processPayment(job.data);
}, {
  maxRetries: 3,
  deadLetterQueue: 'payment-failed', // jobs fallidos van aquí
});

// Procesar DLQ separadamente
queue.process('payment-failed', async (job) => {
  await notifyAdmin(job.data);
  await logFailure(job);
});

Job batches

// Añadir múltiples jobs en una operación atómica
await queue.addBulk([
  { name: 'email-1', data: { to: '[email protected]' } },
  { name: 'email-2', data: { to: '[email protected]' } },
  { name: 'email-3', data: { to: '[email protected]' } },
]);

// Procesar en batch (misma función para todos)
await queue.processBatch('send-emails', async (jobs) => {
  for (const job of jobs) {
    await mailer.send(job.data);
  }
}, { batchSize: 10 });

Pausar/reanudar cola

// Pausar procesamiento (no acepta nuevos jobs)
await queue.pause('email-queue');

// Reanudar
await queue.resume('email-queue');

// Ver estado
const stats = await queue.getStats('email-queue');
console.log('Jobs waiting:', stats.waiting);
console.log('Jobs active:', stats.active);

Job con progreso

const worker = queue.process('large-import', async (job, update) => {
  const total = job.data.items.length;
  
  for (let i = 0; i < total; i++) {
    await processItem(job.data.items[i]);
    
    // Actualizar progreso
    await update({ progress: (i + 1) / total });
  }
});

// Monitorear progreso desde otro lugar
const progress = await queue.getJobProgress(jobId);
console.log(`Progreso: ${(progress * 100).toFixed(1)}%`);

Uso con inyección de dependencias

import { Container } from '@jamx-framework/core';
import { Queue } from '@jamx-framework/queue';

Container.registerSingleton('queue', () => {
  return new Queue(new RedisDriver({ url: process.env.REDIS_URL }));
});

const queue = Container.resolve<Queue>('queue');
await queue.add('task', { data: 'value' });

Flujo interno

  1. Encolado: queue.add(name, data, options) serializa el job y lo envía al driver
  2. Almacenamiento: El driver guarda el job en Redis/RabbitMQ/SQS con metadata (priority, delay, etc.)
  3. Workers: queue.process(name, handler, options) crea workers que escuchan la cola
  4. Desencolado: Los workers obtienen jobs (respetando prioridad y delay)
  5. Ejecución: Se ejecuta el handler con el job como argumento
  6. Retry: Si falla, se reintenta según política (maxRetries, delay, backoff)
  7. DLQ: Después de maxRetries, el job va a dead letter queue
  8. Completado: Si tiene éxito, se elimina de la cola

API Reference (Resumen)

Queue

  • constructor(driver: QueueDriver, options?: QueueOptions)
  • async add(name: string, data: any, options?: JobOptions): Promise<string> (retorna jobId)
  • async addBulk(jobs: BulkJob[]): Promise<string[]>
  • async process(name: string, handler: JobHandler, options?: WorkerOptions): Promise<Worker>
  • async pause(name: string): Promise<void>
  • async resume(name: string): Promise<void>
  • async getJob(jobId: string): Promise<Job | null>
  • async getJobProgress(jobId: string): Promise<number>
  • async getStats(name?: string): Promise<QueueStats>
  • async removeQueue(name: string): Promise<void>
  • async clearQueue(name: string): Promise<void>

Worker

  • id: string
  • name: string
  • isRunning(): boolean
  • stop(): Promise<void>

Job

  • id: string
  • name: string
  • data: any
  • priority: number
  • status: 'pending' | 'active' | 'completed' | 'failed'
  • attempts: number
  • failedReason?: string
  • createdAt: Date
  • processedAt?: Date

QueueDriver (interface)

  • async connect(): Promise<void>
  • async disconnect(): Promise<void>
  • async add(queue: string, job: JobData): Promise<string>
  • async addBulk(queue: string, jobs: JobData[]): Promise<string[]>
  • async getNext(queue: string): Promise<Job | null>
  • async ack(queue: string, jobId: string): Promise<void>
  • async nack(queue: string, jobId: string, requeue: boolean): Promise<void>
  • async getStats(queue?: string): Promise<QueueStats>
  • async pause(queue: string): Promise<void>
  • async resume(queue: string): Promise<void>

Performance Considerations

  • Concurrency: Los workers pueden procesar múltiples jobs concurrentemente
  • Batching: addBulk reduce roundtrips al backend
  • Connection pooling: Los drivers reutilizan conexiones
  • Visibility timeout: En Redis, los jobs activos se ocultan temporalmente
  • Memory usage: Los jobs grandes deben evitarse (usar referencias a almacenamiento)

Configuration Options

// Redis Driver
const redisDriver = new RedisDriver({
  url: 'redis://localhost:6379',
  namespace: 'jamx:queue',
  maxRetries: 3,
  retryDelay: 5000,
});

// Queue options
const queue = new Queue(redisDriver, {
  defaultConcurrency: 10,
  defaultPriority: 5,
  defaultDelay: 0,
});

// Worker options
queue.process('my-queue', handler, {
  concurrency: 5,        // jobs simultáneos
  maxRetries: 3,         // reintentos máximos
  retryDelay: 1000,      // delay entre reintentos (ms)
  backoff: 'linear',     // 'linear' | 'exponential'
  deadLetterQueue: 'dlq', // cola de fallidos
  batchSize: 1,          // tamaño de batch (1 = no batch)
});

Testing

Tests en packages/queue/tests/unit/:

pnpm test

Cubre:

  • Encolado y desencolado
  • Prioridades
  • Retry y backoff
  • Dead letter queues
  • Concurrencia
  • Pausa/resume

Compatibility

  • Compatible con Node.js 18+
  • Drivers: Redis, RabbitMQ, AWS SQS
  • Funciona en Windows, macOS, Linux
  • No requiere dependencias nativas

CLI Integration

  • jamx queue:list: Lista todas las colas
  • jamx queue:stats <queue>: Muestra estadísticas de una cola
  • jamx queue:jobs <queue>: Lista jobs en una cola
  • jamx queue:retry <jobId>: Reintenta un job fallido
  • jamx queue:delete <jobId>: Elimina un job
  • jamx queue:purge <queue>: Limpia toda una cola
  • jamx queue:dlq: Muestra y gestiona dead letter queues

Best Practices

  1. Idempotency: Los jobs deben ser idempotentes (ejecutar múltiples veces no daña)
  2. Small payloads: No guardar datos grandes en el job; usar referencias (IDs)
  3. Timeouts: Configurar timeouts apropiados para evitar jobs colgados
  4. Monitoring: Monitorear longitud de cola, tasa de fallos, latencia
  5. Dead letter queues: Siempre configurar DLQ para investigar fallos
  6. Priorities: Usar prioridades para jobs críticos vs best-effort
  7. Graceful shutdown: Los workers deben manejar SIGTERM y terminar jobs en curso

Troubleshooting

Cola llena

Aumentar concurrency o añadir más workers.

Jobs stuck en "active"

Verificar que los workers no estén caídos; revisar timeouts.

Alta tasa de fallos

Revisar DLQ, ajustar retry policy, verificar dependencias externas.

Memory leaks

Asegurarse de que los handlers no retengan referencias a jobs completados.

This queue module provides a robust, production-ready job processing system for JAMX applications, enabling asynchronous task execution, background processing, and reliable delivery with retries and dead letter queues.