kafka-console
v2.1.30
Published
Kafka CLI tool
Downloads
17,880
Maintainers
Readme
Kafka Console CLI
A powerful and easy-to-use command-line interface for Apache Kafka operations.
Table of Contents
- Features
- Installation
- Quick Start
- Commands
- Authentication
- Message Formats
- Environment Variables
- Common Use Cases
- Troubleshooting
- License
Features
- ✅ Consumer & Producer - Full support for consuming and producing messages
- ✅ Multiple Authentication Methods - Plain, SCRAM-SHA-256/512, AWS IAM, OAuth Bearer
- ✅ Flexible Message Formats - JSON, JavaScript, raw text, or custom formatters
- ✅ Consumer Groups - Full consumer group support with offset management
- ✅ Time-based Consumption - Read messages from specific timestamps
- ✅ SSL/TLS Support - Secure connections to Kafka clusters
- ✅ Topic Management - Create, delete, and inspect topics
- ✅ Headers Support - Read and write message headers
- ✅ GZIP Compression - Automatic compression support
- ✅ TypeScript - Full TypeScript support
Installation
Global Installation (Recommended)
npm install -g kafka-consoleLocal Installation
npm install kafka-consoleUsing without Installation
npx kafka-console [command]Quick Start
1. List all topics
kafka-console list --brokers localhost:90922. Consume messages from a topic
kafka-console consume my-topic --brokers localhost:90923. Produce a message to a topic
echo '{"message": "Hello Kafka!"}' | kafka-console produce my-topic --brokers localhost:9092Commands
Consuming Messages
kafka-console consume <topic> [options]Options
| Option | Description | Default |
|--------|-------------|---------|
| -g, --group <group> | Consumer group name | kafka-console-consumer-{timestamp} |
| -f, --from <from> | Start position (timestamp/ISO date/0 for beginning) | latest |
| -c, --count <count> | Number of messages to read | unlimited |
| -s, --skip <skip> | Number of messages to skip | 0 |
| -o, --output <file> | Write output to file | stdout |
| -d, --data-format <format> | Message format (json/js/raw/custom) | json |
| -p, --pretty | Pretty print JSON output | false |
Examples
Consume from beginning and pretty print:
kafka-console consume my-topic --from 0 --prettyConsume last 10 messages:
kafka-console consume my-topic --count 10Consume from specific timestamp:
kafka-console consume my-topic --from "2024-01-01T00:00:00Z"Consume with specific consumer group:
kafka-console consume my-topic --group my-consumer-groupSave output to file:
kafka-console consume my-topic --output messages.jsonExtract specific fields with jq:
kafka-console consume my-topic | jq '.value.userId'Producing Messages
kafka-console produce <topic> [options]Options
| Option | Description | Default |
|--------|-------------|---------|
| -i, --input <file> | Read input from file | stdin |
| -d, --data-format <format> | Message format (json/js/raw/custom) | json |
| -h, --header <header> | Add message header (format: key:value) | none |
| -w, --wait <ms> | Wait time between messages | 0 |
Examples
Produce single message:
echo '{"user": "john", "action": "login"}' | kafka-console produce my-topicProduce from file:
kafka-console produce my-topic --input messages.jsonProduce with headers:
echo '{"data": "test"}' | kafka-console produce my-topic --header "source:api" --header "version:1.0"Produce multiple messages from JSON array:
cat users.json | jq -c '.[]' | kafka-console produce my-topicProduce with key (for partitioning):
echo '{"key": "user123", "value": {"name": "John"}}' | kafka-console produce my-topicTopic Management
Create Topic
kafka-console topic:create my-new-topicDelete Topic
kafka-console topic:delete old-topicShow Topic Offsets
kafka-console topic:offsets my-topicShow Topic Offsets for Specific Timestamp
kafka-console topic:offsets my-topic "2024-01-01T00:00:00Z"Cluster Information
List All Topics
kafka-console listList Including Internal Topics
kafka-console list --allShow Cluster Metadata
kafka-console metadataShow Topic Configuration
kafka-console config --resource topic --resourceName my-topicAuthentication
SSL/TLS Connection
kafka-console consume my-topic \
--brokers broker1:9093,broker2:9093 \
--sslSASL/PLAIN
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism plain \
--username myuser \
--password mypasswordSASL/SCRAM-SHA-256
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism scram-sha-256 \
--username myuser \
--password mypasswordAWS IAM
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism aws \
--access-key-id AKIAXXXXXXXX \
--secret-access-key XXXXXXXXXX \
--session-token XXXXXXXXXXOAuth Bearer
kafka-console consume my-topic \
--brokers broker:9093 \
--ssl \
--mechanism oauthbearer \
--oauth-bearer "eyJhbGciOiJIUzI1NiIs..."Message Formats
JSON Format (Default)
Messages are parsed as JSON:
echo '{"name": "Alice", "age": 30}' | kafka-console produce my-topicRaw Format
Messages are sent as plain text:
echo "Plain text message" | kafka-console produce my-topic --data-format rawJavaScript Format
Messages can contain JavaScript exports:
echo 'module.exports = { timestamp: Date.now() }' | kafka-console produce my-topic --data-format jsCustom Formatter
Create a custom formatter module:
// formatter/custom.js
module.exports = {
encode: (value) => Buffer.from(JSON.stringify(value)),
decode: (buffer) => JSON.parse(buffer.toString())
};Use the custom formatter:
kafka-console consume my-topic --data-format ./formatter/custom.jsEnvironment Variables
Set environment variables to avoid repeating common options:
export KAFKA_BROKERS=broker1:9092,broker2:9092
export KAFKA_USERNAME=myuser
export KAFKA_PASSWORD=mypassword
export KAFKA_MECHANISM=plain
export KAFKA_TIMEOUT=30000All supported environment variables:
KAFKA_BROKERS- Comma-separated list of brokersKAFKA_TIMEOUT- Operation timeout in millisecondsKAFKA_MECHANISM- SASL mechanismKAFKA_USERNAME- SASL usernameKAFKA_PASSWORD- SASL passwordKAFKA_AUTH_ID- AWS authorization identityKAFKA_ACCESS_KEY_ID- AWS access key IDKAFKA_SECRET_ACCESS_KEY- AWS secret access keyKAFKA_SESSION_TOKEN- AWS session tokenKAFKA_OAUTH_BEARER- OAuth bearer token
Common Use Cases
Monitor Topic in Real-time
kafka-console consume logs --group monitor-group --prettyReplay Messages from Yesterday
kafka-console consume events --from "$(date -d yesterday --iso-8601)"Copy Messages Between Topics
kafka-console consume source-topic | kafka-console produce destination-topicFilter Messages
kafka-console consume all-events | jq 'select(.value.type == "ERROR")' Count Messages in Topic
kafka-console consume my-topic --from 0 | wc -lSample Messages
kafka-console consume large-topic --count 100 --prettyDebug Message Headers
kafka-console consume my-topic | jq '.headers'Troubleshooting
Connection Issues
Problem: Cannot connect to Kafka broker
Error: KafkaJSConnectionError: Connection timeoutSolution:
- Verify broker addresses are correct
- Check network connectivity:
telnet broker-host 9092 - Ensure security groups/firewalls allow connection
- For Docker: use host network or proper port mapping
Authentication Failures
Problem: Authentication failed
Error: KafkaJSProtocolError: SASL authentication failedSolution:
- Verify credentials are correct
- Check SASL mechanism matches broker configuration
- Ensure SSL is enabled if required:
--ssl
Consumer Group Issues
Problem: Not receiving messages Solution:
- Check consumer group offset:
kafka-console topic:offsets my-topic --group my-group - Reset to beginning:
--from 0 - Use a new consumer group name
Message Format Errors
Problem: JSON parsing errors
SyntaxError: Unexpected token...Solution:
- Verify message format matches specified data-format
- Use
--data-format rawfor non-JSON messages - Check for malformed JSON with:
jq . < input.json
Performance Issues
Problem: Slow message consumption Solution:
- Increase batch size in consumer configuration
- Use multiple consumer instances with same group
- Check network latency to brokers
SSL/TLS Issues
Problem: SSL handshake failed Solution:
- Ensure
--sslflag is used - Verify broker SSL port (usually 9093)
- Check certificate validity
License
License The MIT License Copyright (c) 2024 Ivan Zakharchanka
