@lucaapp/kafka-client
v0.1.2
Published
Kafka orchestration utility for Luca backend services
Keywords
Readme
@lucaapp/kafka-client
A secure Kafka messaging orchestration utility for Luca backend services with JWT authentication, encryption, metrics, and type-safe event handling for 15+ business event types including payments, operators, locations, and reservations.
🚀 Features
- 🔐 Security-First Design: Built-in JWT signature verification using ES256 algorithm and optional message encryption
- 📊 Comprehensive Monitoring: Integrated Prometheus metrics for message production, consumption, errors, and performance tracking
- 🎯 Type-Safe Event Handling: Strongly-typed event system supporting 15+ predefined event types
- 🔄 Reliable Message Processing: Automatic topic creation, consumer group management, and graceful error handling
- ⚙️ Environment-Aware: Multi-environment support (local, staging, production) with environment-specific topic naming
- 🏗️ Service Integration: Seamless integration with Luca's service identity system and logging infrastructure
📦 Installation
npm install @lucaapp/kafka-client
# or
yarn add @lucaapp/kafka-client🔧 Quick Start
Basic Setup
import { LucaKafkaClient } from '@lucaapp/kafka-client';
import { Environment, ServiceIdentity } from '@lucaapp/service-utils';
import logger from './logger';
const kafkaClient = new LucaKafkaClient.Client(
logger,
Environment.LOCAL,
serviceIdentity,
);
const topic = LucaKafkaClient.Events.KafkaTopic.PAYMENTS;
const eventType = LucaKafkaClient.Types.KafkaEvent;
await kafkaClient.connect();🔄 Direct Imports
import { KafkaClient, KafkaTopic } from '@lucaapp/kafka-client';
import { Environment, ServiceIdentity } from '@lucaapp/service-utils';
import logger from './logger';
// Initialize the client
const kafkaClient = new KafkaClient(logger, Environment.LOCAL, serviceIdentity);
// Connect to Kafka
await kafkaClient.connect();Available Namespace Properties
LucaKafkaClient.Client- Main KafkaClient classLucaKafkaClient.Types- All type definitionsLucaKafkaClient.Events- Event types, topics, and issuersLucaKafkaClient.Validation- Validation utilities
📋 Supported Event Types
The package supports comprehensive business events:
- Financial:
PAYMENTS,RESERVATION_FEES,RESERVATION_PRE_PAYMENT,PAYMENT_SYNC_ERRORS - User Management:
CONSUMERS,OPERATORS,OPERATOR_DEVICES,OPERATORS_PAY - Location Management:
LOCATIONS,LOCATION_GROUPS,LOCATION_GROUP_EMPLOYEES,OPERATOR_LOCATION_GROUPS - Reservations:
RESERVATIONS - Real-time Communication:
WS_EVENT_backend,WS_EVENT_backend_pay,WS_EVENT_backend_pos - Notifications:
CONSUMER_PUSH_NOTIFICATION
Producing Messages
import { KafkaTopic } from '@lucaapp/kafka-client';
// Produce a payment event
await kafkaClient.produce(KafkaTopic.PAYMENTS, 'payment-123', {
id: 'payment-123',
type: 'create',
entity: {
amount: 1000,
currency: 'EUR',
// ... other payment fields
},
});Consuming Messages
// Subscribe to payment events
const consumer = await kafkaClient.consume(
KafkaTopic.PAYMENTS,
async message => {
console.log('Received payment event:', message.value);
// Process the payment event
},
);⚙️ Configuration
Environment Variables
# Kafka Configuration
KAFKA_BROKER=kafka:9092
KAFKA_USERNAME=your-username
KAFKA_PASSWORD=your-password
KAFKA_SSL=true
KAFKA_ENCRYPTION_ENABLED=true
# Topic Secrets (for encryption)
KAFKA_TOPIC_SECRET_PAYMENTS=your-secret-key
KAFKA_TOPIC_SECRET_CONSUMERS=your-secret-key
# ... other topic secretsConfiguration Object
interface KafkaConfiguration {
environment: Environment;
broker: string;
clientId: string;
username?: string;
password?: string;
ssl?: boolean;
encryptionEnabled?: boolean;
}🔒 Security
Message Encryption
When encryptionEnabled is true, messages are encrypted using JWE with:
- Algorithm: A256GCMKW (AES-256 Key Wrap)
- Encryption: A256GCM (AES-256 Galois/Counter Mode)
- Per-topic secrets: Each topic uses its own encryption key
Message Authentication
All messages are signed using:
- Algorithm: ES256 (ECDSA using P-256 and SHA-256)
- JWT-based signatures: Verified against remote JWKS
- Service identity verification: Ensures message authenticity
📊 Monitoring
The client provides Prometheus metrics:
kafka_message_produce_size_bytes: Message size histogramkafka_message_produce_error_count: Production error counterkafka_message_consume_count: Consumption counterkafka_message_consume_error_count: Consumption error counterkafka_message_acknowledged_count: Acknowledgment counter
🧪 Testing
# Run tests
yarn test
# Run tests with coverage
yarn test:coverage
# Run tests in watch mode
yarn test:watch🏗️ Development
# Install dependencies
yarn install
# Build the package
yarn build
# Run type checking
yarn ts:check
# Lint code
yarn lint
# Format code
yarn format📄 API Reference
KafkaClient
Constructor
constructor(
parentLogger: Logger,
environment: Environment,
serviceIdentity: ServiceIdentity
)Methods
connect(): Promise<void>- Connect to Kafkaproduce<T>(topic: T, key: string, value: KafkaEvent<T>): Promise<void>- Produce a messageconsume<T>(topic: T, handler: EventPayloadHandler<T>, fromBeginning?: boolean): Promise<Consumer>- Consume messagesshutdown(): Promise<void>- Gracefully shutdown all connections
🤝 Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'feat: add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📝 License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
🏢 About Luca
This package is part of the Luca platform ecosystem, providing secure and reliable messaging infrastructure for distributed microservices in the hospitality and event management industry.
Culture4Life - Building the future of digital experiences
