@wiseminds/cdc
v0.2.1
Published
Powerful Change Data Capture (CDC) for Node.js — supports MySQL (binlog replication), PostgreSQL (logical replication), and MSSQL (SQL Server CDC) in real-time.
Downloads
73
Maintainers
Readme
@wiseminds/cdc
Powerful Change Data Capture for Node.js. Supports MySQL, PostgreSQL, and MSSQL (SQL Server) — capturing row-level changes in real-time with a unified API.
Features
- Multi-database support — MySQL, PostgreSQL, and SQL Server with a unified event model
- Row-level change events —
INSERT,UPDATE,DELETEwith full before/after row data - Column name resolution — resolves column names via
INFORMATION_SCHEMA - Schema & table filtering — include/exclude databases and tables
- Crash-safe resume — persists position to disk; restarts from where it left off
- Auto-reconnect — configurable retry logic on connection loss
- DDL detection — emits schema change events (MySQL, PostgreSQL)
- Unified CDCEvent — same event shape regardless of database source
MySQL
- Direct binlog streaming — connects as a MySQL replication slave
- All MySQL column types — integers, decimals, dates, JSON, enums, geometry, etc.
- GTID tracking
PostgreSQL
- Logical replication via the built-in
pgoutputplugin (PostgreSQL 10+) - Real-time WAL streaming with automatic standby status updates
- LSN-based position tracking
MSSQL
- SQL Server CDC polling — uses built-in Change Data Capture feature
- Configurable polling interval
- LSN-based position tracking with before/after row images
Requirements
- Node.js >= 18
MySQL
- MySQL 5.7 / 8.0 / 8.4+ with
log_bin=ON,binlog_format=ROW - A user with
REPLICATION SLAVEprivilege
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'cdc_pass';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;PostgreSQL
- PostgreSQL 10+ with
wal_level = logical - A user with
REPLICATIONprivilege - A publication for target tables
CREATE ROLE cdc_user WITH REPLICATION LOGIN PASSWORD 'cdc_pass';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
CREATE PUBLICATION cdc_pub FOR ALL TABLES;MSSQL
- SQL Server 2016+ or Azure SQL
- CDC enabled on database and tables
USE mydb;
EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'users',
@role_name = NULL,
@supports_net_changes = 1;Installation
npm install @wiseminds/cdc
# Install the driver for your database:
npm install mysql2 # for MySQL
npm install pg # for PostgreSQL
npm install mssql # for MSSQL
pgandmssqlare optional peer dependencies — install only what you need.
Quick Start
MySQL
import { MySQLCDC } from '@wiseminds/cdc';
const cdc = new MySQLCDC({
host: '127.0.0.1',
user: 'cdc_user',
password: 'cdc_pass',
includeSchemas: ['mydb'],
});
cdc.on('data', (event) => {
console.log(event.type, event.schema, event.table);
console.log(' after:', event.after);
});
await cdc.start();PostgreSQL
import { PostgresCDC } from '@wiseminds/cdc';
const cdc = new PostgresCDC({
host: '127.0.0.1',
user: 'cdc_user',
password: 'cdc_pass',
database: 'mydb',
publicationName: 'cdc_pub',
});
cdc.on('data', (event) => {
console.log(event.type, event.schema, event.table);
console.log(' after:', event.after);
console.log(' lsn:', event.lsn);
});
await cdc.start();MSSQL
import { MssqlCDC } from '@wiseminds/cdc';
const cdc = new MssqlCDC({
host: 'localhost',
user: 'sa',
password: 'YourStrong!Passw0rd',
database: 'mydb',
pollingIntervalMs: 500,
});
cdc.on('data', (event) => {
console.log(event.type, event.schema, event.table);
console.log(' after:', event.after);
console.log(' lsn:', event.lsn);
});
await cdc.start();Configuration
All CDC classes share common options for filtering, position tracking, and reconnection:
// Common options (available on all CDC classes)
{
// ── Filtering ──────────────────────────────────────
includeSchemas: ['mydb'],
excludeSchemas: ['sys'],
includeTables: ['mydb.orders'], // "schema.table" format
excludeTables: ['mydb.audit_log'],
// ── Position / Resume ──────────────────────────────
positionStorage: 'file', // 'file' or 'memory'
positionFilePath: './cdc-position.json',
positionFlushMs: 5000,
// ── Reconnection ──────────────────────────────────
autoReconnect: true,
reconnectIntervalMs: 3000,
maxReconnectAttempts: Infinity,
}MySQL-specific options
new MySQLCDC({
host,
port: 3306,
user,
password,
serverId: 12345, // unique replication server ID
binlogFile: 'mysql-bin.000042', // start from specific file
binlogPosition: 1234, // start from specific position
resolveColumns: true, // resolve col_0 → real column names
});PostgreSQL-specific options
new PostgresCDC({
host,
port: 5432,
user,
password,
database: 'mydb', // required
slotName: 'cdc_slot', // logical replication slot
publicationName: 'cdc_pub', // publication name
lsn: '0/16B3748', // resume from specific LSN
});MSSQL-specific options
new MssqlCDC({
host,
port: 1433,
user,
password,
database: 'mydb', // required
pollingIntervalMs: 1000, // polling interval
captureInstances: ['dbo_users'], // specific capture instances
fromLsn: '00000025:00000F38:0001', // resume from specific LSN
encrypt: true,
trustServerCertificate: true,
});Events
data — Row change
Emitted for every INSERT, UPDATE, or DELETE.
cdc.on('data', (event) => {
event.type; // 'INSERT' | 'UPDATE' | 'DELETE'
event.source; // 'mysql' | 'postgres' | 'mssql'
event.schema; // database/schema name
event.table; // table name
event.before; // row before change (UPDATE/DELETE)
event.after; // row after change (INSERT/UPDATE)
event.primaryKeys; // ['id']
event.changedColumns; // ['name', 'email'] (UPDATE only)
event.timestamp; // epoch ms
// MySQL-specific
event.binlogFile; // 'mysql-bin.000042'
event.binlogPosition; // 12345
event.gtid; // 'uuid:txnId' or null
event.serverId; // MySQL server ID
// PostgreSQL / MSSQL
event.lsn; // WAL/transaction log position
event.xid; // transaction ID (Postgres)
event.captureInstance; // capture instance name (MSSQL)
event.getPrimaryKeyValues(); // { id: 42 }
event.toJSON(); // plain object
});ddl — Schema change (MySQL, PostgreSQL)
cdc.on('ddl', (event) => {
event.schema;
event.query;
event.timestamp;
});ready — Connected and streaming
cdc.on('ready', (info) => {
console.log('Connected:', info);
});error — Non-fatal error
cdc.on('error', (err) => {
console.error('CDC error:', err.message);
});reconnecting
cdc.on('reconnecting', ({ attempt, delayMs }) => {
console.log(`Reconnecting (attempt ${attempt}) in ${delayMs}ms`);
});stopped — Cleanly shut down
cdc.on('stopped', (stats) => {
console.log('Stopped:', stats);
});Graceful Shutdown
process.on('SIGINT', async () => {
await cdc.stop(); // flushes position to disk, closes connections
process.exit(0);
});Runtime Stats
const stats = cdc.getStats();
// {
// eventsProcessed: 1042,
// errorsCount: 0,
// startedAt: 1700000000000,
// uptime: 60000,
// currentPosition: { ... }
// }API
MySQL
| Method | Description |
| ---------------------- | ----------------------------------------- |
| new MySQLCDC(config) | Create a MySQL CDC instance |
| cdc.start() | Connect and start streaming binlog events |
| cdc.stop() | Stop streaming and flush position |
| cdc.isRunning() | Check if currently running |
| cdc.getStats() | Get runtime statistics |
PostgreSQL
| Method | Description |
| ------------------------- | ------------------------------------- |
| new PostgresCDC(config) | Create a PostgreSQL CDC instance |
| cdc.start() | Connect and start logical replication |
| cdc.stop() | Stop streaming and flush position |
| cdc.isRunning() | Check if currently running |
| cdc.getStats() | Get runtime statistics |
MSSQL
| Method | Description |
| ---------------------- | ------------------------------------- |
| new MssqlCDC(config) | Create an MSSQL CDC instance |
| cdc.start() | Connect and start polling for changes |
| cdc.stop() | Stop polling and flush position |
| cdc.isRunning() | Check if currently running |
| cdc.getStats() | Get runtime statistics |
Architecture
All CDC implementations extend BaseCDC which provides:
- Event filtering (include/exclude schemas and tables)
- Position tracking with crash-safe persistence
- Auto-reconnect with configurable retry logic
- Runtime statistics
BaseCDC (EventEmitter)
├── MySQLCDC → BinlogReader (binlog streaming)
├── PostgresCDC → PgLogicalReader (WAL logical replication)
└── MssqlCDC → MssqlChangeReader (CDC polling)License
MIT
