rivia-dynamodb
v2.6.0
Published
DynamoDB wrapper with helpers
Readme
DynamoDB Wrapper
A simple and efficient wrapper for AWS DynamoDB operations.
Features
- Simplified CRUD operations
- Support for consistent reads
- Batch operations
- Query and scan operations with pagination
- Filter expressions
- Sort key prefix queries
- Sort key range queries (between)
- Consumed capacity tracking
- Configurable primary and sort key field names
- Compressed pagination tokens for reduced payload size
- Global Secondary Index (GSI) support with transparent switching
Installation
npm install @rivia/dynamoConfiguration
The wrapper can be configured through environment variables or programmatically:
Environment Variables
REGION=us-east-1
TABLE_NAME=your-table-name
DYNAMODB_PK_FIELD_NAME=pk
DYNAMODB_SK_FIELD_NAME=sk
DYNAMODB_CONSISTENT_READ=false
DYNAMODB_RETURN_CONSUMED_CAPACITY=false
# GSI Configuration
DYNAMODB_GSI_NAME=your-gsi-index-name
DYNAMODB_GSI_PK_FIELD_NAME=gsi_pk
DYNAMODB_GSI_SK_FIELD_NAME=gsi_skProgrammatic Configuration
const Dynamo = require('@rivia/dynamo');
const dynamo = new Dynamo();
dynamo.set({
region: 'us-east-1',
tableName: 'your-table-name',
pkField: 'pk',
skField: 'sk',
consistentRead: true,
returnConsumedCapacity: true,
// GSI Configuration
gsiName: 'your-gsi-index-name',
gsiPkField: 'gsi_pk',
gsiSkField: 'gsi_sk'
});Usage
Basic Operations
// Insert
await dynamo.insert({ pk: 'user#1', sk: 'profile', name: 'John' });
// Find
const { item, consumedCapacity } = await dynamo.find('user#1', 'profile');
// Update
await dynamo.update('user#1', 'profile', { name: 'John Doe' });
// Remove
await dynamo.remove('user#1', 'profile');Conditional Operations
The wrapper supports conditional operations using simple object syntax, abstracting DynamoDB expression details:
// Remove item only if it's active
await dynamo.remove('user#1', 'profile', {
conditions: {
field: 'active',
value: true
}
});
// Update item only if version matches
await dynamo.update('user#1', 'profile',
{ lastUpdated: new Date().toISOString() },
{
conditions: {
field: 'version',
value: 5,
operator: '='
}
}
);
// Multiple conditions with nested fields
await dynamo.update('user#1', 'profile',
{ status: 'updated' },
{
conditions: [
{
field: 'active',
value: true
},
{
field: 'user.role',
value: 'admin'
},
{
field: 'loginCount',
value: 5,
operator: '>'
}
]
}
);
// Check if attribute exists before removing
await dynamo.remove('queue:group:all', 'group:123:client:456', {
conditions: {
field: 'pk',
operator: 'attribute_exists'
}
});
// Remove only if item doesn't have deletion timestamp
await dynamo.remove('user#1', 'profile', {
conditions: {
field: 'deletedAt',
operator: 'attribute_not_exists'
}
});Query Operations
// Query with filters
const { items, count, nextPageId, nextPageHashId, consumedCapacity } = await dynamo.query(
'user#1',
[{ name: 'status', value: 'active' }],
null,
10,
{ consistentRead: true }
);
// Query by prefix
const { items } = await dynamo.findByPrefix(
'user#1',
'order#',
[{ name: 'status', value: 'pending' }]
);
// Query by range (between)
const { items } = await dynamo.findByBetween(
'user#1',
'order#2023-01-01',
'order#2023-12-31',
[{ name: 'status', value: 'completed' }]
);
// Get all items
const { items, count } = await dynamo.getAll('user#1');Global Secondary Index (GSI) Support
The wrapper supports transparent GSI usage. When gsiName is configured, all query operations (query, findByPrefix, findByBetween) will automatically use the GSI instead of the main table:
// Configure GSI
dynamo.set({
gsiName: 'user-orders-gsi',
gsiPkField: 'user_id',
gsiSkField: 'order_date'
});
// All these operations will use the GSI automatically
const { items } = await dynamo.query('user123');
const { items: prefixItems } = await dynamo.findByPrefix('user123', '2024-');
const { items: rangeItems } = await dynamo.findByBetween('user123', '2024-01-01', '2024-12-31');
// Switch back to main table by removing gsiName
dynamo.set({ gsiName: null });
const { items: tableItems } = await dynamo.query('user123');GSI Configuration Options
gsiName: The name of your GSI index (automatically enables GSI when set)gsiPkField: The partition key field name in the GSI (default:gsi_pk)gsiSkField: The sort key field name in the GSI (default:gsi_sk)
Dynamic GSI Switching
You can switch between main table and GSI at runtime:
// Use GSI for specific operations
dynamo.set({ gsiName: 'my-gsi-index' });
const gsiResults = await dynamo.query('user123');
// Switch back to main table
dynamo.set({ gsiName: null });
const tableResults = await dynamo.query('user123');Environment Variable Configuration
# Enable GSI usage (automatically enabled when gsiName is set)
DYNAMODB_GSI_NAME=my-gsi-index
DYNAMODB_GSI_PK_FIELD_NAME=gsi_pk
DYNAMODB_GSI_SK_FIELD_NAME=gsi_skPagination with Compressed Tokens
Query operations support both traditional pagination tokens and compressed hash tokens:
// Using traditional pagination token
const page1 = await dynamo.query('user#1', [], null, 10);
const page2 = await dynamo.query('user#1', [], page1.nextPageId, 10);
// Using compressed hash token (recommended for smaller payloads)
const page3 = await dynamo.query('user#1', [], page1.nextPageHashId, 10);
// Both nextPageId and nextPageHashId are returned for compatibility
console.log('Traditional token:', page1.nextPageId);
console.log('Compressed token:', page1.nextPageHashId);Batch Operations
batchInsert sends items in chunks of 25 (DynamoDB limit) and retries UnprocessedItems with exponential backoff.
// Batch insert
await dynamo.batchInsert([
{ pk: 'user#1', sk: 'order#1', status: 'pending' },
{ pk: 'user#1', sk: 'order#2', status: 'completed' }
]);
// With retry options (default maxRetries: 3)
const result = await dynamo.batchInsert(manyItems, { maxRetries: 5 });
if (Object.keys(result.UnprocessedItems || {}).length > 0) {
// Some items were throttled and not written after retries
}Scan Operations
// Scan with attributes
const { items, consumedCapacity } = await dynamo.scanTable(
10,
['pk', 'sk', 'status'],
{ consistentRead: true }
);Consistency and Performance
Consistent Reads
By default, DynamoDB uses eventually consistent reads for better performance. However, you can enable strongly consistent reads when needed:
- Globally through configuration:
dynamo.set({ consistentRead: true });- Per-operation through options:
const { item } = await dynamo.find('user#1', 'profile', { consistentRead: true });Consumed Capacity
You can track the consumed capacity of DynamoDB operations:
- Globally through configuration:
dynamo.set({ returnConsumedCapacity: true });- The consumed capacity will be returned in the operation results:
const { item, consumedCapacity } = await dynamo.find('user#1', 'profile');
console.log('Consumed capacity:', consumedCapacity);Filter Expressions and Conditions
The wrapper supports various filter expressions and conditional operators:
Basic Operators
// Equality
{ field: 'status', value: 'active' }
// Contains
{ field: 'tags', value: 'important', operator: 'contains' }
// Comparison operators
{ field: 'age', value: 18, operator: '>' }
{ field: 'score', value: 100, operator: '>=' }
{ field: 'version', value: 5, operator: '!=' }Attribute Existence Operators
// Check if attribute exists
{ field: 'metadata', operator: 'attribute_exists' }
// Check if attribute doesn't exist
{ field: 'deletedAt', operator: 'attribute_not_exists' }String Operators
// String contains
{ field: 'email', value: '@gmail.com', operator: 'contains' }
// String begins with
{ field: 'name', value: 'Jo', operator: 'begins_with' }Range and Set Operators
// Between values
{ field: 'age', value: [18, 65], operator: 'between' }
// In set of values
{ field: 'status', value: ['active', 'pending'], operator: 'in' }Conditional Operations Benefits
Simplicity
No need to know DynamoDB expression syntax:
// Instead of this complex syntax:
{
conditionExpression: 'attribute_exists(#pk) AND #status = :status',
expressionAttributeNames: { '#pk': 'pk', '#status': 'status' },
expressionAttributeValues: { ':status': { S: 'active' } }
}
// Use this simple object syntax:
{
conditions: [
{ field: 'pk', operator: 'attribute_exists' },
{ field: 'status', value: 'active' }
]
}Nested Fields Support
// Check nested object properties
{
conditions: {
field: 'user.profile.status',
value: 'active'
}
}Backward Compatibility
The new conditional syntax is fully compatible with existing code. You can:
- Use only the new object syntax (recommended)
- Use only the legacy expression syntax
- Mix both (object conditions take priority)
Error Handling
The wrapper throws errors for:
- Invalid value types
- Missing required parameters
- DynamoDB operation failures
try {
await dynamo.insert({ pk: 'user#1' }); // Missing sk
} catch (error) {
console.error('Operation failed:', error);
}Error: socket hang up on batchInsert / DynamoDB
This usually means the TCP connection was closed before the response arrived. Common causes:
- Timeout – A proxy, load balancer, or client has a timeout shorter than the request (e.g. batch taking 700ms+ and a 500ms timeout).
- Throttling – DynamoDB throttles the request; the call takes longer and may hit a timeout, or the connection can be closed.
What the library does: batchInsert now sends at most 25 items per request (DynamoDB limit) and retries UnprocessedItems with exponential backoff, which reduces payload size and helps with throttling.
If you still see socket hang up: use a DynamoDB client with a higher socket timeout and pass it via set({ client }):
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
const client = new DynamoDBClient({
region: process.env.REGION,
requestHandler: new NodeHttpHandler({
connectionTimeout: 30000,
socketTimeout: 30000 // increase if your env allows longer requests
})
});
dynamo.set({ client, tableName: process.env.TABLE_NAME });Install the handler if needed: npm i @aws-sdk/node-http-handler.
License
MIT
