@farukada/aws-langgraph-dynamodb-ts
v0.3.1
Published
AWS DynamoDB implementation for LangGraph Memory Store and Checkpoint Saver in TypeScript
Downloads
19,078
Maintainers
Readme
@farukada/aws-langgraph-dynamodb-ts
A DynamoDB persistence layer for LangGraph in TypeScript (CommonJS build, consumable from both ESM and CommonJS; Node ≥ 22). It provides three LangGraph/LangChain adapters plus a factory:
DynamoDBSaver— checkpoint + pending-writes persistence (extends BaseCheckpointSaver).DynamoDBStore— long-term memory with optional semantic search (extends BaseStore).DynamoDBChatMessageHistory— multi-session chat history, with a single-session adapter forRunnableWithMessageHistory.DynamoDBFactory— convenience constructors, includingcreateAll(one shared client + adestroy()).
Every adapter supports optional gzip compression, S3 offloading of payloads over DynamoDB's 400 KB item limit, and TTL-based expiry. The store additionally supports vector semantic search — in-DynamoDB by default, or delegated to a pluggable VectorBackend (e.g. OpenSearch / pgvector) for large corpora — via any LangChain Embeddings implementation.
Table of Contents
- Install
- Table schema
- Quick start
- Options
- Features
- Error handling
- Logging
- Infrastructure setup
- IAM permissions
- Migrating from earlier versions
- Testing
- License
Install
npm install @farukada/aws-langgraph-dynamodb-ts \
@aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb \
@langchain/core @langchain/langgraph @langchain/langgraph-checkpointOptional peer dependencies, installed only if you use the matching feature:
# Required only when S3 offloading is enabled
npm install @aws-sdk/client-s3
# Required only for semantic search in the store (any LangChain Embeddings works)
npm install @langchain/aws # e.g. Bedrock Titan embeddingsTable schema
Every adapter uses the same simple key schema: a string partition key PK, a string sort key SK, and an optional Number ttl attribute for expiry. Because the key spaces never collide, a single table can back all three adapters, or you can use a separate table per adapter — your choice via the tableName option.
| Attribute | Type | Role |
| --- | --- | --- |
| PK | String (HASH) | partition key |
| SK | String (RANGE) | sort key |
| ttl | Number | (optional) Unix-epoch-seconds expiry; enable DynamoDB TTL on this attribute |
How each adapter lays out keys (informational — you don't manage this):
- Checkpointer —
PK = <thread_id>;SK=META#<ns>#<checkpoint_id>(metadata),PAYLOAD#<ns>#<checkpoint_id>(checkpoint),WRITE#<ns>#<checkpoint_id>#<task>#<idx>(pending writes). - Store —
PK = <namespace[0]>(the scope root);SK = <namespace[1..]>#<key>. This makes a scoped prefix search a nativeQuery(PK = root AND begins_with(SK, …)); only a rootless "search everything" falls back to aScan. - Chat history —
PK = <sessionId>; one item per message atSK = MSG#<ULID>(ordered, append-only) plus oneSK = SESSIONmetadata item.
Quick start
Checkpointer
import { DynamoDBSaver } from '@farukada/aws-langgraph-dynamodb-ts';
const checkpointer = new DynamoDBSaver({
tableName: 'langgraph',
clientConfig: { region: 'eu-west-1' },
});
const graph = workflow.compile({ checkpointer });
const config = { configurable: { thread_id: 'user-42' } };
await graph.invoke({ messages: [/* ... */] }, config);
// Resume later (even in a new process) — state is loaded from DynamoDB.
const resumed = await graph.invoke({ messages: [/* ... */] }, config);
checkpointer.destroy(); // releases the client this instance createdStore + semantic search
import { DynamoDBStore } from '@farukada/aws-langgraph-dynamodb-ts';
import { BedrockEmbeddings } from '@langchain/aws';
const store = new DynamoDBStore({
tableName: 'langgraph',
clientConfig: { region: 'eu-west-1' },
index: {
dims: 1024,
embeddings: new BedrockEmbeddings({ model: 'amazon.titan-embed-text-v2:0', region: 'eu-west-1' }),
fields: ['text'], // which fields to embed; defaults to the whole document ('$')
},
});
await store.put(['library'], 'doc-1', { text: 'Amazon DynamoDB is a serverless NoSQL database' });
await store.put(['library'], 'doc-2', { text: 'Espresso is a concentrated coffee' });
// Metadata filtering (operators: $eq, $ne, $gt, $gte, $lt, $lte)
await store.search(['library'], { filter: { type: 'note', score: { $gte: 5 } } });
// Semantic search — ranked by cosine similarity to the query embedding
const hits = await store.search(['library'], { query: 'cloud database', limit: 5 });
//=> doc-1 ranks first, with a `score` on each SearchItem
await store.get(['library'], 'doc-1');
await store.delete(['library'], 'doc-1');
await store.listNamespaces({ prefix: ['library'], maxDepth: 1 });Chat history
import { DynamoDBChatMessageHistory } from '@farukada/aws-langgraph-dynamodb-ts';
import { HumanMessage } from '@langchain/core/messages';
const history = new DynamoDBChatMessageHistory({
tableName: 'langgraph',
clientConfig: { region: 'eu-west-1' },
});
await history.addMessages('session-1', [new HumanMessage('Hello!')]);
const messages = await history.getMessages('session-1');
const sessions = await history.listSessions(); // [{ sessionId, title, messageCount, ... }]
await history.clear('session-1');Use it with LangChain's RunnableWithMessageHistory via the single-session adapter:
import { RunnableWithMessageHistory } from '@langchain/core/runnables';
const withHistory = new RunnableWithMessageHistory({
runnable: chain,
getMessageHistory: (sessionId) => history.forSession(sessionId),
inputMessagesKey: 'input',
historyMessagesKey: 'history',
});Factory
createAll builds all three adapters on one shared DynamoDB client and returns a single destroy() that tears everything down.
import { DynamoDBFactory } from '@farukada/aws-langgraph-dynamodb-ts';
const factory = new DynamoDBFactory({ clientConfig: { region: 'eu-west-1' } });
const { saver, store, history, destroy } = factory.createAll({
saver: { tableName: 'langgraph' },
store: { tableName: 'langgraph', index: { dims: 1024, embeddings } },
history: { tableName: 'langgraph' },
});
// ... use saver / store / history ...
destroy(); // closes the one shared clientOptions
All adapters share a common base. Provide either a prebuilt client (which the adapter will not own/close) or clientConfig (the adapter builds and owns the client).
| Option | Type | Applies to | Notes |
| --- | --- | --- | --- |
| tableName | string | all | required |
| client | DynamoDBDocument | all | reuse an existing client; not closed by destroy() |
| clientConfig | DynamoDBClientConfig | all | used to build a client when client is omitted |
| ttl | { days: number } | { seconds: number } | all | expiry written to the ttl attribute |
| logger | Logger | all | per-instance logger (default: silent) |
| compression | CompressionConfig | all | { enabled, minSizeBytes?, level?, maxDecompressedBytes? } |
| s3 | S3OffloadConfig | all | offload large payloads to S3 (see below) |
| serde | SerializerProtocol | all | serializer override (checkpointer defaults to LangGraph's; store/history to JSON) |
| index | IndexConfig | store only | { dims, embeddings, fields? } for semantic search |
| vectorBackend | VectorBackend | store only | delegate similarity search to an external index; DynamoDB keeps the canonical item |
| maxSearchCandidates | number | store only | cap for the in-DB ranker before it errors (default 1000) |
S3OffloadConfig: { bucketName, keyPrefix?, thresholdBytes?, serverSideEncryption?, sseKmsKeyId?, clientConfig? }.
Features
Gzip compression — set compression: { enabled: true }. Payloads at or above minSizeBytes (default 1 KB) are gzipped transparently; decompression auto-detects on read and is guarded against decompression-bomb expansion (maxDecompressedBytes, default 50 MiB).
S3 offloading — set s3: { bucketName }. Any serialized payload at or above thresholdBytes (default 350 KB) is written to S3, with only a reference stored in DynamoDB; reads rehydrate transparently. Requires the optional @aws-sdk/client-s3 peer. When a ttl is also configured the library best-effort installs a matching S3 lifecycle expiration rule (logged, never fatal). Deleting a checkpoint thread / chat session also best-effort deletes its offloaded objects.
TTL expiry — set ttl: { days } or ttl: { seconds }. The ttl attribute is written as a Unix-epoch-seconds timestamp; enable DynamoDB TTL on the ttl attribute for automatic deletion. Chat history anchors a single uniform whole-conversation TTL at session creation (via if_not_exists), shared by every message, so a live session never develops mid-history gaps; expired messages are also filtered out on read.
Semantic search (store) — provide index with a LangChain Embeddings implementation. On put, the configured fields are embedded; on search with a query, results are ranked by cosine similarity. By default the embedding is stored on the item and ranking happens in-process over the scoped candidate set (bounded by maxSearchCandidates, default 1000 — exceeding it throws, steering you to an external index). For large corpora, pass a vectorBackend: the embedding is sent there instead, similarity search is delegated to it, and DynamoDB still holds the canonical item. Per-item indexing can be overridden via the index argument to put (false to skip, or a string[] of fields).
Vector index consistency — when a vectorBackend is configured, DynamoDB holds the canonical item and the backend is a rebuildable index. After each canonical write the embedding is synced to the backend best-effort: a failure is logged (not thrown), so a backend hiccup never fails an otherwise-successful put/delete. To repair drift, call store.reconcileVectorIndex(namespacePrefix) — it re-pushes every live embedding and, when the backend implements the optional listKeys, prunes vectors with no canonical item; it returns { upserted, pruned }. Run it when the namespace is idle. Caveats: reconciliation re-embeds with the store's configured index fields, so per-put field overrides are not reproduced; prune happens only when listKeys is implemented (otherwise reconcile re-pushes only and logs that prune was skipped); the prefix must be a non-empty namespace.
Strong consistency — checkpointer read-your-writes (getTuple) and every store.get use ConsistentRead, so a value written and immediately read back is never served a stale replica. Bulk reads (list, listNamespaces, listSessions) stay eventually consistent for lower cost.
Error handling
All errors thrown by the library extend DynamoDbLangGraphError and carry a stable code from the ErrorCode enum plus a native cause chain. Branch on code:
import { ErrorCode, DynamoDbLangGraphError } from '@farukada/aws-langgraph-dynamodb-ts';
try {
await store.put([''], 'k', { v: 1 });
} catch (error) {
if (error instanceof DynamoDbLangGraphError && error.code === ErrorCode.VALIDATION) {
// bad input
}
}ErrorCode values: VALIDATION, CONDITION_CONFLICT, RETRY_EXHAUSTED, BATCH_WRITE_INCOMPLETE, COMPRESSION_LIMIT, S3_OFFLOAD_FAILED, ABORTED. Typed subclasses are exported where callers commonly branch: ValidationError, ConflictError, RetryExhaustedError, BatchWriteIncompleteError, AbortError.
Logging
Logging is per-instance and silent by default — the library never writes to your console uninvited. Pass any object matching the Logger interface (info/warn/error/debug):
import { redactLogger, type Logger } from '@farukada/aws-langgraph-dynamodb-ts';
const logger: Logger = {
info: (m, ...a) => console.info(m, ...a),
warn: (m, ...a) => console.warn(m, ...a),
error: (m, ...a) => console.error(m, ...a),
debug: () => {},
};
const store = new DynamoDBStore({ tableName: 'langgraph', logger: redactLogger(logger) });redactLogger wraps a logger so secret-looking fields (access keys, tokens, passwords, …) are replaced with [REDACTED] in structured log arguments. redactSecrets exposes the same redaction for arbitrary objects.
Infrastructure setup
One table backs all three adapters. Create it with AWS CDK or Terraform.
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
new dynamodb.Table(this, 'LangGraph', {
tableName: 'langgraph',
partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'SK', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
timeToLiveAttribute: 'ttl', // optional; only needed if you use the `ttl` option
});resource "aws_dynamodb_table" "langgraph" {
name = "langgraph"
billing_mode = "PAY_PER_REQUEST"
hash_key = "PK"
range_key = "SK"
attribute { name = "PK" type = "S" }
attribute { name = "SK" type = "S" }
ttl {
attribute_name = "ttl"
enabled = true
}
}IAM permissions
Minimum DynamoDB actions on the table:
dynamodb:GetItem
dynamodb:PutItem
dynamodb:DeleteItem
dynamodb:Query
dynamodb:Scan
dynamodb:BatchGetItem
dynamodb:BatchWriteItem
dynamodb:TransactWriteItemsWhen S3 offloading is enabled, on the bucket/objects: s3:GetObject, s3:PutObject, s3:DeleteObject, s3:ListBucket, and — only if TTL-driven lifecycle rules are desired — s3:GetBucketLifecycleConfiguration and s3:PutBucketLifecycleConfiguration. For semantic search via Bedrock embeddings: bedrock:InvokeModel.
Migrating from earlier versions
0.2.x → 0.3.0 is a complete, ground-up rewrite. The public API is similar, but the table schema, on-disk layout, and several options changed, so existing data is not compatible — create a new table.
- Table schema is now
PK/SKstrings (one table for all adapters) instead of per-adapter custom key names. The key attribute names changed, so CDK/Terraform definitions need updating. - Store keys are
PK = namespace[0],SK = namespace[1..]#key(wasPK = full namespace,SK = key). - Chat history is one item per message (
SK = MSG#<ULID>) plus aSESSIONmetadata item, replacing the single per-session item. - Single
tableNameoption per adapter (wascheckpointsTableName/writesTableName, etc.). - One
ttloption —{ days }or{ seconds }— replacesttlDays/ttlSeconds. - S3 config option renamed
s3OffloadConfig→s3. - Per-instance
loggeroption replaces the globalsetGlobalLoggersingleton; default logging is now silent. - Unified error model — all errors extend
DynamoDbLangGraphErrorwith anErrorCode.
Production notes
- Sharing one table across all three adapters is supported — their key spaces don't collide, and table-wide reads filter to their own items. Checkpointer, chat-history, and scoped store reads are all partition-scoped (
Query/GetItem). - Scoped reads are
Querys.store.search/store.listNamespaceswith a concrete namespace prefix andhistory.getMessagesare nativeQuerys. Only a rootlessstore.search([])/ unprefixedlistNamespacesandhistory.listSessionsfall back toScan(cost scales with table size) — keep those rare or use a dedicated table. - Hot partitions. The store's partition key is
namespace[0]and chat history's issessionId. A single partition tops out around ~1000 WCU / 3000 RCU, so avoid funneling very high write throughput through one tenant/session id; spread load across scope roots (e.g. include a tenant id asnamespace[0]). - Very large vector corpora outgrow the in-DB ranker (
maxSearchCandidates). Configure avectorBackend(OpenSearch, pgvector, …) — the library keeps DynamoDB as the source of truth and only delegates similarity ranking. - TTL deletion timing is governed by DynamoDB (typically within 48 h of expiry) and S3 lifecycle expiry is day-granular — the library writes the correct expiry timestamp / lifecycle rule (and filters expired chat messages on read) but does not guarantee instant deletion.
Testing
npm test # unit + static-guard + type tests, 100% coverage
npm run typecheck
npm run lint
npm run buildIntegration and contract tiers run against DynamoDB Local (Docker) and are kept out of the default npm test:
npm run test:integration:up # docker compose up -d (DynamoDB Local)
npm run test:integration # integration flows + LangGraph/LangChain contract conformance
npm run test:integration:downReal-AWS verification scripts live in examples/ (each creates and tears down its own resources):
node examples/verify-checkpointer.mjs # save/resume/writes/list/delete, compression, S3, TTL
node examples/verify-store.mjs # filters, semantic search, S3 offload, TTL
node examples/verify-history.mjs # per-message model, concurrency, RunnableWithMessageHistory agent
node examples/verify-factory.mjs # shared-client createAll across all three adapters
node examples/verify-agents.mjs # real LangGraph agents using the saver + store as memory
node examples/verify-edge-cases.mjs # filter operators, multi-page reads, compression+S3, scaleLicense
MIT © Faruk Ada
