npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@electreonwireless/native-aws-kcl-ts

v3.0.2

Published

Native TypeScript implementation of AWS Kinesis Client Library (KCL) with async/await support

Downloads

222

Readme

@electreonwireless/native-aws-kcl-ts

A native TypeScript implementation of the AWS Kinesis Client Library (KCL) for consuming Kinesis streams. This library provides the same functionality as the Java-based KCL but is written entirely in TypeScript with an async/await concurrency model.

Features

  • Shard Discovery: Automatically discovers and tracks Kinesis shards
  • Lease Management: Coordinates shard-to-worker assignment using pluggable storage (DynamoDB, Redis, S3, PostgreSQL, MySQL, MongoDB)
  • Lease Renewal: Maintains heartbeats to prevent lease expiration
  • Failover & Rebalancing: Automatically handles worker failures and redistributes shards
  • Checkpoint Coordination: Persists processing progress with pluggable storage backends
  • Graceful Shutdown: Handles shutdown signals and shard end events
  • Enhanced Fan-Out (EFO): Optional dedicated throughput mode with 2MB/s per consumer per shard
  • Exponential Backoff with Jitter: Handles throttling gracefully following AWS best practices
  • Pluggable Logging: Configure your own logger factory or use the default console logger

Installation

Base Installation (DynamoDB - Default)

pnpm add @electreonwireless/native-aws-kcl-ts

With Optional Persistence Backends

Install the base package plus the driver for your chosen persistence backend:

# S3 persistence
pnpm add @electreonwireless/native-aws-kcl-ts @aws-sdk/client-s3

# Redis persistence
pnpm add @electreonwireless/native-aws-kcl-ts ioredis

# PostgreSQL persistence
pnpm add @electreonwireless/native-aws-kcl-ts pg

# MySQL persistence
pnpm add @electreonwireless/native-aws-kcl-ts mysql2

# MongoDB persistence
pnpm add @electreonwireless/native-aws-kcl-ts mongodb

Note: The persistence drivers (ioredis, pg, mysql2, mongodb, @aws-sdk/client-s3) are optional peer dependencies. Only install the one you need. If you don't install a driver and try to use that persistence type, you'll get a clear error message telling you which package to install.

Quick Start

Basic Usage

import { 
  Scheduler, 
  ConfigBuilder,
  BaseRecordProcessor,
  processorFactory,
} from '@electreonwireless/native-aws-kcl-ts';

// Create a custom record processor
class MyProcessor extends BaseRecordProcessor {
  protected async processRecord(record: KinesisClientRecord): Promise<void> {
    const data = Buffer.from(record.data).toString('utf-8');
    console.log('Processing record:', data);
    // Your processing logic here
  }
}

// Build configuration
const config = new ConfigBuilder()
  .withAwsRegion('us-east-1')
  .withStreamName('my-stream')
  .withApplicationName('my-app')
  .withProcessorFactory(processorFactory(MyProcessor))
  .withInitialPosition('TRIM_HORIZON')
  .build();

// Create and run the scheduler
const scheduler = new Scheduler(config);
await scheduler.run();

// Graceful shutdown
process.on('SIGTERM', async () => {
  await scheduler.shutdown();
});

Configuring a Custom Logger

The library uses a simple console logger by default. You can configure a custom logger factory:

import { configureNativeKclLibrary } from '@electreonwireless/native-aws-kcl-ts';
import winston from 'winston';

const myWinston = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [new winston.transports.Console()],
});

// Configure the library once at startup
configureNativeKclLibrary({
  loggerFactory: (name: string) => myWinston.child({ context: name })
});

// Now all classes in the library will use your Winston logger
const scheduler = new Scheduler(config);

Core Concepts

Scheduler

The Scheduler (also known as Worker in KCL) is the main entry point. It coordinates all components:

  • LeaseCoordinator: Manages shard leases for distributed processing
  • ShardDetector: Discovers shards in the stream
  • ShardSyncer: Synchronizes shards with leases
  • ShardConsumers: Process records from individual shards

Record Processors

Record processors implement the ShardRecordProcessor interface to handle records from a shard. You can either:

  • Extend BaseRecordProcessor for a simplified interface
  • Implement ShardRecordProcessor directly for full control

Lease Management

Leases coordinate which worker processes which shard. The library supports multiple storage backends:

  • DynamoDB (default)
  • Redis
  • S3
  • PostgreSQL
  • MySQL
  • MongoDB

Enhanced Fan-Out (EFO) Mode

Enhanced Fan-Out provides dedicated throughput of 2 MB/s per consumer per shard, compared to the shared 2 MB/s per shard with polling mode.

Comparison of Retrieval Modes

| Feature | Polling (DEFAULT) | Enhanced Fan-Out (FANOUT) | |---------|-------------------|---------------------------| | Throughput | Shared 2 MB/s per shard | Dedicated 2 MB/s per consumer per shard | | Latency | ~200ms+ | ~70ms | | API | GetRecords | SubscribeToShard (HTTP/2 push) | | Consumers per shard | Limited by shared throughput | Up to 20 (or 50 with EFO Advantage) | | Cost | Standard Kinesis pricing | Additional per-consumer charges |

Enabling Enhanced Fan-Out

const config = new ConfigBuilder()
  .withAwsRegion('us-east-1')
  .withStreamName('my-stream')
  .withApplicationName('my-app')
  .withProcessorFactory(processorFactory(MyProcessor))
  .withRetrievalMode('FANOUT') // Enable EFO
  .build();

Pluggable Persistence Layer

By default, the library uses DynamoDB for storing lease and checkpoint information. However, the persistence layer is abstracted behind the LeaseRefresher interface, allowing you to implement custom storage backends.

Supported Storage Backends

| Backend | Status | Config Value | Extra Package | |---------|--------|--------------|---------------| | DynamoDB | ✅ Default | dynamodb | None (built-in) | | S3 | ✅ Supported | s3 | @aws-sdk/client-s3 | | Redis | ✅ Supported | redis | ioredis | | PostgreSQL | ✅ Supported | postgresql | pg | | MySQL | ✅ Supported | mysql | mysql2 | | MongoDB | ✅ Supported | mongodb | mongodb | | Custom | ✅ Supported | N/A | User-provided |

Using a Custom Persistence Backend

import { 
  LeaseRefresher, 
  Lease,
  KinesisConsumerConfig,
  Scheduler,
} from '@electreonwireless/native-aws-kcl-ts';

// Implement the LeaseRefresher interface
class MyCustomLeaseRefresher implements LeaseRefresher {
  // ... implement all required methods
}

// Use in configuration
const config: KinesisConsumerConfig = {
  aws: { region: 'us-east-1' },
  applicationName: 'my-app',
  streamName: 'my-stream',
  recordProcessorFactory: myFactory,
  persistence: {
    leaseRefresher: new MyCustomLeaseRefresher(),
  },
};

const scheduler = new Scheduler(config);
await scheduler.run();

Metrics

The library supports publishing application metrics to AWS CloudWatch (or a custom monitoring backend). By default, metrics are disabled (level: NONE).

Metrics Levels

| Level | Description | |-------|-------------| | NONE | No metrics are reported (default) | | SUMMARY | Aggregated metrics: RecordsProcessed, MillisBehindLatest, LeasesHeld | | DETAILED | All SUMMARY metrics plus per-shard and per-operation metrics |

Enabling Metrics

const config = new ConfigBuilder()
  .withAwsRegion('us-east-1')
  .withStreamName('my-stream')
  .withApplicationName('my-app')
  .withProcessorFactory(processorFactory(MyProcessor))
  .withMetrics('DETAILED', 'MyApp/KCL')
  .build();

Custom Metrics Backend

You can implement your own metrics backend (e.g., Prometheus, Datadog) by implementing the IMetricsFactory interface:

import { 
  IMetricsFactory, 
  IMetricsScope,
  MetricsLevel,
} from '@electreonwireless/native-aws-kcl-ts';

class MyPrometheusMetricsFactory implements IMetricsFactory {
  createScope(operation: string): IMetricsScope {
    return new MyPrometheusScope(operation);
  }
  
  getMetricsLevel(): MetricsLevel {
    return MetricsLevel.DETAILED;
  }
  
  isEnabled(): boolean {
    return true;
  }
  
  isDetailedMetricsEnabled(): boolean {
    return true;
  }
  
  async shutdown(): Promise<void> {
    // Flush metrics
  }
}

// Use the custom factory
const config: KinesisConsumerConfig = {
  // ... other config
  metrics: {
    level: 'DETAILED',
    customFactory: new MyPrometheusMetricsFactory(),
  },
};

API Reference

Scheduler

The main entry point for the KCL.

class Scheduler {
  constructor(config: KinesisConsumerConfig);
  async run(): Promise<void>;
  async shutdown(): Promise<void>;
  getState(): SchedulerState;
  getActiveConsumerCount(): number;
}

ConfigBuilder

A fluent builder for creating KinesisConsumerConfig:

const config = new ConfigBuilder()
  .withAwsRegion('us-east-1')
  .withStreamName('my-stream')
  .withApplicationName('my-app')
  .withProcessorFactory(processorFactory(MyProcessor))
  .withInitialPosition('TRIM_HORIZON')
  .withRetrievalMode('DEFAULT')
  .withMetrics('SUMMARY', 'MyApp/KCL')
  .build();

BaseRecordProcessor

A base class that simplifies implementing record processors:

class MyProcessor extends BaseRecordProcessor {
  protected async processRecord(record: KinesisClientRecord): Promise<void> {
    // Process individual records
  }
  
  async initialize(input: InitializationInput): Promise<void> {
    // Optional: Custom initialization
    await super.initialize(input);
  }
  
  async leaseLost(input: LeaseLostInput): Promise<void> {
    // Optional: Handle lease loss
  }
  
  async shardEnded(input: ShardEndedInput): Promise<void> {
    // Optional: Handle shard end
    await super.shardEnded(input); // Default checkpoints at SHARD_END
  }
}

License

UNLICENSED - Proprietary