npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@awesomeniko/kafka-trail

v0.2.2

Published

A Node.js library for managing message queue with Kafka

Downloads

2,181

Readme

Kafka-trail - MessageQueue Library

A Node.js library for managing message queues with Kafka, designed to simplify creating, using, and managing Kafka topics with producers and consumers.

Based on Kafkajs


Features

  • Fully in typescript
  • Branded types
  • Connect to Kafka brokers easily.
  • Create or use existing Kafka topics with specified partitions.
  • Initialize the message queue with minimal setup.
  • Setup consumer handlers
  • Compressing (see)
  • Supports custom encoders/decoders.

Installation

Install the library using npm or Yarn:

npm install @awesomeniko/kafka-trail

Or with Yarn:

yarn add @awesomeniko/kafka-trail

Usage

Here’s an example of how to use the @awesomeniko/kafka-trail library in your project.

If you want only producer:

// Define your Kafka broker URLs
import {
  CreateKTTopic,
  KafkaClientId,
  KafkaMessageKey,
  KafkaTopicName,
  KTMessageQueue
} from "@awesomeniko/kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Start producer
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
  pureConfig: {},
})

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works if batchConsuming = true
  createDLQ: false,
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Use publishSingleMessage method to publish message
const payload = TestExampleTopic({
  fieldForPayload: 1,
}, {
  messageKey: KafkaMessageKey.NULL, // If you don't want to specify message key
  meta: {},
})

await messageQueue.publishSingleMessage(payload)

If you want consumer only:

import type pino from "pino";

import {
  KTHandler,
  CreateKTTopic,
  KafkaClientId,
  KafkaTopicName,
  KTMessageQueue
} from "@awesomeniko/kafka-trail";

// Another dependency example
class DatabaseClass {
  #client: string
  constructor () {
    this.#client = 'test-client'
  }

  getClient() {
    return this.#client
  }
}

const dbClass = new DatabaseClass()

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue({
  // If you want pass context available in handler
  ctx: () => {
    return {
      dbClass,
    }
  },
});

export const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works if batchConsuming = true
  createDLQ: false,
})

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, ctx: {logger: pino.Logger, dbClass: typeof dbClass}) => {
    // Ts will show you right type for `payload` variable from `TestExampleTopic`
    // Ctx passed from KTMessageQueue({ctx: () => {...}})

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const logger = ctx.logger.child({
      payload: data.fieldForPayload,
    })

    logger.info(dbClass.getClient())

    return Promise.resolve()
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id',
    batchConsuming: true // default false
  },
  pureConfig: {},
})

For both consumer and producer:

import {
  KTHandler,
  CreateKTTopic,
  KafkaClientId,
  KafkaMessageKey,
  KafkaTopicName,
  KTMessageQueue
} from "@awesomeniko/kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works if batchConsuming = true
  createDLQ: false,
})

// Required, because inside handler we are going to publish data
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
  pureConfig: {},
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, _, publisher, { resolveOffset }) => { // resolveOffset available for batchConsuming = true only
    // Ts will show you right type for `payload` variable from `TestExampleTopic`

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const newPayload = TestExampleTopic({
      fieldForPayload: data.fieldForPayload + 1,
    }, {
      messageKey: KafkaMessageKey.NULL,
      meta: {},
    })

    await publisher.publishSingleMessage(newPayload)

    if (resolveOffset) {
      // optional manual offset control when needed
    }
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id',
    batchConsuming: true // default false
  },
  pureConfig: {},
})

Destroying all will help you perform graceful shutdown

const messageQueue = new KTMessageQueue();

process.on("SIGINT", async () => {
  await messageQueue.destroyAll()
});

process.on("SIGTERM", async () => {
  await messageQueue.destroyAll()
});

Configurations

Compression codec

By default, lib using LZ4 codec to compress and decompress data. You can override it, by passing via KTKafkaSettings type. Be careful - producer and consumer should have same codec. Ref docs. Example:

import { KafkaClientId, KTMessageQueue } from "@awesomeniko/kafka-trail";
import { CompressionTypes } from "kafkajs";
import lz4 from "lz4";

// Instanciate messageQueue
const kafkaBrokerUrls = ["localhost:19092"];

const messageQueue = new KTMessageQueue();

await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    compressionCodec: {
      codecType: CompressionTypes.LZ4,
      codecFn: {
        compress(encoder: Buffer) {
          return lz4.encode(encoder);
        },

        decompress<T>(buffer: Buffer) {
          return lz4.decode(buffer) as T;
        },
      },
    },
  },
  pureConfig: {},
})

Data encoding / decoding

You can provide custom encoders / decoders for sending / receiving data. Example:

import { CreateKTTopic, KafkaTopicName } from "@awesomeniko/kafka-trail";

type MyModel = {
  fieldForPayload: number
}

const { BaseTopic: TestExampleTopic } = CreateKTTopic<MyModel>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works if batchConsuming = true
  createDLQ: false,
}, {
  encode: (data) => {
    return JSON.stringify(data)
  },
  decode: (data: string | Buffer) => {
    if (Buffer.isBuffer(data)) {
      data = data.toString()
    }

    return JSON.parse(data) as MyModel
  },
})

AJV schema adapter

Use createAjvCodecFromSchema when your payload contract is JSON Schema and you want runtime validation via AJV.

import { Ajv } from "ajv";
import { CreateKTTopic, KafkaTopicName, createAjvCodecFromSchema } from "@awesomeniko/kafka-trail";

type UserEvent = {
  id: number
}

const ajv = new Ajv()

const codec = createAjvCodecFromSchema<UserEvent>({
  ajv,
  schema: {
    $id: "user-event-id",
    title: "user-event",
    type: "object",
    properties: {
      id: {
        type: "number",
      },
    },
    required: ["id"],
    additionalProperties: false,
  },
})

const { BaseTopic } = CreateKTTopic<UserEvent>({
  topic: KafkaTopicName.fromString('test.ajv.topic'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
  createDLQ: false,
}, codec)

Zod schema adapter

Use createZodCodec when schema is defined in application code with Zod.

import { z } from "zod";
import { CreateKTTopic, KafkaTopicName, createZodCodec } from "@awesomeniko/kafka-trail";

type UserEvent = {
  id: number
}

const userEventSchema = z.object({
  id: z.number(),
}).meta({
  id: "user-event",
  schemaVersion: "1",
})

const codec = createZodCodec<UserEvent>(userEventSchema)

const { BaseTopic } = CreateKTTopic<UserEvent>({
  topic: KafkaTopicName.fromString('test.zod.topic'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
  createDLQ: false,
}, codec)

Sending batch messages

You can send batch messages instead of sending one by one, but it required a little different usage. Example:

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopicBatch({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
  createDLQ: false,
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Use publishBatchMessages method to publish message
const payload = TestExampleTopic([{
  value: {
    test: 1,
    test2: 2,
  },
  key: '1',
}, {
  value: {
    test: 3,
    test2: 4,
  },
  key: '2',
}, {
  value: {
    test: 5,
    test2: 6,
  },
  key: '3',
}])

await messageQueue.publishBatchMessages(payload)

Dead Letter Queue (DLQ)

Automatically route failed messages to DLQ topics for later analysis and reprocessing.

initProducer must be called before initConsumer when at least one registered handler uses createDLQ: true, otherwise ProducerInitRequiredForDLQError is thrown.

// DLQ topics are automatically created with 'dlq.' prefix
const { BaseTopic: TestExampleTopic, DLQTopic: DLQTestExampleTopic } = CreateKTTopic<MyPayload>({
  topic: KafkaTopicName.fromString('my.topic'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
  createDLQ: true, // Enables DLQ
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
  DLQTestExampleTopic
])

// Failed messages automatically sent to: dlq.my.topic with next model:
{
  originalOffset: "123",
  originalTopic: "user.events",
  originalPartition: 0,
  key: '["user123","user456"]',
  value: [
    { userId: "user123", action: "login" },
    { userId: "user456", action: "logout" }
  ],
  errorMessage: "Database connection failed",
  failedAt: 1703123456789
}

AWS Glue Schema Registry (with in-memory cache)

You can create a codec from AWS Glue Schema Registry and reuse it in CreateKTTopic / CreateKTTopicBatch. The codec is initialized asynchronously (schema is fetched before codec creation), then works synchronously at runtime.

  1. Create native AWS Glue adapter (IAM/default credentials):
import { Ajv } from "ajv";
import {
  createAwsGlueCodec,
  createAwsGlueSchemaRegistryAdapter,
  clearAwsGlueSchemaCache,
} from "@awesomeniko/kafka-trail";

type UserEvent = {
  id: number
}

const ajv = new Ajv()
const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
  region: "eu-central-1",
  preload: {
    schemas: [{
      registryName: "my-registry",
      schemaName: "user-events",
      schemaVersionId: "schema-version-id",
    }],
  },
})

const codec = await createAwsGlueCodec<UserEvent>({
  ajv,
  glue: glueAdapter,
  schema: {
    registryName: "my-registry",
    schemaName: "user-events",
    schemaVersionId: "schema-version-id",
  },
})

// clearAwsGlueSchemaCache() // optional manual cache reset
  1. Static AWS keys (instead of IAM/default chain):
const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
  region: "eu-central-1",
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
    sessionToken: process.env.AWS_SESSION_TOKEN,
  },
})
  1. Zod mode (same Glue adapter, no manual getSchema):
import { z } from "zod";
import { createAwsGlueCodec, createAwsGlueSchemaRegistryAdapter } from "@awesomeniko/kafka-trail";

type UserEvent = {
  id: number
}

const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
  region: "eu-central-1",
})

const codec = await createAwsGlueCodec<UserEvent>({
  validator: "zod",
  glue: glueAdapter,
  schema: {
    registryName: "my-registry",
    schemaName: "user-events",
  },
  zodSchemaFactory: ({ schema }) => {
    // Build your zod schema using Glue JSON schema payload
    return z.object({
      id: z.number(),
    })
  },
})

Notes:

  • cache is in-memory and enabled by default per process;
  • cache key is based on registry + schema identifiers and is shared for AJV/Zod modes;
  • unsupported Glue data formats (for example AVRO/PROTOBUF) are rejected in this version;
  • call glueAdapter.destroy() on shutdown if you want to close the AWS SDK client explicitly;
  • call clearAwsGlueSchemaCache() if you need to invalidate cached schemas manually.

Deprecated topic creators

KTTopic(...) and KTTopicBatch(...) were deprecated in previous version. Current versions intentionally throw runtime errors if these APIs are invoked (for teams that have not migrated yet). It's planned to be removed in the next version:

  • Deprecated. use CreateKTTopic(...)
  • Deprecated. use CreateKTTopicBatch(...)

Contributing

Contributions are welcome! If you’d like to improve this library:

  1. Fork the repository.
  2. Create a new branch.
  3. Make your changes and submit a pull request.

License

This library is open-source and licensed under the MIT License.