@awesomeniko/kafka-trail
v0.2.2
Published
A Node.js library for managing message queue with Kafka
Downloads
2,181
Maintainers
Readme
Kafka-trail - MessageQueue Library
A Node.js library for managing message queues with Kafka, designed to simplify creating, using, and managing Kafka topics with producers and consumers.
Based on Kafkajs
Features
- Fully in typescript
- Branded types
- Connect to Kafka brokers easily.
- Create or use existing Kafka topics with specified partitions.
- Initialize the message queue with minimal setup.
- Setup consumer handlers
- Compressing (see)
- Supports custom encoders/decoders.
Installation
Install the library using npm or Yarn:
npm install @awesomeniko/kafka-trailOr with Yarn:
yarn add @awesomeniko/kafka-trailUsage
Here’s an example of how to use the @awesomeniko/kafka-trail library in your project.
If you want only producer:
// Define your Kafka broker URLs
import {
CreateKTTopic,
KafkaClientId,
KafkaMessageKey,
KafkaTopicName,
KTMessageQueue
} from "@awesomeniko/kafka-trail";
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();
// Start producer
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
},
pureConfig: {},
})
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works if batchConsuming = true
createDLQ: false,
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Use publishSingleMessage method to publish message
const payload = TestExampleTopic({
fieldForPayload: 1,
}, {
messageKey: KafkaMessageKey.NULL, // If you don't want to specify message key
meta: {},
})
await messageQueue.publishSingleMessage(payload)If you want consumer only:
import type pino from "pino";
import {
KTHandler,
CreateKTTopic,
KafkaClientId,
KafkaTopicName,
KTMessageQueue
} from "@awesomeniko/kafka-trail";
// Another dependency example
class DatabaseClass {
#client: string
constructor () {
this.#client = 'test-client'
}
getClient() {
return this.#client
}
}
const dbClass = new DatabaseClass()
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue({
// If you want pass context available in handler
ctx: () => {
return {
dbClass,
}
},
});
export const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works if batchConsuming = true
createDLQ: false,
})
// Create topic handler
const testExampleTopicHandler = KTHandler({
topic: TestExampleTopic,
run: async (payload, ctx: {logger: pino.Logger, dbClass: typeof dbClass}) => {
// Ts will show you right type for `payload` variable from `TestExampleTopic`
// Ctx passed from KTMessageQueue({ctx: () => {...}})
const [data] = payload
if (!data) {
return Promise.resolve()
}
const logger = ctx.logger.child({
payload: data.fieldForPayload,
})
logger.info(dbClass.getClient())
return Promise.resolve()
},
})
messageQueue.registerHandlers([
testExampleTopicHandler,
])
// Start consumer
await messageQueue.initConsumer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
consumerGroupId: 'consumer-group-id',
batchConsuming: true // default false
},
pureConfig: {},
})For both consumer and producer:
import {
KTHandler,
CreateKTTopic,
KafkaClientId,
KafkaMessageKey,
KafkaTopicName,
KTMessageQueue
} from "@awesomeniko/kafka-trail";
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works if batchConsuming = true
createDLQ: false,
})
// Required, because inside handler we are going to publish data
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
},
pureConfig: {},
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Create topic handler
const testExampleTopicHandler = KTHandler({
topic: TestExampleTopic,
run: async (payload, _, publisher, { resolveOffset }) => { // resolveOffset available for batchConsuming = true only
// Ts will show you right type for `payload` variable from `TestExampleTopic`
const [data] = payload
if (!data) {
return Promise.resolve()
}
const newPayload = TestExampleTopic({
fieldForPayload: data.fieldForPayload + 1,
}, {
messageKey: KafkaMessageKey.NULL,
meta: {},
})
await publisher.publishSingleMessage(newPayload)
if (resolveOffset) {
// optional manual offset control when needed
}
},
})
messageQueue.registerHandlers([
testExampleTopicHandler,
])
// Start consumer
await messageQueue.initConsumer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
consumerGroupId: 'consumer-group-id',
batchConsuming: true // default false
},
pureConfig: {},
})Destroying all will help you perform graceful shutdown
const messageQueue = new KTMessageQueue();
process.on("SIGINT", async () => {
await messageQueue.destroyAll()
});
process.on("SIGTERM", async () => {
await messageQueue.destroyAll()
});Configurations
Compression codec
By default, lib using LZ4 codec to compress and decompress data.
You can override it, by passing via KTKafkaSettings type. Be careful - producer and consumer should have same codec.
Ref docs. Example:
import { KafkaClientId, KTMessageQueue } from "@awesomeniko/kafka-trail";
import { CompressionTypes } from "kafkajs";
import lz4 from "lz4";
// Instanciate messageQueue
const kafkaBrokerUrls = ["localhost:19092"];
const messageQueue = new KTMessageQueue();
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
compressionCodec: {
codecType: CompressionTypes.LZ4,
codecFn: {
compress(encoder: Buffer) {
return lz4.encode(encoder);
},
decompress<T>(buffer: Buffer) {
return lz4.decode(buffer) as T;
},
},
},
},
pureConfig: {},
})Data encoding / decoding
You can provide custom encoders / decoders for sending / receiving data. Example:
import { CreateKTTopic, KafkaTopicName } from "@awesomeniko/kafka-trail";
type MyModel = {
fieldForPayload: number
}
const { BaseTopic: TestExampleTopic } = CreateKTTopic<MyModel>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works if batchConsuming = true
createDLQ: false,
}, {
encode: (data) => {
return JSON.stringify(data)
},
decode: (data: string | Buffer) => {
if (Buffer.isBuffer(data)) {
data = data.toString()
}
return JSON.parse(data) as MyModel
},
})AJV schema adapter
Use createAjvCodecFromSchema when your payload contract is JSON Schema and you want runtime validation via AJV.
import { Ajv } from "ajv";
import { CreateKTTopic, KafkaTopicName, createAjvCodecFromSchema } from "@awesomeniko/kafka-trail";
type UserEvent = {
id: number
}
const ajv = new Ajv()
const codec = createAjvCodecFromSchema<UserEvent>({
ajv,
schema: {
$id: "user-event-id",
title: "user-event",
type: "object",
properties: {
id: {
type: "number",
},
},
required: ["id"],
additionalProperties: false,
},
})
const { BaseTopic } = CreateKTTopic<UserEvent>({
topic: KafkaTopicName.fromString('test.ajv.topic'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
createDLQ: false,
}, codec)Zod schema adapter
Use createZodCodec when schema is defined in application code with Zod.
import { z } from "zod";
import { CreateKTTopic, KafkaTopicName, createZodCodec } from "@awesomeniko/kafka-trail";
type UserEvent = {
id: number
}
const userEventSchema = z.object({
id: z.number(),
}).meta({
id: "user-event",
schemaVersion: "1",
})
const codec = createZodCodec<UserEvent>(userEventSchema)
const { BaseTopic } = CreateKTTopic<UserEvent>({
topic: KafkaTopicName.fromString('test.zod.topic'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
createDLQ: false,
}, codec)Sending batch messages
You can send batch messages instead of sending one by one, but it required a little different usage. Example:
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopicBatch({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
createDLQ: false,
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Use publishBatchMessages method to publish message
const payload = TestExampleTopic([{
value: {
test: 1,
test2: 2,
},
key: '1',
}, {
value: {
test: 3,
test2: 4,
},
key: '2',
}, {
value: {
test: 5,
test2: 6,
},
key: '3',
}])
await messageQueue.publishBatchMessages(payload)Dead Letter Queue (DLQ)
Automatically route failed messages to DLQ topics for later analysis and reprocessing.
initProducer must be called before initConsumer when at least one registered handler uses createDLQ: true, otherwise ProducerInitRequiredForDLQError is thrown.
// DLQ topics are automatically created with 'dlq.' prefix
const { BaseTopic: TestExampleTopic, DLQTopic: DLQTestExampleTopic } = CreateKTTopic<MyPayload>({
topic: KafkaTopicName.fromString('my.topic'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
createDLQ: true, // Enables DLQ
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
DLQTestExampleTopic
])
// Failed messages automatically sent to: dlq.my.topic with next model:
{
originalOffset: "123",
originalTopic: "user.events",
originalPartition: 0,
key: '["user123","user456"]',
value: [
{ userId: "user123", action: "login" },
{ userId: "user456", action: "logout" }
],
errorMessage: "Database connection failed",
failedAt: 1703123456789
}AWS Glue Schema Registry (with in-memory cache)
You can create a codec from AWS Glue Schema Registry and reuse it in CreateKTTopic / CreateKTTopicBatch.
The codec is initialized asynchronously (schema is fetched before codec creation), then works synchronously at runtime.
- Create native AWS Glue adapter (IAM/default credentials):
import { Ajv } from "ajv";
import {
createAwsGlueCodec,
createAwsGlueSchemaRegistryAdapter,
clearAwsGlueSchemaCache,
} from "@awesomeniko/kafka-trail";
type UserEvent = {
id: number
}
const ajv = new Ajv()
const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
region: "eu-central-1",
preload: {
schemas: [{
registryName: "my-registry",
schemaName: "user-events",
schemaVersionId: "schema-version-id",
}],
},
})
const codec = await createAwsGlueCodec<UserEvent>({
ajv,
glue: glueAdapter,
schema: {
registryName: "my-registry",
schemaName: "user-events",
schemaVersionId: "schema-version-id",
},
})
// clearAwsGlueSchemaCache() // optional manual cache reset- Static AWS keys (instead of IAM/default chain):
const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
region: "eu-central-1",
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
sessionToken: process.env.AWS_SESSION_TOKEN,
},
})- Zod mode (same Glue adapter, no manual
getSchema):
import { z } from "zod";
import { createAwsGlueCodec, createAwsGlueSchemaRegistryAdapter } from "@awesomeniko/kafka-trail";
type UserEvent = {
id: number
}
const glueAdapter = await createAwsGlueSchemaRegistryAdapter({
region: "eu-central-1",
})
const codec = await createAwsGlueCodec<UserEvent>({
validator: "zod",
glue: glueAdapter,
schema: {
registryName: "my-registry",
schemaName: "user-events",
},
zodSchemaFactory: ({ schema }) => {
// Build your zod schema using Glue JSON schema payload
return z.object({
id: z.number(),
})
},
})Notes:
- cache is in-memory and enabled by default per process;
- cache key is based on registry + schema identifiers and is shared for AJV/Zod modes;
- unsupported Glue data formats (for example AVRO/PROTOBUF) are rejected in this version;
- call
glueAdapter.destroy()on shutdown if you want to close the AWS SDK client explicitly; - call
clearAwsGlueSchemaCache()if you need to invalidate cached schemas manually.
Deprecated topic creators
KTTopic(...) and KTTopicBatch(...) were deprecated in previous version.
Current versions intentionally throw runtime errors if these APIs are invoked (for teams that have not migrated yet).
It's planned to be removed in the next version:
Deprecated. use CreateKTTopic(...)Deprecated. use CreateKTTopicBatch(...)
Contributing
Contributions are welcome! If you’d like to improve this library:
- Fork the repository.
- Create a new branch.
- Make your changes and submit a pull request.
License
This library is open-source and licensed under the MIT License.
