kafka-typegen
v1.0.0
Published
Type-safe TypeScript Kafka client generation from Avro schemas, topic definitions, and Schema Registry-aware schema evolution workflows.
Maintainers
Readme
kafka-typegen
kafka-typegen generates a type-safe TypeScript Kafka client from topic definitions, Avro schemas, and Schema Registry-aware event contracts.
It is built for teams using Kafka, TypeScript, Avro, KafkaJS, Platformatic, and Confluent-compatible Schema Registry who want event contracts to feel like a real application API instead of a loose collection of topic names, JSON blobs, and hand-maintained wrapper code.
Define topics, events, and schema files once, and kafka-typegen will:
- payload interfaces derived from Avro
- typed producer helpers grouped by topic and event
- typed consumer helpers grouped by topic and event
- topic-level consumer subscriptions for multi-event topics
- a composed client API on top of a small runtime abstraction
- dry-run and apply Schema Registry subject creation plus Kafka topic provisioning from the same config
- surface schema drift and conservative schema evolution hints before you apply changes
It supports both single-event and multi-event topics.
Why Teams Use It
kafka-typegen is for the gap between "we have Avro schemas" and "our application code is actually safe to use."
It helps when you want to:
- generate TypeScript types from Avro schemas
- ship typed KafkaJS or Platformatic producers and consumers
- keep topic names, event names, and payload types aligned
- plan Schema Registry subject changes before applying them
- make schema evolution visible in CI instead of discovering breakage at runtime
Core Value
Many Kafka toolchains stop at schema parsing. kafka-typegen goes further by treating your Kafka contract as both application code and infrastructure intent.
That means one config can drive:
- generated TypeScript client code for producers and consumers
- Kafka topic planning and creation
- Schema Registry subject creation
- dry-run reporting for drift and schema evolution concerns
The result is a workflow that is easier to review, easier to automate, and easier to trust in a TypeScript codebase.
Stability
kafka-typegen 1.0.0 defines its stable surface in docs/stability-policy.md.
For 1.0.0, the main compatibility targets are:
- the validated config shape and current defaults
- the default generated
minimalAPI built aroundcreateProducer(...),createConsumer(...), andcreateClient(...) - the documented runtime entrypoints under
kafka-typegen/runtime* - the documented CLI and sync behavior
Areas that are still intentionally limited are called out explicitly in the stability policy, especially low-level internals and full TLS certificate-wiring support.
What It Generates
Given a config like:
import { defineConfig } from 'kafka-typegen';
export default defineConfig({
outputDir: './generated',
sources: {
rootDir: './schemas'
},
topics: [
{
name: 'user.events',
events: [
{
name: 'user.created',
schemaPath: './user-created.avsc'
}
]
}
]
});kafka-typegen generates a TypeScript module with a shape similar to:
import type {
RuntimeClient,
RuntimeConsumer,
RuntimeProducer
} from 'kafka-typegen/runtime';
export interface UserCreatedPayload {
id: string;
email: string;
isAdmin: boolean;
}
export interface UserCreatedPayloadMessage {
event: 'user.created';
topic: 'user.events';
payload: UserCreatedPayload;
}
export function createProducer(runtimeProducer: RuntimeProducer) { /* ... */ }
export function createConsumer(runtimeConsumer: RuntimeConsumer) { /* ... */ }
export function createClient(runtime: RuntimeClient) { /* ... */ }That generated module gives you:
producer.userEvents.userCreated.send(payload)consumer.userEvents.userCreated.on(handler)consumer.userEvents.on(handler)whenuser.eventshas multiple generated events- optional native transport options as the last argument, for example
producer.userEvents.userCreated.send(payload, { acks: -1 }) createClient(runtime)to bind producer and consumer together
Schema Evolution Workflow
kafka-typegen does not try to rebrand Schema Registry compatibility as a generic migration system. A better description is that it helps with schema evolution workflows around Kafka topics and Schema Registry subjects.
In practice, that means you can:
- review planned Kafka topic and Schema Registry changes with
kafka-typegen sync - apply missing topics and subjects with
kafka-typegen sync --apply - surface detected drift for already-existing resources
- get conservative hints for common schema evolution risks before a registry rejection surprises you
Schema Registry remains the source of truth for compatibility enforcement. kafka-typegen helps you plan, review, and automate those changes from the same config that generates your typed client.
Generated Imports
The generated client is application-specific code, so it is not exported from the published kafka-typegen package itself. Generate it into your source tree and import it directly from the generated file or the generated index.ts re-export.
Example config:
import { defineConfig } from 'kafka-typegen';
export default defineConfig({
outputDir: './src/generated/kafka',
sources: {
rootDir: './schemas'
},
topics: [
{
name: 'user.events',
events: [
{
name: 'user.created',
schemaPath: './user-created.avsc'
}
]
}
]
});The generator emits:
- your generated client file
- an
index.tsre-export file
That lets your application code import the generated API directly:
import {
createClient,
createProducer
} from './generated/kafka/index.js';Install
pnpm add kafka-typegenCommon search terms this package fits:
- TypeScript Kafka code generation
- Avro to TypeScript Kafka client
- KafkaJS typed producer and consumer
- Schema Registry schema evolution tooling
- Confluent Schema Registry with TypeScript
If you want to use the first-party Platformatic adapter:
pnpm add kafka-typegen @platformatic/kafkaRelease Pipeline
The repository includes .github/workflows/publish.yml, which runs on pushes to version tags like v1.0.0 and:
- installs dependencies
- runs
pnpm lint,pnpm typecheck,pnpm test, andpnpm build - verifies that the pushed Git tag matches
package.json - publishes that exact package version to npm
Stable releases are therefore tag-driven rather than canary-on-main. To publish a release:
- update
package.jsonandCHANGELOG.md - commit the release
- create and push a matching tag such as
v1.0.0
To enable publishing, set the NPM_TOKEN GitHub Actions secret for the repository.
See CHANGELOG.md for release history.
For release discipline and upgrade planning, see:
Quick Start
1. Create a config file
Create kafka-typegen.config.mjs in your project root:
import { defineConfig } from 'kafka-typegen';
export default defineConfig({
outputDir: './generated',
sources: {
rootDir: './schemas'
},
topics: [
{
name: 'user.events',
events: [
{
name: 'user.created',
schemaPath: './user-created.avsc'
},
{
name: 'user.updated',
schemaPath: './user-updated.avsc'
}
]
}
]
});2. Add your Avro schemas
Example schemas/user-created.avsc:
{
"type": "record",
"name": "UserCreated",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "isAdmin", "type": "boolean" }
]
}3. Generate the client
pnpm kafka-typegenOr point at a specific config file:
pnpm kafka-typegen --config ./kafka-typegen.config.mjsThe generated file is written to outputDir. By default the filename is kafka-client.ts.
4. Plan or apply infrastructure sync
kafka-typegen can now plan or apply Kafka topic creation and Schema Registry subject creation from the same config:
pnpm kafka-typegen sync
pnpm kafka-typegen sync --apply
pnpm kafka-typegen sync --target kafkaThe default sync mode is a dry-run. It reports what will be created and any detected drift for existing resources.
Config Reference
The config is intentionally explicit. Important fields:
interface KafkaTypegenConfig {
outputDir: string;
sources?: {
rootDir?: string;
};
schemaRegistry?: {
url: string;
auth?: {
username?: string;
password?: string;
token?: string;
};
subjectStrategy?: 'event-name' | 'topic-name' | 'topic-event';
};
runtime?: {
transport?: 'kafkajs' | '@platformatic/kafka';
module?: string;
};
sync?: {
kafka?: {
brokers: string[];
clientId?: string;
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
failOnDrift?: boolean;
};
schemaRegistry?: {
onDrift?: 'register' | 'fail' | 'ignore';
compatibility?:
| 'BACKWARD'
| 'BACKWARD_TRANSITIVE'
| 'FORWARD'
| 'FORWARD_TRANSITIVE'
| 'FULL'
| 'FULL_TRANSITIVE'
| 'NONE';
};
};
generation?: {
apiMode?: 'minimal' | 'advanced';
avroExternalTypes?: Record<string, string>;
avroSemanticMode?: 'default' | 'safe';
typesFileName?: string;
};
naming?: {
eventTypeSuffix?: string;
topicTypeSuffix?: string;
};
topics: Array<{
name: string;
keySchemaPath?: string;
subjectStrategy?: 'event-name' | 'topic-name' | 'topic-event';
sync?: {
partitions: number;
replicationFactor: number;
cleanupPolicy?: 'delete' | 'compact' | 'compact,delete';
compressionType?:
| 'producer'
| 'uncompressed'
| 'gzip'
| 'snappy'
| 'lz4'
| 'zstd';
retentionMs?: number;
retentionBytes?: number;
maxMessageBytes?: number;
minCompactionLagMs?: number;
};
events: Array<{
name: string;
schemaPath: string;
keySchemaPath?: string;
subject?: string;
}>;
}>;
}Notes
sources.rootDiris the base directory used to resolve schema paths.- Topic and event ordering is normalized deterministically before generation.
- Event names must be unique across all topics.
runtime.modulecontrols which runtime package the generated file imports its runtime types from.- If
runtime.moduleis omitted:kafkajsdefaults tokafka-typegen/runtime/kafkajs@platformatic/kafkadefaults tokafka-typegen/runtime/platformatic
generation.typesFileName, when set, controls the generated client filename. The generator also emitsindex.tsunless the filename is alreadyindex.ts.generation.apiModedefaults tominimal. Useadvancedonly if you explicitly want the older metadata-heavy generated surface.generation.avroExternalTypesmaps Avro named types to TypeScript type expressions for references that are not generated in the same catalog run. Values can be bare type names or expressions such asimport('./shared-types.js').Address.generation.avroSemanticModedefaults todefault. Usesafeto render plain Avrolongvalues asbigintwhile keeping the existing logical-type aliases.sync.kafkaconfig is used only by thesyncCLI command.schemaRegistryis the single source of truth for Schema Registry connection details.sync.schemaRegistrycontrols registry sync policy only.sync.schemaRegistry.onDriftdefaults toregister, sosync --applyregisters a new subject version when an existing schema changes.sync.schemaRegistry.compatibility, when set, updates each subject's Schema Registry compatibility policy before registration. If Schema Registry rejects an incompatible schema,sync --applyfails with that registry error.- Legacy
sync.schemaRegistry.failOnDrift: trueis still accepted as an alias foronDrift: 'fail', but new configs should useonDrift. - If
sync.kafkais configured, every topic must providesync.partitionsandsync.replicationFactor.
Avro Type Support
Supported schema constructs:
- top-level named
record,enum, andfixedschemas - primitive types:
null,boolean,int,long,float,double,bytes,string - unions, including nullable unions like
['null', 'string'] - arrays and maps
- inline nested records
- named nested
record,enum, andfixeddeclarations, including references by short or fully-qualified name inside the same schema file - cross-file references to top-level named records, including short and fully-qualified names when the referenced schema is part of the same generated catalog
- recursive references to a top-level record from its own fields
- logical types:
uuid->stringdate->AvroDatetime-millis->AvroTimeMillistimestamp-millis->AvroTimestampMillistimestamp-micros->AvroTimestampMicrosdecimal->AvroDecimal
Current limitations:
- cross-file references require the referenced schema to be included in the same config/catalog run unless an explicit
generation.avroExternalTypesmapping is configured longis represented asnumberin the default semantic mode andbigintingeneration.avroSemanticMode: 'safe'AvroDate,AvroTimeMillis,AvroTimestampMillis, andAvroTimestampMicrosare numeric aliases, notDateobjectsAvroDecimalis currently aUint8Arrayalias- unsupported or malformed schema shapes fail generation with an explicit error instead of falling back to
unknown
External Avro Type Mappings
Use generation.avroExternalTypes when a schema references a named Avro type that is intentionally not part of the current generation run:
import { defineConfig } from 'kafka-typegen';
export default defineConfig({
outputDir: './generated',
generation: {
avroExternalTypes: {
'com.external.Address': "import('./shared-types.js').Address"
}
},
topics: [
{
name: 'user.events',
events: [
{
name: 'user.created',
schemaPath: './user-created.avsc'
}
]
}
]
});This is a TypeScript type-resolution feature only. It does not load remote Avro schemas or fetch types from Schema Registry.
Sync Config Example
import { defineConfig } from 'kafka-typegen';
export default defineConfig({
outputDir: './generated',
schemaRegistry: {
url: process.env.SCHEMA_REGISTRY_URL,
auth: {
username: process.env.SCHEMA_REGISTRY_USERNAME,
password: process.env.SCHEMA_REGISTRY_PASSWORD
}
},
sync: {
kafka: {
brokers: [process.env.KAFKA_BROKER ?? 'localhost:9092'],
clientId: 'kafka-typegen-sync'
},
schemaRegistry: {
onDrift: 'register',
compatibility: 'BACKWARD'
}
},
sources: {
rootDir: './schemas'
},
topics: [
{
name: 'user.events',
sync: {
partitions: 3,
replicationFactor: 2,
cleanupPolicy: 'delete',
retentionMs: 86_400_000
},
events: [
{
name: 'user.created',
schemaPath: './user-created.avsc'
}
]
}
]
});Runtime Usage
The generated client is transport-agnostic. It talks to a runtime interface, and this package ships generic, KafkaJS, and Platformatic runtime entrypoints.
Observability
Runtime and sync entrypoints accept optional observability hooks:
type KafkaTypegenLogger = {
debug(message: string, context?: Record<string, unknown>): void;
info(message: string, context?: Record<string, unknown>): void;
warn(message: string, context?: Record<string, unknown>): void;
error(message: string, context?: Record<string, unknown>): void;
};
type KafkaTypegenObserver = {
onEvent(event: KafkaTypegenObservedEvent): void | Promise<void>;
};loggeris used for diagnostics and internal failures. If omitted,kafka-typegenfalls back toconsole.observerreceives structured runtime and sync events such as send/handle start-success-failure and sync start-complete-failure.- These hooks are additive. Operations still throw or fail through their normal control-flow APIs.
Generic runtime
Import from kafka-typegen/runtime for the generic runtime constructors, and from kafka-typegen/runtime/advanced for the low-level transport interfaces when you want to provide your own transport adapters:
import { createRuntimeClient } from 'kafka-typegen/runtime';
import type {
RuntimeTransportConsumer,
RuntimeTransportProducer
} from 'kafka-typegen/runtime/advanced';You provide:
producerTransport.send(message)consumerTransport.onTopic(topicName, handler)- either
serialization.serialize(metadata, payload)/serialization.deserialize(metadata, message) - or
schemaRegistry, which can be either:- direct Confluent Schema Registry connection options
- an already-created registry client that satisfies the runtime registry interface
KafkaJS runtime
KafkaJS helpers are available from both kafka-typegen/runtime and kafka-typegen/runtime/kafkajs.
import { Kafka } from 'kafkajs';
import { createKafkaJsRuntimeClient } from 'kafka-typegen/runtime';
import { createClient } from './generated/kafka-client.js';
const kafka = new Kafka({
brokers: ['localhost:9092'],
clientId: 'demo-app'
});
const runtime = createKafkaJsRuntimeClient({
producer: kafka.producer(),
consumer: kafka.consumer({
groupId: 'demo-app-group'
}),
schemaRegistry: {
url: 'http://localhost:8081'
},
runOptions: {
autoCommit: false
},
onError(error) {
console.error('KafkaJS consumer failure', error);
}
});
const client = createClient(runtime);
await client.producer.connect();
await client.consumer.connect();
await client.consumer.userEvents.userCreated.on(async (message) => {
console.log(message.payload.email);
}, {
fromBeginning: true
});
await client.consumer.run();
await client.producer.userEvents.userCreated.send({
id: 'user_1',
email: '[email protected]',
isAdmin: true
}, {
acks: -1
});
await client.consumer.close();
await client.producer.disconnect();KafkaJS-specific behavior:
- register generated consumer handlers before calling
consumer.run() - repeated subscriptions to the same topic must use the same
fromBeginningoption - subscribing a new topic after
consumer.run()has started is rejected - native KafkaJS methods like
connect,disconnect,stop,commitOffsets,pause,resume, andonremain available on the generated wrapper consumer.close()stops active consumption and disconnects the underlying KafkaJS consumer
Platformatic runtime
Platformatic helpers are available from both kafka-typegen/runtime and kafka-typegen/runtime/platformatic. The shorter import is supported:
import { Producer, Consumer } from '@platformatic/kafka';
import {
createPlatformaticRuntimeClient
} from 'kafka-typegen/runtime';Example:
import { Consumer, Producer } from '@platformatic/kafka';
import { createPlatformaticRuntimeClient } from 'kafka-typegen/runtime';
import { createClient } from './generated/kafka-client.js';
const producer = new Producer({
clientId: 'app-producer',
bootstrapBrokers: ['localhost:9092']
});
const consumer = new Consumer({
clientId: 'app-consumer',
groupId: 'app-group',
bootstrapBrokers: ['localhost:9092']
});
const runtime = createPlatformaticRuntimeClient({
producer,
consumer,
serialization: {
async serialize(_metadata, payload) {
return {
value: new TextEncoder().encode(JSON.stringify(payload))
};
},
async deserialize(_metadata, message) {
return JSON.parse(new TextDecoder().decode(message.value));
}
}
});
const client = createClient(runtime);Consumer stream errors and rejected async handlers are surfaced through onError:
const runtime = createPlatformaticRuntimeClient({
producer,
consumer,
onError(error) {
console.error('Kafka consumer failure', error);
},
serialization: {
async serialize(_metadata, payload) {
return {
value: new TextEncoder().encode(JSON.stringify(payload))
};
},
async deserialize(_metadata, message) {
return JSON.parse(new TextDecoder().decode(message.value));
}
}
});Schema Evolution Workflow
When you edit an existing .avsc file:
- Run
kafka-typegen generate --config ./kafka-typegen.config.mjs. - Run
kafka-typegen sync --config ./kafka-typegen.config.mjsto review the planned Schema Registry update. - Run
kafka-typegen sync --config ./kafka-typegen.config.mjs --applyto register a new subject version.
Recommended Avro evolution pattern:
- To add a field, prefer a nullable field with a default, or a required field with a valid default value.
- To remove a required field, first make it optional/nullable and deploy that transition before removing it later.
- To change optional to required, first ensure all producers always populate the field, then tighten the schema in a later version if your compatibility mode allows it.
Schema Registry compatibility modes such as BACKWARD and FULL are enforced by Schema Registry itself. kafka-typegen sync --apply does not bypass those checks; it surfaces the registry error if a new schema version is rejected.
When drift is detected, kafka-typegen sync also emits conservative evolution hints for common breaking changes such as adding a field without a default, removing a required field, changing an optional field to required, removing enum symbols, or changing a field type. These hints are guidance, not a replacement for Schema Registry compatibility checks.
Recommended compatibility policy:
- Use
BACKWARDfor most Kafka event streams. That fits the common rollout order of deploying consumers first, then producers, and allows additive schema changes when new fields are nullable or have defaults. - Use
FULLonly when you need both old consumers and old producers to remain compatible across schema versions. - Avoid
NONEoutside local/dev experiments, because it allows breaking schema changes without protection. - If
sync.schemaRegistry.compatibilityis omitted,kafka-typegendoes not change the subject policy and Schema Registry keeps its existing compatibility setting.
For a given topic, repeated generated subscriptions must use the same consume options. Conflicting options are rejected instead of being ignored.
Consumer Shutdown
The generated consumer exposes a transport-aware lifecycle API:
process.once('SIGINT', async () => {
await client.consumer.close();
process.exit(0);
});
process.once('SIGTERM', async () => {
await client.consumer.close();
process.exit(0);
});Use await client.consumer.stop() when you only want to stop active consumption but keep the native client available.
For Platformatic, client.consumer.close() first closes active topic streams, then closes the native consumer. If the native client still refuses to leave the consumer group because a stream is active, the wrapper retries with a forced close. You can also request that directly:
await client.consumer.close({ force: true });Runtime Schema Registry Support
Runtime helpers now support two mutually exclusive serialization modes:
serializationschemaRegistry
You must provide exactly one of them. Passing both is rejected as ambiguous, and passing neither is rejected because the runtime cannot encode or decode payloads without one.
The schemaRegistry path supports two forms:
- direct Confluent-compatible options:
schemaRegistry: {
url: 'http://localhost:8081',
auth: {
username: '...',
password: '...'
}
}- an explicit runtime registry client:
schemaRegistry: createConfluentSchemaRegistryRuntimeClient({
url: 'http://localhost:8081'
})The direct config form is the simplest default and is the recommended path unless you already need to manage the registry client yourself.
At runtime, the library uses generated internal event metadata such as subjectName, eventName, and topicName to:
- resolve the latest schema for a subject on the producer side
- encode payloads with Avro
- prepend Confluent wire format with the schema id
- read schema ids from incoming messages on the consumer side
- resolve schemas by id
- decode Avro payloads back into the typed generated message payload
The runtime keeps internal caches for subject and schema-id lookups so repeated sends and receives do not refetch the same schema information on every message.
The expected registry client shape is intentionally small and runtime-focused:
interface SchemaRegistryRuntimeClient {
getLatestSchema(subjectName: string): Promise<{
schemaId: number;
schema: string | Record<string, unknown>;
subjectName?: string;
}>;
getSchemaById(schemaId: number): Promise<{
schemaId: number;
schema: string | Record<string, unknown>;
subjectName?: string;
}>;
}Generic runtime example:
import { createRuntimeClient } from 'kafka-typegen/runtime';
import { SchemaRegistryConfig } from './generated/kafka/index.js';
const runtime = createRuntimeClient({
producerTransport,
consumerTransport,
schemaRegistry: {
...SchemaRegistryConfig,
auth: {
token: process.env.SCHEMA_REGISTRY_TOKEN
}
}
});Platformatic example:
import { Consumer, Producer } from '@platformatic/kafka';
import { createPlatformaticRuntimeClient } from 'kafka-typegen/runtime';
import { SchemaRegistryConfig, createClient } from './generated/kafka/index.js';
const runtime = createPlatformaticRuntimeClient({
producer: new Producer({
clientId: 'app-producer',
bootstrapBrokers: ['localhost:9092']
}),
consumer: new Consumer({
clientId: 'app-consumer',
groupId: 'app-group',
bootstrapBrokers: ['localhost:9092']
}),
schemaRegistry: {
...SchemaRegistryConfig,
auth: {
username: process.env.SCHEMA_REGISTRY_USERNAME,
password: process.env.SCHEMA_REGISTRY_PASSWORD
}
}
});
const client = createClient(runtime);The Platformatic adapter:
- wraps
producer.send({ messages: [...] }) - wraps
consumer.consume({ topics: [...] }) - creates at most one consume stream per topic
- fans messages out to all registered handlers for that topic
- exposes
stop()to close active topic streams without closing the native consumer - exposes
close()to close active topic streams and then close the native consumer - retries the native consumer close with a forced close when Platformatic reports an active-stream shutdown conflict
- still expects application code to decide when producer and consumer shutdown should happen
runtime.transport in kafka-typegen.config.mjs is still useful even if you import Platformatic helpers manually. It controls which runtime module path the generated client uses for its type imports. If you omit it, generated code defaults to kafka-typegen/runtime; if you set transport: '@platformatic/kafka', generated code defaults to kafka-typegen/runtime/platformatic.
If you import from kafka-typegen/runtime directly, you can omit runtime.transport and keep an explicit runtime.module only when you want to override the generated import path.
Producer-only and consumer-only runtime helpers
If your application only needs one side of the API, you do not need to build a full runtime client first.
Generic runtime:
import { createRuntimeConsumer, createRuntimeProducer } from 'kafka-typegen/runtime';
import { createConsumer, createProducer } from './generated/kafka/index.js';
const runtimeProducer = createRuntimeProducer({
producerTransport: {
async send(message) {
// send to your transport
}
},
serialization: {
async serialize(_metadata, payload) {
return {
value: new TextEncoder().encode(JSON.stringify(payload))
};
},
async deserialize() {
throw new Error('Not used by producer-only runtime.');
}
}
});
const producer = createProducer(runtimeProducer);
await producer.userEvents.userCreated.send({
id: 'user_1',
email: '[email protected]',
isAdmin: true
}, {
acks: -1
});The same helpers also support Schema Registry directly:
const runtimeProducer = createRuntimeProducer({
producerTransport,
schemaRegistry: SchemaRegistryConfig
});Platformatic runtime:
import { Producer } from '@platformatic/kafka';
import { createPlatformaticRuntimeProducer } from 'kafka-typegen/runtime';
import { createProducer } from './generated/kafka/index.js';
const runtimeProducer = createPlatformaticRuntimeProducer({
producer: new Producer({
clientId: 'app-producer',
bootstrapBrokers: ['localhost:9092']
}),
serialization: {
async serialize(_metadata, payload) {
return {
value: new TextEncoder().encode(JSON.stringify(payload))
};
},
async deserialize() {
throw new Error('Not used by producer-only runtime.');
}
}
});
const producer = createProducer(runtimeProducer);
await producer.userEvents.userCreated.send({
id: 'user_1',
email: '[email protected]',
isAdmin: true
});And the consumer-only path works the same way:
import { Consumer } from '@platformatic/kafka';
import { createPlatformaticRuntimeConsumer } from 'kafka-typegen/runtime';
import { SchemaRegistryConfig, createConsumer } from './generated/kafka/index.js';
const runtimeConsumer = createPlatformaticRuntimeConsumer({
consumer: new Consumer({
clientId: 'app-consumer',
groupId: 'app-group',
bootstrapBrokers: ['localhost:9092']
}),
schemaRegistry: SchemaRegistryConfig
});
const consumer = createConsumer(runtimeConsumer);
await consumer.userEvents.userCreated.on(async (message) => {
message.payload.isAdmin;
}, {
autocommit: false
});CLI
The package exposes a CLI binary:
kafka-typegenSupported behavior:
- default config discovery via
kafka-typegen.config.mjs - explicit config loading via
--config <path> - generation output written to the configured
outputDir - actionable validation and loading errors
synccommand with dry-run by defaultsync --applyto create missing Kafka topics and Schema Registry subjectssync --applyto register a new Schema Registry subject version when registry drift is detected andsync.schemaRegistry.onDriftresolves toregistersync --target kafka|registry|allsync --jsonfor machine-readable sync output
Examples:
kafka-typegen
kafka-typegen generate --config ./kafka-typegen.config.mjs
kafka-typegen sync --config ./kafka-typegen.config.mjs
kafka-typegen sync --apply --target registryDemo App
A standalone usage example lives in examples/demo-app. It is not part of the published library package because the package files list only ships dist/.
The demo shows:
- a local
kafka-typegen.config.mjs - an Avro schema under
schemas/ - generated client output written to
src/generated/kafkawhen you runpnpm demoorpnpm generate - a small application that uses
createPlatformaticRuntimeClient(...)fromkafka-typegen/runtime/platformatic
Run it with:
cd examples/demo-app
pnpm install
pnpm demo
pnpm consume
# in another terminal
pnpm producePackage Exports
kafka-typegen- config helpers
- schema loading and catalog builder interfaces
- generator interfaces
- generic runtime interfaces
kafka-typegen/runtime- generic runtime client and runtime types
- KafkaJS and Platformatic high-level runtime helpers
kafka-typegen/runtime/advanced- low-level transport adapter interfaces
- KafkaJS and Platformatic transport adapter builders
kafka-typegen/runtime/kafkajs- KafkaJS runtime adapter
- generic runtime types re-exported for generated imports
kafka-typegen/runtime/platformatic- Platformatic runtime adapter
- generic runtime types re-exported for generated imports
Development
Useful commands in this repository:
pnpm test
pnpm typecheck
pnpm buildFor Docker-backed end-to-end coverage with real Kafka and Schema Registry:
pnpm build
pnpm test:integration
pnpm test:integration:securepnpm test:integration requires a running Docker daemon. pnpm test stays fast and does not run the Testcontainers suite.
pnpm test:integration:secure is a separate Kafka SASL/SCRAM suite that validates authenticated sync --target kafka and KafkaJS runtime usage against a secured broker. It does not currently cover end-to-end TLS certificate wiring because the public sync config only exposes ssl: boolean, not CA/key material.
Current Scope
What this package does today:
- validates and normalizes Kafka typegen config
- loads Avro record schemas from disk
- builds a deterministic internal event catalog
- generates typed producer, consumer, and client APIs
- supports a generic runtime abstraction
- ships a first-party
@platformatic/kafkaruntime adapter - can plan or create Kafka topics through the
synccommand - can plan or create Schema Registry subjects through the
synccommand - can register a new Schema Registry subject version when schema drift is detected and the configured drift policy allows registration
What it does not do automatically:
- manage runtime client lifecycle for you
- generate multiple output files per config
- mutate existing Kafka topics to reconcile drift
- mutate existing Schema Registry subjects when drift policy is configured to fail or ignore instead of register
Status
This repository is now on the stable 1.x line. The compatibility guarantees for that line are documented in docs/stability-policy.md.
