@benrobo/queueflow
v0.1.0
Published
A simple, type-safe task queue library built on BullMQ
Maintainers
Readme
Queueflow
A simple, type-safe task queue library that gives you the developer-friendly API of Trigger.dev or Inngest, but built on top of BullMQ and Redis. No infrastructure to manage, no platform lock-in—just define your tasks and trigger them.
Why Queueflow?
If you've ever used Trigger.dev or Inngest, you know how nice their APIs are. They're clean, type-safe, and make background jobs feel effortless. But what if you want that same experience while keeping control of your infrastructure?
That's where Queueflow comes in. I couldn't find an npm package that combined BullMQ's reliability with a Trigger.dev-like API, so I built one. It's perfect for when you want:
- ✅ A simple, type-safe API (like Trigger.dev/Inngest)
- ✅ Full control over your Redis/BullMQ infrastructure
- ✅ Zero boilerplate—just define and trigger
- ✅ TypeScript-first with full type safety
Installation
bun add @benrobo/queueflowOr with npm:
npm install @benrobo/queueflowYou'll also need Redis running (locally or remote). If you don't have it set up yet, you can run it with Docker:
docker run -d -p 6379:6379 redis:latestQuick Start
1. Define Your Task
// tasks/email.ts
import { defineTask } from "@benrobo/queueflow";
export const sendWelcomeEmail = defineTask({
id: "email.welcome",
run: async (payload: { userId: string; email: string }) => {
console.log(`Sending welcome email to ${payload.email}`);
// Your email sending logic here
},
});2. Configure Redis (Optional)
If you're using a local Redis instance on the default port, you can skip this step. Otherwise, configure it once at your app's entry point. The configure() function validates the Redis connection immediately and will throw an error if the connection fails:
// app.ts or index.ts
import { configure } from "@benrobo/queueflow";
// Using await (recommended)
await configure({
connection: process.env.REDIS_URL || "redis://localhost:6379",
defaultQueue: "myapp",
});
// Or with error handling using .catch()
configure({
connection: process.env.REDIS_URL || "redis://localhost:6379",
defaultQueue: "myapp",
}).catch((error) => {
console.error("Failed to configure Queueflow:", error);
process.exit(1);
});You can use either a connection string or an object:
await configure({
connection: {
host: "localhost",
port: 6379,
password: "your-password",
db: 0,
},
});Enable debug logging to see detailed information about task registration, triggering, and processing:
await configure({
connection: process.env.REDIS_URL,
debug: true, // Enable debug logs
});3. Trigger Your Task
import { sendWelcomeEmail } from "./tasks/email";
await sendWelcomeEmail.trigger({
userId: "123",
email: "[email protected]",
});That's it! The worker automatically starts when you trigger your first task.
How It Works
Queueflow uses a singleton worker pattern that automatically manages everything for you:
- Define tasks with
defineTask()- they register themselves automatically - Trigger tasks with
.trigger()- the worker starts on first use - That's it - no worker setup, no queue configuration, no boilerplate
The worker listens to all your queues in the background and processes jobs as they come in. Tasks are organized by queue (defaults to the prefix of your task ID, like email from email.welcome).
Features
Type Safety
Queueflow is built with TypeScript from the ground up. Your payload types flow through from definition to trigger:
const processOrder = defineTask({
id: "orders.process",
run: async (payload: { orderId: string; amount: number }) => {
// payload is fully typed here
console.log(`Processing order ${payload.orderId}`);
},
});
// TypeScript will catch this error:
await processOrder.trigger({
orderId: "123",
// amount: 100, // ❌ TypeScript error: missing 'amount'
});Delayed Jobs
Need to schedule a task for later? Pass options as the second parameter to trigger:
await sendWelcomeEmail.trigger(
{ userId: "123", email: "[email protected]" },
{ delay: 5000 } // Send in 5 seconds
);You can pass any BullMQ job options through the second parameter, including:
delay- milliseconds to wait before processingattempts- number of retry attemptsbackoff- retry strategy
Error Handling
Handle errors gracefully with the onError callback:
const sendEmail = defineTask({
id: "email.send",
run: async (payload: { email: string; subject: string }) => {
await sendEmailToUser(payload.email, payload.subject);
},
onError: async (error, payload) => {
console.error(`Failed to send email to ${payload.email}:`, error.message);
await logErrorToService(error, payload);
},
});The onError callback receives both the error and the original payload, so you can handle failures appropriately (log to an error service, send notifications, etc.).
Scheduled Tasks (Cron Jobs)
For recurring tasks that run on a schedule, use scheduleTask:
import { scheduleTask } from "@benrobo/queueflow";
const dailyReport = scheduleTask({
id: "reports.daily",
cron: "0 9 * * *",
run: async () => {
console.log("Generating daily report...");
await generateReport();
},
onError: async (error) => {
console.error("Failed to generate report:", error.message);
},
});The task will automatically run based on the cron pattern. Common cron patterns:
"0 9 * * *"- Every day at 9:00 AM"0 * * * *"- Every hour"0 0 * * 0"- Every Sunday at midnight"*/5 * * * *"- Every 5 minutes
You can also specify a timezone:
const weeklyBackup = scheduleTask({
id: "backup.weekly",
cron: "0 2 * * 0",
tz: "America/New_York",
run: async () => {
await performBackup();
},
});Custom Queues
By default, tasks are grouped by the prefix of their ID (e.g., email.welcome goes to the email queue). You can override this:
const heavyTask = defineTask({
id: "processing.heavy",
queue: "heavy-processing", // Custom queue name
run: async (payload) => {
// This runs in the "heavy-processing" queue
},
});Concurrency Control
Control how many tasks run simultaneously per queue:
const highVolumeTask = defineTask({
id: "processing.high-volume",
concurrency: 10,
run: async (payload) => {
// This queue can process up to 10 tasks concurrently
},
});Manual Worker Control
While the worker auto-starts, you can also start it manually if you prefer:
import { startWorker } from "@benrobo/queueflow";
startWorker();This is useful if you want to ensure the worker is running before your app starts accepting requests.
Real-World Example
Here's a more complete example showing how you might structure tasks in a real app:
// tasks/email.ts
import { defineTask } from "@benrobo/queueflow";
export const sendWelcomeEmail = defineTask({
id: "email.welcome",
run: async (payload: { userId: string; email: string }) => {
await sendEmail({
to: payload.email,
subject: "Welcome!",
body: "Thanks for joining us!",
});
},
});
export const sendPasswordReset = defineTask({
id: "email.password-reset",
run: async (payload: { email: string; token: string }) => {
await sendEmail({
to: payload.email,
subject: "Reset your password",
body: `Click here to reset: ${payload.token}`,
});
},
});// tasks/orders.ts
import { defineTask } from "@benrobo/queueflow";
export const processOrder = defineTask({
id: "orders.process",
run: async (payload: { orderId: string; items: string[] }) => {
// Process the order
await chargePayment(payload.orderId);
await updateInventory(payload.items);
await sendConfirmation(payload.orderId);
},
});// app.ts
import { configure } from "@benrobo/queueflow";
import "./tasks/email";
import "./tasks/orders";
await configure({
connection: process.env.REDIS_URL,
});
// Later in your API routes or wherever:
import { sendWelcomeEmail, processOrder } from "./tasks";
app.post("/signup", async (req, res) => {
const user = await createUser(req.body);
await sendWelcomeEmail.trigger({
userId: user.id,
email: user.email,
});
res.json({ success: true });
});API Reference
defineTask<T>(config)
Creates a new task definition for manual triggering.
Parameters:
config.id(string) - Unique identifier for the task (e.g.,"email.welcome")config.queue(string, optional) - Queue name (defaults todefaultQueuefrom config)config.handler(function) - Async function that processes the task payloadconfig.onError(function, optional) - Error handler callback(error: Error, payload: T) => Promise<void> | voidconfig.concurrency(number, optional) - Number of concurrent tasks for this queue (defaults to 5)
Returns: Task<T> instance
scheduleTask<T>(config)
Creates a scheduled task that runs automatically based on a cron pattern.
Parameters:
config.id(string) - Unique identifier for the task (e.g.,"reports.daily")config.cron(string) - Cron pattern (e.g.,"0 9 * * *"for daily at 9 AM)config.queue(string, optional) - Queue name (defaults todefaultQueuefrom config)config.handler(function) - Async function that processes the taskconfig.onError(function, optional) - Error handler callback(error: Error, payload: T) => Promise<void> | voidconfig.tz(string, optional) - Timezone (e.g.,"America/New_York")config.concurrency(number, optional) - Number of concurrent tasks for this queue (defaults to 5)
Returns: ScheduledTask<T> instance
Note: Scheduled tasks start automatically when the worker starts. No need to call .trigger().
Task.trigger(payload, options?)
Triggers a task to run. Options are optional and can be used for delayed jobs, retries, and more.
Parameters:
payload(T) - The typed payload for the taskoptions(JobsOptions, optional) - BullMQ job options (delay, attempts, backoff, etc.)
Returns: Promise<void>
configure(config)
Configures the Redis connection and default settings. Validates the Redis connection immediately and throws an error if the connection fails.
Parameters:
config.connection(string | object, optional) - Redis connection string or config objectconfig.defaultQueue(string, optional) - Default queue name (defaults to"default")config.debug(boolean, optional) - Enable debug logging (defaults tofalse)
Returns: Promise<void>
Throws: Error if Redis connection cannot be established
Example:
// With await
await configure({
connection: process.env.REDIS_URL,
defaultQueue: "myapp",
});
// With error handling
configure({
connection: process.env.REDIS_URL,
}).catch((error) => {
console.error("Configuration failed:", error);
process.exit(1);
});
// With debug logging enabled
await configure({
connection: process.env.REDIS_URL,
debug: true,
});startWorker()
Manually starts the global worker. Usually not needed as it auto-starts on first trigger.
Requirements
- Node.js 18+ or Bun
- Redis (local or remote)
- TypeScript 5.0+ (for type safety)
License
MIT
Contributing
Contributions are welcome! This is a young project, so I'm open to ideas, bug reports, and pull requests.
