npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@rogerchi/ddb-stream-router

v0.0.10

Published

Express-like routing for DynamoDB Streams with type-safe handlers, validation, attribute filtering, batch processing, and SQS deferral

Readme

@rogerchi/ddb-stream-router

A TypeScript library providing Express-like routing for DynamoDB Stream events. Register type-safe handlers for INSERT, MODIFY, and REMOVE operations using discriminator functions or schema validators like Zod. Defer heavy processing to SQS with a simple .defer(handlerId) chain to keep your stream processing fast and reliable.

Installation

npm install @rogerchi/ddb-stream-router
# or
yarn add @rogerchi/ddb-stream-router

Peer dependencies:

npm install @aws-sdk/util-dynamodb @aws-sdk/client-sqs

Features

  • Express-like API - Familiar .onInsert(), .onModify(), .onRemove(), .onTTLRemove(), .use() methods
  • Type Safety - Full TypeScript inference from discriminators and parsers
  • Flexible Matching - Use type guards or schema validators (Zod, etc.)
  • Attribute Filtering - React to specific attribute changes in MODIFY events
  • Batch Processing - Group records and process them together
  • Middleware Support - Intercept records before handlers
  • Deferred Processing - Automatically enqueue to SQS for async processing
  • Global Tables - Filter records by region
  • Partial Batch Failures - Lambda retry support via reportBatchItemFailures
  • TTL Removal Handling - Separate handlers for TTL-triggered vs user-initiated deletions

Quick Start

import { StreamRouter } from '@rogerchi/ddb-stream-router';
import type { DynamoDBStreamHandler } from 'aws-lambda';

interface User {
  pk: string;
  sk: string;
  name: string;
  email: string;
}

// Type guard discriminator
const isUser = (record: unknown): record is User =>
  typeof record === 'object' &&
  record !== null &&
  'pk' in record &&
  (record as { pk: string }).pk.startsWith('USER#');

const router = new StreamRouter();

router
  .onInsert(isUser, async (newUser, ctx) => {
    console.log(`User created: ${newUser.name}`);
  })
  .onModify(isUser, async (oldUser, newUser, ctx) => {
    console.log(`User updated: ${oldUser.name} -> ${newUser.name}`);
  })
  .onRemove(isUser, async (deletedUser, ctx) => {
    console.log(`User deleted: ${deletedUser.name}`);
  });

// Simplified export with built-in batch failure support
export const handler = router.streamHandler;

// Or with custom options:
// export const handler: DynamoDBStreamHandler = async (event) => {
//   return router.process(event, { reportBatchItemFailures: true });
// };

Discriminator Matching

The discriminator/parser is matched against different images based on event type:

| Event Type | Image Used for Matching | |------------|------------------------| | INSERT | newImage | | MODIFY | newImage | | REMOVE | oldImage |

For MODIFY events, the newImage is used for matching because you typically want to route based on the current state of the record. If a record's type changed (e.g., pk prefix changed from USER# to ADMIN#), the handler for the new type will be invoked.

Validation Target: Use the validationTarget option to control which image(s) are validated:

  • "newImage" (default) - Validates the new image
  • "oldImage" - Validates the old image (useful for processing based on previous state)
  • "both" - Both images must match (useful for type migration validation)
// Only process if BOTH old and new images match
router.onModify(isUser, async (oldUser, newUser, ctx) => {
  // Both states are guaranteed to be valid Users
}, { validationTarget: "both" });

Using Zod Schemas

import { z } from 'zod';
import { StreamRouter } from '@rogerchi/ddb-stream-router';

const UserSchema = z.object({
  pk: z.string().startsWith('USER#'),
  sk: z.string(),
  name: z.string(),
  email: z.string(),
});

type User = z.infer<typeof UserSchema>;

const router = new StreamRouter();

// Schema validates and parses data before handler receives it
router.onInsert(UserSchema, async (newUser: User, ctx) => {
  // newUser is guaranteed to match the schema
  console.log(`Valid user: ${newUser.email}`);
});

Attribute Change Filtering

React only when specific attributes change:

// Only trigger when email changes
router.onModify(
  isUser,
  async (oldUser, newUser, ctx) => {
    await sendEmailVerification(newUser.email);
  },
  { attribute: 'email', changeType: 'changed_attribute' }
);

// Nested attributes with dot notation
router.onModify(
  isUser,
  async (oldUser, newUser, ctx) => {
    console.log(`Theme: ${oldUser.preferences.theme} -> ${newUser.preferences.theme}`);
  },
  { attribute: 'preferences.theme', changeType: 'changed_attribute' }
);

// Watching parent catches all nested changes
router.onModify(
  isUser,
  async (oldUser, newUser, ctx) => {
    // Triggers when preferences.theme OR preferences.notifications changes
    console.log('Any preference changed');
  },
  { attribute: 'preferences' }
);

// Trigger when tags are added to a collection
router.onModify(
  isUser,
  async (oldUser, newUser, ctx) => {
    console.log('New tags added');
  },
  { attribute: 'tags', changeType: 'new_item_in_collection' }
);

// Multiple change types (OR logic)
router.onModify(
  isUser,
  async (oldUser, newUser, ctx) => {
    console.log('Tags modified');
  },
  { attribute: 'tags', changeType: ['new_item_in_collection', 'remove_item_from_collection'] }
);

Change types:

  • new_attribute - Attribute added
  • remove_attribute - Attribute removed
  • changed_attribute - Attribute value changed
  • new_item_in_collection - Item added to List or Set (SS/NS/BS)
  • remove_item_from_collection - Item removed from List or Set
  • changed_item_in_collection - Items both added and removed in List/Set

Supported collection types:

  • Arrays (DynamoDB Lists)
  • Sets (DynamoDB String Sets, Number Sets, Binary Sets)

TTL Removal Events

DynamoDB TTL automatically deletes expired items, creating REMOVE events with special userIdentity metadata. The router lets you handle TTL removals separately from user-initiated deletions.

// Handle TTL-triggered removals only
router.onTTLRemove(isSession, async (oldImage, ctx) => {
  console.log(`Session expired via TTL: ${oldImage.sessionId}`);
  await cleanupExpiredSession(oldImage);
});

// Handle user-initiated deletions only (exclude TTL)
router.onRemove(isSession, async (oldImage, ctx) => {
  console.log(`User logged out: ${oldImage.userId}`);
  await handleUserLogout(oldImage);
}, { excludeTTL: true });

// Default: onRemove receives both TTL and user-initiated removals
router.onRemove(isUser, async (oldImage, ctx) => {
  console.log(`User removed: ${oldImage.userId}`);
});

TTL removals are identified by:

  • userIdentity.type === "Service"
  • userIdentity.principalId === "dynamodb.amazonaws.com"

See examples/ttl-removal.ts for more examples.

Middleware

// Logging middleware
router.use(async (record, next) => {
  console.log(`Processing ${record.eventName}: ${record.eventID}`);
  await next();
});

// Skip certain records
router.use(async (record, next) => {
  if (record.eventSourceARN?.includes('test-table')) {
    return; // Don't call next() to skip
  }
  await next();
});

// Error handling
router.use(async (record, next) => {
  try {
    await next();
  } catch (error) {
    await recordMetric('stream.error', 1);
    throw error;
  }
});

Batch Processing

Process multiple records together:

// All matching records in one handler call
router.onInsert(
  isInventoryChange,
  async (records) => {
    console.log(`Processing ${records.length} changes`);
    for (const { newImage, ctx } of records) {
      // process each record
    }
  },
  { batch: true }
);

// Group by attribute value
router.onInsert(
  isAuditLog,
  async (records) => {
    const userId = records[0].newImage.userId;
    console.log(`${records.length} logs for user ${userId}`);
  },
  { batch: true, batchKey: 'userId' }
);

// Group by primary key
router.onModify(
  isItem,
  async (records) => {
    // All records for the same pk+sk
  },
  { batch: true, batchKey: { partitionKey: 'pk', sortKey: 'sk' } }
);

Deferred Processing (SQS)

Offload heavy processing to SQS to keep stream processing fast:

import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { StreamRouter, createSQSClient } from '@rogerchi/ddb-stream-router';

const router = new StreamRouter({
  deferQueue: process.env.DEFER_QUEUE_URL,
  sqsClient: createSQSClient(new SQSClient({}), SendMessageCommand),
});

// Immediate handler
router.onInsert(isOrder, async (order, ctx) => {
  console.log('Order received');
});

// Deferred handler - enqueues to SQS instead of executing immediately
router
  .onInsert(isOrder, async (order, ctx) => {
    await sendConfirmationEmail(order);
    await generateInvoice(order);
  })
  .defer('order-email-handler', { delaySeconds: 30 });

// Stream handler
export const streamHandler = router.streamHandler;

// SQS handler (can be same or different Lambda function)
export const sqsHandler = router.sqsHandler;

The handler ID in .defer(id) is required to match records when processing from SQS. It ensures stable routing across deployments and supports cross-function processing.

Stream View Types

Handler signatures adapt based on your DynamoDB stream configuration:

| Stream View Type | INSERT | MODIFY | REMOVE | |-----------------|--------|--------|--------| | KEYS_ONLY | (keys, ctx) | (keys, ctx) | (keys, ctx) | | NEW_IMAGE | (newImage, ctx) | (undefined, newImage, ctx) | (undefined, ctx) | | OLD_IMAGE | (undefined, ctx) | (oldImage, undefined, ctx) | (oldImage, ctx) | | NEW_AND_OLD_IMAGES | (newImage, ctx) | (oldImage, newImage, ctx) | (oldImage, ctx) |

Global Tables (Region Filtering)

Process only records from the current region:

const router = new StreamRouter({ sameRegionOnly: true });

Configuration Options

const router = new StreamRouter({
  // Stream view type (default: 'NEW_AND_OLD_IMAGES')
  streamViewType: 'NEW_AND_OLD_IMAGES',
  
  // Auto-unmarshall DynamoDB JSON (default: true)
  unmarshall: true,
  
  // Only process same-region records (default: false)
  sameRegionOnly: false,
  
  // Default SQS queue for deferred handlers
  deferQueue: 'https://sqs...',
  
  // SQS client for deferred processing
  sqsClient: { sendMessage: async (params) => { ... } },
  
  // Return batchItemFailures format for streamHandler/sqsHandler (default: true)
  reportBatchItemFailures: true,
  
  // Optional logger for trace logging (e.g., console, pino, Powertools Logger)
  logger: console,
});

Processing Results

const result = await router.process(event);
// { processed: 10, succeeded: 9, failed: 1, errors: [...] }

// Or with partial batch failures for Lambda retry
const result = await router.process(event, { reportBatchItemFailures: true });
// { batchItemFailures: [{ itemIdentifier: 'sequence-number' }] }

API Reference

StreamRouter

class StreamRouter<V extends StreamViewType = 'NEW_AND_OLD_IMAGES'> {
  constructor(options?: StreamRouterOptions);
  
  onInsert<T>(matcher, handler, options?): HandlerRegistration;
  onModify<T>(matcher, handler, options?): HandlerRegistration;
  onRemove<T>(matcher, handler, options?): HandlerRegistration;
  onTTLRemove<T>(matcher, handler, options?): HandlerRegistration;
  use(middleware): this;
  
  process(event, options?): Promise<ProcessingResult | BatchItemFailuresResponse>;
  processDeferred(sqsEvent, options?): Promise<ProcessingResult | BatchItemFailuresResponse>;
}

HandlerRegistration

interface HandlerRegistration {
  // id: unique identifier for this deferred handler (used to match records in processDeferred)
  defer(id: string, options?: { queue?: string; delaySeconds?: number }): StreamRouter;
  onInsert(...): HandlerRegistration;
  onModify(...): HandlerRegistration;
  onRemove(...): HandlerRegistration;
  onTTLRemove(...): HandlerRegistration;
}

HandlerContext

interface HandlerContext {
  eventName: 'INSERT' | 'MODIFY' | 'REMOVE';
  eventID?: string;
  eventSourceARN?: string;
}

License

Apache-2.0