npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mbc-cqrs-serverless/import

v1.2.6

Published

Import module

Readme

MBC CQRS serverless framework

@mbc-cqrs-serverless/import

npm version License: MIT

Flexible data import module for the MBC CQRS Serverless framework. Import data from REST APIs, CSV files, and ZIP archives with validation, transformation, and async processing.

Features

  • Multi-Source Import: Support for REST API, CSV files, and ZIP archives
  • Strategy Pattern: Customizable validation and transformation per entity type
  • Two-Phase Processing: Separate ingestion and business logic phases
  • Dual Processing Modes: DIRECT mode for small files, STEP_FUNCTION for large-scale imports
  • Progress Tracking: Real-time status updates via SNS notifications
  • Error Handling: Built-in alarm notifications and row-level error tracking

Installation

npm install @mbc-cqrs-serverless/import

Quick Start

1. Register the Module

import { Module } from '@nestjs/common';
import { ImportModule } from '@mbc-cqrs-serverless/import';
import { ProductImportStrategy } from './strategies/product-import.strategy';
import { ProductProcessStrategy } from './strategies/product-process.strategy';
import { ProductModule } from './product/product.module';

@Module({
  imports: [
    ImportModule.register({
      profiles: [
        {
          tableName: 'product',
          importStrategy: ProductImportStrategy,
          processStrategy: ProductProcessStrategy,
        },
      ],
      imports: [ProductModule], // Optional: modules that provide strategy dependencies
      enableController: true, // Optional: enable REST endpoints
    }),
  ],
})
export class AppModule {}

2. Implement Import Strategy

import { Injectable } from '@nestjs/common';
import { IImportStrategy } from '@mbc-cqrs-serverless/import';

@Injectable()
export class ProductImportStrategy implements IImportStrategy<RawProductInput, ProductDto> {
  async transform(input: RawProductInput): Promise<ProductDto> {
    return {
      code: input.product_code?.trim(),
      name: input.product_name?.trim(),
      price: parseFloat(input.price),
      category: input.category?.toUpperCase(),
    };
  }

  async validate(dto: ProductDto): Promise<void> {
    if (!dto.code) throw new Error('Product code is required');
    if (!dto.name) throw new Error('Product name is required');
    if (isNaN(dto.price) || dto.price < 0) throw new Error('Invalid price');
  }
}

3. Implement Process Strategy

import { Injectable } from '@nestjs/common';
import {
  IProcessStrategy,
  ComparisonStatus,
  ComparisonResult,
} from '@mbc-cqrs-serverless/import';
import { CommandService, DataModel, CommandInputModel } from '@mbc-cqrs-serverless/core';

// Define your entity model
interface ProductModel extends DataModel {
  code: string;
  name: string;
  price: number;
}

@Injectable()
export class ProductProcessStrategy implements IProcessStrategy<ProductModel, ProductDto> {
  constructor(
    private readonly productService: ProductService,
    private readonly commandService: CommandService,
  ) {}

  async compare(dto: ProductDto, tenantCode: string): Promise<ComparisonResult<ProductModel>> {
    const existing = await this.productService.findByCode(dto.code, tenantCode);
    if (!existing) {
      return { status: ComparisonStatus.NOT_EXIST };
    }
    if (this.hasChanges(existing, dto)) {
      return { status: ComparisonStatus.CHANGED, existingData: existing };
    }
    return { status: ComparisonStatus.EQUAL };
  }

  async map(
    status: ComparisonStatus.NOT_EXIST | ComparisonStatus.CHANGED,
    dto: ProductDto,
    tenantCode: string,
    existingData?: ProductModel,
  ): Promise<CommandInputModel> {
    return {
      pk: `PRODUCT#${tenantCode}`,
      sk: `PRODUCT#${dto.code}`,
      code: dto.code,
      name: dto.name,
      attributes: { price: dto.price },
    };
  }

  getCommandService(): CommandService {
    return this.commandService;
  }
}

Architecture

The import module uses a two-phase architecture:

┌─────────────────────────────────────────────────────────────────┐
│                    Import Architecture                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Phase 1: Ingestion                                              │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────┐     │
│  │ REST API /   │────▶│ ImportStrategy│────▶│  Temp Table  │     │
│  │ CSV / ZIP    │     │ transform()   │     │  (CREATED)   │     │
│  └──────────────┘     │ validate()    │     └──────────────┘     │
│                       └──────────────┘            │              │
│                                                   ▼ DynamoDB     │
│  Phase 2: Processing                              │ Streams      │
│  ┌──────────────┐     ┌──────────────┐     ┌─────┴────────┐     │
│  │ ProcessStrategy│◀───│  SNS/SQS     │◀────│   Lambda     │     │
│  │ compare()    │     │  Event       │     │   Trigger    │     │
│  │ map()        │     └──────────────┘     └──────────────┘     │
│  └──────────────┘                                                │
│         │                                                        │
│         ▼                                                        │
│  ┌──────────────┐     ┌──────────────┐                          │
│  │ CommandService│────▶│ Final Table  │                          │
│  │ publish()    │     │ (Data Store) │                          │
│  └──────────────┘     └──────────────┘                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

API Reference

ImportService

| Method | Description | |--------|-------------| | createWithApi(dto, options) | Import single record from REST API | | handleCsvImport(dto, options) | Import from CSV file (DIRECT or STEP_FUNCTION) | | createCsvJob(dto, options) | Create CSV import job for Step Functions | | createZipJob(dto, options) | Create ZIP import job for Step Functions | | createImport(dto, options) | Create import record in temp table | | updateStatus(key, status, payload?, attributes?, notifyId?) | Update import status | | getImportByKey(key) | Get import record by key | | incrementParentJobCounters(parentKey, childSucceeded) | Update parent job progress |

IImportStrategy Interface

interface IImportStrategy<TInput extends object, TAttributesDto extends object> {
  transform(input: TInput): Promise<TAttributesDto>;
  validate(data: TAttributesDto): Promise<void>;
}

IProcessStrategy Interface

interface IProcessStrategy<TEntity extends DataModel, TAttributesDto extends object> {
  compare(importAttributes: TAttributesDto, tenantCode: string): Promise<ComparisonResult<TEntity>>;
  map(status: ComparisonStatus, importAttributes: TAttributesDto, tenantCode: string, existingData?: TEntity): Promise<CommandInputModel | CommandPartialInputModel>;
  getCommandService(): CommandService;
}

interface ComparisonResult<TEntity> {
  status: ComparisonStatus;
  existingData?: TEntity; // Provided when status is CHANGED
}

enum ComparisonStatus {
  NOT_EXIST = 'NOT_EXIST',
  CHANGED = 'CHANGED',
  EQUAL = 'EQUAL',
}

ImportStatusEnum

| Status | Description | |--------|-------------| | CREATED | Import record created | | PROCESSING | Being processed | | COMPLETED | Successfully completed | | FAILED | Processing failed |

ImportStatusHandler

The ImportStatusHandler is an internal event handler that manages Step Functions callbacks for import jobs. When using Step Functions orchestration (ZIP imports or STEP_FUNCTION mode CSV imports), this handler ensures proper communication with the state machine.

Behavior

| Import Status | Action | Step Functions Command | |---------------|--------|----------------------| | COMPLETED | Send success callback | SendTaskSuccessCommand | | FAILED | Send failure callback | SendTaskFailureCommand | | Other statuses | Ignored | None |

Methods

| Method | Description | |--------|-------------| | sendTaskSuccess(taskToken, output) | Sends success signal to Step Functions with the import result | | sendTaskFailure(taskToken, error, cause) | Sends failure signal to Step Functions with error details |

Step Functions Integration

When an import job is created as part of a Step Functions workflow (e.g., ZIP import), a taskToken is stored in the job's attributes. The ImportStatusHandler listens for status change notifications and:

  1. Retrieves the import job from DynamoDB
  2. Checks if a taskToken exists in the job's attributes
  3. Sends the appropriate callback based on the final status:
    • COMPLETEDSendTaskSuccessCommand with result data
    • FAILEDSendTaskFailureCommand with error details

This ensures Step Functions workflows properly handle both success and failure cases without hanging indefinitely.

Usage Examples

REST API Import

@Controller('import')
export class ImportController {
  constructor(private readonly importService: ImportService) {}

  @Post('product')
  async importProduct(
    @Body() dto: ImportProductDto,
    @InvokeContext() ctx: IInvoke,
  ) {
    return this.importService.createWithApi(
      {
        tableName: 'product',
        tenantCode: dto.tenantCode,
        attributes: dto,
      },
      { invokeContext: ctx },
    );
  }
}

CSV Import (Direct Mode)

Process small CSV files immediately:

async importSmallCsv(bucket: string, key: string, opts: { invokeContext: IInvoke }) {
  return this.importService.handleCsvImport(
    {
      tenantCode: 'MBC',
      tableName: 'product',
      bucket,
      key,
      processingMode: 'DIRECT',
    },
    opts,
  );
}

CSV Import (Step Functions Mode)

Process large CSV files with Step Functions orchestration:

async importLargeCsv(bucket: string, key: string, opts: { invokeContext: IInvoke }) {
  return this.importService.handleCsvImport(
    {
      tenantCode: 'MBC',
      tableName: 'product',
      bucket,
      key,
      processingMode: 'STEP_FUNCTION',
    },
    opts,
  );
}

ZIP Import

Import multiple CSV files from a ZIP archive:

async importZip(bucket: string, key: string, opts: { invokeContext: IInvoke }) {
  return this.importService.createZipJob(
    {
      tenantCode: 'MBC',
      bucket,
      key,
    },
    opts,
  );
}

Check Import Status

async checkStatus(pk: string, sk: string) {
  const importJob = await this.importService.getImportByKey({ pk, sk });
  return {
    status: importJob.status,
    totalRows: importJob.totalRows,
    processedRows: importJob.processedRows,
    succeededRows: importJob.succeededRows,
    failedRows: importJob.failedRows,
  };
}

Processing Modes

| Mode | Use Case | Processing | |------|----------|------------| | DIRECT | Small files (< 1000 rows) | Immediate, synchronous | | STEP_FUNCTION | Large files, mission-critical | Async, resilient, tracked |

Related Packages

| Package | Description | |---------|-------------| | @mbc-cqrs-serverless/core | Core CQRS framework | | @mbc-cqrs-serverless/task | Task processing for async jobs |

Documentation

Full documentation available at https://mbc-cqrs-serverless.mbc-net.com/

License

Copyright © 2024-2025, Murakami Business Consulting, Inc. https://www.mbc-net.com/

This project is under the MIT License.