@message-in-the-middle/insights
v0.1.2
Published
Real-time observability and performance insights for message-in-the-middle pipelines
Maintainers
Readme
@message-in-the-middle/insights
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
Real-time observability and performance insights for message-in-the-middle pipelines.
Performance profiler for message queues - Visualize bottlenecks, track metrics, and optimize your pipelines with zero instrumentation code.
Features
- 📊 Real-time Metrics - Throughput, latency percentiles, error rates
- 🔍 Bottleneck Detection - Automatically identify slow middlewares
- 💡 Smart Insights - AI-powered recommendations for optimization
- 🖥️ Terminal Dashboard - Beautiful ASCII dashboard for development
- 📈 Prometheus Exporter - Production-ready metrics integration
- 🎯 Zero Instrumentation - Just attach to your manager
- 🚀 Low Overhead - Minimal performance impact
Installation
npm install @message-in-the-middle/insightsQuick Start
Option 1: Terminal Dashboard (Development)
Perfect for local development and debugging:
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { createTerminalDashboard } from '@message-in-the-middle/insights';
const manager = new MessageMiddlewareManager();
// Add your middlewares...
manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new RetryInboundMiddleware({ maxRetries: 3 }))
.addInboundMiddleware(new DispatcherMiddleware(dispatcher));
// Start dashboard
const dashboard = createTerminalDashboard(manager, {
refreshRate: 1000,
layout: 'detailed',
theme: 'dark'
});Features:
- Real-time ASCII dashboard
- Middleware performance waterfall
- Automatic bottleneck detection
- Keyboard shortcuts (P: pause, R: reset, E: export, Q: quit)
- Export snapshots to JSON
Dashboard View:
╔═══════════════════════════════════════════════════════════════════╗
║ Message Pipeline Insights - Orders Queue 10:15:30 AM ║
╠═══════════════════════════════════════════════════════════════════╣
║ ║
║ 📊 THROUGHPUT ║
║ Current: 850 msg/min ↑ 15% Peak: 1,240 msg/min ║
║ ║
║ ⏱️ LATENCY (p95) ║
║ 145ms ↑ 12% - DEGRADED [▓▓▓▓▓▓▓▓░░░░] Target: 120ms ║
║ ║
║ 🔴 ERROR RATE ║
║ 2.0% ↓ 50% - HEALTHY [▓▓░░░░░░░░░░] Target: <5% ║
║ ║
║ ⚙️ MIDDLEWARE PERFORMANCE ║
║ ParseJson █ 2ms 99.8% success ║
║ Deduplication ████ 15ms 15% hit rate ║
║ Validation ███ 8ms 99.5% success ║
║ Retry ████████████████ 195ms ⚠️ BOTTLENECK ║
║ Dispatcher ████ 24ms 100% routed ║
║ ║
║ 💡 INSIGHTS ║
║ • Retry is 79% of total latency - check downstream service ║
║ • Circuit breaker approaching threshold (7/10) ║
║ ║
║ [P]ause [R]eset [E]xport [H]elp [Q]uit ║
╚═══════════════════════════════════════════════════════════════════╝Option 2: Prometheus Exporter (Production)
Perfect for production monitoring with Grafana:
import { createPrometheusExporter } from '@message-in-the-middle/insights/prometheus';
const exporter = await createPrometheusExporter(manager, {
port: 9090,
path: '/metrics',
prefix: 'order_pipeline',
labels: {
service: 'order-processor',
environment: 'production',
queue: 'orders'
}
});
// Metrics available at http://localhost:9090/metrics
// Health check at http://localhost:9090/healthExposed Metrics:
# Throughput
order_pipeline_messages_total{status="success",queue="orders"} 85420
order_pipeline_messages_total{status="error",queue="orders"} 1245
# Latency (histograms with percentiles)
order_pipeline_processing_duration_seconds{queue="orders",quantile="0.5"} 0.045
order_pipeline_processing_duration_seconds{queue="orders",quantile="0.95"} 0.145
order_pipeline_processing_duration_seconds{queue="orders",quantile="0.99"} 0.350
# Middleware performance
order_pipeline_middleware_duration_seconds{middleware="Retry",quantile="0.95"} 0.195
order_pipeline_middleware_executions_total{middleware="Retry",status="success"} 1200
# Pipeline health
order_pipeline_health_score{queue="orders"} 0.85
order_pipeline_throughput_messages_per_second{queue="orders"} 14.2
order_pipeline_error_rate{queue="orders"} 0.02Grafana Dashboard:
Use these queries in Grafana:
# Throughput
rate(order_pipeline_messages_total[5m])
# P95 Latency
histogram_quantile(0.95, rate(order_pipeline_processing_duration_seconds_bucket[5m]))
# Error Rate
rate(order_pipeline_messages_total{status="error"}[5m]) /
rate(order_pipeline_messages_total[5m])
# Middleware Performance
rate(order_pipeline_middleware_duration_seconds_sum{middleware="Retry"}[5m]) /
rate(order_pipeline_middleware_duration_seconds_count{middleware="Retry"}[5m])Option 3: Programmatic Access (Custom Solutions)
Perfect for building custom monitoring:
import { MetricsCollector, InsightsEngine } from '@message-in-the-middle/insights';
const collector = new MetricsCollector(manager, {
snapshotInterval: 1000,
enableTracing: true,
maxTraces: 100
});
const engine = new InsightsEngine();
// Listen to real-time snapshots
collector.on('snapshot', (snapshot) => {
console.log('Throughput:', snapshot.pipeline.throughput.messagesPerSecond);
console.log('Latency P95:', snapshot.pipeline.latency.p95);
console.log('Error Rate:', snapshot.pipeline.errors.rate);
// Generate insights
const insights = engine.analyze(snapshot);
for (const insight of insights) {
if (insight.severity === 'critical') {
alerting.notify(insight);
}
}
});
// Listen to slow messages
collector.on('slow-message', (data) => {
logger.warn('Slow processing detected', data);
});
// Listen to message traces
collector.on('trace', (trace) => {
if (trace.status === 'error') {
console.error('Failed message:', trace.messageId, trace.bottleneck);
}
});What Gets Tracked
Pipeline Metrics
{
throughput: {
messagesPerSecond: 850,
messagesPerMinute: 51000,
totalProcessed: 123456,
trend: "increasing", // "stable" | "decreasing"
trendPercentage: 15 // +15%
},
latency: {
p50: 45, // ms
p95: 145,
p99: 350,
max: 1200,
avg: 67
},
errors: {
total: 1245,
rate: 0.02, // 2%
byType: {
"ETIMEDOUT": 450,
"ValidationError": 320,
"ORDER_PROCESSING_ERROR": 475
}
},
health: {
score: 0.85, // 0-1
status: "healthy", // "degraded" | "critical"
reasons: []
}
}Middleware Metrics
{
middlewares: [
{
name: "Retry",
executionCount: 85420,
totalDuration: 16684000,
avgDuration: 195,
p95Duration: 350,
p99Duration: 850,
successCount: 84175,
errorCount: 1245,
successRate: 0.985,
percentOfTotal: 79, // 79% of total time!
isBottleneck: true // Automatically detected
},
// ... other middlewares
]
}Intelligent Insights
{
insights: [
{
type: "bottleneck",
severity: "critical",
title: "Middleware Bottleneck Detected",
description: "Retry accounts for 79% of total processing time",
affectedComponent: "Retry",
recommendation: "Consider: (1) Implementing circuit breaker, (2) Reducing max retries, (3) Investigating downstream service latency"
},
{
type: "warning",
severity: "high",
title: "High Latency",
description: "P95 latency is 145ms",
recommendation: "Review middleware performance. Check for slow database queries or external API calls."
}
]
}API Reference
Terminal Dashboard
import { createTerminalDashboard } from '@message-in-the-middle/insights';
const dashboard = createTerminalDashboard(manager, {
// Refresh rate in milliseconds
refreshRate: 1000,
// Layout style
layout: 'detailed' | 'compact',
// Color theme
theme: 'dark' | 'light',
// Show middleware details
showMiddlewares: true,
// Show insights panel
showInsights: true,
// Show message traces
showTraces: false
});
// Stop dashboard
dashboard.stop();Keyboard Shortcuts:
Q/Ctrl+C- QuitP/Space- Pause/ResumeR- Reset statisticsE- Export snapshot to JSONH/?- Show help
Prometheus Exporter
import { createPrometheusExporter } from '@message-in-the-middle/insights/prometheus';
const exporter = await createPrometheusExporter(manager, {
// HTTP port
port: 9090,
// Metrics endpoint path
path: '/metrics',
// Metric name prefix
prefix: 'message_pipeline',
// Additional labels
labels: {
service: 'order-processor',
environment: 'production',
queue: 'orders'
},
// Include Node.js default metrics
includeDefaultMetrics: true
});
// Access registry for custom metrics
const registry = exporter.getRegistry();
// Stop exporter
await exporter.stop();Endpoints:
GET /metrics- Prometheus metricsGET /health- Health check (JSON)
Metrics Collector
import { MetricsCollector } from '@message-in-the-middle/insights';
const collector = new MetricsCollector(manager, {
// Snapshot generation interval
snapshotInterval: 1000,
// Number of historical snapshots to keep
historySize: 60,
// Enable message tracing
enableTracing: true,
// Maximum traces to keep
maxTraces: 100,
// Slow processing threshold
slowThreshold: 5000,
// Health score thresholds
healthThresholds: {
errorRate: 0.05, // 5%
p95Latency: 5000, // ms
degradedScore: 0.7,
criticalScore: 0.5
}
});
// Get current snapshot
const snapshot = collector.getCurrentSnapshot();
// Get historical snapshots
const history = collector.getSnapshots();
// Get message traces
const traces = collector.getTraces(10); // Last 10
// Clear all data
collector.clear();
// Clean up
await collector.destroy();Events:
snapshot- New metrics snapshot generatedtrace- Message trace recordedslow-message- Slow processing detected
Insights Engine
import { InsightsEngine } from '@message-in-the-middle/insights';
const engine = new InsightsEngine();
// Analyze snapshot
const insights = engine.analyze(snapshot);
// insights is an array of Insight objects
for (const insight of insights) {
console.log(insight.severity); // 'critical' | 'high' | 'medium' | 'low'
console.log(insight.title);
console.log(insight.description);
console.log(insight.recommendation);
}
// Get summary
const summary = engine.summarize(insights);
// "Found 1 critical issue, 2 high-priority issues, 3 medium-priority issues"Examples
See examples/ directory:
terminal-dashboard.ts- Terminal dashboard with simulated messagesprometheus-exporter.ts- Prometheus metrics exporterprogrammatic-access.ts- Custom monitoring solution
Run examples:
npx tsx examples/terminal-dashboard.ts
npx tsx examples/prometheus-exporter.ts
npx tsx examples/programmatic-access.tsUse Cases
1. Local Development
Use terminal dashboard to optimize your pipeline during development:
const dashboard = createTerminalDashboard(manager);
// See bottlenecks in real-time
// Identify slow middlewares
// Optimize before production2. Production Monitoring
Export metrics to Prometheus/Grafana:
const exporter = await createPrometheusExporter(manager, {
port: 9090,
labels: { service: 'orders', env: 'prod' }
});
// Setup Grafana dashboards
// Create alerts on error rate, latency
// Monitor SLOs3. Custom Alerting
Build custom alerting on top of insights:
collector.on('snapshot', (snapshot) => {
const insights = engine.analyze(snapshot);
for (const insight of insights) {
if (insight.severity === 'critical') {
pagerDuty.trigger({
severity: 'critical',
summary: insight.title,
details: insight.description
});
}
}
});4. Performance Profiling
Profile your pipeline to find optimization opportunities:
collector.on('trace', (trace) => {
if (trace.totalDuration > 1000) {
console.log('Slow message:', trace.messageId);
console.log('Bottleneck:', trace.bottleneck);
console.log('Trace:', trace.middlewareTrace);
// Identify which middleware is slow
// Optimize specific code paths
}
});Best Practices
1. Always Use in Development
Start terminal dashboard during development:
if (process.env.NODE_ENV === 'development') {
createTerminalDashboard(manager);
}2. Export to Prometheus in Production
if (process.env.NODE_ENV === 'production') {
await createPrometheusExporter(manager, {
port: process.env.METRICS_PORT || 9090
});
}3. Enable Tracing Only When Needed
Tracing has overhead - use it for debugging:
const collector = new MetricsCollector(manager, {
enableTracing: process.env.ENABLE_TRACING === 'true'
});4. Set Appropriate Thresholds
Customize thresholds based on your SLOs:
const collector = new MetricsCollector(manager, {
slowThreshold: 2000, // 2s for your use case
healthThresholds: {
errorRate: 0.01, // 1% error rate SLO
p95Latency: 1000 // 1s latency SLO
}
});Performance Impact
The insights package is designed for production use with minimal overhead:
| Component | CPU Overhead | Memory Overhead | |-----------|--------------|-----------------| | MetricsCollector | < 1% | ~5MB (for 1000 messages tracked) | | Terminal Dashboard | < 2% | ~10MB (UI rendering) | | Prometheus Exporter | < 0.5% | ~3MB |
Tracing overhead:
- With tracing disabled: Negligible
- With tracing enabled: ~2-3% additional CPU, ~10MB per 100 traces
Integration Examples
With DataDog
import { MetricsCollector } from '@message-in-the-middle/insights';
import StatsD from 'hot-shots';
const statsd = new StatsD({ host: 'localhost', port: 8125 });
collector.on('snapshot', (snapshot) => {
statsd.gauge('pipeline.throughput', snapshot.pipeline.throughput.messagesPerSecond);
statsd.gauge('pipeline.latency.p95', snapshot.pipeline.latency.p95);
statsd.gauge('pipeline.error_rate', snapshot.pipeline.errors.rate);
});With CloudWatch
import { CloudWatch } from '@aws-sdk/client-cloudwatch';
const cloudwatch = new CloudWatch({ region: 'us-east-1' });
collector.on('snapshot', async (snapshot) => {
await cloudwatch.putMetricData({
Namespace: 'MessagePipeline',
MetricData: [
{
MetricName: 'Throughput',
Value: snapshot.pipeline.throughput.messagesPerSecond,
Unit: 'Count/Second'
},
{
MetricName: 'Latency',
Value: snapshot.pipeline.latency.p95,
Unit: 'Milliseconds'
}
]
});
});Troubleshooting
Dashboard not showing data
Make sure you're processing messages through the manager:
await manager.processInbound(messageBody);High memory usage
Reduce history size and traces:
const collector = new MetricsCollector(manager, {
historySize: 30, // Keep only 30 snapshots
maxTraces: 50, // Keep only 50 traces
enableTracing: false // Disable if not needed
});Terminal dashboard garbled
Try changing the theme:
const dashboard = createTerminalDashboard(manager, {
theme: 'light' // or 'dark'
});Related Packages
- @message-in-the-middle/core - Core message middleware
- @message-in-the-middle/aws - AWS SQS integration
- @message-in-the-middle/testing - Testing utilities
License
MIT
