@zero.core/api-framework-confluent-kafka-plugin
v1.0.0
Published
Confluent Kafka provider plugin for the ZeroCore API framework.
Downloads
157
Readme
ZeroCore Confluent Kafka Provider Plugin
@zero.core/api-framework-confluent-kafka-plugin registers a Confluent Kafka provider with the @zero.core/api-framework pub/sub bus.
Implemented Scope
ConfluentKafkaPubSubProviderConfluentKafkaPubSubPluginConfluentKafkaProviderOptionsConfluentKafkaProviderErrorContextConfluentKafkaProviderErrorHandler- structural Confluent Kafka producer and consumer contracts
This package targets Confluent Kafka JavaScript style producer and consumer contracts. It does not bundle @confluentinc/kafka-javascript; pass clients or thin adapters that implement the exported interfaces. The npm package name is @zero.core/api-framework-confluent-kafka-plugin.
Installation
yarn add @zero.core/api-framework @zero.core/api-framework-confluent-kafka-pluginUsage
import { ApiRouter, MemoryPubSubProvider, PubSubBus, ServiceContainer } from "@zero.core/api-framework";
import { ConfluentKafkaPubSubPlugin } from "@zero.core/api-framework-confluent-kafka-plugin";
const services = new ServiceContainer();
const pubSub = new PubSubBus(new MemoryPubSubProvider("local"), registrations, {
defaultProvider: "confluent-kafka",
services
});
services.addInstance(PubSubBus, pubSub);
new ApiRouter(app, controllers, {
services,
plugins: [
new ConfluentKafkaPubSubPlugin(confluentProducer, confluentConsumer, {
topicPrefix: "zerocore.",
fromBeginning: false,
onError: (error, context) => {
logger.error({ error, context }, "Confluent Kafka pub/sub delivery failed");
}
})
]
}).register();
await pubSub.start();
process.once("SIGTERM", () => {
void pubSub.stop().finally(() => process.exit(0));
});Use provider: "confluent-kafka" in @Subscribe() or pubSub.publish() options to select this provider explicitly.
Structural Client Contract
The plugin expects app-owned Confluent Kafka clients or adapters:
import type {
ConfluentKafkaConsumerClient,
ConfluentKafkaProducerClient,
} from "@zero.core/api-framework-confluent-kafka-plugin";
const producer: ConfluentKafkaProducerClient = {
send: (request) => confluentProducer.send(request),
flush: () => confluentProducer.flush?.(),
disconnect: () => confluentProducer.disconnect(),
};
const consumer: ConfluentKafkaConsumerClient = {
subscribe: (request) => confluentConsumer.subscribe(request),
run: (request) => confluentConsumer.run(request),
stop: () => confluentConsumer.stop?.(),
disconnect: () => confluentConsumer.disconnect(),
};The provider calls consumer.run(...) once and dispatches messages by resolved topic. It calls optional lifecycle hooks from close(), which is invoked when PubSubBus.stop() closes providers.
Verification Stages
yarn typecheck
yarn test:contracts
yarn test:unit
yarn buildRun the full gate:
yarn verify:full
yarn pack:checkStage meaning:
typecheckverifies provider and plugin contracts.test:contractsverifies package metadata, legal files, npm payload intent, Confluent send/subscribe/run translation, replay options, error reporting, lifecycle close behavior, and plugin registration errors.test:unitverifies focused provider writes, header/key normalization, handler error reporting, lifecycle close hooks, and plugin registration behavior.buildemits the publishabledistpackage.pack:checkbuilds and runsnpm pack --dry-runso the payload can be reviewed before publish.
Production Notes
- This package intentionally targets Confluent Kafka contracts, not KafkaJS contracts.
- Your app owns broker connectivity, topic provisioning, consumer group configuration, offsets, and operational policy.
- Set stable topic prefixes and app-level topic names. Consumer group and broker-level settings belong to the Confluent client or adapter you pass in.
- Deserialization and handler failures are reported through
onError. Handler failures are rethrown by default so the broker/client can apply its offset and retry semantics. - Set
throwOnHandlerError: falseonly when your application intentionally wants the provider to report handler failures and continue dispatching later handlers in the same process. close()stops/disconnects the consumer and flushes/disconnects the producer when those lifecycle methods exist.- Keep
PubSubBusregistered in the framework DI container beforeApiRouter.register()soConfluentKafkaPubSubPlugincan add the provider. - The npm package publishes compiled
distoutput plusREADME.md,LICENSE, andNOTICE; TypeScript source and tests stay out of the npm payload.
