@aion2hub/shared-queue-utils
v1.0.17
Published
Shared Redis queue utilities for Aion2Hub event-driven RSS processing
Downloads
53
Maintainers
Readme
@aion2hub/shared-queue-utils
Shared Redis queue utilities for Aion2Hub event-driven RSS processing system.
Features
- Queue Management: Create, configure, and manage Redis-based queues
- Worker Processing: Concurrent job processing with retry logic and error handling
- Monitoring: Comprehensive queue health monitoring and metrics
- Event-Driven Architecture: Support for RSS processing pipeline events
- Dead Letter Queues: Automatic handling of failed jobs
- Priority Queues: Support for job prioritization
- Delayed Jobs: Schedule jobs for future execution
- Batch Processing: Process multiple jobs together for efficiency
Installation
npm install @aion2hub/shared-queue-utilsQuick Start
Queue Manager
import { QueueManager, QUEUE_NAMES } from '@aion2hub/shared-queue-utils';
const queueManager = new QueueManager('redis://localhost:6379');
// Publish a job
await queueManager.publishJob(QUEUE_NAMES.RSS_SCRAPED, {
feedId: 'feed-123',
articles: [...]
}, {
priority: 1,
correlationId: 'scrape-job-456'
});
// Get queue metrics
const metrics = await queueManager.getQueueMetrics(QUEUE_NAMES.RSS_SCRAPED);
console.log(`Queue depth: ${metrics.depth}`);Queue Worker
import { QueueWorker, createQueueLogger } from '@aion2hub/shared-queue-utils';
const logger = createQueueLogger('rss-processor');
const worker = new QueueWorker(
'redis://localhost:6379',
{
queueName: QUEUE_NAMES.RSS_TRANSLATE,
concurrency: 3,
batchSize: 10,
timeout: 60000
},
async (jobs) => {
// Process batch of translation jobs
for (const job of jobs) {
await translateArticles(job.data);
}
},
logger
);
await worker.start();Queue Monitoring
import { QueueMonitor, createQueueLogger } from '@aion2hub/shared-queue-utils';
const logger = createQueueLogger('queue-monitor');
const monitor = new QueueMonitor('redis://localhost:6379', logger);
// Start monitoring with 30-second intervals
monitor.startMonitoring(30000);
// Get current health status
const health = await monitor.getHealthStatus();
console.log(`System health: ${health.overall}`);Queue Types
The system defines several predefined queues for RSS processing:
rss_scraped: Raw parsed article data from RSS feedsrss_translate: Articles ready for translation processingrss_classify: Articles ready for AI classificationrss_ingest: Articles ready for database ingestion*_dlq: Dead letter queues for failed jobs
Event Types
RSSScrapedEvent
interface RSSScrapedEvent {
jobId: string;
correlationId: string;
feedId: string;
feedUrl: string;
feedName: string;
timestamp: Date;
articles: Array<{
title: string;
content: string;
url: string;
publishedAt: Date;
author?: string;
language: string;
description?: string;
}>;
metadata: {
priority: number;
retryCount: number;
maxRetries: number;
feedCategory?: string;
};
}RSSTranslateEvent
interface RSSTranslateEvent {
jobId: string;
correlationId: string;
feedId: string;
articles: Array<{
id: string;
title: string;
content: string;
description?: string;
url: string;
publishedAt: Date;
author?: string;
language: string;
needsTranslation: boolean;
}>;
metadata: {
priority: number;
retryCount: number;
maxRetries: number;
};
}Configuration
Default queue configurations are provided but can be customized:
import { DEFAULT_QUEUE_CONFIGS, QueueConfig } from '@aion2hub/shared-queue-utils';
const customConfig: QueueConfig = {
...DEFAULT_QUEUE_CONFIGS.rss_scraped,
maxLength: 5000,
retryAttempts: 5
};Error Handling
The system provides comprehensive error handling:
- Automatic Retries: Failed jobs are automatically retried with exponential backoff
- Dead Letter Queues: Jobs that exceed retry limits are moved to dead letter queues
- Circuit Breakers: Workers can be paused/resumed for maintenance
- Timeout Handling: Jobs that exceed timeout limits are automatically failed
Monitoring
Built-in monitoring provides:
- Queue depth and processing metrics
- Health status for all queues
- Redis connection monitoring
- Error rate tracking
- Performance metrics
Best Practices
- Use Correlation IDs: Track related jobs across the pipeline
- Set Appropriate Timeouts: Configure timeouts based on job complexity
- Monitor Queue Health: Set up alerts for queue depth and error rates
- Handle Failures Gracefully: Implement proper error handling in processors
- Use Batch Processing: Process multiple jobs together when possible
- Configure Dead Letter Queues: Always configure DLQs for failed job analysis
Development
# Install dependencies
npm install
# Build the package
npm run build
# Run tests
npm test
# Watch for changes
npm run devLicense
MIT