nestjs-batch-ingest
v1.0.0
Published
Time & count based batching for NestJS. Buffer in Redis, flush in bulk via BullMQ (time window or count threshold).
Downloads
16
Maintainers
Readme
nestjs-batch-ingest
Time & count based batching for NestJS: buffer hot-path writes in Redis, then flush in bulk via BullMQ either on a schedule (time window) or immediately when a count threshold is reached — whichever comes first.
Why this package?
When a high-load endpoint needs to write a row per request, hammering the DB is expensive. A better pattern is to buffer payloads in Redis and flush them as bulk inserts. This library gives you a tiny, generic module that:
- 🚀 Hot path:
append(topic, payload)→RPUSHinto Redis (super fast). - ⏱️ Time-based flush: repeatable BullMQ job runs on a cron (e.g. every 5 minutes).
- 🔢 Count-based flush: if the buffer length crosses a threshold, we enqueue an immediate flush job.
- 🧰 Generic: works with any ORM/DB (Prisma, TypeORM, Knex, raw SQL) via a single
insertMany(items)function. - 🪵 Logging: pluggable logger or console fallback.
- 📦 Framework-native: built for NestJS, using
@nestjs/bullmqandioredis.
"Time & count" means flush occurs when either condition is met (time window elapses or item count threshold reached).
Install
pnpm add nestjs-batch-ingest @nestjs/bullmq bullmq ioredis
# peer deps you likely already have:
# pnpm add @nestjs/common @nestjs/core reflect-metadata rxjs
# Dev/test deps (in your app or the repo):
pnpm add -D jest @types/jest ts-jest typescript ts-nodeRequires Node 18+, NestJS 9/10/11, Redis 6/7, BullMQ 4.x.
Quick Start
- Register the module in your app:
// app.module.ts
import { Module } from '@nestjs/common';
import { BatchIngestModule } from 'nestjs-batch-ingest';
import { OrdersModule } from './orders.module';
@Module({
imports: [
BatchIngestModule.register({
redisUrl: process.env.REDIS_URL!, // e.g. redis://127.0.0.1:6379
flushCron: '*/5 * * * *', // every 5 min (time trigger)
defaultBatchSize: 5000, // chunk size per DB write
defaultMaxRounds: 50, // safety cap per run
defaultCountThreshold: 3000, // count trigger
// logger: yourNestLogger, // optional (else console)
// reuseExistingBullModule: true, // set true if your app already calls BullModule.forRoot
}),
OrdersModule,
],
})
export class AppModule {}- Register a topic with your bulk inserter (Prisma example):
// orders.module.ts
import { Module, OnModuleInit } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { BatchIngestModule, BatchIngestService, TopicConfig } from 'nestjs-batch-ingest';
import { OrdersController } from './orders.controller';
const prisma = new PrismaClient();
@Module({
imports: [BatchIngestModule],
controllers: [OrdersController],
})
export class OrdersModule implements OnModuleInit {
constructor(private readonly ingest: BatchIngestService) {}
onModuleInit() {
const ordersCfg: TopicConfig = {
redisKey: 'ingest:orders', // optional (defaults to ingest:<topic>)
countThreshold: 2000, // per-topic override (optional)
batchSize: 5000, // per-topic override (optional)
maxRounds: 50, // per-topic override (optional)
inserter: {
insertMany: async (items: any[]) => {
await prisma.order.createMany({
data: items.map((r) => ({
externalId: r.id, // add UNIQUE index on this for idempotency
sku: r.sku,
qty: r.qty,
price: r.price,
userId: r.userId,
createdAt: new Date(r.ts ?? Date.now()),
})),
skipDuplicates: true,
});
},
},
};
this.ingest.registerTopic('orders', ordersCfg);
}
}- Use it in a controller (hot path stays fast):
// orders.controller.ts
import { Body, Controller, Post } from '@nestjs/common';
import { BatchIngestService } from 'nestjs-batch-ingest';
@Controller('orders')
export class OrdersController {
constructor(private readonly ingest: BatchIngestService) {}
@Post()
async create(@Body() body: any) {
// TODO: validate body first
await this.ingest.append('orders', body);
return { ok: true };
}
}That’s it. Your data gets buffered and later bulk-inserted either on schedule (e.g., every 5 mins) or instantly when the buffer hits the count threshold.
API
BatchIngestModule.register(options)
interface ModuleOptions {
redisUrl: string;
flushCron?: string; // default '*/5 * * * *'
defaultBatchSize?: number; // default 5000
defaultMaxRounds?: number; // default 50
defaultCountThreshold?: number;// default 3000
reuseExistingBullModule?: boolean; // default false
logger?: LoggerLike; // optional
}BatchIngestService.registerTopic(topic, config)
export interface Inserter<T = any> { insertMany(items: T[]): Promise<void>; }
export interface TopicConfig<T = any> {
redisKey?: string; // default: ingest:<topic>
batchSize?: number; // override defaultBatchSize
maxRounds?: number; // override defaultMaxRounds
countThreshold?: number; // override defaultCountThreshold
inserter: Inserter<T>;
}BatchIngestService.append(topic, payload)
Appends payload to the Redis list for the topic. If list length ≥ countThreshold, an immediate flush-topic job is enqueued.
Production Notes
- Backpressure: tune
batchSizeandmaxRoundsto control memory usage and DB write sizes. - Observability: log flush timings and list lengths; Bull Board can inspect the queue if you add it to your app.
- Poison handling: throw from
insertManyto trigger BullMQ retry/backoff; optionally send items to your DLQ insideinsertManyif permanently bad.
