kafka-db-tool
v1.0.2
Published
A Node.js CLI tool for bidirectional data flow between Kafka and local database
Readme
Kafka-DB Data Flow Tool
🚀 A powerful Node.js CLI tool for seamless bidirectional data synchronization between Apache Kafka and PostgreSQL databases. Perfect for real-time data pipelines, database replication, and event-driven architectures, with built-in support for Confluent Cloud.
Features
- Bidirectional Sync: Transfer data seamlessly between Kafka and PostgreSQL
- Batch Processing: Optimized performance with configurable batch sizes and intervals
- Robust Error Handling: Comprehensive error recovery with detailed logging
- Enterprise Security: Built-in support for SASL authentication and SSL encryption
- Cloud-Ready: Native integration with Confluent Cloud
- Production Monitoring: Detailed logging with Winston for better observability
- Graceful Recovery: Automatic handling of connection issues and process signals
Prerequisites
Before you begin, ensure you have the following installed:
- Node.js 18.x or later
- PostgreSQL 12.x or later
- Access to a Kafka cluster (local or Confluent Cloud)
- npm 8.x or later
🚀 Quick Start
Installation
# Install globally (recommended for CLI usage)
npm install -g kafka-db-tool
# Or install locally in your project
npm install kafka-db-toolEnvironment Setup
Create a .env file with your credentials:
# Kafka Configuration
KAFKA_BROKERS=your-broker.confluent.cloud:9092
KAFKA_SASL_USERNAME=your-api-key
KAFKA_SASL_PASSWORD=your-api-secret
# PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_passwordQuick Example
# Start producing data from PostgreSQL to Kafka
kafka-db-tool produce users-topic "SELECT id, name, email FROM users LIMIT 10"
# Consume data from Kafka to PostgreSQL
kafka-db-tool consume users-topic users_backupVerify Installation
# Check if the tool is properly installed
kafka-db-tool --version
# View available commands
kafka-db-tool --helpConfiguration
Kafka Configuration
The tool supports both Confluent Cloud and local Kafka deployments with comprehensive security options.
Confluent Cloud Setup
Get your credentials from Confluent Cloud:
- Go to your Confluent Cloud cluster
- Navigate to "API keys" section
- Create a new API key or use existing one
- Note down the Bootstrap server
Configure environment variables:
# Required Confluent Cloud Configuration
KAFKA_BROKERS=<your-cluster>.confluent.cloud:9092
KAFKA_SASL_USERNAME=<your-api-key>
KAFKA_SASL_PASSWORD=<your-api-secret>
# Performance Tuning (Optional)
KAFKA_CONNECTION_TIMEOUT=3000
KAFKA_REQUEST_TIMEOUT=25000
KAFKA_RETRY_COUNT=8
KAFKA_MAX_RETRY_TIME=30000
# Security Configuration (Recommended)
KAFKA_SSL_REJECT_UNAUTHORIZED=true # Always true in productionLocal Development Setup
- Start local Kafka using provided Docker Compose:
docker-compose up -d- Configure environment for local development:
# Local Development Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CONNECTION_TIMEOUT=3000Security Best Practices
- Always use SASL authentication in production
- Keep SSL verification enabled (
KAFKA_SSL_REJECT_UNAUTHORIZED=true) - Rotate API keys regularly
- Use separate API keys for development and production
2. PostgreSQL Configuration
PGHOST=localhost
PGPORT=5432
PGDATABASE=your_database
PGUSER=your_username
PGPASSWORD=your_passwordUsage Guide
The tool provides two main operations: producing messages from database queries and consuming messages into database tables.
Producer Mode (Database → Kafka)
Send data from PostgreSQL queries to Kafka topics:
kafka-db-tool produce <topic> <query> [options]
Options:
-b, --batch-size <size> Batch size for queries (default: "100")
-i, --interval <ms> Interval between batches (default: "1000")Producer Examples
- Real-time Updates:
# Stream recent user activities
kafka-db-tool produce user-activities \
"SELECT user_id, action, created_at FROM activities WHERE created_at > NOW() - INTERVAL '1 hour'" \
--batch-size 50 \
--interval 5000- Incremental Data Loading:
# Load orders with pagination
kafka-db-tool produce orders-stream \
"SELECT * FROM orders WHERE processed = false ORDER BY created_at" \
--batch-size 100- Filtered Data Sync:
# Sync specific order statuses
kafka-db-tool produce pending-orders \
"SELECT id, customer_id, total, status FROM orders WHERE status IN ('pending', 'processing')" \
--batch-size 75 \
--interval 3000Consumer Mode (Kafka → Database)
Write messages from Kafka topics to PostgreSQL tables:
kafka-db-tool consume <topic> <table> [options]
Options:
-g, --group <id> Consumer group ID (default: "kafka-db-tool")
-b, --batch-size <size> Batch size for writes (default: "100")Consumer Examples
- Basic Consumption:
# Write messages to backup table
kafka-db-tool consume orders-topic orders_archive \
--batch-size 200- Multiple Consumers:
# Run parallel consumers with different group IDs
kafka-db-tool consume user-events user_logs \
--group user-logger-1 \
--batch-size 150
kafka-db-tool consume user-events user_logs \
--group user-logger-2 \
--batch-size 150- Data Warehouse Loading:
# Load analytics data with larger batches
kafka-db-tool consume metrics-stream analytics_raw \
--batch-size 500 \
--group analytics-loaderBest Practices
Batch Size Tuning:
- Start with smaller batches (50-100)
- Increase if throughput is needed and memory allows
- Monitor database and Kafka broker load
Consumer Groups:
- Use meaningful group IDs for tracking
- Separate groups for different purposes
- Consider using hostname or environment in group ID
Error Handling:
- The tool automatically handles connection issues
- Failed messages are logged in error.log
- Use monitoring for production deployments
Development Setup
- Start local Kafka cluster:
docker-compose up -d- Create test database:
createdb kafka_db_test- Set up environment variables:
cp .env.example .env
# Edit .env with your configurations🔧 Troubleshooting
Common Issues
Kafka Connection Issues
Error: Could not reach Kafka cluster✅ Solutions:
- Verify your
KAFKA_BROKERSURL is correct - Check if your network allows the connection
- For Confluent Cloud, ensure you're using the correct port (9092)
- Verify SSL/TLS settings if using secure connections
Error: Invalid API key or secret✅ Solutions:
- Double-check your
KAFKA_SASL_USERNAMEandKAFKA_SASL_PASSWORD - Ensure API key has appropriate permissions
- For Confluent Cloud, verify API key is active
Database Connection Issues
Error: Connection terminated unexpectedly✅ Solutions:
- Verify PostgreSQL is running and accessible
- Check database credentials in your
.envfile - Ensure database port is not blocked by firewall
Error: Relation does not exist✅ Solutions:
- Confirm table exists in the specified database
- Check user permissions for the table
- Verify correct database name in connection string
Getting Help
- Run
kafka-db-tool --helpfor command usage - Check logs in
error.logandcombined.logfor detailed error messages - Submit issues on GitHub for bug reports
Environment Variables Reference
Kafka Configuration
KAFKA_BROKERS: Comma-separated list of Kafka brokersKAFKA_SASL_USERNAME: SASL username (API key)KAFKA_SASL_PASSWORD: SASL password (API secret)KAFKA_SSL_REJECT_UNAUTHORIZED: SSL verification (default: true)KAFKA_CONNECTION_TIMEOUT: Connection timeout in ms (default: 3000)KAFKA_REQUEST_TIMEOUT: Request timeout in ms (default: 25000)
PostgreSQL Configuration
PGHOST: Database hostPGPORT: Database portPGDATABASE: Database namePGUSER: Database userPGPASSWORD: Database password
Logging
Logs are written to:
error.log: Error-level logscombined.log: All logs
Contributing
- Fork the repository
- Create your feature branch
- Commit your changes
- Push to the branch
- Create a new Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
node src/index.js produce data_migration information_schema.tables 'select * from information_schema.tables' -rk 'table_catalog,table_schema,table_name'
node src/index.js consume test-user test_table -g 'local'
data_migration
