universal-outbox
v1.0.0
Published
Universal Outbox Pattern library for Node.js – MySQL-only v1 focused on production-grade transactional outbox.
Maintainers
Readme
Universal Outbox Pattern Library
A production-ready, database-driven outbox implementation for Node.js applications.
Universal Outbox is a plug-and-play solution that guarantees reliable event publishing in distributed systems using the Transactional Outbox Pattern. It ensures that your database writes and message broker publishing remain consistent — even during crashes, network failures, or service restarts.
1. What Problem It Solves
The Dual-Write Problem is one of the hardest challenges in distributed systems. When you need to save data to your database and publish an event to a message broker (like RabbitMQ or Kafka), you run into a race condition.
If you write to the database first, and the app crashes right before publishing the event, your system is now inconsistent. Users might be billed, but the shipping service never got the memo.
If you publish the event first and the database write fails, other microservices process phantom data that doesn't actually exist in your database.
Universal Outbox solves this by treating the database as the single source of truth. It transactions both the business entity and the "intent to publish" (outbox event) into the database atomically. A background worker then reliably reads and publishes the event to the broker, automatically handling retries and dead-lettering if the broker is down.
2. Architecture Diagram
sequenceDiagram
participant App as Node.js App
participant DB as MySQL DB
participant Worker as Outbox Worker
participant Broker as RabbitMQ / Kafka
App->>DB: 1. Begin DB Transaction
App-->>DB: 2. Save Business Data (e.g. Orders)
App-->>DB: 3. Save Outbox Event (status: pending)
DB-->>App: 4. Commit Transaction
loop Polling Interval
Worker->>DB: 5. Fetch Pending Events
Worker->>Broker: 6. Publish Event
Broker-->>Worker: 7. Acknowledge
Worker->>DB: 8. Mark Event as 'published'
end3. Quick Start (Under 2 Minutes)
Install the package alongside your database and broker clients:
npm install universal-outbox mysql2 amqplibMinimal Example:
import { createPool } from "mysql2/promise";
import { createMySqlOutbox, createOutboxWorker } from "universal-outbox";
// 1. Initialize DB and Outbox
const pool = createPool({ host: "localhost", user: "root", database: "myapp" });
const outbox = createMySqlOutbox({ pool, tableName: "outbox_events" });
// 2. Publish Events Safely within a Transaction
async function sendWelcomeEmail(userId) {
const connection = await pool.getConnection();
await connection.beginTransaction();
await connection.query("INSERT INTO users (id) VALUES (?)", [userId]);
await outbox.publish({
eventName: "user.created",
payload: { userId },
transaction: connection,
});
await connection.commit();
connection.release();
}
// 3. Start the Background Worker
const worker = createOutboxWorker({
outbox,
async publish(event) {
// Put your broker logic here!
console.log(`Publishing ${event.eventName} to broker...`);
},
});
worker.start();4. Real-World Example
Here is a full real-world implementation using Express, MySQL, and RabbitMQ. It features graceful shutdowns, transaction safety, and a dedicated endpoint.
import express from "express";
import mysql from "mysql2/promise";
import amqp from "amqplib";
import { createMySqlOutbox, createOutboxWorker } from "universal-outbox";
const app = express();
app.use(express.json());
const pool = mysql.createPool({
host: "localhost",
user: "root",
database: "production",
});
const outbox = createMySqlOutbox({ pool, tableName: "outbox_events" });
let channel;
let worker;
app.post("/orders", async (req, res) => {
const { userId, total } = req.body;
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
// The business logic
const [result] = await connection.query(
"INSERT INTO orders (user_id, total) VALUES (?, ?)",
[userId, total],
);
const orderId = result.insertId;
// The consistent outbox event
await outbox.publish({
eventName: "order.created",
payload: { orderId, userId, total },
transaction: connection,
});
await connection.commit();
res.status(201).json({ orderId });
} catch (err) {
await connection.rollback();
res.status(500).send("Transaction Failed");
} finally {
connection.release();
}
});
async function bootstrap() {
const amqpConnection = await amqp.connect("amqp://localhost");
channel = await amqpConnection.createChannel();
await channel.assertExchange("orders_exchange", "direct");
worker = createOutboxWorker({
outbox,
pollIntervalMs: 1000,
maxRetries: 5,
async publish(event) {
const buffer = Buffer.from(JSON.stringify(event.payload));
channel.publish("orders_exchange", event.eventName, buffer);
},
logger: console,
});
await worker.start();
const server = app.listen(3000, () =>
console.log("Server running on port 3000"),
);
// Graceful shutdown
process.on("SIGTERM", async () => {
console.log("Shutting down safely...");
await worker.stop();
await pool.end();
server.close();
process.exit(0);
});
}
bootstrap().catch(console.error);5. Why Not Just Publish Directly?
If you're wondering, "Can't I just put channel.publish(...) right after my INSERT query?"
You can, but the moment your app crashes between the INSERT and the publish(), or RabbitMQ experiences a 3-second network blip, you silently lose data.
Direct publishing is the #1 cause of data inconsistency in microservices. It results in customer service tickets like "I paid for my order but it says pending" or "I canceled my subscription but still got billed."
With Universal Outbox, you are guaranteed:
- Zero Lost Events: If the DB writes, the event will be published.
- Failover Safety: If horizontally scaling, workers coordinate automatically with row-level locking.
- Built-in Circuit Breaking: Retries with exponential backoff prevent spamming failing brokers.
- Dead-Letter Tracing: Events that repeatedly fail publishing are sidelined into a
dead_letterstatus for manual inspection, ensuring the queue continues processing healthy events unblocked.
Configuration & Database Schema
Before you begin, ensure your MySQL database is ready. See Database Schema for future DB support updates, but here's the v1 required table:
CREATE TABLE outbox_events (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
event_name VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
status ENUM('pending', 'processing', 'published', 'failed', 'dead_letter') NOT NULL DEFAULT 'pending',
retry_count INT UNSIGNED NOT NULL DEFAULT 0,
next_run_at DATETIME(3) NULL,
last_error TEXT NULL,
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
updated_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
INDEX idx_outbox_status_next_run (status, next_run_at)
);