@aion2hub/shared-queue-utils

v1.0.17

Published

Shared Redis queue utilities for Aion2Hub event-driven RSS processing

Downloads

53

Readme

@aion2hub/shared-queue-utils

Shared Redis queue utilities for Aion2Hub event-driven RSS processing system.

Features

  • Queue Management: Create, configure, and manage Redis-based queues
  • Worker Processing: Concurrent job processing with retry logic and error handling
  • Monitoring: Comprehensive queue health monitoring and metrics
  • Event-Driven Architecture: Support for RSS processing pipeline events
  • Dead Letter Queues: Automatic handling of failed jobs
  • Priority Queues: Support for job prioritization
  • Delayed Jobs: Schedule jobs for future execution
  • Batch Processing: Process multiple jobs together for efficiency

Installation

npm install @aion2hub/shared-queue-utils

Quick Start

Queue Manager

import { QueueManager, QUEUE_NAMES } from '@aion2hub/shared-queue-utils';

const queueManager = new QueueManager('redis://localhost:6379');

// Publish a job
await queueManager.publishJob(QUEUE_NAMES.RSS_SCRAPED, {
  feedId: 'feed-123',
  articles: [...]
}, {
  priority: 1,
  correlationId: 'scrape-job-456'
});

// Get queue metrics
const metrics = await queueManager.getQueueMetrics(QUEUE_NAMES.RSS_SCRAPED);
console.log(`Queue depth: ${metrics.depth}`);

Queue Worker

import { QueueWorker, createQueueLogger } from '@aion2hub/shared-queue-utils';

const logger = createQueueLogger('rss-processor');

const worker = new QueueWorker(
  'redis://localhost:6379',
  {
    queueName: QUEUE_NAMES.RSS_TRANSLATE,
    concurrency: 3,
    batchSize: 10,
    timeout: 60000
  },
  async (jobs) => {
    // Process batch of translation jobs
    for (const job of jobs) {
      await translateArticles(job.data);
    }
  },
  logger
);

await worker.start();

Queue Monitoring

import { QueueMonitor, createQueueLogger } from '@aion2hub/shared-queue-utils';

const logger = createQueueLogger('queue-monitor');
const monitor = new QueueMonitor('redis://localhost:6379', logger);

// Start monitoring with 30-second intervals
monitor.startMonitoring(30000);

// Get current health status
const health = await monitor.getHealthStatus();
console.log(`System health: ${health.overall}`);

Queue Types

The system defines several predefined queues for RSS processing:

  • rss_scraped: Raw parsed article data from RSS feeds
  • rss_translate: Articles ready for translation processing
  • rss_classify: Articles ready for AI classification
  • rss_ingest: Articles ready for database ingestion
  • *_dlq: Dead letter queues for failed jobs

Event Types

RSSScrapedEvent

interface RSSScrapedEvent {
  jobId: string;
  correlationId: string;
  feedId: string;
  feedUrl: string;
  feedName: string;
  timestamp: Date;
  articles: Array<{
    title: string;
    content: string;
    url: string;
    publishedAt: Date;
    author?: string;
    language: string;
    description?: string;
  }>;
  metadata: {
    priority: number;
    retryCount: number;
    maxRetries: number;
    feedCategory?: string;
  };
}

RSSTranslateEvent

interface RSSTranslateEvent {
  jobId: string;
  correlationId: string;
  feedId: string;
  articles: Array<{
    id: string;
    title: string;
    content: string;
    description?: string;
    url: string;
    publishedAt: Date;
    author?: string;
    language: string;
    needsTranslation: boolean;
  }>;
  metadata: {
    priority: number;
    retryCount: number;
    maxRetries: number;
  };
}

Configuration

Default queue configurations are provided but can be customized:

import { DEFAULT_QUEUE_CONFIGS, QueueConfig } from '@aion2hub/shared-queue-utils';

const customConfig: QueueConfig = {
  ...DEFAULT_QUEUE_CONFIGS.rss_scraped,
  maxLength: 5000,
  retryAttempts: 5
};

Error Handling

The system provides comprehensive error handling:

  • Automatic Retries: Failed jobs are automatically retried with exponential backoff
  • Dead Letter Queues: Jobs that exceed retry limits are moved to dead letter queues
  • Circuit Breakers: Workers can be paused/resumed for maintenance
  • Timeout Handling: Jobs that exceed timeout limits are automatically failed

Monitoring

Built-in monitoring provides:

  • Queue depth and processing metrics
  • Health status for all queues
  • Redis connection monitoring
  • Error rate tracking
  • Performance metrics

Best Practices

  1. Use Correlation IDs: Track related jobs across the pipeline
  2. Set Appropriate Timeouts: Configure timeouts based on job complexity
  3. Monitor Queue Health: Set up alerts for queue depth and error rates
  4. Handle Failures Gracefully: Implement proper error handling in processors
  5. Use Batch Processing: Process multiple jobs together when possible
  6. Configure Dead Letter Queues: Always configure DLQs for failed job analysis

Development

# Install dependencies
npm install

# Build the package
npm run build

# Run tests
npm test

# Watch for changes
npm run dev

License

MIT