@trovohealth/drizzle-bulk-load
v0.6.0
Published
High-throughput bulk load utilities for Drizzle ORM + Postgres using COPY
Downloads
795
Maintainers
Readme
drizzle-bulk-load
High-throughput bulk load utilities for Drizzle ORM + PostgreSQL using the COPY protocol.
Inspired by django-bulk-load.
Why
Drizzle ORM doesn't natively support bulk operations that scale to hundreds of thousands of rows. Standard INSERT statements hit PostgreSQL's parameter limit (~34k params), and row-by-row operations are slow.
This library uses PostgreSQL's COPY FROM STDIN to stream data into a temporary table, then executes SQL queries (insert, update, upsert, etc.) against the destination table — all within a single transaction.
Install
npm install @trovohealth/drizzle-bulk-loadDriver support: drizzle-orm/node-postgres and drizzle-orm/postgres-js
Usage
All functions accept a Drizzle PostgreSQL database instance (db) created via drizzle().
Node-postgres:
import { drizzle } from 'drizzle-orm/node-postgres';
import { Pool } from 'pg';
const pool = new Pool({ connectionString: '...' });
const db = drizzle(pool);Postgres.js:
import { drizzle } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
const sql = postgres(process.env.DATABASE_URL!);
const db = drizzle(sql);When backed by a pooled client (pg Pool or postgres.js root client), the library automatically acquires and releases a dedicated connection for each bulk operation.
returnResults behavior
All mutation functions support returnResults: true. Instead of returning full models, they return an array aligned 1:1 with the input rows and in the same order:
[
{ pk: { id: 1 }, operation: 'none' },
{ pk: { id: 2 }, operation: 'updated' },
{ pk: { id: 3 }, operation: 'inserted' },
]pk contains the resolved primary key fields for the destination row. operation is one of 'inserted', 'updated', or 'none'.
bulkInsertModels
Insert rows using COPY, optionally ignoring conflicts.
import { bulkInsertModels } from '@trovohealth/drizzle-bulk-load';
import { users } from './schema';
await bulkInsertModels({
db,
table: users,
rows: [
{ name: 'Alice', email: '[email protected]' },
{ name: 'Bob', email: '[email protected]' },
],
ignoreConflicts: true, // optional — adds ON CONFLICT DO NOTHING
matchFields: [users.id], // optional — required to resolve existing PKs for ignored conflicts
returnResults: true, // optional — returns ordered { pk, operation } results
});bulkUpdateModels
Update existing rows by matching on primary key.
import { bulkUpdateModels } from '@trovohealth/drizzle-bulk-load';
await bulkUpdateModels({
db,
table: users,
rows: [
{ id: 1, name: 'Alice Updated' },
{ id: 2, name: 'Bob Updated' },
],
updateFields: [users.name], // optional — defaults to all fields present in rows
pkFields: [users.id], // optional — defaults to table PKs
updateIfNullFields: [users.deletedAt], // optional — only update when source or dest is NULL
returnResults: true, // optional — returns ordered { pk, operation } results
});bulkUpsertModels
Insert new rows and update existing ones in a single operation.
import { bulkUpsertModels } from '@trovohealth/drizzle-bulk-load';
await bulkUpsertModels({
db,
table: users,
rows: [
{ id: 1, name: 'Alice', email: '[email protected]' },
{ id: 99, name: 'New User', email: '[email protected]' },
],
pkFields: [users.id],
insertOnlyFields: [users.createdAt], // optional — fields only set on insert, not update
returnResults: true, // optional — returns ordered { pk, operation } results
});bulkInsertChangedModels
Append-only insert that only inserts rows where tracked fields have changed compared to the latest existing row (by order field).
import { bulkInsertChangedModels } from '@trovohealth/drizzle-bulk-load';
await bulkInsertChangedModels({
db,
table: priceHistory,
rows: newPrices,
pkFields: [priceHistory.productId],
compareFields: [priceHistory.price, priceHistory.currency],
orderField: priceHistory.id, // optional — defaults to PK
returnResults: true, // optional — returns ordered { pk, operation } results
});bulkSelectModelDicts
Select rows by composite filter keys using parameterized VALUES clauses. Automatically batches large filter sets to stay within PostgreSQL's parameter limit.
import { bulkSelectModelDicts } from '@trovohealth/drizzle-bulk-load';
const results = await bulkSelectModelDicts({
db,
table: users,
filterFields: [users.id],
selectFields: [users.id, users.name, users.email],
filterData: [[1], [2], [3]],
selectForUpdate: false, // optional — adds FOR UPDATE (requires dedicated connection)
});Note:
selectForUpdaterequires a dedicated connection. Forpg, pass aPoolClient-backed drizzle instance. Forpostgres.js, use a transaction/reserved client.
bulkLoadRowsWithQueries
Low-level function: loads rows into a temp table via COPY, then runs arbitrary SQL queries against it. All higher-level functions are built on this.
import { bulkLoadRowsWithQueries, generateLoadingTableName } from '@trovohealth/drizzle-bulk-load';
const loadingTableName = generateLoadingTableName('users');
const rows = await bulkLoadRowsWithQueries({
db,
table: users,
rows: data,
loadingTableName,
loadQueries: [
`INSERT INTO "users" (name, email) SELECT name, email FROM "${loadingTableName}"`,
],
returnRows: true,
});loadQueries also accepts object entries:
{
text: 'SELECT * FROM "users" WHERE id = $1',
values: [123],
returnsRows: true, // optional hint for drivers that do not expose field metadata
}Custom update conditions
Use updateWhere to control when updates are applied:
import { bulkUpdateModels, generateGreaterThanCondition } from '@trovohealth/drizzle-bulk-load';
await bulkUpdateModels({
db,
table: users,
rows: updates,
updateWhere: (updateColumns, srcTable, destTable) =>
generateGreaterThanCondition(srcTable, destTable, 'version'),
});Helper functions for building WHERE clauses:
generateDistinctCondition—IS DISTINCT FROM(null-safe inequality)generateGreaterThanCondition— source column > destination columngenerateJoinCondition— equality join on multiple columnsquoteIdent— safely quote SQL identifiers
How it works
- A temporary table is created (with
ON COMMIT DROP) matching the destination table's column types - Rows are serialized to CSV and streamed via
COPY FROM STDIN - SQL queries (insert/update/upsert) run against the temp table within the same transaction
- The temp table is automatically dropped when the transaction ends
License
MIT
