npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

kafka-db-tool

v1.0.2

Published

A Node.js CLI tool for bidirectional data flow between Kafka and local database

Readme

Kafka-DB Data Flow Tool

License: MIT Node.js Version npm package

🚀 A powerful Node.js CLI tool for seamless bidirectional data synchronization between Apache Kafka and PostgreSQL databases. Perfect for real-time data pipelines, database replication, and event-driven architectures, with built-in support for Confluent Cloud.

Features

  • Bidirectional Sync: Transfer data seamlessly between Kafka and PostgreSQL
  • Batch Processing: Optimized performance with configurable batch sizes and intervals
  • Robust Error Handling: Comprehensive error recovery with detailed logging
  • Enterprise Security: Built-in support for SASL authentication and SSL encryption
  • Cloud-Ready: Native integration with Confluent Cloud
  • Production Monitoring: Detailed logging with Winston for better observability
  • Graceful Recovery: Automatic handling of connection issues and process signals

Prerequisites

Before you begin, ensure you have the following installed:

  • Node.js 18.x or later
  • PostgreSQL 12.x or later
  • Access to a Kafka cluster (local or Confluent Cloud)
  • npm 8.x or later

🚀 Quick Start

Installation

# Install globally (recommended for CLI usage)
npm install -g kafka-db-tool

# Or install locally in your project
npm install kafka-db-tool

Environment Setup

Create a .env file with your credentials:

# Kafka Configuration
KAFKA_BROKERS=your-broker.confluent.cloud:9092
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret

# PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password

Quick Example

# Start producing data from PostgreSQL to Kafka
kafka-db-tool produce users-topic "SELECT id, name, email FROM users LIMIT 10"

# Consume data from Kafka to PostgreSQL
kafka-db-tool consume users-topic users_backup

Verify Installation

# Check if the tool is properly installed
kafka-db-tool --version

# View available commands
kafka-db-tool --help

Configuration

Kafka Configuration

The tool supports both Confluent Cloud and local Kafka deployments with comprehensive security options.

Confluent Cloud Setup

  1. Get your credentials from Confluent Cloud:

    • Go to your Confluent Cloud cluster
    • Navigate to "API keys" section
    • Create a new API key or use existing one
    • Note down the Bootstrap server
  2. Configure environment variables:

# Required Confluent Cloud Configuration
KAFKA_BROKERS=<your-cluster>.confluent.cloud:9092
KAFKA_SASL_USERNAME=<your-api-key>
KAFKA_SASL_PASSWORD=<your-api-secret>

# Performance Tuning (Optional)
KAFKA_CONNECTION_TIMEOUT=3000
KAFKA_REQUEST_TIMEOUT=25000
KAFKA_RETRY_COUNT=8
KAFKA_MAX_RETRY_TIME=30000

# Security Configuration (Recommended)
KAFKA_SSL_REJECT_UNAUTHORIZED=true  # Always true in production

Local Development Setup

  1. Start local Kafka using provided Docker Compose:
docker-compose up -d
  1. Configure environment for local development:
# Local Development Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CONNECTION_TIMEOUT=3000

Security Best Practices

  • Always use SASL authentication in production
  • Keep SSL verification enabled (KAFKA_SSL_REJECT_UNAUTHORIZED=true)
  • Rotate API keys regularly
  • Use separate API keys for development and production

2. PostgreSQL Configuration

PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_password

Usage Guide

The tool provides two main operations: producing messages from database queries and consuming messages into database tables.

Producer Mode (Database → Kafka)

Send data from PostgreSQL queries to Kafka topics:

kafka-db-tool produce <topic> <query> [options]

Options:
  -b, --batch-size <size>   Batch size for queries (default: "100")
  -i, --interval <ms>       Interval between batches (default: "1000")

Producer Examples

  1. Real-time Updates:
# Stream recent user activities
kafka-db-tool produce user-activities \
  "SELECT user_id, action, created_at FROM activities WHERE created_at > NOW() - INTERVAL '1 hour'" \
  --batch-size 50 \
  --interval 5000
  1. Incremental Data Loading:
# Load orders with pagination
kafka-db-tool produce orders-stream \
  "SELECT * FROM orders WHERE processed = false ORDER BY created_at" \
  --batch-size 100
  1. Filtered Data Sync:
# Sync specific order statuses
kafka-db-tool produce pending-orders \
  "SELECT id, customer_id, total, status FROM orders WHERE status IN ('pending', 'processing')" \
  --batch-size 75 \
  --interval 3000

Consumer Mode (Kafka → Database)

Write messages from Kafka topics to PostgreSQL tables:

kafka-db-tool consume <topic> <table> [options]

Options:
  -g, --group <id>         Consumer group ID (default: "kafka-db-tool")
  -b, --batch-size <size>  Batch size for writes (default: "100")

Consumer Examples

  1. Basic Consumption:
# Write messages to backup table
kafka-db-tool consume orders-topic orders_archive \
  --batch-size 200
  1. Multiple Consumers:
# Run parallel consumers with different group IDs
kafka-db-tool consume user-events user_logs \
  --group user-logger-1 \
  --batch-size 150

kafka-db-tool consume user-events user_logs \
  --group user-logger-2 \
  --batch-size 150
  1. Data Warehouse Loading:
# Load analytics data with larger batches
kafka-db-tool consume metrics-stream analytics_raw \
  --batch-size 500 \
  --group analytics-loader

Best Practices

  1. Batch Size Tuning:

    • Start with smaller batches (50-100)
    • Increase if throughput is needed and memory allows
    • Monitor database and Kafka broker load
  2. Consumer Groups:

    • Use meaningful group IDs for tracking
    • Separate groups for different purposes
    • Consider using hostname or environment in group ID
  3. Error Handling:

    • The tool automatically handles connection issues
    • Failed messages are logged in error.log
    • Use monitoring for production deployments

Development Setup

  1. Start local Kafka cluster:
docker-compose up -d
  1. Create test database:
createdb kafka_db_test
  1. Set up environment variables:
cp .env.example .env
# Edit .env with your configurations

🔧 Troubleshooting

Common Issues

Kafka Connection Issues

Error: Could not reach Kafka cluster

Solutions:

  • Verify your KAFKA_BROKERS URL is correct
  • Check if your network allows the connection
  • For Confluent Cloud, ensure you're using the correct port (9092)
  • Verify SSL/TLS settings if using secure connections
Error: Invalid API key or secret

Solutions:

  • Double-check your KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD
  • Ensure API key has appropriate permissions
  • For Confluent Cloud, verify API key is active

Database Connection Issues

Error: Connection terminated unexpectedly

Solutions:

  • Verify PostgreSQL is running and accessible
  • Check database credentials in your .env file
  • Ensure database port is not blocked by firewall
Error: Relation does not exist

Solutions:

  • Confirm table exists in the specified database
  • Check user permissions for the table
  • Verify correct database name in connection string

Getting Help

  • Run kafka-db-tool --help for command usage
  • Check logs in error.log and combined.log for detailed error messages
  • Submit issues on GitHub for bug reports

Environment Variables Reference

Kafka Configuration

  • KAFKA_BROKERS: Comma-separated list of Kafka brokers
  • KAFKA_SASL_USERNAME: SASL username (API key)
  • KAFKA_SASL_PASSWORD: SASL password (API secret)
  • KAFKA_SSL_REJECT_UNAUTHORIZED: SSL verification (default: true)
  • KAFKA_CONNECTION_TIMEOUT: Connection timeout in ms (default: 3000)
  • KAFKA_REQUEST_TIMEOUT: Request timeout in ms (default: 25000)

PostgreSQL Configuration

  • PGHOST: Database host
  • PGPORT: Database port
  • PGDATABASE: Database name
  • PGUSER: Database user
  • PGPASSWORD: Database password

Logging

Logs are written to:

  • error.log: Error-level logs
  • combined.log: All logs

Contributing

  1. Fork the repository
  2. Create your feature branch
  3. Commit your changes
  4. Push to the branch
  5. Create a new Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

node src/index.js produce data_migration information_schema.tables 'select * from information_schema.tables' -rk 'table_catalog,table_schema,table_name'

node src/index.js consume test-user test_table -g 'local'

data_migration