npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@nivalis/n-queue

v0.2.0

Published

Redis Queue for Next.js

Readme

@nivalis/n-queue

A robust, Redis-backed job queue system with type safety, event handling, and streaming capabilities.

Features

  • 🎯 Type-safe API: Full TypeScript support with generic types for payloads and job names
  • 💾 Redis-backed: Reliable persistence and atomic operations
  • 🔄 Event Streaming: Real-time job status updates and event handling
  • 🎚️ Concurrency Control: Fine-grained control over parallel job processing
  • 🔍 Job Tracking: Comprehensive job lifecycle management and progress tracking
  • 🛡️ Error Handling: Robust error handling with automatic retries
  • 🔄 Transaction Support: Atomic operations for job state changes
  • 📊 Queue Statistics: Real-time queue metrics and monitoring

Installation

npm install @nivalis/n-queue

Quick Start

import { createClient } from 'redis';
import { Queue } from '@nivalis/n-queue';

// Define your payload schema
type MyPayload = {
  emailQueue: {
    sendEmail: {
      to: string;
      subject: string;
      body: string;
    };
    sendNotification: {
      userId: string;
      message: string;
    };
  };
};

// Create a Redis client factory
const getRedisClient = async () => {
  const client = createClient({
    url: process.env.REDIS_URL ?? 'redis://localhost:6379',
  });

  if (!client.isOpen) {
    await client.connect();
  }

  return client;
};

// Create a queue with concurrency limit
const emailQueue = new Queue<MyPayload, 'emailQueue'>(
  'emailQueue',
  getRedisClient,
  { concurrency: 5 }
);

// Add jobs to the queue
const emailJob = await emailQueue.add('sendEmail', {
  to: '[email protected]',
  subject: 'Welcome!',
  body: 'Welcome to our platform!'
});

// Process jobs with automatic completion/failure handling
await emailQueue.process(async (job) => {
  console.log(`Processing ${job.name} job ${job.id}`);
  await sendEmail(job.payload);
});

// Or process specific job types
await emailQueue.process(async (job) => {
  await sendNotification(job.payload);
}, 'sendNotification');

// Stream jobs in real-time
await emailQueue.stream(async (job) => {
  console.log(`Processing streamed job ${job.id}`);
  await processJob(job);
});

// Listen for events
emailQueue.on('saved', (jobId) => {
  console.log(`Job ${jobId} was saved`);
});

emailQueue.on('completed', (jobId) => {
  console.log(`Job ${jobId} completed successfully`);
});

Architecture

The system consists of three main components:

1. Queue

  • Manages job lifecycle and processing
  • Handles concurrency and job distribution
  • Provides event streaming and real-time updates
  • Maintains queue statistics and monitoring

2. Job

  • Represents a unit of work with typed payload
  • Tracks job state and progress
  • Manages job transitions and updates
  • Handles job-specific operations

3. RedisClient

  • Provides atomic operations for job state changes
  • Manages Redis connections and error handling
  • Implements retry mechanisms and transaction support
  • Handles stream operations and event publishing

API Reference

Queue

class Queue<Payload, QueueName> {
  constructor(
    name: QueueName,
    getRedisClient: () => Promise<RedisClientType>,
    options?: QueueOptions
  );

  // Job Management
  add<JobName>(jobName: JobName, payload: Payload[QueueName][JobName]): Promise<Job>;
  process(fn: (job: Job) => Promise<void>, jobName?: JobName): Promise<void>;
  stream(fn: (job: Job) => Promise<void>, jobName?: JobName): Promise<void>;

  // Event Handling
  on(event: string, handler: (jobId: string) => void): void;
  once(event: string, handler: (jobId: string) => void): void;

  // Queue Information
  getStats(): Promise<{
    name: QueueName;
    concurrency: number;
    waiting: number;
    active: number;
    failed: number;
    completed: number;
    total: number;
    availableSlots: number;
  }>;
}

Job

class Job<Payload, QueueName, JobName> {
  readonly id: string;
  readonly name: JobName;
  readonly state: JobState;
  readonly payload: Payload[QueueName][JobName];
  readonly createdAt: string;
  readonly updatedAt: string;

  progress: number;
  processedAt: string | null;
  attempts: number;
  failedReason: string | null;
  stacktrace: string[];

  // Job Operations
  save(): Promise<Job>;
  move(state: JobState): Promise<Job>;
  withState(state: JobState): Job;
}

Queue Options

interface QueueOptions {
  concurrency?: number;  // Max concurrent jobs (-1 for unlimited)
}

Job States

type JobState = 'waiting' | 'active' | 'completed' | 'failed';

Events

The queue emits the following events:

  • saved: When a job is added to the queue
  • active: When a job starts processing
  • completed: When a job completes successfully
  • failed: When a job fails
  • progress: When job progress is updated

Best Practices

  1. Error Handling

    queue.process(async (job) => {
      try {
        await processJob(job);
        // Job automatically marked as completed
      } catch (error) {
        // Job automatically marked as failed
        console.error(`Job ${job.id} failed:`, error);
        throw error; // Rethrow to trigger failure handling
      }
    });
  2. Progress Tracking

    queue.process(async (job) => {
      for (let i = 0; i < items.length; i++) {
        await processItem(items[i]);
        job.progress = (i + 1) / items.length;
      }
    });
  3. Event Handling

    queue.on('failed', async (jobId) => {
      const job = await Job.unpack(queue, jobId);
      await notifyFailure(job);
    });

License

MIT