kafka-producer-js
v1.0.5
Published
A configurable Kafka producer package for Node.js applications with NFL event examples and AWS MSK integration
Maintainers
Readme
vi-kafka-producer
A configurable Kafka producer package for Node.js applications with NFL event examples and AWS MSK integration. This package provides an easy-to-use interface for sending messages to Apache Kafka with support for authentication, SSL, batching, and multiple topics.
Features
- 🚀 Easy to use API with Promise support
- 🔧 Configurable connection settings with environment variables
- 🔐 Support for SASL authentication and SSL
- 📦 Batch message sending
- 🎯 Multiple topics support in single operation
- ⚡ High-performance message production
- 🛡️ Built-in error handling and retry logic
- 📊 Connection status monitoring
- 🎛️ Customizable producer settings
- 🏈 NFL event examples with real-world data structures
- ☁️ AWS MSK cluster integration ready
Installation
npm install kafka-producer-jsRequirements
- Node.js >= 14.0.0
- Apache Kafka cluster (local or AWS MSK)
- dotenv package for environment configuration
Quick Start
Basic Usage
const { createConnectedProducer } = require('kafka-producer-js');
async function sendMessage() {
// Configuration
const config = {
brokers: ['localhost:9092'],
clientId: 'my-app'
};
try {
// Create and connect producer
const producer = await createConnectedProducer(config);
// Send a message
await producer.send('my-topic', {
key: 'user-123',
value: {
userId: 123,
action: 'login',
timestamp: Date.now()
}
});
// Disconnect when done
await producer.disconnect();
console.log('Message sent successfully!');
} catch (error) {
console.error('Error:', error.message);
}
}
sendMessage();Environment Variable Configuration
Create a .env file in your project root:
KAFKA_BROKERS=your-broker:9092
KAFKA_CLIENT_ID=your-app
KAFKA_LOG_LEVEL=inforequire('dotenv').config();
const config = {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: process.env.KAFKA_CLIENT_ID || 'default-client',
logLevel: process.env.KAFKA_LOG_LEVEL || 'info'
};NFL Event Examples
The package includes comprehensive examples with real NFL event data structures:
User Connection Event
const userEvent = {
key: 'user-175551803607995',
value: {
event: 'user_connected',
user_id: 175551803607995,
source: 'NFL',
timestamp: new Date().toISOString(),
connection: {
user_id: 175551973627120,
source: 'NFL'
}
},
headers: {
source: 'NFL',
version: '1.0'
}
};Batch Events
const batchEvents = [
{
key: 'user-175551803607995',
value: {
event: 'user_connected',
user_id: 175551803607995,
source: 'NFL',
timestamp: new Date().toISOString()
}
},
{
key: 'user-175551803607996',
value: {
event: 'user_disconnected',
user_id: 175551803607996,
source: 'NFL',
timestamp: new Date().toISOString(),
session_duration: 1800
}
}
];Testing and Examples
The package includes comprehensive examples and validation scripts:
Basic Usage Examples
Run the basic functionality examples (requires Kafka cluster):
npm run basic
# or
node examples/basic-usage.jsAdvanced Configuration Examples
Test advanced features and configurations:
npm run advanced
# or
node examples/advanced-usage.jsRun All Examples
Execute all examples in sequence:
npm run examplesExample Files
examples/basic-usage.js- Core functionality demonstrationexamples/advanced-usage.js- Advanced configuration examples
Configuration
Basic Configuration
const config = {
brokers: ['broker1:9092', 'broker2:9092'], // Required: Array of broker addresses
clientId: 'my-app', // Optional: Client identifier
logLevel: 'warn', // Optional: error, warn, info, debug
connectionTimeout: 3000, // Optional: Connection timeout in ms
requestTimeout: 30000, // Optional: Request timeout in ms
retry: { // Optional: Retry configuration
initialRetryTime: 100,
retries: 8
}
};AWS MSK Configuration
const config = {
brokers: [
'b-1.your-cluster.region.amazonaws.com:9094',
'b-2.your-cluster.region.amazonaws.com:9094'
],
clientId: 'aws-msk-producer',
logLevel: 'info'
};API Reference
createConnectedProducer(config)
Creates a producer instance and automatically initializes and connects it.
Parameters:
config(Object): Kafka configuration object
Returns: Promise
KafkaProducer Methods
send(topic, message)
Sends a single message to a topic.
sendBatch(topic, messages)
Sends multiple messages to a single topic.
sendToMultipleTopics(topicMessages)
Sends messages to multiple topics in a single operation.
disconnect()
Disconnects from the Kafka cluster.
isProducerConnected()
Checks if the producer is connected.
Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| KAFKA_BROKERS | Comma-separated broker addresses | localhost:9092 |
| KAFKA_CLIENT_ID | Client identifier | vi-kafka-producer |
| KAFKA_LOG_LEVEL | Log level | info |
| KAFKA_RETRY_INITIAL_TIME | Initial retry delay (ms) | 100 |
| KAFKA_RETRY_COUNT | Maximum retry attempts | 3 |
Best Practices
- Environment Configuration: Use
.envfiles for configuration - Connection Management: Reuse producer instances
- Batch Processing: Use
sendBatch()for multiple messages - Error Handling: Implement proper error handling and retry logic
- Graceful Shutdown: Disconnect producers before application shutdown
- Monitoring: Check connection status using
isProducerConnected()
License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
If you encounter any issues or have questions, please file an issue on the GitHub repository.
