lifion-kinesis
v2.0.0
Published
Lifion client for Amazon Kinesis Data streams
Readme
lifion-kinesis
Lifion's Node.js client for Amazon Kinesis Data Streams.
Upgrading from v1? v2 requires Node.js 22.12+ and ships as an ES module. See the migration guide for the full list of changes.
Getting Started
To install the module:
npm install lifion-kinesis --saveThe main module export is a Kinesis class that instantiates as a readable stream.
import Kinesis from 'lifion-kinesis';
const kinesis = new Kinesis({
streamName: 'sample-stream'
/* plus any AWS SDK v3 client options */
});
kinesis.on('data', (data) => {
console.log('Incoming data:', data);
});
kinesis.startConsumer();To take advantage of back-pressure, the client can be piped to a writable stream:
import { Writable, pipeline } from 'node:stream';
import { promisify } from 'node:util';
import Kinesis from 'lifion-kinesis';
const asyncPipeline = promisify(pipeline);
const kinesis = new Kinesis({
streamName: 'sample-stream'
/* plus any AWS SDK v3 client options */
});
asyncPipeline(
kinesis,
new Writable({
objectMode: true,
write(data, encoding, callback) {
console.log(data);
callback();
}
})
).catch(console.error);
kinesis.startConsumer();Credentials
Starting with v2, lifion-kinesis runs on the AWS SDK for JavaScript v3. In most setups you don't pass any credentials: the SDK resolves them from its default provider chain, which reads environment variables, shared config files, web identity tokens, and the IAM role attached to your ECS task or EC2 instance. That covers the same sources the v1 client relied on.
To run with specific credentials, pass a credentials object or an AWS credential provider:
import { fromIni } from '@aws-sdk/credential-providers';
const kinesis = new Kinesis({
streamName: 'sample-stream',
credentials: fromIni({ profile: 'my-profile' })
});Any AWS SDK v3 client option (region, endpoint, credentials, and so on) can be set at the top level for the Kinesis client, and under the dynamoDb and s3 options for those services.
Upgrading from v1? The top-level
accessKeyId,secretAccessKey, andsessionTokenoptions are no longer read; wrap them in acredentialsobject instead. See the migration guide for this and the other changes in v2.
Consuming records
The client is an object-mode readable stream. Each data event hands you an object with a batch of records and some context about where they came from:
kinesis.on('data', ({ records, shardId, streamName, millisBehindLatest }) => {
for (const record of records) {
console.log(record.sequenceNumber, record.partitionKey, record.data);
}
});Every record in records carries sequenceNumber, partitionKey, approximateArrivalTimestamp, encryptionType, and data (the decoded payload, parsed as JSON when it looks like JSON).
Batch sizes and limit
In polling mode, limit (default 10000) maps to the Limit parameter of the Kinesis GetRecords API. It's an upper bound on how many records a single call can return, so a data event often carries fewer records than limit even when the shard still has plenty waiting. Kinesis returns whatever happens to be in the next batch, capped by limit or by 10 MB, whichever it reaches first. The client keeps polling and delivers the rest in later data events, so you still receive every record over time. To gauge how far behind you are, millisBehindLatest reports the lag in milliseconds; it trends toward 0 as you catch up to the tip of the shard.
Manual checkpoints and paused polling
setCheckpoint and continuePolling show up as properties on that same data payload, so they're easy to miss if you go looking for them on the client itself:
- With
useAutoCheckpoints: false, eachdataevent includessetCheckpoint(sequenceNumber). Call it once you've processed up to a record to store that sequence number as the shard's checkpoint. This works in both polling and enhanced fan-out mode (useEnhancedFanOut: true). - With
usePausedPolling: true, eachdataevent includescontinuePolling(). The client holds off on the next batch until you call it, which gives you room to finish processing first. Paused polling applies to polling mode only (useEnhancedFanOut: false).
const kinesis = new Kinesis({
streamName: 'sample-stream',
useAutoCheckpoints: false,
usePausedPolling: true
});
kinesis.on('data', async ({ records, setCheckpoint, continuePolling }) => {
for (const record of records) {
await handle(record);
}
await setCheckpoint(records[records.length - 1].sequenceNumber);
continuePolling();
});
kinesis.startConsumer();Reading specific shards
By default the client reads from every shard, either by distributing them across the consumer group (useAutoShardAssignment: true) or by reading them all from a single client (useAutoShardAssignment: false). To read only a subset, pass shardIds:
const kinesis = new Kinesis({
streamName: 'sample-stream',
shardIds: ['shardId-000000000000', 'shardId-000000000002']
});
kinesis.startConsumer();With shardIds set, the client reads exactly those shards on its own and stays out of the group's automatic shard assignment, so useAutoShardAssignment doesn't apply. Shard IDs that aren't in the stream are logged and skipped.
Inspecting shard assignments
When several consumers share a group, the client spreads the stream's shards across them. getShardAssignments() reports who currently owns what, keyed by consumer ID, so you can see how the work is distributed without querying the DynamoDB state table yourself. The consumer has to be started first.
const kinesis = new Kinesis({ streamName: 'sample-stream' });
await kinesis.startConsumer();
const assignments = await kinesis.getShardAssignments();
// {
// 'consumer-a': { host, pid, isActive, shards: ['shardId-000000000000', …], … },
// 'consumer-b': { … }
// }Each entry carries the consumer's appName, host, pid, startedOn, heartbeat, isActive, and isStandalone, along with the sorted shards it's assigned.
Features
- Standard Node.js stream abstraction of Kinesis streams.
- Node.js implementation of the new enhanced fan-out feature.
- Optional auto-creation, encryption, and tagging of Kinesis streams.
- Support for a polling mode, using the
GetRecordsAPI, with automatic checkpointing. - Support for multiple concurrent consumers through automatic assignment of shards.
- Support for sending messages to streams, with auto-retries.
Enhanced fan-out over HTTP/1.1
The enhanced fan-out consumer reads SubscribeToShard over HTTP/1.1. It streams the response as a chunked application/vnd.amazon.eventstream body and parses the binary frames itself with lifion-aws-event-stream, rather than going through the AWS SDK's HTTP/2 client.
That can be surprising, since AWS announced and documents enhanced fan-out as an HTTP/2 push API. In practice the Kinesis data endpoint doesn't negotiate h2 over the usual TLS ALPN handshake, so SubscribeToShard arrives as an HTTP/1.1 stream carrying AWS's own event-stream frames. @eaviles reverse-engineered that wire format for the v1 client, and the HTTP/1.1 path has run in production since. Other clients have hit the same thing (see the references below), so if you're considering a move to HTTP/2 here, it's worth knowing the endpoint won't ALPN-negotiate it today (last checked 2026-06-04).
References:
- Amazon Kinesis Data Streams Adds Enhanced Fan-Out and HTTP/2, the original announcement, which presents the feature as HTTP/2.
SubscribeToShardAPI reference, which also describes it as establishing an HTTP/2 connection.- aws-sdk-cpp #3115 and #3118, where others observe
SubscribeToShardgoing over HTTP/1.1 with batchy, high-latency delivery.
Deregistering idle enhanced consumers
The client pre-registers up to maxEnhancedConsumers enhanced fan-out consumers, and AWS bills for each registered consumer whether or not it's reading. If your consumer group scales down, the extra consumers sit idle and keep costing money. Set enhancedConsumerIdleTimeout (in milliseconds) to have the client deregister consumers that have stayed unused for that long, keeping at least one. They get re-registered as the group scales back up, which takes a little while since AWS has to make each one active again, so pick a timeout comfortably larger than your lease and heartbeat cycles. It defaults to 0, which leaves every registered consumer in place.
State table (DynamoDB)
The client stores its consumer state (shard leases and checkpoints) in a DynamoDB table, and it creates and manages that table for you. You don't have to create it ahead of time.
By default the table is named lifion-kinesis-state. You can change that with the dynamoDb.tableName option. The client creates it the first time it's needed, with server-side encryption enabled and on-demand billing (PAY_PER_REQUEST). If you'd rather use provisioned capacity, pass dynamoDb.provisionedThroughput with readCapacityUnits and writeCapacityUnits, and the table is created in provisioned mode instead.
Key schema
| Attribute | Type | Key |
| --- | --- | --- |
| consumerGroup | String (S) | Partition key (HASH) |
| streamName | String (S) | Sort key (RANGE) |
Those two attributes are the only ones DynamoDB needs declared. Everything else lives inside a single item per consumer group and stream.
What's in an item
The client keeps all of its state for a given consumer group and stream in one item, and updates pieces of it with conditional writes. A version token (a short UUID) guards each piece so consumers competing for the same lease don't overwrite one another. An item looks roughly like this:
{
"consumerGroup": "my-app", // partition key
"streamName": "my-stream", // sort key
"streamCreatedOn": "2024-01-02T03:04:05.000Z", // see the note below
"version": "abc123", // optimistic-concurrency token for the item
"consumers": { /* consumerId -> consumer record */ },
"enhancedConsumers": { /* name -> enhanced fan-out record */ },
"shards": { /* shardId -> shard record, when shards are auto-assigned */ }
}consumers[consumerId], one entry per running consumer in the group:
| Field | Type | Meaning |
| --- | --- | --- |
| appName | String | Name of the host application |
| host | String | Hostname of the machine running the consumer |
| pid | Number | Process ID |
| startedOn | String | ISO timestamp of when the process started |
| heartbeat | String | ISO timestamp refreshed while the consumer is alive |
| isActive | Boolean | Whether the consumer counts toward lease distribution |
| isStandalone | Boolean | true when automatic shard assignment is off |
| shards | Map | Per-consumer shard state (only when polling in standalone mode) |
enhancedConsumers[name], one entry per registered enhanced fan-out consumer:
| Field | Type | Meaning |
| --- | --- | --- |
| arn | String | ARN of the enhanced fan-out consumer |
| isUsedBy | String / null | ID of the consumer currently holding it, or null |
| isStandalone | Boolean | true when automatic shard assignment is off |
| version | String | Token for locking the consumer to one reader |
| shards | Map | Per-consumer shard state (only in standalone mode) |
Shard records, keyed by shard ID. Depending on how the consumer reads, these live under the item's top-level shards (automatic shard assignment), under consumers[id].shards (standalone polling), or under enhancedConsumers[name].shards (standalone enhanced fan-out):
| Field | Type | Meaning |
| --- | --- | --- |
| checkpoint | String / null | Sequence number to resume from |
| approximateArrivalTimestamp | String / null | Arrival time of the checkpointed record |
| leaseOwner | String / null | ID of the consumer holding the lease |
| leaseExpiration | String / null | ISO timestamp when the lease lapses |
| depleted | Boolean | true once the shard has been fully read |
| parent | String / null | Parent shard ID, used to order resharding |
| version | String | Token for locking the lease |
The client records the stream's creation time in
streamCreatedOn. If it finds an item whose timestamp doesn't match the current stream (for example, the stream was deleted and recreated under the same name), it deletes the stale item and starts the state fresh.
Provisioning the table yourself
If you'd rather provision the table yourself, for example with infrastructure-as-code, create it with the key schema above and pass its name as dynamoDb.tableName. The non-key attributes don't need to be declared.
IAM permissions
The credentials the client runs with need DynamoDB access to the table: CreateTable and DescribeTable while the table is being created, and GetItem, PutItem, UpdateItem, and DeleteItem for normal operation. Add TagResource and ListTagsOfResource if you pass dynamoDb.tags.
API Reference
- lifion-kinesis
- Kinesis ⇐ PassThrough ⏏
- new Kinesis(options)
- instance
- .startConsumer() ⇒ Promise
- .stopConsumer()
- .putRecord(params) ⇒ Promise
- .listShards(params) ⇒ Promise
- .putRecords(params) ⇒ Promise
- .getStats() ⇒ Object
- .getShardAssignments() ⇒ Promise
- static
- .getStats() ⇒ Object
- Kinesis ⇐ PassThrough ⏏
Kinesis ⇐ PassThrough ⏏
A pass-through stream class specialization implementing a consumer
of Kinesis Data Streams using the AWS SDK for JavaScript. Incoming
data can be retrieved through either the data event or by piping the instance to other streams.
Kind: Exported class
Extends: PassThrough
new Kinesis(options)
Initializes a new instance of the Kinesis client.
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| options | Object | | The initialization options. In addition to the below options, it can also contain any of the AWS.Kinesis options. |
| [options.compression] | string | | The kind of data compression to use with records. The currently available compression options are either "LZ-UTF8" or none. |
| [options.consumerGroup] | string | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | boolean | true | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | Object | {} | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to tableNames and tags, it can also contain any of the AWS.DynamoDB options. |
| [options.dynamoDb.tableName] | string | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
| [options.dynamoDb.tags] | Object | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | Object | | The encryption options to enforce in the stream. |
| [options.encryption.type] | string | | The encryption type to use. |
| [options.encryption.keyId] | string | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.enhancedConsumerIdleTimeout] | number | 0 | When greater than 0 and useEnhancedFanOut is true, enhanced fan-out consumers that have stayed unused for at least this many milliseconds are deregistered from AWS (so they stop incurring charges), keeping at least one registered. They are re-registered as the consumer group scales back up, which takes time as AWS makes them active. Set this comfortably above the lease and heartbeat cycles to avoid removing consumers that are briefly idle. Defaults to 0, which keeps every registered consumer in place. |
| [options.initialPositionInStream] | string | "LATEST" | The location in the shard from which the Consumer will start fetching records from when the application starts for the first time and there is no checkpoint for the shard. Set to LATEST to fetch new data only Set to TRIM_HORIZON to start from the oldest available data record. |
| [options.leaseAcquisitionInterval] | number | 20000 | The interval in milliseconds for how often to attempt lease acquisitions. |
| [options.leaseAcquisitionRecoveryInterval] | number | 5000 | The interval in milliseconds for how often to re-attempt lease acquisitions when an error is returned from aws. |
| [options.limit] | number | 10000 | The maximum number of records to request in a single GetRecords call (only applicable when useEnhancedFanOut is set to false). Kinesis may return fewer records than this; the client keeps polling to deliver the rest. |
| [options.logger] | Object | | An object with the warn, debug, and error functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.maxEnhancedConsumers] | number | 5 | An option to set the number of enhanced fan-out consumer ARNs that the module should initialize. Defaults to 5. Providing a number above the AWS limit (20) or below 1 will result in using the default. |
| [options.noRecordsPollDelay] | number | 1000 | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when useEnhancedFanOut is set to false) |
| [options.pollDelay] | number | 250 | When the usePausedPolling option is false, this option defines the delay in milliseconds in between poll requests for more records (only applicable when useEnhancedFanOut is set to false) |
| [options.retryOptions] | Object | {} | The retry options as in async-retry applied to the calls made to AWS.Kinesis. By default, calls are retried forever with exponential backoff; provide e.g. { forever: false, retries: 0 } to limit or disable retries. |
| [options.s3] | Object | {} | The initialization options for the S3 client used to store large items in buckets. In addition to bucketName and endpoint, it can also contain any of the AWS.S3 options. |
| [options.s3.bucketName] | string | | The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream. |
| [options.s3.largeItemThreshold] | number | 900 | The size in KB above which an item should automatically be stored in s3. |
| [options.s3.nonS3Keys] | Array.<string> | [] | If the useS3ForLargeItems option is set to true, the nonS3Keys option lists the keys that will be sent normally on the kinesis record. |
| [options.s3.tags] | string | | If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged. |
| [options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is set) |
| [options.shardIds] | Array.<string> | | When provided, the client consumes only these specific shards instead of every shard in the stream. Setting this puts the client in standalone mode (it reads the listed shards directly and does not take part in the consumer group's automatic shard assignment, so useAutoShardAssignment is ignored). Shard IDs that aren't found in the stream are logged and skipped. |
| [options.shouldDeaggregate] | string | boolean | "auto" | Whether the method retrieving the records should expect aggregated records and deaggregate them appropriately. |
| [options.shouldParseJson] | string | boolean | "auto" | Whether if retrieved records' data should be parsed as JSON or not. Set to "auto" to only attempt parsing if data looks like JSON. Set to true to force data parse. |
| [options.statsInterval] | number | 30000 | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. |
| options.streamName | string | | The name of the stream to consume data from (required) |
| [options.supressThroughputWarnings] | boolean | false | Set to true to make the client log ProvisionedThroughputExceededException as debug rather than warning. |
| [options.tags] | Object | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.useAutoCheckpoints] | boolean | true | Set to true to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to false consumers can use the setCheckpoint() function, provided on the data event payload, to store any sequence number as the checkpoint for the shard. |
| [options.useAutoShardAssignment] | boolean | true | Set to true to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to false to make the client read from all shards. |
| [options.useEnhancedFanOut] | boolean | false | Set to true to make the client use enhanced fan-out consumers to read from shards. |
| [options.usePausedPolling] | boolean | false | Set to true to make the client not to poll for more records until the consumer calls continuePolling(), a function provided on the data event payload. This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when useEnhancedFanOut is set to false) |
| [options.useS3ForLargeItems] | boolean | false | Whether to automatically use an S3 bucket to store large items or not. |
kinesis.startConsumer() ⇒ Promise
Starts the stream consumer, by ensuring that the stream exists, that it's ready, and configured as requested. The internal managers that deal with heartbeats, state, and consumers will also be started.
Kind: instance method of Kinesis
Fulfil: undefined - Once the consumer has successfully started.
Reject: Error - On any unexpected error while trying to start.
kinesis.stopConsumer()
Stops the stream consumer. The internal managers will also be stopped.
Kind: instance method of Kinesis
kinesis.putRecord(params) ⇒ Promise
Writes a single data record into a stream.
Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.
| Param | Type | Description | | --- | --- | --- | | params | Object | The parameters. | | params.data | * | The data to put into the record. | | [params.explicitHashKey] | string | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. | | [params.partitionKey] | string | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. | | [params.sequenceNumberForOrdering] | string | Set this to the sequence number obtained from the last put record operation to guarantee strictly increasing sequence numbers, for puts from the same client and to the same partition key. If omitted, records are coarsely ordered based on arrival time. | | [params.streamName] | string | If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation. |
kinesis.listShards(params) ⇒ Promise
List the shards of a stream.
Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.
| Param | Type | Description | | --- | --- | --- | | params | Object | The parameters. | | [params.streamName] | string | If provided, the method will list the shards of the specific stream instead of the stream name provided during the consumer instantiation. |
kinesis.putRecords(params) ⇒ Promise
Writes multiple data records into a stream in a single call.
Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.
| Param | Type | Description | | --- | --- | --- | | params | Object | The parameters. | | params.records | Array.<Object> | The records associated with the request. | | params.records[].data | * | The record data. | | [params.records[].explicitHashKey] | string | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash. | | [params.records[].partitionKey] | string | Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data. | | [params.streamName] | string | If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation. |
kinesis.getStats() ⇒ Object
Returns statistics for the instance of the client.
Kind: instance method of Kinesis
Returns: Object - An object with the statistics.
kinesis.getShardAssignments() ⇒ Promise
Returns the shards assigned to each consumer in the same group, so it's possible to inspect
how the stream shards are currently distributed across the consumers sharing a group. The
consumer must be started before calling this (see startConsumer).
Kind: instance method of Kinesis
Fulfil: Object - A map keyed by consumer ID, where each entry has the consumer details and
a sorted array with the IDs of the shards assigned to that consumer.
Reject: Error - If the consumer hasn't been started yet.
Kinesis.getStats() ⇒ Object
Returns the aggregated statistics of all the instances of the client.
Kind: static method of Kinesis
Returns: Object - An object with the statistics.
