npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

n8n-nodes-kafka-batch-consumer

v1.0.25

Published

N8N node for consuming Kafka messages in batches

Readme

N8N Kafka Batch Consumer Node

A custom N8N node for consuming Kafka messages in batches using KafkaJS.

Features

  • Batch Message Consumption: Collect a configurable number of messages before processing
  • Flexible Authentication: Support for SASL (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and SSL/TLS
  • Comprehensive Error Handling: Graceful error handling with proper resource cleanup
  • JSON Parsing: Automatic JSON parsing with fallback to string
  • Timeout Management: Configurable read timeout with partial batch support
  • N8N Integration: Standard N8N node with credential support

Installation

npm install
npm run build

Configuration Parameters

Required Parameters

  • Brokers: Comma-separated list of Kafka broker addresses (e.g., localhost:9092)
  • Client ID: Unique identifier for this Kafka client
  • Group ID: Consumer group identifier
  • Topic: Kafka topic to consume from
  • Batch Size: Number of messages to consume in a batch

Optional Parameters

  • From Beginning: Whether to read from the beginning of the topic (default: false)
  • Session Timeout: Session timeout in milliseconds (default: 30000)

Options

  • Read Timeout: Maximum time to wait for messages in milliseconds (default: 60000)
  • Parse JSON: Whether to parse message values as JSON (default: true)

Understanding Timeouts

The node uses two different timeout configurations that serve distinct purposes:

Session Timeout (Kafka/Broker-side)

  • Purpose: Manages the connection between the consumer and the Kafka broker
  • Function: The broker uses this to determine if the consumer is still "alive" and part of the consumer group
  • Behavior:
    • The consumer must send heartbeats to the broker within this time
    • If the broker doesn't receive heartbeats for sessionTimeout milliseconds, it considers the consumer "dead" and triggers a rebalancing (reassigning partitions to other consumers in the group)
  • Typical values: 6000-300000 ms (6-300 seconds). Minimum 6000ms enforced by the broker
  • Managed by: Kafka broker and KafkaJS consumer (automatic background heartbeats)

Read Timeout (Application-side)

  • Purpose: Controls how long the n8n node waits to collect batch messages
  • Function: Prevents the workflow from blocking indefinitely if insufficient messages arrive to complete the batch
  • Behavior:
    • If batchSize messages arrive before the timeout → returns immediately
    • If the timeout expires first → returns collected messages (partial batch)
  • Typical values: 60000 ms (60 seconds) by default, user-configurable
  • Managed by: Application code (setTimeout in the execute method)

Why Both Are Needed

  • Session Timeout alone: The consumer would stay connected to the broker, but the n8n workflow would block forever if messages don't arrive
  • Read Timeout alone: The workflow would complete correctly, but the broker might disconnect the consumer during long waits if heartbeats aren't maintained

Best Practice: Keep Session Timeout ≥ Read Timeout to avoid broker disconnections while waiting for messages. However, KafkaJS sends heartbeats automatically in the background, so the consumer stays alive even during longer Read Timeouts.

Credentials

The node supports optional Kafka credentials with the following features:

SASL Authentication

  • PLAIN: Simple username/password authentication
  • SCRAM-SHA-256: Salted Challenge Response Authentication Mechanism with SHA-256
  • SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism with SHA-512

SSL/TLS Configuration

  • Reject Unauthorized: Whether to reject unauthorized SSL certificates
  • CA Certificate: Certificate Authority certificate
  • Client Certificate: Client certificate for mutual TLS
  • Client Key: Client private key for mutual TLS

Usage Example

  1. Add the "Kafka Batch Consumer" node to your workflow
  2. Configure the broker addresses and topic
  3. Set the desired batch size
  4. Optionally configure credentials for authentication
  5. Run the workflow to consume messages

Output Format

Each message is returned as an INodeExecutionData object with the following structure:

{
  json: {
    topic: string,
    partition: number,
    offset: string,
    key: string | null,
    value: any,
    timestamp: string,
    headers: Record<string, any>
  }
}

Testing

The project includes comprehensive Jest tests covering:

  • Credential handling (SASL, SSL, combinations)
  • Connection management
  • Message collection and batching
  • JSON parsing
  • Timeout handling
  • Error handling
  • Output format validation

Run tests:

npm test

Run tests with coverage:

npm run test:coverage

Coverage target: 80% minimum

Development

Build

npm run build

Lint

npm run lint

Test

npm test

License

MIT