@dshahi468/nd-dataset-split
v1.0.1
Published
Split large dataset and resolve timeout issues
Maintainers
Readme
nd-dataset-split
大きなデータセットを安全に分割し、非同期処理をチャンク単位で実行するための軽量ユーティリティです。
「1リクエストあたり最大20件」などの制約があるAPI連携(例: Bedrock 呼び出し)を共通化できます。
特徴
- データを最大20件単位でチャンク化
Promise.allベースの並列チャンク処理- 並列数(
maxConcurrency)の制御 - リトライ + バックオフ(指数バックオフ対応)
- エラーごとの再試行可否を
shouldRetryで制御 - 結果のマージ処理を
mergeResultsで自由に定義 - TypeScript 型定義同梱(
index.d.ts)
インストール
npm レジストリから
npm i @dshahi468/nd-dataset-split使い方
1. シンプルに使う(processInChunks)
import { processInChunks } from "@dshahi468/nd-dataset-split";
const data = Array.from({ length: 45 }, (_, i) => i + 1);
const result = await processInChunks(
data,
async (chunk) => chunk.map((n) => n * 2),
{ chunkSize: 20 }, // 1〜20
);
console.log(result.length); // 452. 実運用向け(processDatasetInChunks)
import { processDatasetInChunks } from "nd-dataset-split";
const output = await processDatasetInChunks(
events,
async (chunk, chunkIndex, totalChunks) => {
// or you can use your logic here
const res = await fetch("your-api-name", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
events: chunk,
}),
});
const payload = await res.json().catch(() => ({}));
if (!res.ok) {
throw new Error(
`Chunk ${chunkIndex + 1}/${totalChunks}: ${payload?.error || "Failed"}`,
);
}
return payload; // チャンク単位の結果
},
{
chunkSize: 20,
maxConcurrency: 3,
retries: 2,
retryDelayMs: 600,
backoffMultiplier: 1.5,
shouldRetry: (error) => {
const message = String((error as Error)?.message || "");
return message.includes("throttl") || message.includes("timeout");
},
mergeResults: (chunkResults) => chunkResults, // 必要に応じて独自マージ
},
);API
chunkArray(data, chunkSize = 20)
配列をチャンクに分割します。
data: 対象配列chunkSize: 1〜20 の整数- 戻り値:
T[][]
wait(ms)
指定ミリ秒待機するヘルパーです。
ms: 待機時間(ミリ秒)- 戻り値:
Promise<void>
processInChunks(data, processChunk, options?)
シンプルなチャンク処理用APIです。
内部で全チャンクを Promise.all で一括実行します。
data: 対象配列processChunk:(chunk, chunkIndex, totalChunks) => Promise<R[] | R> | (R[] | R)options.chunkSize:- デフォルト
20 - 1〜20 の整数
- デフォルト
options.flatten:- デフォルト
true true: 各チャンク結果を1つの配列にフラット化false: チャンクごとの結果配列をそのまま返却
- デフォルト
processDatasetInChunks(data, processChunk, options?)
実運用向けの拡張APIです。
並列数制御、リトライ、バックオフ、カスタムマージに対応します。
data: 対象配列processChunk:(chunk, chunkIndex, totalChunks) => Promise<R> | Roptions.chunkSize:- デフォルト
20 - 1〜20 の整数
- デフォルト
options.maxConcurrency:- デフォルト
Infinity - 同時実行するチャンク数
- デフォルト
options.retries:- デフォルト
0 - 失敗時の再試行回数
- デフォルト
options.retryDelayMs:- デフォルト
0 - リトライ間隔の基準ミリ秒
- デフォルト
options.backoffMultiplier:- デフォルト
1 - 実遅延:
retryDelayMs * backoffMultiplier^attempt
- デフォルト
options.shouldRetry:- デフォルト
() => true falseを返すと即時失敗
- デフォルト
options.mergeResults:- 指定時は
chunkResultsを任意の最終形式へ変換して返却 - 未指定時は
R[](チャンク結果配列)を返却
- 指定時は
エラーハンドリングの考え方
- API制限(429 / throttling)や一時的なタイムアウトは
shouldRetryで再試行対象にする - 認可エラーや入力不正など恒久エラーは
shouldRetryでfalseを返して即失敗 - エラーメッセージに
chunkIndex + 1とtotalChunksを入れると追跡しやすい
注意点
chunkSizeは 20 を超えられません(仕様)maxConcurrencyを上げすぎると、相手API側のレート制限にかかりやすくなりますmergeResultsで重複排除・集計・並び替えなどを実装すると、呼び出し側コードを薄くできます
ライセンス
ISC
