npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

in_out_box_processor

v0.0.55

Published

InOutBoxProcessor is a NestJS service that processes documents from an inbox collection, applies aggregation pipelines, and upserts the results into a target collection. It also handles retries and moves failed documents to a failed collection. Additional

Downloads

121

Readme

InOutBoxProcessor

InOutBoxProcessor is a NestJS service that processes documents from an inbox collection, applies aggregation pipelines, and upserts the results into a target collection. It also handles retries and moves failed documents to a failed collection. Additionally, it allows adding documents to an outbox collection.

Installation

npm install in-out-box-processor

Usage

Importing the Module

First, import the InOutBoxProcessorModule into your NestJS application and configure it with the necessary options.

import { Module } from '@nestjs/common';
import { InOutBoxProcessorModule } from 'in-out-box-processor';
import { MongooseModule } from '@nestjs/mongoose';
import { ConfigModule } from '@nestjs/config';

   @Module({
     imports: [
       ConfigModule.forRoot({
         envFilePath: path.resolve(process.cwd(), `.env${process.env.NODE_ENV ? `.${process.env.NODE_ENV}` : ''}`), isGlobal: true,
       }),
       MongooseModule.forRoot(process.env.MONGO_URI),
       InOutBoxProcessorModule.forRoot({
         inboxCollectionName: 'inbox',
         targetCollectionName: 'target',
         // Can be abstracted out into seperate file
         aggregationPipelines: {
           'authenticatedAvailableShiftOpeningCreated': [
             {
               $project: {
                 _id: 1,
                 event: "$event.string",
                 shift_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_id.int",
                 shift_type: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_type.string",
                 facility_team_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_team_id.int",
                 is_from_on_shift: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.is_from_on_shift.boolean",
                 start_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.start_time.long",
                 end_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.end_time.long",
                 tz_converted_start_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.tz_converted_start_time.long",
                 tz_converted_end_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.tz_converted_end_time.long",
                 time_zone: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.time_zone.string",
                 specialty_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.specialty_name.string",
                 premium_rate: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.premium_rate.boolean",
                 covid: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.covid.boolean",
                 shift_kind: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_kind.string",
                 opening_count: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.opening_count.int",
                 filled_count: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.filled_count.int",
                 latitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.latitude.float",
                 longitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.longitude.float",
                 facility_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_id.int",
                 facility_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_name.string",
                 facility_phone: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_phone.string",
                 facility_uses_digital_invoice: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_uses_digital_invoice.boolean",
                 facility_work_record_primary: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_work_record_primary.string",
                 facility_work_record_backup: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_work_record_backup.string",
                 facility_address_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_id.int",
                 facility_address_one: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_one.string",
                 facility_address_city: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_city.string",
                 facility_address_zip: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_zip.string",
                 facility_address_state_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.id.int",
                 facility_address_state_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.name.string",
                 facility_address_state_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.abbreviation.string",
                 facility_address_geo_latitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressGeoLocationPropertiesRecord.latitude.float",
                 facility_address_geo_longitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressGeoLocationPropertiesRecord.longitude.float",
                 facility_type_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_id.int",
                 facility_type_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_name.string",
                 facility_type_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_color.string",
                 skill_type_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_id.int",
                 skill_type_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_name.string",
                 skill_type_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_color.string",
                 localized_specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.id.int",
                 localized_specialty_specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_id.int",
                 localized_specialty_state_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.state_id.int",
                 localized_specialty_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.name.string",
                 localized_specialty_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.abbreviation.string",
                 specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.id.int",
                 specialty_record_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.name.string",
                 specialty_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.color.string",
                 specialty_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.abbreviation.string",
                 created_at: "$created_at.long"
               }
             }
           ],
           'newEventType': [
             {
               $project: {
                 _id: 1,
                 event: "$event.string",
                 id: "$properties.NewEventTypeProperties.id.int",
                 name: "$properties.NewEventTypeProperties.name.string",
                 timestamp: "$properties.NewEventTypeProperties.timestamp.long",
                 details: "$properties.NewEventTypeProperties.details.string",
                 created_at: "$created_at.long"
               }
             }
           ]
         }
       })
     ],
     controllers: [AppController],
     providers: [AppService],
   })
   export class AppModule {
     constructor(private readonly watcherService: InOutBoxProcessorService) {
       this.watcherService.watchCollection('authenticatedAvailableShiftOpeningCreated');
       this.watcherService.watchCollection('newEventType');
     }
   }

Using the Service

Inject the InOutBoxProcessorService into your service or controller and use the watchCollection method to start watching a collection for changes. You can also use the addToOutbox method to add documents to the outbox collection.

import { Injectable} from '@nestjs/common';
import { InOutBoxProcessorService } from 'in-out-box-processor';

@Injectable()
export class AppService {
  constructor(private readonly inOutBoxProcessorService: InOutBoxProcessorService) {}

  async addDocumentToOutbox(document: any) {
    await this.inOutBoxProcessorService.addToOutbox(document);
  }
}

Controller Example

import { Controller, Post, Body } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Post('add-to-outbox')
  async addToOutbox(@Body() document: any) {
    await this.appService.addDocumentToOutbox(document);
  }
}

Configuration

The InOutBoxProcessorModule requires a configuration object with the following properties:

  • inboxCollectionName: The name of the inbox collection.
  • targetCollectionName: The name of the target collection.
  • aggregationPipelines: An object where keys are event types and values are arrays of MongoDB aggregation pipeline stages.

Methods

watchCollection(eventType: string)

Starts watching the inbox collection for changes of the specified event type. When a change is detected, the corresponding aggregation pipeline is applied, and the results are upserted into the target collection.

addToOutbox(documents: any | any[])

Adds a document or batch of documents to the outbox collection.

Error Handling

If an error occurs during document processing, the service retries up to 3 times with exponential backoff. If all retries fail, the document is moved to a failed collection with an error message.

InOutBoxProcessorService

Overview

The InOutBoxProcessorService is a NestJS service designed to process and manage data flows between collections in a MongoDB database. It leverages MongoDB's change streams to monitor changes in a source collection (inbox) and applies transformations or updates to a target collection. The service also supports batching, error handling, and an outbox pattern for additional processing.

Features

  • Change Stream Monitoring: Watches a MongoDB collection for specific events (e.g., inserts) and processes them in real-time.
  • Batch Processing: Groups updates into batches for efficient bulk writes to the target collection.
  • Aggregation Pipelines: Supports custom MongoDB aggregation pipelines for transforming data.
  • Outbox Pattern: Provides a mechanism to store processed documents in an outbox collection for further downstream processing.
  • Error Handling: Logs errors and ensures graceful shutdown by processing remaining updates before termination.

Installation

  1. Ensure you have a NestJS project set up.

  2. Install the required dependencies:

     npm install @shiftkey/thor-inbox-outbox
  3. Add the InOutBoxProcessorService to your module's providers array.

Configuration

The service requires an InOutBoxProcessorConfig object to be injected. This configuration includes:

  • inboxCollectionName: Name of the source collection to monitor.
  • targetCollectionName: Name of the target collection for updates.
  • aggregationPipelines: Array of event configurations, each specifying an event type and its corresponding aggregation pipeline.

Example configuration:

   import { Module } from '@nestjs/common';
   import { ConfigModule } from '@nestjs/config';
   import { MongooseModule } from '@nestjs/mongoose';
   import { AppController } from './app.controller';
   import { AppService } from './app.service';
   import { InOutBoxProcessorModule, InOutBoxProcessorService } from '@shiftkey/thor-inbox-outbox';
   import * as path from 'path';
   
   @Module({
     imports: [
       ConfigModule.forRoot({
         envFilePath: path.resolve(process.cwd(), `.env${process.env.NODE_ENV ? `.${process.env.NODE_ENV}` : ''}`), isGlobal: true,
       }),
       MongooseModule.forRoot(process.env.MONGO_URI),
       InOutBoxProcessorModule.forRoot({
         inboxCollectionName: 'inbox',
         targetCollectionName: 'target',
         // Can be abstracted out into seperate file
         aggregationPipelines: [
        {
          eventType: 'authenticatedAvailableShiftOpeningCreated',
          pipeline: [
             {
               $project: {
                 event: "$event.string",
                 shift_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_id.int",
                 shift_type: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_type.string",
                 facility_team_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_team_id.int",
                 is_from_on_shift: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.is_from_on_shift.boolean",
                 start_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.start_time.long",
                 end_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.end_time.long",
                 tz_converted_start_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.tz_converted_start_time.long",
                 tz_converted_end_time: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.tz_converted_end_time.long",
                 time_zone: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.time_zone.string",
                 specialty_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.specialty_name.string",
                 premium_rate: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.premium_rate.boolean",
                 covid: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.covid.boolean",
                 shift_kind: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.shift_kind.string",
                 opening_count: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.opening_count.int",
                 filled_count: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.filled_count.int",
                 latitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.latitude.float",
                 longitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.longitude.float",
                 facility_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_id.int",
                 facility_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_name.string",
                 facility_phone: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_phone.string",
                 facility_uses_digital_invoice: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_uses_digital_invoice.boolean",
                 facility_work_record_primary: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_work_record_primary.string",
                 facility_work_record_backup: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility.AvailableShiftFacilityPropertiesRecord.facility_work_record_backup.string",
                 facility_address_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_id.int",
                 facility_address_one: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_one.string",
                 facility_address_city: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_city.string",
                 facility_address_zip: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_zip.string",
                 facility_address_state_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.id.int",
                 facility_address_state_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.name.string",
                 facility_address_state_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressPropertiesRecord.facility_address_state_record.AvailableShiftFacilityStatePropertiesRecord.abbreviation.string",
                 facility_address_geo_latitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressGeoLocationPropertiesRecord.latitude.float",
                 facility_address_geo_longitude: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_address.AvailableShiftFacilityAddressGeoLocationPropertiesRecord.longitude.float",
                 facility_type_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_id.int",
                 facility_type_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_name.string",
                 facility_type_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.facility_type.AvailableShiftFacilityTypeProperties.facility_type_color.string",
                 skill_type_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_id.int",
                 skill_type_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_name.string",
                 skill_type_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.skill_type.AvailableShiftSkillTypeProperties.skill_type_color.string",
                 localized_specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.id.int",
                 localized_specialty_specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_id.int",
                 localized_specialty_state_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.state_id.int",
                 localized_specialty_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.name.string",
                 localized_specialty_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.abbreviation.string",
                 specialty_id: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.id.int",
                 specialty_record_name: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.name.string",
                 specialty_color: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.color.string",
                 specialty_abbreviation: "$properties.AuthenticatedAvailableShiftOpeningCreatedProperties.localized_specialty_properties.AvailableShiftsLocalizedSpecialtyProperties.specialty_properties_record.AvailableShiftSpecialtyPropertiesRecord.abbreviation.string",
                 created_at: "$created_at.long"
               }
             }
           ],
          docIdField: 'shift_id'
        },
        {
          eventType: 'newEventType',
          pipeline: [
             {
               $project: {
                 event: "$event.string",
                 id: "$properties.NewEventTypeProperties.id.int",
                 name: "$properties.NewEventTypeProperties.name.string",
                 timestamp: "$properties.NewEventTypeProperties.timestamp.long",
                 details: "$properties.NewEventTypeProperties.details.string",
                 created_at: "$created_at.long"
               }
             }
           ],
          docIdField: 'id'
        }
         ]
       })
     ],
     controllers: [AppController],
     providers: [AppService],
   })
   export class AppModule {
     constructor(private readonly watcherService: InOutBoxProcessorService) {
       this.watcherService.watchCollection();
     }
   }

Usage

  1. Start Watching Collections: Call the watchCollection() method to begin monitoring the inbox collection for changes.
  2. Batch Processing: The service automatically processes batches of updates every 30 seconds or when the batch size reaches 500.
  3. Outbox Management: Use the addToOutbox() method to add documents to the outbox collection for further processing.

Key Methods

onModuleInit()

Initializes the service and sets up a periodic batch processing interval.

watchCollection()

Starts monitoring the inbox collection for changes based on the configured aggregation pipelines.

processBatch(docs: any[])

Processes a batch of documents by performing bulk write operations on the target collection.

addToOutbox(documents: any | any[])

Adds one or more documents to the outbox collection with a timestamp.

onModuleDestroy()

Ensures any remaining updates are processed before the service shuts down.

Logging

The service uses NestJS's Logger to log important events, such as:

  • Change stream events and errors.
  • Batch processing results.
  • Outbox operations.

Notes

  • Ensure MongoDB is configured to support change streams (requires a replica set).
  • The batch size and interval can be adjusted for performance tuning.
  • Handle sensitive data carefully when logging or processing documents.