@kosta-electreon/native-aws-kcl-ts
v3.0.7
Published
Native TypeScript implementation of AWS Kinesis Client Library (KCL) with async/await support
Maintainers
Readme
native-aws-kcl-ts
A native TypeScript implementation of the AWS Kinesis Client Library (KCL) for consuming Kinesis streams. This library provides the same functionality as the Java-based KCL but is written entirely in TypeScript with an async/await concurrency model.
Features
- Shard Discovery: Automatically discovers and tracks Kinesis shards
- Lease Management: Coordinates shard-to-worker assignment using pluggable storage (DynamoDB, Redis, S3, PostgreSQL, MySQL, MongoDB)
- Lease Renewal: Maintains heartbeats to prevent lease expiration
- Failover & Rebalancing: Automatically handles worker failures and redistributes shards
- Checkpoint Coordination: Persists processing progress with pluggable storage backends
- Graceful Shutdown: Handles shutdown signals and shard end events
- Enhanced Fan-Out (EFO): Optional dedicated throughput mode with 2MB/s per consumer per shard
- Exponential Backoff with Jitter: Handles throttling gracefully following AWS best practices
- Pluggable Logging: Configure your own logger factory or use the default console logger
Installation
Base Installation (DynamoDB - Default)
pnpm add native-aws-kcl-tsWith Optional Persistence Backends
Install the base package plus the driver for your chosen persistence backend:
# S3 persistence
pnpm add native-aws-kcl-ts @aws-sdk/client-s3
# Redis persistence
pnpm add native-aws-kcl-ts ioredis
# PostgreSQL persistence
pnpm add native-aws-kcl-ts pg
# MySQL persistence
pnpm add native-aws-kcl-ts mysql2
# MongoDB persistence
pnpm add native-aws-kcl-ts mongodbNote: The persistence drivers (
ioredis,pg,mysql2,mongodb,@aws-sdk/client-s3) are optional peer dependencies. Only install the one you need. If you don't install a driver and try to use that persistence type, you'll get a clear error message telling you which package to install.
Quick Start
Basic Usage
import {
Scheduler,
ConfigBuilder,
BaseRecordProcessor,
processorFactory,
} from 'native-aws-kcl-ts';
// Create a custom record processor
class MyProcessor extends BaseRecordProcessor {
protected async processRecord(record: KinesisClientRecord): Promise<void> {
const data = Buffer.from(record.data).toString('utf-8');
console.log('Processing record:', data);
// Your processing logic here
}
}
// Build configuration
const config = new ConfigBuilder()
.withAwsRegion('us-east-1')
.withStreamName('my-stream')
.withApplicationName('my-app')
.withProcessorFactory(processorFactory(MyProcessor))
.withInitialPosition('TRIM_HORIZON')
.build();
// Create and run the scheduler
const scheduler = new Scheduler(config);
await scheduler.run();
// Graceful shutdown
process.on('SIGTERM', async () => {
await scheduler.shutdown();
});Configuring a Custom Logger
The library uses a simple console logger by default. You can configure a custom logger factory:
import { configureNativeKclLibrary } from 'native-aws-kcl-ts';
import winston from 'winston';
const myWinston = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new winston.transports.Console()],
});
// Configure the library once at startup
configureNativeKclLibrary({
loggerFactory: (name: string) => myWinston.child({ context: name })
});
// Now all classes in the library will use your Winston logger
const scheduler = new Scheduler(config);Core Concepts
Scheduler
The Scheduler (also known as Worker in KCL) is the main entry point. It coordinates all components:
- LeaseCoordinator: Manages shard leases for distributed processing
- ShardDetector: Discovers shards in the stream
- ShardSyncer: Synchronizes shards with leases
- ShardConsumers: Process records from individual shards
Record Processors
Record processors implement the ShardRecordProcessor interface to handle records from a shard. You can either:
- Extend
BaseRecordProcessorfor a simplified interface - Implement
ShardRecordProcessordirectly for full control
Lease Management
Leases coordinate which worker processes which shard. The library supports multiple storage backends:
- DynamoDB (default)
- Redis
- S3
- PostgreSQL
- MySQL
- MongoDB
Enhanced Fan-Out (EFO) Mode
Enhanced Fan-Out provides dedicated throughput of 2 MB/s per consumer per shard, compared to the shared 2 MB/s per shard with polling mode.
Comparison of Retrieval Modes
| Feature | Polling (DEFAULT) | Enhanced Fan-Out (FANOUT) | |---------|-------------------|---------------------------| | Throughput | Shared 2 MB/s per shard | Dedicated 2 MB/s per consumer per shard | | Latency | ~200ms+ | ~70ms | | API | GetRecords | SubscribeToShard (HTTP/2 push) | | Consumers per shard | Limited by shared throughput | Up to 20 (or 50 with EFO Advantage) | | Cost | Standard Kinesis pricing | Additional per-consumer charges |
Enabling Enhanced Fan-Out
const config = new ConfigBuilder()
.withAwsRegion('us-east-1')
.withStreamName('my-stream')
.withApplicationName('my-app')
.withProcessorFactory(processorFactory(MyProcessor))
.withRetrievalMode('FANOUT') // Enable EFO
.build();Pluggable Persistence Layer
By default, the library uses DynamoDB for storing lease and checkpoint information. However, the persistence layer is abstracted behind the LeaseRefresher interface, allowing you to implement custom storage backends.
Supported Storage Backends
| Backend | Status | Config Value | Extra Package |
|---------|--------|--------------|---------------|
| DynamoDB | ✅ Default | dynamodb | None (built-in) |
| S3 | ✅ Supported | s3 | @aws-sdk/client-s3 |
| Redis | ✅ Supported | redis | ioredis |
| PostgreSQL | ✅ Supported | postgresql | pg |
| MySQL | ✅ Supported | mysql | mysql2 |
| MongoDB | ✅ Supported | mongodb | mongodb |
| Custom | ✅ Supported | N/A | User-provided |
Using a Custom Persistence Backend
import {
LeaseRefresher,
Lease,
KinesisConsumerConfig,
Scheduler,
} from 'native-aws-kcl-ts';
// Implement the LeaseRefresher interface
class MyCustomLeaseRefresher implements LeaseRefresher {
// ... implement all required methods
}
// Use in configuration
const config: KinesisConsumerConfig = {
aws: { region: 'us-east-1' },
applicationName: 'my-app',
streamName: 'my-stream',
recordProcessorFactory: myFactory,
persistence: {
leaseRefresher: new MyCustomLeaseRefresher(),
},
};
const scheduler = new Scheduler(config);
await scheduler.run();Metrics
The library supports publishing application metrics to AWS CloudWatch (or a custom monitoring backend). By default, metrics are disabled (level: NONE).
Metrics Levels
| Level | Description |
|-------|-------------|
| NONE | No metrics are reported (default) |
| SUMMARY | Aggregated metrics: RecordsProcessed, MillisBehindLatest, LeasesHeld |
| DETAILED | All SUMMARY metrics plus per-shard and per-operation metrics |
Enabling Metrics
const config = new ConfigBuilder()
.withAwsRegion('us-east-1')
.withStreamName('my-stream')
.withApplicationName('my-app')
.withProcessorFactory(processorFactory(MyProcessor))
.withMetrics('DETAILED', 'MyApp/KCL')
.build();Custom Metrics Backend
You can implement your own metrics backend (e.g., Prometheus, Datadog) by implementing the IMetricsFactory interface:
import {
IMetricsFactory,
IMetricsScope,
MetricsLevel,
} from 'native-aws-kcl-ts';
class MyPrometheusMetricsFactory implements IMetricsFactory {
createScope(operation: string): IMetricsScope {
return new MyPrometheusScope(operation);
}
getMetricsLevel(): MetricsLevel {
return MetricsLevel.DETAILED;
}
isEnabled(): boolean {
return true;
}
isDetailedMetricsEnabled(): boolean {
return true;
}
async shutdown(): Promise<void> {
// Flush metrics
}
}
// Use the custom factory
const config: KinesisConsumerConfig = {
// ... other config
metrics: {
level: 'DETAILED',
customFactory: new MyPrometheusMetricsFactory(),
},
};API Reference
Scheduler
The main entry point for the KCL.
class Scheduler {
constructor(config: KinesisConsumerConfig);
async run(): Promise<void>;
async shutdown(): Promise<void>;
getState(): SchedulerState;
getActiveConsumerCount(): number;
}ConfigBuilder
A fluent builder for creating KinesisConsumerConfig:
const config = new ConfigBuilder()
.withAwsRegion('us-east-1')
.withStreamName('my-stream')
.withApplicationName('my-app')
.withProcessorFactory(processorFactory(MyProcessor))
.withInitialPosition('TRIM_HORIZON')
.withRetrievalMode('DEFAULT')
.withMetrics('SUMMARY', 'MyApp/KCL')
.build();BaseRecordProcessor
A base class that simplifies implementing record processors:
class MyProcessor extends BaseRecordProcessor {
protected async processRecord(record: KinesisClientRecord): Promise<void> {
// Process individual records
}
async initialize(input: InitializationInput): Promise<void> {
// Optional: Custom initialization
await super.initialize(input);
}
async leaseLost(input: LeaseLostInput): Promise<void> {
// Optional: Handle lease loss
}
async shardEnded(input: ShardEndedInput): Promise<void> {
// Optional: Handle shard end
await super.shardEnded(input); // Default checkpoints at SHARD_END
}
}License
UNLICENSED - Proprietary
