@platformatic/kafka-opentelemetry
v0.2.0
Published
OpenTelemetry instrumentation for @platformatic/kafka
Downloads
139,985
Readme
@platformatic/kafka-opentelemetry
OpenTelemetry instrumentation for @platformatic/kafka.
Features
- Tracing: Comprehensive tracing for Kafka producers and wrapped consumer processors.
- Semantic Conventions: Follows OpenTelemetry semantic conventions for messaging systems.
- Zero Configuration: Works out of the box with minimal setup.
- Performance Optimized: Low-overhead instrumentation designed for production use.
- Type Safety: Full TypeScript support with strong typing.
Installation
npm install @platformatic/kafka-opentelemetryGetting Started
Basic Usage
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'
import { registerInstrumentations } from '@opentelemetry/instrumentation'
import { KafkaInstrumentation, processWithTracing } from '@platformatic/kafka-opentelemetry'
import { forEach } from 'hwp'
// Initialize OpenTelemetry
const provider = new NodeTracerProvider()
provider.register()
// Register the Kafka instrumentation
registerInstrumentations({
instrumentations: [new KafkaInstrumentation()]
})
import { Producer, Consumer } from '@platformatic/kafka'
const producer = new Producer({
clientId: 'my-producer',
bootstrapBrokers: ['localhost:9092']
})
const consumer = new Consumer({
groupId: 'my-consumer-group',
clientId: 'my-consumer',
bootstrapBrokers: ['localhost:9092']
})
const stream = await consumer.consume({ topics: ['my-topic'] })
// 1. Sync message processing
for await (const message of stream) {
processWithTracing(message, message => {
// Process the message here. Spans created in this function are children of the process span.
// If this function returns a promise and you don't await processWithTracing, then you'll process message in parallel.
})
}
// 2. Async message processing
for await (const message of stream) {
await processWithTracing(message, async message => {
// Process the message here. Spans created in this function are children of the process span.
})
}
// 3. Callback based message processing
stream.on('data', message => {
processWithTracing(
message,
message => {
// Process the message here. Spans created in this function are children of the process span.
},
() => {
// Be notified when processing ends.
}
)
})
// 4. Concurrent message processing
await forEach(
stream,
async message => {
return processWithTracing(message, async message => {
// Process the message here. Spans created in this function are children of the process span.
})
},
16
) // 16 is the concurrency levelConfiguration Options
| Property | Type | Default | Description |
| ----------------------- | ---------- | ----------- | ------------------------------------------------------------------------------------------------------------------ |
| enabled | boolean | true | If the instrumentation is enabled. |
| producedKeySerializer | Function | undefined | A function to serialized produced keys before tracing them. This function must be synchronous. |
| consumedKeySerializer | Function | undefined | A function to serialized consumed keys before tracing them. This function must be synchronous. |
| beforeProduce | Function | undefined | A function to customize a message before sending it to Kafka.This function must be synchronous. |
| beforeProcess | Function | undefined | A function to analyze a consumed message before pushing to the messages stream. This function must be synchronous. |
Producer Trace
- Span Kind:
PRODUCER - Attributes:
messaging.system:kafkamessaging.operation.name:sendmessaging.operation.type:sendmessaging.destination.name: Topic namemessaging.destination.partition.id: Partition number (if specified)messaging.kafka.message.key: Message key (if present)messaging.kafka.message.tombstone:true(if no value is present)error.type: Error code (if present)
Consumer Traces
- Span Kind:
CONSUMER - Attributes:
messaging.system:kafkamessaging.operation.name:processmessaging.operation.type:processmessaging.destination.name: Topic namemessaging.destination.partition.id: Partition number (if specified)messaging.kafka.message.key: Message key (if present)messaging.kafka.message.tombstone:true(if no value is present)error.type: Error code (if present)
Metrics
This instrumentation automatically exports OpenTelemetry metrics to provide observability into Kafka operations.
messaging.client.sent.messages
- Type: Counter
- Description: Number of messages sent by Kafka producers
- Attributes:
messaging.system:kafkamessaging.operation.name:sendmessaging.destination.name: Topic namemessaging.destination.partition.id: Partition ID (when specified)error.type: Error code (only present when sending fails)
messaging.client.consumed.messages
- Type: Counter
- Description: Number of messages consumed by Kafka consumers
- Attributes:
messaging.system:kafkamessaging.operation.name:processmessaging.destination.name: Topic namemessaging.destination.partition.id: Partition IDerror.type: Error code (only present when processing fails)
Duration Histograms
messaging.client.operation.duration
- Type: Histogram
- Description: Duration of Kafka client API operations (ApiVersions, Metadata, Produce, etc.)
- Unit: Seconds
- Bucket Boundaries:
[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10] - Attributes:
messaging.system:kafkamessaging.operation.name: API operation name (e.g.,ApiVersions,Metadata,Produce)server.address: Kafka broker hostserver.port: Kafka broker port
messaging.process.duration
- Type: Histogram
- Description: Duration of message processing operations in consumers
- Unit: Seconds
- Bucket Boundaries:
[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10] - Attributes:
messaging.system:kafkamessaging.operation.name:processerror.type: Error code (only present when processing fails)
Requirements
- Node.js >= 22.14.0
License
Apache-2.0 - See LICENSE for more information.
