express-sqs-typeorm-example
v1.1.4
Published
Real-world example: Express + SQS + TypeORM + MySQL with message-in-the-middle
Downloads
32
Readme
Real-World Example: Express + SQS + MySQL (Queue-Centric Design)
This example demonstrates a production-ready, scalable architecture using the Queue-Centric Design pattern with multiple SQS queues, database persistence, and event-driven observability.
What's Included
- ✅ Queue-Centric Architecture - Each queue is a self-contained module
- ✅ Multiple SQS queues with different priorities (Orders, Notifications, Analytics)
- ✅ Circuit breakers for fault tolerance (Orders & Notifications)
- ✅ Event-driven observability - Manager events + per-queue events
- ✅ Database logging using built-in
PersistenceInboundMiddleware+MySQLMessageStore - ✅ Different retry strategies per queue (5/3/1 retries)
- ✅ Validation using Zod schemas
- ✅ REST API to view logs and retry failed messages
- ✅ Graceful shutdown handling
- ✅ TypeScript with full type safety
Architecture
┌─────────────────┐
│ SQS Queues │
├─────────────────┤
│ - Orders │ → Critical (5 retries, full audit trail)
│ - Notifications │ → Standard (3 retries, error logs only)
│ - Analytics │ → Best effort (1 retry, error logs only)
└─────────────────┘
│
▼
┌──────────────────────────────────────┐
│ SQSPoller (Production-Ready) │
├──────────────────────────────────────┤
│ - Long polling (20s wait time) │
│ - Configurable concurrency per queue │
│ - Automatic message deletion │
│ - Graceful shutdown support │
└──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ message-in-the-middle Pipeline │
├──────────────────────────────────────┤
│ 1. ParseJsonInboundMiddleware │
│ 2. ValidateInboundMiddleware (Zod) │
│ 3. PersistenceInboundMiddleware │ ← Built-in!
│ 4. RetryInboundMiddleware │
│ 5. Business Logic / Dispatcher │
└──────────────────────────────────────┘
│
▼
┌─────────────────┐
│ MySQL Database │
├─────────────────┤
│ - message_store │ ← Auto-created by library
└─────────────────┘Quick Start
Option 1: Docker (Zero Setup Required) ⚡
Everything auto-configures - just start and test!
# Start everything (MySQL, LocalStack, Express app)
docker-compose up -d
# That's it! Queues are auto-created on startup.
# Send a test message:
AWS_ACCESS_KEY_ID=test AWS_SECRET_ACCESS_KEY=test AWS_REGION=us-east-1 \
aws sqs send-message \
--queue-url http://localhost:4567/000000000000/orders-queue \
--message-body '{"orderId":"550e8400-e29b-41d4-a716-446655440000","customerId":"650e8400-e29b-41d4-a716-446655440001","amount":99.99,"currency":"USD","items":[{"productId":"PROD-789","quantity":1,"price":99.99}],"action":"CREATE_ORDER"}' \
--endpoint-url http://localhost:4567
# View logs
docker logs express-sqs-app -f
# Access services:
# - API: http://localhost:3000/health
# - Adminer (MySQL UI): http://localhost:8081
# - Server: mysql, User: queue_user, Password: queue_password, DB: queue_logsWhat happens automatically:
- ✅ MySQL database created with
message_storetable - ✅ LocalStack starts with SQS service
- ✅ All 3 SQS queues auto-created by the app
- ✅ Express app starts polling queues
- ✅ Ready to process messages!
Option 2: Local Development
1. Install Dependencies
pnpm install2. Setup Database
# Create database
mysql -u root -p -e "CREATE DATABASE queue_logs;"The message_store table is auto-created by MySQLMessageStore!
3. Configure Environment
cp .env.example .env
# Edit .env with your settings4. Run
# Development
pnpm dev
# Production
pnpm build
pnpm startKey Features Demonstrated
1. Built-in Database Persistence
No custom middleware needed! Uses the library's built-in persistence:
import { PersistenceInboundMiddleware } from '@message-in-the-middle/persistence-core';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';
// Setup MySQL store (auto-creates table!)
const pool = createPool({ host: 'localhost', database: 'queue_logs' });
const messageStore = new MySQLMessageStore(pool, {
autoCreateTable: true, // ✅ Table created automatically
});
// Use built-in persistence middleware
manager.addInboundMiddleware(
new PersistenceInboundMiddleware(messageStore, {
storeOn: ['error'], // or ['always'] for full audit trail
sourceExtractor: (ctx) => ({ queueName: 'orders' }),
})
);2. Different Retry Strategies Per Queue
// Orders - Critical (5 retries, 2s → 32s backoff)
ordersManager.addInboundMiddleware(
new RetryInboundMiddleware({
maxRetries: 5,
delayMs: 2000,
backoffMultiplier: 2,
})
);
// Notifications - Standard (3 retries, 1s → 4s)
notificationsManager.addInboundMiddleware(
new RetryInboundMiddleware({
maxRetries: 3,
delayMs: 1000,
backoffMultiplier: 2,
})
);
// Analytics - Best Effort (1 retry, 500ms)
analyticsManager.addInboundMiddleware(
new RetryInboundMiddleware({
maxRetries: 1,
delayMs: 500,
})
);3. Per-Queue Event Handlers (No If-Else Chains!)
NEW: The SQSPoller now returns QueueController instances that support per-queue event handlers, eliminating the need for if-else chains.
❌ Before (if-else anti-pattern):
sqsPoller.on('message:processed', (queueName, message, duration) => {
if (queueName === 'orders') {
logger.debug('Order processed', { messageId: message.MessageId, duration });
} else if (queueName === 'notifications') {
logger.debug('Notification sent', { messageId: message.MessageId, duration });
} else if (queueName === 'analytics') {
logger.debug('Analytics event tracked', { messageId: message.MessageId, duration });
}
});✅ After (per-queue handlers):
// Global events for system-wide metrics
poller.on('message:processed', (queueName, message, duration) => {
metrics.timing('sqs.duration', duration, { queue: queueName });
});
// Per-queue events for business logic
const ordersQueue = poller.start({ name: 'orders', ... });
ordersQueue.on('message:processed', (message, duration) => {
logger.debug('Order processed', { messageId: message.MessageId, duration });
});
const notificationsQueue = poller.start({ name: 'notifications', ... });
notificationsQueue.on('message:processed', (message, duration) => {
logger.debug('Notification sent', { messageId: message.MessageId, duration });
});
const analyticsQueue = poller.start({ name: 'analytics', ... });
analyticsQueue.on('message:processed', (message, duration) => {
logger.debug('Analytics event tracked', { messageId: message.MessageId, duration });
});Benefits:
- ✅ No if-else chains - queue-specific logic stays with queue setup
- ✅ Type-safe events with full IntelliSense
- ✅ Easy to add new queues (no scattered updates)
- ✅ Clean separation between system metrics (global) and business logic (per-queue)
4. REST API for Queue Management
# View all message logs
GET http://localhost:3000/api/message-logs
# Filter by status
GET http://localhost:3000/api/message-logs?status=processing
# View failed messages
GET http://localhost:3000/api/failed-messages
# Retry failed message
POST http://localhost:3000/api/messages/{id}/retryTesting with LocalStack
Test locally with LocalStack:
# Start LocalStack
docker run -d -p 4566:4566 localstack/localstack
# Create queues
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name orders-queue
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name notifications-queue
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name analytics-queue
# Send test message
aws --endpoint-url=http://localhost:4566 sqs send-message \
--queue-url http://localhost:4566/000000000000/orders-queue \
--message-body '{
"orderId": "550e8400-e29b-41d4-a716-446655440000",
"customerId": "660e8400-e29b-41d4-a716-446655440001",
"amount": 99.99,
"currency": "USD",
"items": [{"productId": "PROD-001", "quantity": 2, "price": 49.99}],
"action": "CREATE_ORDER"
}'What This Example Shows
✅ Production-Ready Features
- Built-in Persistence - No custom middleware needed
- Auto-Schema Creation - Tables created automatically
- SQSPoller - Production-ready polling with long polling, concurrency control, and graceful shutdown
- Flexible Configuration - Different strategies per queue
- Query API - Find by status, error type, date range
- Message Replay - Retry failed messages via REST API
- Graceful Shutdown - Properly stops polling and waits for in-flight messages
Code Structure (Queue-Centric Design)
src/
├── app.ts # Clean bootstrap (140 lines vs 258 before)
├── config/
│ ├── aws.ts # SQS client setup
│ ├── database.ts # MySQL + messageStore export
│ ├── logger.ts # Centralized logger
│ └── queue-setup.ts # Queue creation utility
│
├── shared/ # Reusable components
│ ├── middlewares/
│ │ ├── circuit-breaker.ts # Orders & notifications circuit breakers
│ │ └── deduplication.ts # Shared dedup store
│ └── utils/
│ └── validators.ts # All Zod schemas (consolidated)
│
└── queues/ # Queue-Centric modules
├── orders/ # Orders queue (9 files)
│ ├── handlers/
│ │ ├── create-order.ts # Pure business logic
│ │ ├── update-order.ts
│ │ ├── cancel-order.ts
│ │ └── index.ts
│ ├── pipeline.ts # Middleware configuration
│ ├── events.ts # Event listeners (observability)
│ ├── types.ts # TypeScript interfaces
│ └── index.ts # Factory function
│
├── notifications/ # Notifications queue (9 files)
│ ├── handlers/
│ │ ├── send-email.ts
│ │ ├── send-sms.ts
│ │ ├── send-push.ts
│ │ └── index.ts
│ ├── pipeline.ts
│ ├── events.ts
│ ├── types.ts
│ └── index.ts
│
└── analytics/ # Analytics queue (6 files)
├── handlers/
│ ├── track-event.ts
│ └── index.ts
├── pipeline.ts
├── types.ts
└── index.tsBenefits of this structure:
- Find any handler in < 30 seconds - Clear file organization
- Scale from 1 to 100+ queues - Consistent pattern
- Test independently - Each handler is pure function
- Team-friendly - No merge conflicts between queues
- Clear separation - Handlers (logic) → Pipeline (processing) → Events (monitoring)
Documentation:
- Queue-Centric Design Guide
- Migration Plan - How we refactored from old structure
Key Learnings
Production-Ready Components
- ✅ Persistence is built-in -
PersistenceInboundMiddleware+MySQLMessageStore - ✅ Auto-table creation -
autoCreateTable: truehandles schema - ✅ ORM-agnostic design - Works with TypeORM, Prisma, or raw SQL
- ✅ SQSPoller - Production-ready polling with long polling, concurrency, graceful shutdown
- ✅ Message deletion - Automatic deletion after successful processing
- ✅ Graceful shutdown -
stopAll()waits for in-flight messages
Production Considerations
Database Retention
-- Archive old succeeded messages
UPDATE message_store
SET status = 'archived'
WHERE status = 'succeeded'
AND created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- Delete old archived messages
DELETE FROM message_store
WHERE status = 'archived'
AND created_at < DATE_SUB(NOW(), INTERVAL 90 DAY);Monitoring
// Add metrics middleware
import { MetricsInboundMiddleware } from '@message-in-the-middle/core';
manager.addInboundMiddleware(
new MetricsInboundMiddleware(metricsCollector, {
prefix: 'orders.inbound',
})
);Health Checks
app.get('/health', async (req, res) => {
try {
// Check database
await pool.query('SELECT 1');
// Check SQS connection
await sqsClient.send(new GetQueueAttributesCommand({
QueueUrl: queueUrls.orders,
AttributeNames: ['ApproximateNumberOfMessages']
}));
res.json({ status: 'ok', timestamp: new Date().toISOString() });
} catch (error) {
res.status(503).json({ status: 'unhealthy', error: error.message });
}
});Next Steps
- Customize for your use case - Adjust retry strategies, logging levels, persistence options
- Add monitoring - Integrate with Prometheus, Datadog, CloudWatch, etc.
- Set up alerting - Alert on high error rates, queue depth, processing time
- Configure retention policies - Set up database cleanup for old messages
License
MIT
