smart-job-queues-sdk
v1.0.0
Published
SDK for smart job queue management
Downloads
10
Maintainers
Readme
🚀 InApp Queue
The missing background job queue for Node.js and Python
Features • Quick Start • Examples • Why InApp Queue? • Documentation
🎯 The Problem
You're building an app. A user signs up, and you need to send them a welcome email. Do you:
A) Make them wait while the email sends? ❌
B) Set up Redis, RabbitMQ, and a separate worker process? ❌
C) Use InApp Queue and move on with your life? ✅
// This is all you need
await queue.enqueue('sendWelcomeEmail', { userId: newUser.id });✨ Features
🏠 Lives Inside Your App
No separate processes. No extra infrastructure. It's just part of your application.
💾 Survives Restarts
Jobs are persisted to your database. Server crashes? Jobs resume where they left off.
🔄 Automatic Retries
Failed jobs retry with exponential backoff. Temporary failures aren't your problem.
⏰ Scheduled & Recurring Jobs
Run jobs later. Run jobs repeatedly. Cron expressions supported.
📊 Beautiful Admin UI
Monitor your queues with a built-in web interface. No extra tools needed.
📈 Scales With You
Start embedded. Swap to Redis when you're ready. Your code stays the same.
🚀 Quick Start
Node.js / Express
npm install inapp-queueconst express = require('express');
const { JobQueue } = require('inapp-queue');
const app = express();
const queue = new JobQueue({
storage: { type: 'sqlite', path: './jobs.db' }
});
// Define a job
queue.define('sendEmail', async ({ to, subject, body }) => {
await emailService.send(to, subject, body);
console.log(`Email sent to ${to}`);
});
// Start the queue
await queue.start();
// In your route handler
app.post('/signup', async (req, res) => {
const user = await createUser(req.body);
// Queue the email - this returns immediately
await queue.enqueue('sendEmail', {
to: user.email,
subject: 'Welcome!',
body: 'Thanks for signing up!'
});
res.json({ success: true });
});Python / FastAPI
pip install inapp-queuefrom fastapi import FastAPI
from inapp_queue import JobQueue
app = FastAPI()
queue = JobQueue(db_url="sqlite:///./jobs.db")
# Define a job
@queue.task("send_email")
async def send_email(data: dict):
await email_service.send(
data["to"],
data["subject"],
data["body"]
)
print(f"Email sent to {data['to']}")
# Auto-start with FastAPI
@app.on_event("startup")
async def startup():
await queue.start()
# In your route handler
@app.post("/signup")
async def signup(user_data: dict):
user = await create_user(user_data)
# Queue the email - this returns immediately
await queue.enqueue("send_email", {
"to": user.email,
"subject": "Welcome!",
"body": "Thanks for signing up!"
})
return {"success": True}📊 Admin Dashboard
Every queue comes with a built-in admin interface:
// Node.js
app.use('/admin/jobs', createJobQueueRouter({ queue }));
// Python
app.include_router(create_job_queue_router(queue), prefix="/admin/jobs")Visit /admin/jobs/ui to see:
- ⏳ Pending, running, completed, and failed jobs
- 📈 Real-time statistics
- 🔍 Job details and error messages
- 🎮 Manual job retry and cleanup controls
🎭 Examples
Delayed Jobs
// Run in 5 minutes
await queue.enqueue('sendFollowUpEmail', { userId }, {
delay: 5 * 60 * 1000
});
// Run at specific time
await queue.enqueue('generateReport', { reportId }, {
runAt: new Date('2024-01-01 09:00:00')
});Retries with Backoff
queue.define('syncWithAPI', async (data) => {
await externalAPI.sync(data);
}, {
maxAttempts: 5, // Try up to 5 times
retryDelay: 1000, // Start with 1 second
// Delays: 1s, 2s, 4s, 8s, 16s (exponential backoff)
});Recurring Jobs (coming soon)
// Every day at 9 AM
queue.scheduleRecurring('dailyReport', {}, '0 9 * * *');
// Every 30 minutes
queue.scheduleRecurring('healthCheck', {}, '*/30 * * * *');🤔 Why InApp Queue?
The Goldilocks Solution
// setTimeout - dies on restart
setTimeout(() => {
sendEmail(user);
}, 5000);// InApp Queue - persistent
await queue.enqueue(
'sendEmail',
{ userId },
{ delay: 5000 }
);# Docker Compose
services:
redis:
image: redis:7
worker:
build: ./worker
rabbitmq:
image: rabbitmq:3Start Simple, Scale Later
// Start with SQLite (embedded)
const queue = new JobQueue({
storage: { type: 'sqlite' }
});
// Later: Switch to PostgreSQL (still embedded)
const queue = new JobQueue({
storage: {
type: 'postgres',
connectionString: process.env.DATABASE_URL
}
});
// Future: External queue (coming soon)
const queue = new JobQueue({
storage: {
type: 'redis',
url: process.env.REDIS_URL
}
});Your job definitions and application code remain exactly the same! 🎉
📖 Documentation
Core Concepts
- Job: A task to be executed in the background
- Queue: Manages job storage and execution
- Worker: Background process that executes jobs
- Task: A named function that processes jobs
Configuration
const queue = new JobQueue({
// Storage backend
storage: {
type: 'sqlite', // or 'postgres'
path: './jobs.db', // for sqlite
connectionString: '...' // for postgres
},
// Worker settings
concurrency: 2, // Number of parallel jobs
pollInterval: 1000, // Check for jobs every 1s
// Features
enableGracefulShutdown: true, // Clean shutdown on SIGTERM
// Custom logger
logger: winston.createLogger({ ... })
});Job Lifecycle
PENDING → RUNNING → COMPLETED
↓
FAILED → PENDING (retry)
↓
FAILED (permanent)Events
queue.on('job:enqueued', (job) => { });
queue.on('job:started', (job) => { });
queue.on('job:completed', (job, duration) => { });
queue.on('job:failed', (job, error) => { });
queue.on('job:retry', (job, error, nextRunAt) => { });🏗️ Real-World Patterns
Email Queue
queue.define('email:welcome', async ({ userId }) => {
const user = await db.users.findById(userId);
await emails.sendWelcome(user);
});
queue.define('email:passwordReset', async ({ userId, token }) => {
const user = await db.users.findById(userId);
await emails.sendPasswordReset(user, token);
});Report Generation
queue.define('reports:generate', async ({ reportId, format }) => {
const data = await analytics.getData(reportId);
const file = await reports.generate(data, format);
await storage.upload(file);
await notifications.send(`Report ${reportId} ready`);
});Webhooks & Integrations
queue.define('webhook:send', async ({ url, payload, attempts = 0 }) => {
try {
await axios.post(url, payload);
} catch (error) {
if (error.response?.status === 429) {
// Rate limited - retry with backoff
throw new Error('Rate limited');
}
// Don't retry client errors
if (error.response?.status >= 400 && error.response?.status < 500) {
return; // Success - don't retry
}
throw error; // Retry server errors
}
});🛠️ Advanced Usage
TypeScript Support
interface EmailJobData {
to: string;
subject: string;
template: string;
data: Record<string, any>;
}
queue.define<EmailJobData>('sendEmail', async (data) => {
// Full type safety!
await emailService.send(data.to, data.subject, data.template, data.data);
});Testing
// In your tests
beforeEach(() => {
queue.pause(); // Don't process jobs
});
test('should queue welcome email', async () => {
await signup({ email: '[email protected]' });
const jobs = await queue.getJobs({ taskName: 'sendWelcomeEmail' });
expect(jobs).toHaveLength(1);
expect(jobs[0].payload.email).toBe('[email protected]');
});Multi-Process Deployment
// Web server (multiple instances OK)
const queue = new JobQueue({
storage: { type: 'postgres', connectionString }
});
// Worker process (single instance)
const worker = new JobQueue({
storage: { type: 'postgres', connectionString },
runWorkerOnly: true // Don't accept new jobs
});🤝 Contributing
We love contributions! Please see our Contributing Guide for details.
# Clone the repo
git clone https://github.com/yourusername/inapp-queue.git
# Install dependencies
cd packages/node && npm install
cd packages/python && pip install -e .[dev]
# Run tests
npm test
pytest📄 License
MIT © 2024
Built with ❤️ for developers who value their time
