npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

kafka-console

v3.1.11

Published

Kafka CLI tool

Readme

Kafka Console CLI

A command-line interface for Apache Kafka operations.

NPM version Downloads

Installation

npm install -g kafka-console

Or run without installing:

npx kafka-console [command]

Quick Start

# List topics
kafka-console list -b localhost:9092

# Consume messages
kafka-console consume my-topic -b localhost:9092 --from 0 --count 10

# Produce a message
echo '{"key":"k1","value":"hello"}' | kafka-console produce my-topic -b localhost:9092

Global Options

These options apply to all commands:

| Option | Description | Default | Env | |--------|-------------|---------|-----| | -b, --brokers <brokers> | Comma-separated broker addresses | localhost:9092 | KAFKA_BROKERS | | -t, --timeout <ms> | Operation timeout in milliseconds | 0 (no timeout) | KAFKA_TIMEOUT | | --ssl | Enable TLS connection | false | | | --insecure | Disable TLS certificate verification (requires --ssl) | false | | | --mechanism <mech> | SASL mechanism: plain, scram-sha-256, scram-sha-512, oauthbearer | | KAFKA_MECHANISM | | --username <user> | SASL username (for plain/scram) | | KAFKA_USERNAME | | --password <pass> | SASL password (for plain/scram) | | KAFKA_PASSWORD | | --oauth-bearer <token> | SASL OAuth bearer token | | KAFKA_OAUTH_BEARER |

Commands

consume

Consume messages from a Kafka topic. Outputs one JSON object per line (JSONL).

kafka-console consume <topic> [options]

Each output line contains: partition, offset, timestamp, headers, key, value, ahead.

| Option | Description | Default | |--------|-------------|---------| | -g, --group <group> | Consumer group name | auto-generated | | -f, --from <from> | Start position: 0 (beginning), timestamp in ms, or ISO 8601 date | latest | | -c, --count <count> | Maximum number of messages to consume | unlimited | | -s, --skip <skip> | Number of messages to skip before outputting | 0 | | -o, --output <file> | Write output to a file instead of stdout | stdout | | -d, --data-format <fmt> | Value format: json, raw, or path to custom formatter | json | | --snapshot | Consume all existing messages and exit (records high watermark on start) | false |

Examples:

# Consume from the beginning, stop after 10 messages
kafka-console consume my-topic --from 0 --count 10

# Snapshot: dump all existing messages to a file and exit
kafka-console consume my-topic --snapshot -o dump.jsonl

# Consume from a specific timestamp
kafka-console consume my-topic --from "2024-06-01T00:00:00Z"

# Skip first 100 messages, take next 50
kafka-console consume my-topic --from 0 --skip 100 --count 50

# Pipe to jq for filtering
kafka-console consume my-topic | jq 'select(.value.level == "ERROR")'

# Use raw format for non-JSON message values
kafka-console consume my-topic -d raw

produce

Produce messages to a Kafka topic. Reads JSONL from stdin or a JSON array from a file.

kafka-console produce <topic> [options]

Each input line must be a JSON object with a value field. Optional fields: key, headers.

| Option | Description | Default | |--------|-------------|---------| | -d, --data-format <fmt> | Value format: json, raw, or path to custom formatter | json | | -i, --input <file> | Read messages from a JSON array file instead of stdin | stdin | | -w, --wait <ms> | Delay between messages in milliseconds | 0 | | -H, --header <header> | Static header added to every message (key:value), repeatable | none |

Examples:

# Produce a single message
echo '{"key":"user-1","value":{"name":"Alice"}}' | kafka-console produce my-topic

# Produce with static headers
echo '{"value":"data"}' | kafka-console produce my-topic -H source:cli -H env:prod

# Produce from a file (JSON array of messages)
kafka-console produce my-topic --input messages.json

# Produce raw text values
echo '{"value":"plain text"}' | kafka-console produce my-topic -d raw

# Pipe from another topic
kafka-console consume source --snapshot | kafka-console produce dest

list

List topic names. Outputs one topic name per line (JSONL).

kafka-console list [options]
kafka-console ls [options]

| Option | Description | Default | |--------|-------------|---------| | -a, --all | Include internal topics (e.g. __consumer_offsets) | false |

metadata

Display cluster metadata: brokers, controller, topics, partitions, replicas, and ISR.

kafka-console metadata

Output is a single JSONL object with brokers, clusterId, controllerId, and topicMetadata fields.

config

Describe the configuration of a Kafka resource.

kafka-console config [options]

| Option | Description | |--------|-------------| | -r, --resource <type> | Resource type: topic, broker, broker_logger (required) | | -n, --resourceName <name> | Resource name: topic name or broker ID (required) |

Examples:

kafka-console config -r topic -n my-topic
kafka-console config -r broker -n 1

topic:create

Create a new Kafka topic.

kafka-console topic:create <topic> [options]

| Option | Description | Default | |--------|-------------|---------| | -p, --partitions <n> | Number of partitions | 1 | | -r, --replicas <n> | Replication factor | 1 |

topic:delete

Delete a Kafka topic.

kafka-console topic:delete <topic>

topic:offsets

Show topic partition offsets.

kafka-console topic:offsets <topic> [timestamp] [options]

Without arguments: shows high/low watermarks per partition. With a timestamp (ms or ISO 8601): shows offsets at that point in time. With --group: shows committed offsets for a consumer group.

| Option | Description | |--------|-------------| | -g, --group <group> | Show committed offsets for this consumer group |

Examples:

# High/low watermarks
kafka-console topic:offsets my-topic

# Offsets at a specific time
kafka-console topic:offsets my-topic "2024-06-01T00:00:00Z"

# Consumer group committed offsets
kafka-console topic:offsets my-topic -g my-consumer-group

Authentication

# TLS (certificate verification enabled by default)
kafka-console consume my-topic -b broker:9093 --ssl

# TLS with self-signed certificate (skip verification)
kafka-console consume my-topic -b broker:9093 --ssl --insecure

# SASL/PLAIN
kafka-console consume my-topic -b broker:9093 --ssl \
  --mechanism plain --username myuser --password mypass

# SASL/SCRAM-SHA-256
kafka-console consume my-topic -b broker:9093 --ssl \
  --mechanism scram-sha-256 --username myuser --password mypass

# OAuth Bearer
kafka-console consume my-topic -b broker:9093 --ssl \
  --mechanism oauthbearer --oauth-bearer "eyJhbG..."

Message Formats

json (default) - values are JSON-serialized on produce and JSON-parsed on consume.

raw - values are passed as plain text strings, no serialization.

Custom formatter - provide a path to a module exporting encode and decode functions:

// formatter/custom.js
module.exports = {
  encode: (value) => Buffer.from(JSON.stringify(value)),
  decode: (buffer) => JSON.parse(buffer.toString())
};
kafka-console consume my-topic -d ./formatter/custom.js

License

License The MIT License