@keystrokehq/dynamodb
v0.0.1
Published
Amazon DynamoDB tables, items, transactions, PartiQL, backups, imports, exports, global tables, PITR, resource policies, tags, Kinesis streaming destinations, and DynamoDB Streams triggers for Keystroke workflows.
Readme
@keystrokehq/dynamodb
Amazon DynamoDB tables, items, transactions, PartiQL, backups, imports, exports, global tables, PITR, resource policies, tags, Kinesis streaming destinations, and DynamoDB Streams triggers for Keystroke workflows.
This package is a single-mode official integration. The package root is intentionally non-canonical; use explicit subpaths instead:
@keystrokehq/dynamodb/connection@keystrokehq/dynamodb/client@keystrokehq/dynamodb/schemas@keystrokehq/dynamodb/events@keystrokehq/dynamodb/triggers@keystrokehq/dynamodb/tables@keystrokehq/dynamodb/items@keystrokehq/dynamodb/batch@keystrokehq/dynamodb/transactions@keystrokehq/dynamodb/partiql@keystrokehq/dynamodb/ttl@keystrokehq/dynamodb/global-tables@keystrokehq/dynamodb/backups@keystrokehq/dynamodb/pitr@keystrokehq/dynamodb/exports@keystrokehq/dynamodb/imports@keystrokehq/dynamodb/kinesis-streaming@keystrokehq/dynamodb/contributor-insights@keystrokehq/dynamodb/limits@keystrokehq/dynamodb/tags@keystrokehq/dynamodb/resource-policy@keystrokehq/dynamodb/streams
Installation
pnpm add @keystrokehq/dynamodbConnection
DynamoDB has no consumer-style OAuth; access is always IAM-based. The public
connection credential set accepts a long-lived access-key pair and, optionally,
an STS AssumeRole configuration that Keystroke uses to swap those credentials
for short-lived scoped ones at request time.
import { dynamodb } from '@keystrokehq/dynamodb/connection';
dynamodb.resolvedCredentialSetId; // 'keystroke:dynamodb'Required vault keys:
| Key | Required | Description |
|-----|----------|-------------|
| AWS_ACCESS_KEY_ID | yes | Long-lived access key id, or base key for AssumeRole. |
| AWS_SECRET_ACCESS_KEY | yes | Paired secret. |
| AWS_REGION | yes | e.g. us-east-1. |
| AWS_SESSION_TOKEN | no | Provide when passing temporary credentials directly. |
| AWS_DYNAMODB_ENDPOINT | no | Full endpoint override (VPC, FIPS, DynamoDB Local). |
| AWS_ASSUME_ROLE_ARN | no | If set, the base keys are used only to sign STS AssumeRole. |
| AWS_ASSUME_ROLE_SESSION_NAME | no | Defaults to keystroke-dynamodb-{timestamp}. |
| AWS_ASSUME_ROLE_EXTERNAL_ID | no | For cross-account trust. |
| AWS_ASSUME_ROLE_DURATION_SECONDS | no | 900–43200. Default 3600. |
The package intentionally uses a Keystroke-owned fetch client instead of
@aws-sdk/client-dynamodb so the IAM token lifecycle, runtime ownership, retry
policy, and error normalization stay aligned with the official integration
model.
Using the raw client
Most operations should be authored as Keystroke workflow operations (the
subpaths below). When you need to reach DynamoDB directly,
createDynamodbClient exposes a signed JSON/RPC request entry point that
handles SigV4, STS refresh, retries, endpoint resolution, and error
normalization.
import { createDynamodbClient } from '@keystrokehq/dynamodb/client';
const client = createDynamodbClient({
AWS_ACCESS_KEY_ID: 'AKIA...',
AWS_SECRET_ACCESS_KEY: '...',
AWS_REGION: 'us-east-1',
});
const tables = await client.request<{ TableNames: string[] }>('ListTables', {});
const streams = await client.request<{ Streams: unknown[] }>(
'ListStreams',
{ TableName: 'Orders' },
{ service: 'streams' } // route to the DynamoDB Streams endpoint
);Tables
import {
createTable,
describeTable,
listTables,
updateTable,
} from '@keystrokehq/dynamodb/tables';
await createTable.run({
TableName: 'Orders',
BillingMode: 'PAY_PER_REQUEST',
AttributeDefinitions: [
{ AttributeName: 'pk', AttributeType: 'S' },
{ AttributeName: 'sk', AttributeType: 'S' },
],
KeySchema: [
{ AttributeName: 'pk', KeyType: 'HASH' },
{ AttributeName: 'sk', KeyType: 'RANGE' },
],
Tags: [{ Key: 'env', Value: 'prod' }],
});
const tables = await listTables.run({ Limit: 50 });Items
import {
deleteItem,
getItem,
putItem,
query,
scan,
updateItem,
} from '@keystrokehq/dynamodb/items';
await putItem.run({
TableName: 'Orders',
Item: { pk: { S: 'user#1' }, sk: { S: 'order#1' }, total: { N: '99.50' } },
});
const page = await query.run({
TableName: 'Orders',
KeyConditionExpression: '#pk = :pk',
ExpressionAttributeNames: { '#pk': 'pk' },
ExpressionAttributeValues: { ':pk': { S: 'user#1' } },
});Batch
import {
batchGetItem,
batchWriteItem,
} from '@keystrokehq/dynamodb/batch';
await batchWriteItem.run({
RequestItems: {
Orders: [
{ PutRequest: { Item: { pk: { S: 'u#1' }, sk: { S: 'o#1' } } } },
{ DeleteRequest: { Key: { pk: { S: 'u#1' }, sk: { S: 'o#0' } } } },
],
},
});Both batch operations automatically retry UnprocessedKeys / UnprocessedItems
with jittered backoff so callers receive a single settled response.
Transactions and PartiQL
import {
transactGetItems,
transactWriteItems,
} from '@keystrokehq/dynamodb/transactions';
import {
batchExecuteStatement,
executeStatement,
executeTransaction,
} from '@keystrokehq/dynamodb/partiql';
await transactWriteItems.run({
TransactItems: [
{
Put: {
TableName: 'Orders',
Item: { pk: { S: 'u#1' }, sk: { S: 'o#2' } },
},
},
{
ConditionCheck: {
TableName: 'Inventory',
Key: { sku: { S: 'abc' } },
ConditionExpression: 'stock > :zero',
ExpressionAttributeValues: { ':zero': { N: '0' } },
},
},
],
});
await executeStatement.run({
Statement: 'SELECT * FROM "Orders" WHERE pk = ?',
Parameters: [{ S: 'user#1' }],
});ClientRequestToken is auto-populated with crypto.randomUUID() if callers
omit it, keeping transactional retries idempotent.
Time-to-live
import {
describeTimeToLive,
updateTimeToLive,
} from '@keystrokehq/dynamodb/ttl';
await updateTimeToLive.run({
TableName: 'Sessions',
TimeToLiveSpecification: { Enabled: true, AttributeName: 'expiresAt' },
});Global tables, backups, and PITR
import {
createGlobalTable,
updateGlobalTable,
} from '@keystrokehq/dynamodb/global-tables';
import {
createBackup,
restoreTableFromBackup,
} from '@keystrokehq/dynamodb/backups';
import {
restoreTableToPointInTime,
updateContinuousBackups,
} from '@keystrokehq/dynamodb/pitr';
await updateContinuousBackups.run({
TableName: 'Orders',
PointInTimeRecoverySpecification: { PointInTimeRecoveryEnabled: true },
});
await restoreTableToPointInTime.run({
SourceTableName: 'Orders',
TargetTableName: 'Orders-replay',
UseLatestRestorableDateTime: true,
});Exports, imports, and Kinesis streaming
import {
describeExport,
exportTableToPointInTime,
} from '@keystrokehq/dynamodb/exports';
import { importTable } from '@keystrokehq/dynamodb/imports';
import { enableKinesisStreamingDestination } from '@keystrokehq/dynamodb/kinesis-streaming';
await exportTableToPointInTime.run({
TableArn: 'arn:aws:dynamodb:us-east-1:111122223333:table/Orders',
S3Bucket: 'keystroke-exports',
ExportFormat: 'DYNAMODB_JSON',
});
await enableKinesisStreamingDestination.run({
TableName: 'Orders',
StreamArn: 'arn:aws:kinesis:us-east-1:111122223333:stream/orders-stream',
});ExportTableToPointInTime and ImportTable auto-populate ClientToken so
retried invocations don't create duplicate jobs.
Tags, resource policies, and governance
import {
listTagsOfResource,
tagResource,
} from '@keystrokehq/dynamodb/tags';
import {
getResourcePolicy,
putResourcePolicy,
} from '@keystrokehq/dynamodb/resource-policy';
import {
describeContributorInsights,
listContributorInsights,
} from '@keystrokehq/dynamodb/contributor-insights';
import { describeLimits } from '@keystrokehq/dynamodb/limits';
await tagResource.run({
ResourceArn: 'arn:aws:dynamodb:us-east-1:111122223333:table/Orders',
Tags: [{ Key: 'team', Value: 'growth' }],
});
await putResourcePolicy.run({
ResourceArn: 'arn:aws:dynamodb:us-east-1:111122223333:table/Orders',
Policy: JSON.stringify({ Version: '2012-10-17', Statement: [] }),
ConfirmRemoveSelfResourceAccess: false,
});DynamoDB Streams
The data-plane operations cover the full shard + iterator lifecycle:
import {
describeStream,
getRecords,
getShardIterator,
listStreams,
} from '@keystrokehq/dynamodb/streams';
const { Streams = [] } = await listStreams.run({ TableName: 'Orders' });
const streamArn = Streams[0]?.StreamArn;
const { StreamDescription } = await describeStream.run({ StreamArn: streamArn! });
const { ShardIterator } = await getShardIterator.run({
StreamArn: streamArn!,
ShardId: StreamDescription!.Shards![0]!.ShardId,
ShardIteratorType: 'TRIM_HORIZON',
});
const { Records = [] } = await getRecords.run({
ShardIterator: ShardIterator!,
});Triggers
import {
itemPolled,
streamRecordInserted,
streamRecordModified,
streamRecordRemoved,
streamShardOpened,
} from '@keystrokehq/dynamodb/triggers';
const newOrders = streamRecordInserted({
target: { tableName: 'Orders' },
schedule: '30s',
});
const priceChanges = streamRecordModified({
target: { tableName: 'Products' },
});
const canceledOrders = streamRecordRemoved({
target: { tableName: 'Orders' },
});
const reshards = streamShardOpened({
target: { tableName: 'Orders' },
schedule: '1m',
});
const fallback = itemPolled({
tableName: 'LegacyInventory',
keyAttributes: ['pk', 'sk'],
schedule: '5m',
});All Streams-backed triggers share a consumer that paginates DescribeStream,
opens iterators at TRIM_HORIZON (or LATEST when fromLatest: true) on the
first visit to a shard and at AFTER_SEQUENCE_NUMBER thereafter, drains
GetRecords, and persists per-shard sequence-number cursors so restarts resume
at-least-once.
Curated event payload shapes are exported for workflow authors:
import {
streamInsertEventSchema,
streamModifyEventSchema,
streamRemoveEventSchema,
streamShardOpenedEventSchema,
} from '@keystrokehq/dynamodb/events';Webhooks
DynamoDB does not deliver webhooks. Change-data capture is modelled as polling
triggers on top of DynamoDB Streams (above). The package deliberately does not
expose a @keystrokehq/dynamodb/verification subpath; there is no
signature or CRC handshake to verify.
Caveats
- At-least-once delivery. The Streams consumer persists
SequenceNumber-based checkpoints between polls. Records may replay when the runtime restarts mid-poll; downstream handlers must be idempotent. - DynamoDB Streams retention is 24 hours. If a trigger is paused or disconnected longer than that, older records are trimmed and only new records will arrive on the next resume.
- Kinesis-destination triggers. This release ships the Streams-backed
triggers plus an
itemPolledfallback. The three matching Kinesis-mirror triggers (§7.2 ofPLAN.md) are deferred until the shared Kinesis subscription primitive lands; seeIMPLEMENTATION_NOTES.md§ 7 Q2. itemPolledis a last-resort fallback. Scan-based polling can be very expensive on large tables and cannot observeREMOVEevents reliably. Prefer a Streams-backed trigger whenever possible.- Document vs native AttributeValue. Schemas speak the native
{S:…}/{N:…}/{M:…}wire format directly. A document-mode helper layer ships with@keystrokehq/dynamodb/schemasfor workflow authors who prefer idiomatic JS values; nested objects, sets,bigint,Uint8Array, andSet<>are all round-tripped. - Item size cap. DynamoDB rejects items larger than 400 KB; the client
surfaces this as a
DynamodbApiErrorwithkind: 'validation'. DescribeGlobalTableSettingsis not a real AWS operation. The v1 Global Tables surface has five operations — seeIMPLEMENTATION_NOTES.md§ 7 Q4.- Sim-parity convenience wrappers (
get,put,query,scan,update,delete,introspect) are intentionally not shipped. They would aliasgetItem/putItem/… without adding capability; workflow authors should call the canonical operations directly.
Hidden / internal subpaths
@keystrokehq/dynamodb/_official and
@keystrokehq/dynamodb/_runtime exist for platform internals
(bundle registration, runtime adapters). They are not part of the end-user
authoring surface and may change without notice.
License
MIT.
