queuebear
v0.1.9
Published
QueueBear SDK for message queues, scheduled jobs, and durable workflows
Downloads
938
Maintainers
Readme
queuebear
QueueBear SDK for building durable workflows and managing message queues.
Installation
npm install queuebearQuick Start
import { QueueBear, serve } from "queuebear";
const qb = new QueueBear({
apiKey: "qb_live_xxx",
projectId: "proj_xxx",
});API Overview
The SDK provides access to all QueueBear APIs:
| API | Description |
| -------------- | ----------------------------------------------- |
| qb.messages | Publish and manage webhook messages |
| qb.schedules | Create and manage cron-based recurring jobs |
| qb.dlq | Manage failed messages in the dead letter queue |
| qb.workflows | Trigger and manage durable workflows |
Messages API
Publish messages to be delivered to webhook destinations with automatic retries.
Publish a Message
const { messageId } = await qb.messages.publish(
"https://api.example.com/webhook",
{ event: "user.created", userId: "123" },
{
delay: "30s", // Delay before delivery
retries: 5, // Number of retry attempts
method: "POST", // HTTP method
headers: { "X-API-Key": "secret" }, // Headers to forward
callbackUrl: "https://...", // Success callback
failureCallbackUrl: "https://...", // Failure callback
deduplicationId: "unique-id", // Prevent duplicate messages
}
);Get Message Status
const message = await qb.messages.get(messageId);
console.log(message.status); // "pending" | "completed" | "failed"
console.log(message.deliveryLogs); // Delivery attempt historyList Messages
const { messages, pagination } = await qb.messages.list({
status: "pending",
limit: 20,
offset: 0,
});Cancel a Message
await qb.messages.cancel(messageId);Publish and Wait
const message = await qb.messages.publishAndWait(
"https://api.example.com/webhook",
{ event: "user.created" },
{ timeoutMs: 30000 }
);
console.log(message.status); // "completed"Schedules API
Create cron-based recurring jobs.
Create a Schedule
const schedule = await qb.schedules.create({
destination: "https://api.example.com/cron-job",
cron: "0 9 * * *", // Daily at 9 AM
timezone: "America/New_York",
method: "POST",
body: JSON.stringify({ type: "daily-report" }),
headers: { "Content-Type": "application/json" },
retries: 3,
metadata: { jobName: "daily-report" },
});Common Cron Expressions
| Expression | Description |
| ------------- | ----------------------- |
| * * * * * | Every minute |
| 0 * * * * | Every hour |
| 0 9 * * * | Daily at 9:00 AM |
| 0 9 * * 1-5 | Weekdays at 9:00 AM |
| 0 0 1 * * | First day of each month |
| 0 */6 * * * | Every 6 hours |
List Schedules
const { schedules } = await qb.schedules.list();Pause / Resume
await qb.schedules.pause(scheduleId);
await qb.schedules.resume(scheduleId);Delete a Schedule
await qb.schedules.delete(scheduleId);Dead Letter Queue (DLQ) API
Manage messages that failed all retry attempts.
List DLQ Entries
const { entries } = await qb.dlq.list();
for (const entry of entries) {
console.log(`${entry.id}: ${entry.failureReason}`);
}Get Entry Details
const entry = await qb.dlq.get(dlqId);
console.log(entry.body); // Original message body
console.log(entry.totalAttempts); // Number of failed attemptsRetry a Failed Message
const result = await qb.dlq.retry(dlqId);
console.log(result.newMessageId); // New message createdDelete Entry / Purge All
await qb.dlq.delete(dlqId);
await qb.dlq.purge(); // Delete all entriesRetry All Failed Messages
const results = await qb.dlq.retryAll();
console.log(`Retried ${results.length} entries`);Workflows API
Build durable, fault-tolerant workflows with automatic step caching.
Workflows consist of two parts:
- Workflow endpoint - Created with
serve(), handles workflow execution - Client - Uses
qb.workflowsto trigger and manage workflow runs
serve() - Create a Workflow Endpoint
The serve() function creates an HTTP handler for your workflow. It receives requests from QueueBear, executes your workflow code, and manages step caching automatically.
import { serve } from "queuebear";
export const POST = serve<InputType>(async (context) => {
// Your workflow logic here
return result;
}, options);Parameters:
| Parameter | Type | Description |
| --------- | --------------------------------------------- | ---------------------- |
| handler | (context: WorkflowContext<T>) => Promise<R> | Your workflow function |
| options | ServeOptions | Optional configuration |
Options:
| Option | Type | Description |
| --------------- | -------- | --------------------------------------------- |
| signingSecret | string | Secret to verify requests come from QueueBear |
Framework Integration
Next.js (App Router)
// app/api/workflows/my-workflow/route.ts
import { serve } from "queuebear";
export const POST = serve(async (context) => {
await context.run("step-1", async () => {
/* ... */
});
return { success: true };
});Express
import express from "express";
import { serve } from "queuebear";
const app = express();
app.use(express.json());
const handler = serve(async (context) => {
await context.run("step-1", async () => {
/* ... */
});
return { success: true };
});
app.post("/api/workflows/my-workflow", async (req, res) => {
const response = await handler(
new Request(req.url, {
method: "POST",
headers: req.headers as HeadersInit,
body: JSON.stringify(req.body),
})
);
res.status(response.status).json(await response.json());
});Hono
import { Hono } from "hono";
import { serve } from "queuebear";
const app = new Hono();
const handler = serve(async (context) => {
await context.run("step-1", async () => {
/* ... */
});
return { success: true };
});
app.post("/api/workflows/my-workflow", async (c) => {
const response = await handler(c.req.raw);
return response;
});Complete Workflow Example
// app/api/workflows/onboarding/route.ts
import { serve } from "queuebear";
interface OnboardingInput {
userId: string;
email: string;
}
export const POST = serve<OnboardingInput>(
async (context) => {
const { userId, email } = context.input;
// Step 1: Send welcome email (cached if already done)
await context.run("send-welcome", async () => {
await sendEmail(email, "welcome");
});
// Step 2: Wait 3 days
await context.sleep("wait-3-days", 60 * 60 * 24 * 3);
// Step 3: Send tips email
await context.run("send-tips", async () => {
await sendEmail(email, "tips");
});
return { completed: true };
},
{
signingSecret: process.env.QUEUEBEAR_SIGNING_SECRET,
}
);Trigger a Workflow
const { runId } = await qb.workflows.trigger(
"user-onboarding",
"https://your-app.com/api/workflows/onboarding",
{ userId: "123", email: "[email protected]" },
{
idempotencyKey: "onboarding-user-123",
maxDuration: 60 * 60 * 24 * 7, // 7 day timeout
}
);Check Workflow Status
const status = await qb.workflows.getStatus(runId);
console.log(status.status); // "running" | "sleeping" | "completed"
console.log(status.steps); // Array of step detailsWait for Completion
const result = await qb.workflows.waitForCompletion(runId, {
pollIntervalMs: 2000,
timeoutMs: 60000,
});Trigger and Wait
const result = await qb.triggerAndWait(
"user-onboarding",
"https://your-app.com/api/workflows/onboarding",
{ userId: "123" },
{ timeoutMs: 120000 }
);
console.log(result.result); // Workflow outputCancel / Retry
await qb.workflows.cancel(runId);
await qb.workflows.retry(runId); // Resume from last completed stepSend Events
// In workflow: await context.waitForEvent("order-approved", "order.approved")
// From external code:
await qb.workflows.sendEvent("order.approved", {
eventKey: "order-123",
payload: { status: "approved" },
});Context Methods
Available in serve() handlers:
context.run(stepName, fn, options?)
Execute a step with automatic caching.
const result = await context.run("fetch-user", async () => {
return await db.users.findById(userId);
});context.sleep(stepName, seconds)
Pause workflow for specified duration.
await context.sleep("wait-1-hour", 3600);context.sleepUntil(stepName, date)
Pause until a specific date/time.
await context.sleepUntil("wait-until-tomorrow", new Date("2024-01-15"));context.call(stepName, config)
Make an HTTP call as a cached step.
const data = await context.call("fetch-api", {
url: "https://api.example.com/data",
method: "POST",
headers: { Authorization: "Bearer xxx" },
body: { key: "value" },
});context.waitForEvent(stepName, eventName, options?)
Wait for an external event.
const payload = await context.waitForEvent("wait-approval", "order.approved", {
eventKey: "order-123",
timeoutSeconds: 86400, // 1 day
});context.notify(eventName, payload?)
Send fire-and-forget event.
await context.notify("user.onboarded", { userId: "123" });context.parallel(steps)
Execute steps in parallel.
const [user, orders, preferences] = await context.parallel([
{ name: "fetch-user", fn: () => fetchUser(userId) },
{ name: "fetch-orders", fn: () => fetchOrders(userId) },
{ name: "fetch-preferences", fn: () => fetchPreferences(userId) },
]);context.getCompletedSteps()
Get all completed steps for debugging.
const steps = await context.getCompletedSteps();
console.log(`Completed ${steps.length} steps`);Security
Signature Verification
Verify that workflow requests come from your QueueBear instance:
export const POST = serve(handler, {
signingSecret: process.env.QUEUEBEAR_SIGNING_SECRET,
});The signing secret is available in your QueueBear project settings. When configured, requests without a valid signature will be rejected with a 401 error.
Local Development
When developing locally, your webhook endpoints run on localhost which isn't accessible from QueueBear's servers. Use Tunnelmole to expose your local server - it's free and requires no signup.
Installing Tunnelmole
Linux, macOS, Windows WSL:
curl -O https://install.tunnelmole.com/t357g/install && sudo bash installNode.js (all platforms, requires Node 16+):
npm install -g tunnelmoleStarting a Tunnel
tmole 3000
# Output: https://xxxx.tunnelmole.com is forwarding to localhost:3000Using the Tunnel URL
// Use tunnelmole URL instead of localhost
await qb.messages.publish("https://xxxx.tunnelmole.com/api/webhooks", {
event: "user.created",
userId: "123"
});
// Works for workflows too
await qb.workflows.trigger(
"onboarding",
"https://xxxx.tunnelmole.com/api/workflows/onboarding",
{ userId: "123" }
);Tips
- Store your tunnel URL in
.envfor easy switching between local and production - Both
callbackUrlandfailureCallbackUrlneed public URLs for local testing - Tunnel URLs change on restart
License
MIT
