@rogerchi/ddb-stream-router
v0.0.10
Published
Express-like routing for DynamoDB Streams with type-safe handlers, validation, attribute filtering, batch processing, and SQS deferral
Readme
@rogerchi/ddb-stream-router
A TypeScript library providing Express-like routing for DynamoDB Stream events. Register type-safe handlers for INSERT, MODIFY, and REMOVE operations using discriminator functions or schema validators like Zod. Defer heavy processing to SQS with a simple .defer(handlerId) chain to keep your stream processing fast and reliable.
Installation
npm install @rogerchi/ddb-stream-router
# or
yarn add @rogerchi/ddb-stream-routerPeer dependencies:
npm install @aws-sdk/util-dynamodb @aws-sdk/client-sqsFeatures
- Express-like API - Familiar
.onInsert(),.onModify(),.onRemove(),.onTTLRemove(),.use()methods - Type Safety - Full TypeScript inference from discriminators and parsers
- Flexible Matching - Use type guards or schema validators (Zod, etc.)
- Attribute Filtering - React to specific attribute changes in MODIFY events
- Batch Processing - Group records and process them together
- Middleware Support - Intercept records before handlers
- Deferred Processing - Automatically enqueue to SQS for async processing
- Global Tables - Filter records by region
- Partial Batch Failures - Lambda retry support via
reportBatchItemFailures - TTL Removal Handling - Separate handlers for TTL-triggered vs user-initiated deletions
Quick Start
import { StreamRouter } from '@rogerchi/ddb-stream-router';
import type { DynamoDBStreamHandler } from 'aws-lambda';
interface User {
pk: string;
sk: string;
name: string;
email: string;
}
// Type guard discriminator
const isUser = (record: unknown): record is User =>
typeof record === 'object' &&
record !== null &&
'pk' in record &&
(record as { pk: string }).pk.startsWith('USER#');
const router = new StreamRouter();
router
.onInsert(isUser, async (newUser, ctx) => {
console.log(`User created: ${newUser.name}`);
})
.onModify(isUser, async (oldUser, newUser, ctx) => {
console.log(`User updated: ${oldUser.name} -> ${newUser.name}`);
})
.onRemove(isUser, async (deletedUser, ctx) => {
console.log(`User deleted: ${deletedUser.name}`);
});
// Simplified export with built-in batch failure support
export const handler = router.streamHandler;
// Or with custom options:
// export const handler: DynamoDBStreamHandler = async (event) => {
// return router.process(event, { reportBatchItemFailures: true });
// };Discriminator Matching
The discriminator/parser is matched against different images based on event type:
| Event Type | Image Used for Matching |
|------------|------------------------|
| INSERT | newImage |
| MODIFY | newImage |
| REMOVE | oldImage |
For MODIFY events, the newImage is used for matching because you typically want to route based on the current state of the record. If a record's type changed (e.g., pk prefix changed from USER# to ADMIN#), the handler for the new type will be invoked.
Validation Target: Use the validationTarget option to control which image(s) are validated:
"newImage"(default) - Validates the new image"oldImage"- Validates the old image (useful for processing based on previous state)"both"- Both images must match (useful for type migration validation)
// Only process if BOTH old and new images match
router.onModify(isUser, async (oldUser, newUser, ctx) => {
// Both states are guaranteed to be valid Users
}, { validationTarget: "both" });Using Zod Schemas
import { z } from 'zod';
import { StreamRouter } from '@rogerchi/ddb-stream-router';
const UserSchema = z.object({
pk: z.string().startsWith('USER#'),
sk: z.string(),
name: z.string(),
email: z.string(),
});
type User = z.infer<typeof UserSchema>;
const router = new StreamRouter();
// Schema validates and parses data before handler receives it
router.onInsert(UserSchema, async (newUser: User, ctx) => {
// newUser is guaranteed to match the schema
console.log(`Valid user: ${newUser.email}`);
});Attribute Change Filtering
React only when specific attributes change:
// Only trigger when email changes
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
await sendEmailVerification(newUser.email);
},
{ attribute: 'email', changeType: 'changed_attribute' }
);
// Nested attributes with dot notation
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log(`Theme: ${oldUser.preferences.theme} -> ${newUser.preferences.theme}`);
},
{ attribute: 'preferences.theme', changeType: 'changed_attribute' }
);
// Watching parent catches all nested changes
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
// Triggers when preferences.theme OR preferences.notifications changes
console.log('Any preference changed');
},
{ attribute: 'preferences' }
);
// Trigger when tags are added to a collection
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log('New tags added');
},
{ attribute: 'tags', changeType: 'new_item_in_collection' }
);
// Multiple change types (OR logic)
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log('Tags modified');
},
{ attribute: 'tags', changeType: ['new_item_in_collection', 'remove_item_from_collection'] }
);Change types:
new_attribute- Attribute addedremove_attribute- Attribute removedchanged_attribute- Attribute value changednew_item_in_collection- Item added to List or Set (SS/NS/BS)remove_item_from_collection- Item removed from List or Setchanged_item_in_collection- Items both added and removed in List/Set
Supported collection types:
- Arrays (DynamoDB Lists)
- Sets (DynamoDB String Sets, Number Sets, Binary Sets)
TTL Removal Events
DynamoDB TTL automatically deletes expired items, creating REMOVE events with special userIdentity metadata. The router lets you handle TTL removals separately from user-initiated deletions.
// Handle TTL-triggered removals only
router.onTTLRemove(isSession, async (oldImage, ctx) => {
console.log(`Session expired via TTL: ${oldImage.sessionId}`);
await cleanupExpiredSession(oldImage);
});
// Handle user-initiated deletions only (exclude TTL)
router.onRemove(isSession, async (oldImage, ctx) => {
console.log(`User logged out: ${oldImage.userId}`);
await handleUserLogout(oldImage);
}, { excludeTTL: true });
// Default: onRemove receives both TTL and user-initiated removals
router.onRemove(isUser, async (oldImage, ctx) => {
console.log(`User removed: ${oldImage.userId}`);
});TTL removals are identified by:
userIdentity.type === "Service"userIdentity.principalId === "dynamodb.amazonaws.com"
See examples/ttl-removal.ts for more examples.
Middleware
// Logging middleware
router.use(async (record, next) => {
console.log(`Processing ${record.eventName}: ${record.eventID}`);
await next();
});
// Skip certain records
router.use(async (record, next) => {
if (record.eventSourceARN?.includes('test-table')) {
return; // Don't call next() to skip
}
await next();
});
// Error handling
router.use(async (record, next) => {
try {
await next();
} catch (error) {
await recordMetric('stream.error', 1);
throw error;
}
});Batch Processing
Process multiple records together:
// All matching records in one handler call
router.onInsert(
isInventoryChange,
async (records) => {
console.log(`Processing ${records.length} changes`);
for (const { newImage, ctx } of records) {
// process each record
}
},
{ batch: true }
);
// Group by attribute value
router.onInsert(
isAuditLog,
async (records) => {
const userId = records[0].newImage.userId;
console.log(`${records.length} logs for user ${userId}`);
},
{ batch: true, batchKey: 'userId' }
);
// Group by primary key
router.onModify(
isItem,
async (records) => {
// All records for the same pk+sk
},
{ batch: true, batchKey: { partitionKey: 'pk', sortKey: 'sk' } }
);Deferred Processing (SQS)
Offload heavy processing to SQS to keep stream processing fast:
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { StreamRouter, createSQSClient } from '@rogerchi/ddb-stream-router';
const router = new StreamRouter({
deferQueue: process.env.DEFER_QUEUE_URL,
sqsClient: createSQSClient(new SQSClient({}), SendMessageCommand),
});
// Immediate handler
router.onInsert(isOrder, async (order, ctx) => {
console.log('Order received');
});
// Deferred handler - enqueues to SQS instead of executing immediately
router
.onInsert(isOrder, async (order, ctx) => {
await sendConfirmationEmail(order);
await generateInvoice(order);
})
.defer('order-email-handler', { delaySeconds: 30 });
// Stream handler
export const streamHandler = router.streamHandler;
// SQS handler (can be same or different Lambda function)
export const sqsHandler = router.sqsHandler;The handler ID in .defer(id) is required to match records when processing from SQS. It ensures stable routing across deployments and supports cross-function processing.
Stream View Types
Handler signatures adapt based on your DynamoDB stream configuration:
| Stream View Type | INSERT | MODIFY | REMOVE |
|-----------------|--------|--------|--------|
| KEYS_ONLY | (keys, ctx) | (keys, ctx) | (keys, ctx) |
| NEW_IMAGE | (newImage, ctx) | (undefined, newImage, ctx) | (undefined, ctx) |
| OLD_IMAGE | (undefined, ctx) | (oldImage, undefined, ctx) | (oldImage, ctx) |
| NEW_AND_OLD_IMAGES | (newImage, ctx) | (oldImage, newImage, ctx) | (oldImage, ctx) |
Global Tables (Region Filtering)
Process only records from the current region:
const router = new StreamRouter({ sameRegionOnly: true });Configuration Options
const router = new StreamRouter({
// Stream view type (default: 'NEW_AND_OLD_IMAGES')
streamViewType: 'NEW_AND_OLD_IMAGES',
// Auto-unmarshall DynamoDB JSON (default: true)
unmarshall: true,
// Only process same-region records (default: false)
sameRegionOnly: false,
// Default SQS queue for deferred handlers
deferQueue: 'https://sqs...',
// SQS client for deferred processing
sqsClient: { sendMessage: async (params) => { ... } },
// Return batchItemFailures format for streamHandler/sqsHandler (default: true)
reportBatchItemFailures: true,
// Optional logger for trace logging (e.g., console, pino, Powertools Logger)
logger: console,
});Processing Results
const result = await router.process(event);
// { processed: 10, succeeded: 9, failed: 1, errors: [...] }
// Or with partial batch failures for Lambda retry
const result = await router.process(event, { reportBatchItemFailures: true });
// { batchItemFailures: [{ itemIdentifier: 'sequence-number' }] }API Reference
StreamRouter
class StreamRouter<V extends StreamViewType = 'NEW_AND_OLD_IMAGES'> {
constructor(options?: StreamRouterOptions);
onInsert<T>(matcher, handler, options?): HandlerRegistration;
onModify<T>(matcher, handler, options?): HandlerRegistration;
onRemove<T>(matcher, handler, options?): HandlerRegistration;
onTTLRemove<T>(matcher, handler, options?): HandlerRegistration;
use(middleware): this;
process(event, options?): Promise<ProcessingResult | BatchItemFailuresResponse>;
processDeferred(sqsEvent, options?): Promise<ProcessingResult | BatchItemFailuresResponse>;
}HandlerRegistration
interface HandlerRegistration {
// id: unique identifier for this deferred handler (used to match records in processDeferred)
defer(id: string, options?: { queue?: string; delaySeconds?: number }): StreamRouter;
onInsert(...): HandlerRegistration;
onModify(...): HandlerRegistration;
onRemove(...): HandlerRegistration;
onTTLRemove(...): HandlerRegistration;
}HandlerContext
interface HandlerContext {
eventName: 'INSERT' | 'MODIFY' | 'REMOVE';
eventID?: string;
eventSourceARN?: string;
}License
Apache-2.0
