npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

smart-job-queues-sdk

v1.0.0

Published

SDK for smart job queue management

Downloads

10

Readme

🚀 InApp Queue

The missing background job queue for Node.js and Python

Node.js Python TypeScript License: MIT

FeaturesQuick StartExamplesWhy InApp Queue?Documentation


🎯 The Problem

You're building an app. A user signs up, and you need to send them a welcome email. Do you:

A) Make them wait while the email sends? ❌
B) Set up Redis, RabbitMQ, and a separate worker process? ❌
C) Use InApp Queue and move on with your life? ✅

// This is all you need
await queue.enqueue('sendWelcomeEmail', { userId: newUser.id });

✨ Features

🏠 Lives Inside Your App

No separate processes. No extra infrastructure. It's just part of your application.

💾 Survives Restarts

Jobs are persisted to your database. Server crashes? Jobs resume where they left off.

🔄 Automatic Retries

Failed jobs retry with exponential backoff. Temporary failures aren't your problem.

Scheduled & Recurring Jobs

Run jobs later. Run jobs repeatedly. Cron expressions supported.

📊 Beautiful Admin UI

Monitor your queues with a built-in web interface. No extra tools needed.

📈 Scales With You

Start embedded. Swap to Redis when you're ready. Your code stays the same.

🚀 Quick Start

Node.js / Express

npm install inapp-queue
const express = require('express');
const { JobQueue } = require('inapp-queue');

const app = express();
const queue = new JobQueue({ 
  storage: { type: 'sqlite', path: './jobs.db' } 
});

// Define a job
queue.define('sendEmail', async ({ to, subject, body }) => {
  await emailService.send(to, subject, body);
  console.log(`Email sent to ${to}`);
});

// Start the queue
await queue.start();

// In your route handler
app.post('/signup', async (req, res) => {
  const user = await createUser(req.body);
  
  // Queue the email - this returns immediately
  await queue.enqueue('sendEmail', {
    to: user.email,
    subject: 'Welcome!',
    body: 'Thanks for signing up!'
  });
  
  res.json({ success: true });
});

Python / FastAPI

pip install inapp-queue
from fastapi import FastAPI
from inapp_queue import JobQueue

app = FastAPI()
queue = JobQueue(db_url="sqlite:///./jobs.db")

# Define a job
@queue.task("send_email")
async def send_email(data: dict):
    await email_service.send(
        data["to"], 
        data["subject"], 
        data["body"]
    )
    print(f"Email sent to {data['to']}")

# Auto-start with FastAPI
@app.on_event("startup")
async def startup():
    await queue.start()

# In your route handler
@app.post("/signup")
async def signup(user_data: dict):
    user = await create_user(user_data)
    
    # Queue the email - this returns immediately
    await queue.enqueue("send_email", {
        "to": user.email,
        "subject": "Welcome!",
        "body": "Thanks for signing up!"
    })
    
    return {"success": True}

📊 Admin Dashboard

Every queue comes with a built-in admin interface:

// Node.js
app.use('/admin/jobs', createJobQueueRouter({ queue }));

// Python
app.include_router(create_job_queue_router(queue), prefix="/admin/jobs")

Visit /admin/jobs/ui to see:

  • ⏳ Pending, running, completed, and failed jobs
  • 📈 Real-time statistics
  • 🔍 Job details and error messages
  • 🎮 Manual job retry and cleanup controls

🎭 Examples

Delayed Jobs

// Run in 5 minutes
await queue.enqueue('sendFollowUpEmail', { userId }, { 
  delay: 5 * 60 * 1000 
});

// Run at specific time
await queue.enqueue('generateReport', { reportId }, { 
  runAt: new Date('2024-01-01 09:00:00') 
});

Retries with Backoff

queue.define('syncWithAPI', async (data) => {
  await externalAPI.sync(data);
}, { 
  maxAttempts: 5,        // Try up to 5 times
  retryDelay: 1000,      // Start with 1 second
  // Delays: 1s, 2s, 4s, 8s, 16s (exponential backoff)
});

Recurring Jobs (coming soon)

// Every day at 9 AM
queue.scheduleRecurring('dailyReport', {}, '0 9 * * *');

// Every 30 minutes
queue.scheduleRecurring('healthCheck', {}, '*/30 * * * *');

🤔 Why InApp Queue?

The Goldilocks Solution

// setTimeout - dies on restart
setTimeout(() => {
  sendEmail(user);
}, 5000);
// InApp Queue - persistent
await queue.enqueue(
  'sendEmail', 
  { userId }, 
  { delay: 5000 }
);
# Docker Compose
services:
  redis:
    image: redis:7
  worker:
    build: ./worker
  rabbitmq:
    image: rabbitmq:3

Start Simple, Scale Later

// Start with SQLite (embedded)
const queue = new JobQueue({ 
  storage: { type: 'sqlite' } 
});

// Later: Switch to PostgreSQL (still embedded)
const queue = new JobQueue({ 
  storage: { 
    type: 'postgres',
    connectionString: process.env.DATABASE_URL
  }
});

// Future: External queue (coming soon)
const queue = new JobQueue({ 
  storage: { 
    type: 'redis',
    url: process.env.REDIS_URL
  }
});

Your job definitions and application code remain exactly the same! 🎉

📖 Documentation

Core Concepts

  • Job: A task to be executed in the background
  • Queue: Manages job storage and execution
  • Worker: Background process that executes jobs
  • Task: A named function that processes jobs

Configuration

const queue = new JobQueue({
  // Storage backend
  storage: {
    type: 'sqlite',        // or 'postgres'
    path: './jobs.db',     // for sqlite
    connectionString: '...' // for postgres
  },
  
  // Worker settings
  concurrency: 2,          // Number of parallel jobs
  pollInterval: 1000,      // Check for jobs every 1s
  
  // Features
  enableGracefulShutdown: true, // Clean shutdown on SIGTERM
  
  // Custom logger
  logger: winston.createLogger({ ... })
});

Job Lifecycle

PENDING → RUNNING → COMPLETED
            ↓
          FAILED → PENDING (retry)
                     ↓
                   FAILED (permanent)

Events

queue.on('job:enqueued', (job) => { });
queue.on('job:started', (job) => { });
queue.on('job:completed', (job, duration) => { });
queue.on('job:failed', (job, error) => { });
queue.on('job:retry', (job, error, nextRunAt) => { });

🏗️ Real-World Patterns

Email Queue

queue.define('email:welcome', async ({ userId }) => {
  const user = await db.users.findById(userId);
  await emails.sendWelcome(user);
});

queue.define('email:passwordReset', async ({ userId, token }) => {
  const user = await db.users.findById(userId);
  await emails.sendPasswordReset(user, token);
});

Report Generation

queue.define('reports:generate', async ({ reportId, format }) => {
  const data = await analytics.getData(reportId);
  const file = await reports.generate(data, format);
  await storage.upload(file);
  await notifications.send(`Report ${reportId} ready`);
});

Webhooks & Integrations

queue.define('webhook:send', async ({ url, payload, attempts = 0 }) => {
  try {
    await axios.post(url, payload);
  } catch (error) {
    if (error.response?.status === 429) {
      // Rate limited - retry with backoff
      throw new Error('Rate limited');
    }
    // Don't retry client errors
    if (error.response?.status >= 400 && error.response?.status < 500) {
      return; // Success - don't retry
    }
    throw error; // Retry server errors
  }
});

🛠️ Advanced Usage

TypeScript Support

interface EmailJobData {
  to: string;
  subject: string;
  template: string;
  data: Record<string, any>;
}

queue.define<EmailJobData>('sendEmail', async (data) => {
  // Full type safety!
  await emailService.send(data.to, data.subject, data.template, data.data);
});

Testing

// In your tests
beforeEach(() => {
  queue.pause(); // Don't process jobs
});

test('should queue welcome email', async () => {
  await signup({ email: '[email protected]' });
  
  const jobs = await queue.getJobs({ taskName: 'sendWelcomeEmail' });
  expect(jobs).toHaveLength(1);
  expect(jobs[0].payload.email).toBe('[email protected]');
});

Multi-Process Deployment

// Web server (multiple instances OK)
const queue = new JobQueue({ 
  storage: { type: 'postgres', connectionString }
});

// Worker process (single instance)
const worker = new JobQueue({ 
  storage: { type: 'postgres', connectionString },
  runWorkerOnly: true  // Don't accept new jobs
});

🤝 Contributing

We love contributions! Please see our Contributing Guide for details.

# Clone the repo
git clone https://github.com/yourusername/inapp-queue.git

# Install dependencies
cd packages/node && npm install
cd packages/python && pip install -e .[dev]

# Run tests
npm test
pytest

📄 License

MIT © 2024


Built with ❤️ for developers who value their time

GitHubDocumentationDiscord