@torixtv/nestjs-inngest
v0.13.0
Published
Modern NestJS integration for Inngest - type-safe, decorator-based event-driven functions
Maintainers
Readme
NestJS Inngest
Modern NestJS integration for Inngest - the durable function platform. Build type-safe, decorator-based event-driven functions with step functions, automatic retries, and comprehensive observability.
Table of Contents
- What is Inngest?
- Key Features
- Installation
- Quick Start
- Core Concepts
- Advanced Features
- Connection Modes
- Real-World Examples
- API Reference
- Testing
- Deployment & Production
- Troubleshooting
- Contributing
What is Inngest?
Inngest is a developer platform for building reliable workflows, background jobs, and scheduled functions. It provides:
- Durable execution - Functions survive server restarts and failures
- Automatic retries - Built-in retry logic with exponential backoff
- Step functions - Break complex workflows into reliable, resumable steps
- Event-driven architecture - Trigger functions with type-safe events
- Observability - Built-in logging, metrics, and tracing
- Local development - Full local development server with UI
This NestJS integration brings Inngest's powerful capabilities to your NestJS applications with familiar decorators and dependency injection.
Key Features
✨ Type-Safe Decorators - @InngestFunction, @InngestEvent, @InngestCron
🔧 Step Functions - Reliable multi-step workflows with step.run(), step.waitForEvent()
⚡ Flow Control - @Throttle, @Debounce, @RateLimit, @Concurrency, @Retries
📊 Observability - OpenTelemetry tracing, health checks, metrics collection
🧪 Testing Support - Comprehensive testing utilities and mocks
🔌 Middleware - Custom middleware with @UseMiddleware
📦 Modular Architecture - Optional health and monitoring modules
🚀 Production Ready - Built for enterprise with monitoring and error handling
Installation
# npm
npm install @torixtv/nestjs-inngest inngest
# yarn
yarn add @torixtv/nestjs-inngest inngest
# pnpm
pnpm add @torixtv/nestjs-inngest inngestPeer Dependencies
npm install @nestjs/common @nestjs/core reflect-metadata rxjs zodOptional Dependencies (for advanced features)
# For OpenTelemetry tracing (IMPORTANT: Use compatible versions)
npm install @opentelemetry/api@^1.9.0 @opentelemetry/sdk-node@^0.56.0
# For health checks and monitoring
npm install @nestjs/terminus @nestjs/platform-express⚠️ OpenTelemetry Version Constraints: Due to Inngest's OpenTelemetry dependencies, you must use compatible versions:
@opentelemetry/api@^1.9.0(latest stable)@opentelemetry/sdk-node@^0.56.0(matches Inngest v3.40.x)Using newer versions (e.g., sdk-node v0.205.x) will cause runtime conflicts. These constraints will be lifted when Inngest updates their OpenTelemetry dependencies.
Quick Start
1. Set up the Module
// app.module.ts
import { Module } from '@nestjs/common';
import { InngestModule } from '@torixtv/nestjs-inngest';
import { UserService } from './user.service';
@Module({
imports: [
InngestModule.forRoot({
id: 'my-nestjs-app',
// For development - connects to local Inngest dev server
baseUrl: 'http://localhost:8288',
// Configure custom port and host (for auto-registration)
servePort: 3002,
serveHost: 'localhost',
// For production - remove baseUrl to use Inngest Cloud
signingKey: process.env.INNGEST_SIGNING_KEY,
environment: process.env.NODE_ENV as 'development' | 'production',
}),
],
providers: [UserService],
})
export class AppModule {}2. Create Your First Function
// user.service.ts
import { Injectable } from '@nestjs/common';
import { InngestEvent, InngestService } from '@torixtv/nestjs-inngest';
@Injectable()
export class UserService {
constructor(private readonly inngestService: InngestService) {}
// Event-triggered function
@InngestEvent('welcome-new-user', 'user.created')
async welcomeNewUser({ event, step }: { event: any; step: any }) {
const { userId, email } = event.data;
// Step 1: Send welcome email
await step.run('send-welcome-email', async () => {
console.log(`Sending welcome email to ${email}`);
// Your email logic here
return { emailSent: true };
});
// Step 2: Create user profile
await step.run('create-user-profile', async () => {
console.log(`Creating profile for user ${userId}`);
// Your profile creation logic here
return { profileCreated: true };
});
// Step 3: Send follow-up event
await step.sendEvent('schedule-follow-up', {
name: 'user.follow-up',
data: { userId, email },
});
return { success: true, userId };
}
// Method to trigger the function
async createUser(email: string) {
const userId = `user-${Date.now()}`;
// Send event to trigger the function
await this.inngestService.send({
name: 'user.created',
data: { userId, email },
});
return { userId, email };
}
}3. Start the Inngest Dev Server
# Install Inngest CLI
npm install -g inngest-cli
# Start the dev server
inngest dev4. Run Your NestJS App
npm run start:devYour functions will be automatically registered and visible in the Inngest dev UI at http://localhost:8288.
Core Concepts
Event-Driven Functions
@InngestFunction - Full Configuration
import { Injectable } from '@nestjs/common';
import { InngestFunction } from '@torixtv/nestjs-inngest';
@Injectable()
export class OrderService {
@InngestFunction({
id: 'process-order',
trigger: { event: 'order.created' },
concurrency: 10,
retries: 3,
batchEvents: {
maxSize: 10,
timeout: '5m'
}
})
async processOrder({ event, step }: { event: any; step: any }) {
// Your function logic here
}
}@InngestEvent - Event-Triggered Functions
// Simple event trigger
@InngestEvent('handle-payment', 'payment.completed')
async handlePayment({ event, step }) {
// Triggered when 'payment.completed' event is sent
}
// Event with conditions
@InngestEvent('handle-large-payment', {
event: 'payment.completed',
if: 'event.data.amount > 1000'
})
async handleLargePayment({ event, step }) {
// Only triggered for payments over $1000
}
// Multiple event triggers
@InngestEvent('handle-user-activity', ['user.login', 'user.purchase', 'user.updated'])
async handleUserActivity({ event, step }) {
// Triggered by any of the specified events
}@InngestCron - Scheduled Functions
// Run daily at 9 AM UTC
@InngestCron('daily-report', '0 9 * * *')
async generateDailyReport({ step }) {
const report = await step.run('generate-report', async () => {
// Generate your report
return { reportId: 'daily-123', generatedAt: new Date() };
});
await step.run('send-report-email', async () => {
// Send the report via email
});
}
// Run every 5 minutes
@InngestCron('health-check', '*/5 * * * *')
async performHealthCheck({ step }) {
// Your health check logic
}Step Functions & Workflows
Step functions provide durability and reliability by breaking your workflow into discrete, resumable steps.
step.run() - Basic Steps
@InngestEvent('process-order', 'order.created')
async processOrder({ event, step }) {
const { orderId, customerId } = event.data;
// Step 1: Validate the order
const validation = await step.run('validate-order', async () => {
// This step will be retried independently if it fails
const isValid = await this.validateOrder(orderId);
return { valid: isValid, validatedAt: new Date() };
});
if (!validation.valid) {
throw new Error('Invalid order');
}
// Step 2: Process payment
const payment = await step.run('process-payment', async () => {
// If this step fails, validation won't be re-run
const result = await this.processPayment(orderId);
return { transactionId: result.id, amount: result.amount };
});
// Step 3: Update inventory
await step.run('update-inventory', async () => {
await this.updateInventory(orderId);
return { inventoryUpdated: true };
});
return { success: true, orderId, transactionId: payment.transactionId };
}step.waitForEvent() - Waiting for Events
@InngestEvent('user-onboarding', 'user.registered')
async userOnboarding({ event, step }) {
const { userId, email } = event.data;
// Step 1: Send welcome email
await step.run('send-welcome-email', async () => {
await this.emailService.sendWelcome(email);
return { emailSent: true };
});
// Step 2: Wait for email verification (with 24-hour timeout)
const verification = await step.waitForEvent('wait-for-verification', {
event: 'user.email-verified',
timeout: '24h',
if: `async.data.userId == "${userId}"`,
});
if (!verification) {
// Timeout occurred - send reminder
await step.run('send-reminder-email', async () => {
await this.emailService.sendVerificationReminder(email);
return { reminderSent: true };
});
return { status: 'verification-timeout' };
}
// Step 3: Complete onboarding
await step.run('complete-onboarding', async () => {
await this.userService.markAsVerified(userId);
return { onboardingCompleted: true };
});
return { status: 'completed', userId };
}step.sendEvent() - Sending Events
@InngestEvent('order-workflow', 'order.submitted')
async orderWorkflow({ event, step }) {
const { orderId } = event.data;
// Process the order
await step.run('process-order', async () => {
return await this.processOrder(orderId);
});
// Send downstream events
await step.sendEvent('notify-fulfillment', {
name: 'fulfillment.order-ready',
data: { orderId, status: 'ready-for-fulfillment' },
});
await step.sendEvent('send-confirmation', {
name: 'email.send-order-confirmation',
data: { orderId, template: 'order-confirmation' },
});
// Send multiple events at once
await step.sendEvent('batch-notifications', [
{
name: 'analytics.order-processed',
data: { orderId, timestamp: new Date() },
},
{
name: 'webhook.order-status-change',
data: { orderId, status: 'processed' },
},
]);
}step.sleep() and step.sleepUntil()
@InngestEvent('delayed-follow-up', 'user.trial-started')
async delayedFollowUp({ event, step }) {
const { userId } = event.data;
// Wait 7 days before following up
await step.sleep('wait-7-days', '7d');
await step.run('send-follow-up', async () => {
await this.emailService.sendTrialFollowUp(userId);
return { followUpSent: true };
});
// Wait until specific date/time
const reminderDate = new Date(Date.now() + 14 * 24 * 60 * 60 * 1000); // 14 days
await step.sleepUntil('wait-until-reminder', reminderDate);
await step.run('send-trial-ending-reminder', async () => {
await this.emailService.sendTrialEndingReminder(userId);
return { reminderSent: true };
});
}Middleware & Flow Control
@UseMiddleware - Custom Middleware
// Custom logging middleware
const loggingMiddleware = {
init: () => ({
onFunctionRun: ({ fn }) => {
console.log(`Function ${fn.id} starting...`);
return {
transformOutput: (result) => {
console.log(`Function ${fn.id} completed:`, result);
return result;
},
};
},
}),
};
@Injectable()
export class PaymentService {
@InngestEvent('process-payment', 'payment.requested')
@UseMiddleware(loggingMiddleware)
async processPayment({ event, step }) {
// Your payment logic with automatic logging
}
}@Concurrency - Limiting Concurrent Executions
// Limit to 5 concurrent executions globally
@InngestEvent('heavy-processing', 'data.process-request')
@Concurrency(5)
async heavyProcessing({ event, step }) {
// Expensive operation
}
// Limit concurrency per user
@InngestEvent('user-specific-task', 'user.task-requested')
@Concurrency(1, { key: 'event.data.userId' })
async userSpecificTask({ event, step }) {
// Only one task per user at a time
}@RateLimit - Rate Limiting
// Allow 100 executions per hour
@InngestEvent('api-call', 'external.api-request')
@RateLimit(100, '1h')
async makeApiCall({ event, step }) {
// API call logic
}
// Rate limit per customer
@InngestEvent('customer-export', 'export.requested')
@RateLimit(10, '1h', 'event.data.customerId')
async customerExport({ event, step }) {
// Export logic limited per customer
}@Throttle - Throttling with Burst Support
// Allow burst of 20, then 100 per minute
@InngestEvent('notification-send', 'notification.requested')
@Throttle(100, '1m', { burst: 20 })
async sendNotification({ event, step }) {
// Notification logic with burst capability
}@Debounce - Preventing Rapid Executions
// Debounce file save operations
@InngestEvent('save-document', 'document.changed')
@Debounce('5s', 'event.data.documentId')
async saveDocument({ event, step }) {
// Only save if no changes for 5 seconds
}@Retries - Custom Retry Configuration
// Retry up to 5 times instead of the default 3
@InngestEvent('unreliable-task', 'task.execute')
@Retries(5)
async unreliableTask({ event, step }) {
// Task that might fail and need more retries
}Advanced Features
OpenTelemetry Tracing
Enable distributed tracing to track your functions across your entire system:
Configuration
import { InngestModule } from '@torixtv/nestjs-inngest';
@Module({
imports: [
InngestModule.forRoot({
id: 'my-app',
tracing: {
enabled: true,
serviceName: 'my-nestjs-service',
includeEventData: false, // For privacy
includeStepData: true, // For debugging
defaultAttributes: {
'service.version': '1.0.0',
'deployment.environment': process.env.NODE_ENV,
},
contextInjection: {
enabled: true,
fieldName: 'traceContext', // Where to inject trace context in events
},
},
}),
],
})
export class AppModule {}Automatic Trace Propagation
@Injectable()
export class OrderService {
@InngestEvent('process-order', 'order.created')
async processOrder({ event, step }) {
// Tracing is automatic - each step becomes a span
const validation = await step.run('validate-order', async () => {
// This step is automatically traced
return await this.validateOrder(event.data.orderId);
});
// Trace context is automatically propagated to sent events
await step.sendEvent('payment-requested', {
name: 'payment.process',
data: {
orderId: event.data.orderId,
amount: validation.amount,
// traceContext automatically injected here
},
});
}
@InngestEvent('process-payment', 'payment.process')
async processPayment({ event, step }) {
// This function will be part of the same distributed trace
// if called from the traced order processing above
}
}Custom Trace Attributes
import { trace } from '@opentelemetry/api';
@InngestEvent('custom-traced-function', 'custom.event')
async customTracedFunction({ event, step }) {
// Get the current span to add custom attributes
const span = trace.getActiveSpan();
await step.run('custom-step', async () => {
span?.setAttributes({
'custom.user_id': event.data.userId,
'custom.operation_type': 'data_processing',
});
// Your logic here
return { processed: true };
});
}Health Checks & Monitoring
Enable Health Checks
import { Module } from '@nestjs/common';
import { InngestModule, InngestHealthModule } from '@torixtv/nestjs-inngest';
@Module({
imports: [
InngestModule.forRoot({
id: 'my-app',
health: {
enabled: true,
path: '/health/inngest',
includeDetails: true,
enableMetrics: true,
checkInterval: 30000, // 30 seconds
},
}),
// Add the health module
InngestHealthModule,
],
})
export class AppModule {}Monitoring with Metrics
import { InngestMonitoringModule } from '@torixtv/nestjs-inngest';
@Module({
imports: [
InngestModule.forRoot({
id: 'my-app',
monitoring: {
enabled: true,
collectMetrics: true,
metricsInterval: 15000, // Collect every 15 seconds
enableTracing: true,
},
}),
InngestMonitoringModule,
],
})
export class AppModule {}Health Check API
# Basic health check
GET /health/inngest
# Response
{
"status": "ok",
"info": {
"inngest": {
"status": "up",
"functions": 5,
"lastSync": "2024-01-15T10:30:00Z"
}
}
}
# Detailed health check
GET /health/inngest?details=true
# Response with metrics
{
"status": "ok",
"info": {
"inngest": {
"status": "up",
"functions": 5,
"lastSync": "2024-01-15T10:30:00Z"
}
},
"details": {
"memory": {
"used": "45 MB",
"limit": "512 MB",
"percentage": 8.8
},
"functions": [
{
"id": "process-order",
"status": "healthy",
"lastExecution": "2024-01-15T10:29:45Z"
}
]
}
}@nestjs/terminus Integration
For Kubernetes readiness/liveness probes or existing @nestjs/terminus health check setups, use the InngestHealthIndicator:
# Install @nestjs/terminus (optional peer dependency)
npm install @nestjs/terminus// health.module.ts
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { InngestHealthIndicator } from '@torixtv/nestjs-inngest';
import { HealthController } from './health.controller';
@Module({
imports: [TerminusModule],
providers: [InngestHealthIndicator],
controllers: [HealthController],
})
export class HealthModule {}// health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService, HealthCheckResult } from '@nestjs/terminus';
import { InngestHealthIndicator } from '@torixtv/nestjs-inngest';
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private inngest: InngestHealthIndicator,
) {}
@Get('liveness')
@HealthCheck()
liveness(): Promise<HealthCheckResult> {
return this.health.check([
() => this.inngest.isHealthy('inngest'),
]);
}
@Get('readiness')
@HealthCheck()
readiness(): Promise<HealthCheckResult> {
return this.health.check([
() => this.inngest.isReady('inngest'),
]);
}
}The health indicator is connection-mode aware:
- Serve mode: Returns healthy if Inngest client is initialized
- Connect mode: Returns healthy only when WebSocket connection is
ACTIVE
Response includes mode and connectionState fields:
{
"status": "ok",
"info": {
"inngest": {
"status": "up",
"message": "Inngest worker is connected",
"mode": "connect",
"connectionState": "ACTIVE"
}
}
}Configuration
Environment-Based Configuration
// config/inngest.config.ts
import { InngestModuleOptions } from '@torixtv/nestjs-inngest';
export const getInngestConfig = (): InngestModuleOptions => {
const baseConfig: InngestModuleOptions = {
id: process.env.INNGEST_APP_ID || 'my-app',
eventKey: process.env.INNGEST_EVENT_KEY,
environment: (process.env.NODE_ENV as any) || 'development',
};
if (process.env.NODE_ENV === 'production') {
return {
...baseConfig,
signingKey: process.env.INNGEST_SIGNING_KEY,
baseUrl: undefined, // Use Inngest Cloud
middleware: [], // Add production middleware
monitoring: {
enabled: true,
collectMetrics: true,
metricsInterval: 30000,
enableTracing: true,
},
health: {
enabled: true,
path: '/health/inngest',
includeDetails: false,
enableMetrics: true,
checkInterval: 60000,
},
};
}
return {
...baseConfig,
baseUrl: 'http://localhost:8288', // Local dev server
logger: console, // Enable debug logging
};
};
// app.module.ts
@Module({
imports: [
InngestModule.forRoot(getInngestConfig()),
],
})
export class AppModule {}Async Configuration with ConfigService
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
InngestModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
id: configService.get<string>('INNGEST_APP_ID'),
eventKey: configService.get<string>('INNGEST_EVENT_KEY'),
signingKey: configService.get<string>('INNGEST_SIGNING_KEY'),
baseUrl: configService.get<string>('INNGEST_BASE_URL'),
environment: configService.get<string>('NODE_ENV') as any,
tracing: {
enabled: configService.get<boolean>('ENABLE_TRACING', false),
serviceName: configService.get<string>('SERVICE_NAME'),
},
monitoring: {
enabled: configService.get<boolean>('ENABLE_MONITORING', true),
collectMetrics: true,
metricsInterval: configService.get<number>('METRICS_INTERVAL', 30000),
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}Understanding Configuration Parameters
Before diving into specific configuration patterns, it's important to understand what the key configuration parameters actually mean and how they work together.
The Three Configuration Concerns
When configuring the Inngest module, you're dealing with three separate concerns:
- Inngest Server Location (
baseUrl): Where the Inngest server is running - Your App Location (
serveHost,servePort): Where YOUR NestJS app is accessible - Endpoint Path (
path): Where the Inngest functions endpoint is served in your app
Visual Architecture
┌──────────────────────────────┐ ┌──────────────────────────────┐
│ Inngest Dev Server │ │ Your NestJS App │
│ localhost:8288 │◄────────│ localhost:3000 │
│ (baseUrl) │ calls │ (serveHost:servePort) │
│ │ │ │
│ - Function registry │ │ /api/inngest │
│ - Event queue │ │ (path + globalPrefix) │
│ - UI Dashboard │ │ │
└──────────────────────────────┘ └──────────────────────────────┘How auto-registration works:
- When your NestJS app starts, it creates an endpoint at
path(default:/inngest) - If
baseUrlpoints to a dev server (notinngest.com), the module automatically POSTs to{baseUrl}/fn/register - The registration tells Inngest: "My functions are available at
http://{serveHost}:{servePort}/{path}" - Inngest dev server then calls YOUR app at that URL when events trigger your functions
Common Confusion Points
"What is serveHost/servePort for?"
- These are NOT Inngest's host/port (that's
baseUrl) - These tell Inngest where YOUR app is running
- Think of them as "my app's address" not "Inngest's address"
"Does path respect NestJS global prefix?"
- No, the
@Controllerdecorator doesn't know about global prefix at decoration time - If you use
app.setGlobalPrefix('api'), you must setpath: 'api/inngest'manually - This is consistent with how other NestJS packages work (like
@nestjs/swagger)
"Do I need to configure servePort if my app runs on the default port?"
- If your app runs on port 3000: No configuration needed (it's the default)
- If your app runs on a different port: Yes, you must set
servePortto match - The module defaults to
process.env.PORT || 3000, so settingPORTenv var works too
Configuration Precedence
The module follows this precedence order when determining configuration values:
1. Explicit configuration in forRoot() / forRootAsync()
2. Environment variables (INNGEST_SERVE_PORT, INNGEST_SERVE_HOST, INNGEST_PATH)
3. Standard environment variables (PORT for servePort)
4. Package defaults (servePort: 3000, serveHost: 'localhost', path: 'inngest')Custom Port & Host Configuration
When your NestJS application runs on a custom port or needs a specific host configuration for auto-registration with the Inngest dev server:
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288', // Inngest dev server
// Option 1: Hostname + Port (for local development)
servePort: 3002,
serveHost: 'localhost',
// Option 2: Full URL (for production/custom setups)
serveHost: 'https://myapp.herokuapp.com',
// servePort is ignored when serveHost is a full URL
// Option 3: Environment variables (recommended)
servePort: parseInt(process.env.PORT || '3000'),
serveHost: process.env.INNGEST_SERVE_HOST || 'localhost',
// Option 4: Let environment variables handle it (with new auto-detection)
// servePort auto-reads from INNGEST_SERVE_PORT or PORT
// serveHost auto-reads from INNGEST_SERVE_HOST
// path auto-reads from INNGEST_PATH
})Common Configuration Patterns
Here are the most common configuration patterns you'll need:
Pattern 1: Default Development Setup
The simplest configuration - all defaults work for standard local development:
// Your app runs on port 3000 with no global prefix
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288',
// That's it! Defaults handle the rest:
// - servePort: 3000
// - serveHost: 'localhost'
// - path: 'inngest'
})
// Your functions will be accessible at: http://localhost:3000/inngest
// Inngest dev server will auto-register and call this URLPattern 2: Custom Port
When your app runs on a non-standard port:
// Your app runs on port 3002
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288',
servePort: 3002, // Must match where your app actually listens
})
// Or better - use environment variable:
// In main.ts:
const port = process.env.PORT || 3002;
await app.listen(port);
// In module config:
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288',
servePort: parseInt(process.env.PORT || '3002'),
})Pattern 3: With NestJS Global Prefix
When using app.setGlobalPrefix(), you must include it in the path:
// In main.ts:
const app = await NestFactory.create(AppModule);
app.setGlobalPrefix('api'); // Global prefix for all routes
await app.listen(3000);
// In module config:
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288',
path: 'api/inngest', // MUST include the global prefix manually
})
// Your functions will be at: http://localhost:3000/api/inngest
// NOT at: http://localhost:3000/inngestWhy? The @Controller decorator is applied before setGlobalPrefix() runs, so the module can't auto-detect it. This is standard NestJS behavior.
Pattern 4: Production Deployment (Cloud Platforms)
For platforms like Heroku, Render, AWS, etc.:
InngestModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
id: config.get('INNGEST_APP_ID'),
signingKey: config.get('INNGEST_SIGNING_KEY'), // Required for production
eventKey: config.get('INNGEST_EVENT_KEY'),
environment: 'production',
// No baseUrl - uses Inngest Cloud
// For cloud platforms, you typically don't need serveHost/servePort
// because Inngest Cloud reaches you via your public URL
}),
inject: [ConfigService],
})Pattern 5: Kubernetes Deployment
For Kubernetes with internal service DNS:
InngestModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
id: config.get('INNGEST_APP_ID'),
signingKey: config.get('INNGEST_SIGNING_KEY'),
baseUrl: config.get('INNGEST_BASE_URL'), // If using self-hosted Inngest
// Option A: Explicit K8s service DNS
serveHost: config.get('K8S_SERVICE_HOST', 'my-app.default.svc.cluster.local'),
servePort: config.get('SERVICE_PORT', 8080),
// Option B: Use environment variables (recommended)
// Set in your K8s deployment:
// - INNGEST_SERVE_HOST=my-app.default.svc.cluster.local
// - PORT=8080
// Module will auto-read these
}),
inject: [ConfigService],
})Kubernetes deployment YAML:
env:
- name: INNGEST_SERVE_HOST
value: "my-app-service.default.svc.cluster.local"
- name: PORT
value: "8080"
- name: INNGEST_APP_ID
valueFrom:
configMapKeyRef:
name: inngest-config
key: app-idPattern 6: Docker Compose
For local development with Docker Compose:
InngestModule.forRoot({
id: 'my-app',
// Use Docker service name from docker-compose.yml
baseUrl: 'http://inngest:8288',
serveHost: 'app', // Docker service name for your NestJS app
servePort: 3000,
})docker-compose.yml:
services:
app:
build: .
ports:
- '3000:3000'
environment:
- INNGEST_SERVE_HOST=app
- PORT=3000
inngest:
image: inngest/inngest:latest
ports:
- '8288:8288'Pattern 7: Manual Registration Control
For advanced scenarios where you need control over when registration happens:
// Disable auto-registration
InngestModule.forRoot({
id: 'my-app',
baseUrl: 'http://localhost:8288',
disableAutoRegistration: true, // Don't register on module init
})
// In main.ts - register manually after app.listen()
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const port = process.env.PORT || 3000;
await app.listen(port);
// Now register with the actual port
const inngestService = app.get(InngestService);
await inngestService.registerWithDevServer({
serveHost: 'localhost',
servePort: port,
});
console.log(`App listening on port ${port}`);
}When to use this:
- Dynamic port allocation (port 0)
- Complex startup sequences
- Testing scenarios
- When you need to defer registration
When to use these options:
- Your app runs on a non-standard port (not 3000)
- You need custom host configuration for Docker/containers
- Multiple NestJS apps with Inngest on different ports
- Load balancers or reverse proxies require specific host settings
- Kubernetes deployments with service DNS
Connection Modes
The module supports two connection modes for communicating with Inngest:
Serve Mode (Default)
Serve mode is the traditional HTTP webhook-based approach. Your NestJS application exposes an HTTP endpoint that Inngest calls to execute functions.
InngestModule.forRoot({
id: 'my-app',
mode: 'serve', // Default - can be omitted
baseUrl: 'http://localhost:8288',
})Characteristics:
- Uses HTTP webhooks - Inngest calls your
/api/inngestendpoint - Requires your app to be publicly accessible (or tunneled in development)
- Traditional request-response model
- Good for serverless environments (Vercel, AWS Lambda, etc.)
Connect Mode
Connect mode uses a persistent WebSocket connection. Your application connects to Inngest and pulls work, rather than Inngest pushing work to your app via HTTP.
InngestModule.forRoot({
id: 'my-app',
mode: 'connect',
signingKey: process.env.INNGEST_SIGNING_KEY,
connect: {
instanceId: 'worker-1', // Optional: unique identifier for this worker
maxConcurrency: 10, // Optional: max concurrent function executions
shutdownTimeout: 30000, // Optional: graceful shutdown timeout in ms
handleShutdownSignals: ['SIGTERM', 'SIGINT'], // Optional: signals to handle
},
})Characteristics:
- Persistent WebSocket connection - your app connects to Inngest
- No need for public HTTP endpoint - works behind firewalls
- Ideal for Kubernetes, Docker, and containerized environments
- Better for long-running workers and high-throughput scenarios
- Automatic reconnection handling
When to Use Each Mode
| Use Case | Recommended Mode | |----------|------------------| | Serverless (Vercel, Lambda) | Serve | | Kubernetes deployment | Connect | | Behind corporate firewall | Connect | | Development with Inngest CLI | Serve | | Long-running workers | Connect | | Hybrid cloud/on-prem | Connect |
Environment Variable Configuration
You can also set the mode via environment variable:
# Set mode via environment variable
INNGEST_MODE=connect npm run start
# Or in your .env file
INNGEST_MODE=connectConnect Mode Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| instanceId | string | Auto-generated UUID | Unique identifier for this worker instance |
| maxConcurrency | number | undefined | Maximum concurrent function executions |
| shutdownTimeout | number | 30000 | Time in ms to wait for graceful shutdown |
| handleShutdownSignals | string[] | ['SIGTERM', 'SIGINT'] | Process signals to handle for shutdown |
Connection State API
When using connect mode, you can monitor the connection state:
@Injectable()
export class MyService {
constructor(private readonly inngestService: InngestService) {}
checkConnection() {
// Get current connection state
const state = this.inngestService.getConnectionState();
// Returns: 'ACTIVE' | 'CONNECTING' | 'RECONNECTING' | 'PAUSED' | 'CLOSING' | 'CLOSED' | 'NOT_APPLICABLE'
// Simple connected check
const isConnected = this.inngestService.isConnected();
// Returns true only if state is 'ACTIVE'
console.log(`Connection state: ${state}, connected: ${isConnected}`);
}
}Connection States:
ACTIVE- Connected and ready to receive workCONNECTING- Initial connection in progressRECONNECTING- Reconnecting after disconnectPAUSED- Connection temporarily pausedCLOSING- Graceful shutdown in progressCLOSED- Connection closedNOT_APPLICABLE- Using serve mode (no persistent connection)
Health Checks with Connect Mode
The health service is connection-aware and reports appropriate status:
// In serve mode
GET /health/inngest
{
"status": "ok",
"info": {
"inngest": {
"status": "up",
"mode": "serve",
"functions": 22
}
}
}
// In connect mode
GET /health/inngest
{
"status": "ok",
"info": {
"inngest": {
"status": "up",
"mode": "connect",
"connectionState": "ACTIVE",
"functions": 22
}
}
}Kubernetes Deployment with Connect Mode
Connect mode is ideal for Kubernetes because your pods don't need to be publicly accessible:
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inngest-worker
spec:
replicas: 3
template:
spec:
containers:
- name: worker
image: my-app:latest
env:
- name: INNGEST_MODE
value: "connect"
- name: INNGEST_SIGNING_KEY
valueFrom:
secretKeyRef:
name: inngest-secrets
key: signing-key
# Each pod gets a unique instance ID from metadata
- name: INNGEST_INSTANCE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
livenessProbe:
httpGet:
path: /health/inngest
port: 3000
initialDelaySeconds: 30
periodSeconds: 10// Configuration using K8s environment
InngestModule.forRootAsync({
useFactory: () => ({
id: process.env.INNGEST_APP_ID,
mode: 'connect',
signingKey: process.env.INNGEST_SIGNING_KEY,
connect: {
instanceId: process.env.INNGEST_INSTANCE_ID, // From K8s metadata.name
maxConcurrency: 5,
shutdownTimeout: 60000, // Give K8s time for graceful shutdown
},
}),
})Graceful Shutdown
Connect mode handles graceful shutdown automatically:
- When a shutdown signal is received (SIGTERM, SIGINT)
- The module stops accepting new work
- Waits for in-progress functions to complete (up to
shutdownTimeout) - Closes the WebSocket connection
- Allows NestJS to complete shutdown
To customize shutdown behavior:
InngestModule.forRoot({
id: 'my-app',
mode: 'connect',
connect: {
// Only handle SIGTERM (useful when you handle SIGINT yourself)
handleShutdownSignals: ['SIGTERM'],
// Or disable automatic signal handling entirely
// handleShutdownSignals: [],
// Extended timeout for long-running functions
shutdownTimeout: 120000, // 2 minutes
},
})Real-World Examples
1. User Onboarding Workflow
A comprehensive user onboarding flow with email verification and follow-ups:
import { Injectable } from '@nestjs/common';
import { InngestEvent, InngestService } from '@torixtv/nestjs-inngest';
interface UserEvents {
'user.registered': {
data: { userId: string; email: string; name: string };
};
'user.email-verified': {
data: { userId: string; verifiedAt: string };
};
}
@Injectable()
export class UserOnboardingService {
constructor(private readonly inngestService: InngestService) {}
@InngestEvent('user-onboarding-flow', 'user.registered')
async userOnboardingFlow({ event, step }: { event: UserEvents['user.registered']; step: any }) {
const { userId, email, name } = event.data;
// Step 1: Send welcome email with verification link
await step.run('send-welcome-email', async () => {
const verificationToken = await this.generateVerificationToken(userId);
await this.emailService.sendWelcomeEmail({
email,
name,
verificationLink: `https://app.example.com/verify?token=${verificationToken}`,
});
return { emailSent: true, token: verificationToken };
});
// Step 2: Wait for email verification (48-hour timeout)
const verificationEvent = await step.waitForEvent('wait-for-email-verification', {
event: 'user.email-verified',
timeout: '48h',
if: `async.data.userId == "${userId}"`,
});
if (!verificationEvent) {
// Email not verified in time - send reminder and mark as unverified
await step.run('send-verification-reminder', async () => {
await this.emailService.sendVerificationReminder(email, name);
await this.userService.markAsUnverified(userId);
return { reminderSent: true };
});
return { status: 'verification-timeout', userId };
}
// Step 3: Email verified - set up user profile
await step.run('setup-user-profile', async () => {
await this.userService.markAsVerified(userId);
await this.userService.createDefaultProfile(userId);
return { profileCreated: true };
});
// Step 4: Send onboarding completion email
await step.run('send-completion-email', async () => {
await this.emailService.sendOnboardingComplete(email, name);
return { completionEmailSent: true };
});
// Step 5: Schedule follow-up sequences
await step.sendEvent('schedule-follow-ups', [
{
name: 'user.schedule-tips-series',
data: { userId, email, startDate: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000) },
},
{
name: 'user.schedule-feedback-request',
data: { userId, email, requestDate: new Date(Date.now() + 30 * 24 * 60 * 60 * 1000) },
},
]);
return {
status: 'completed',
userId,
verifiedAt: verificationEvent.data.verifiedAt
};
}
// Follow-up tip series
@InngestEvent('send-tip-series', 'user.schedule-tips-series')
async sendTipSeries({ event, step }: { event: any; step: any }) {
const { userId, email } = event.data;
const tips = await this.getTipsForUser(userId);
for (let i = 0; i < tips.length; i++) {
await step.sleep(`wait-between-tips-${i}`, '3d'); // Wait 3 days between tips
await step.run(`send-tip-${i + 1}`, async () => {
await this.emailService.sendTip(email, tips[i]);
return { tipSent: true, tipIndex: i + 1 };
});
}
return { tipsSent: tips.length, userId };
}
private async generateVerificationToken(userId: string): Promise<string> {
// Generate and store verification token
return `verify_${userId}_${Date.now()}`;
}
private async getTipsForUser(userId: string): Promise<string[]> {
// Return personalized tips based on user profile
return ['tip1', 'tip2', 'tip3'];
}
}2. E-commerce Order Processing
A robust order processing workflow with payment, inventory, and fulfillment:
import { Injectable } from '@nestjs/common';
import { InngestEvent, Concurrency, Retries } from '@torixtv/nestjs-inngest';
@Injectable()
export class OrderProcessingService {
@InngestEvent('process-order', 'order.submitted')
@Concurrency(10) // Process up to 10 orders concurrently
@Retries(3) // Retry failed orders up to 3 times
async processOrder({ event, step }: { event: any; step: any }) {
const { orderId, customerId, items, paymentMethod } = event.data;
try {
// Step 1: Validate order and check inventory
const validation = await step.run('validate-order', async () => {
const order = await this.orderService.getOrder(orderId);
const inventoryCheck = await this.inventoryService.checkAvailability(items);
if (!inventoryCheck.available) {
throw new Error(`Insufficient inventory: ${inventoryCheck.unavailableItems.join(', ')}`);
}
return {
order,
totalAmount: order.totalAmount,
inventoryReserved: inventoryCheck.reservationId
};
});
// Step 2: Process payment
const payment = await step.run('process-payment', async () => {
const paymentResult = await this.paymentService.processPayment({
amount: validation.totalAmount,
customerId,
paymentMethod,
orderId,
});
if (!paymentResult.success) {
throw new Error(`Payment failed: ${paymentResult.error}`);
}
return {
transactionId: paymentResult.transactionId,
paidAmount: paymentResult.amount,
paidAt: new Date(),
};
});
// Step 3: Reserve inventory
await step.run('reserve-inventory', async () => {
await this.inventoryService.reserveItems(orderId, items);
return { inventoryReserved: true };
});
// Step 4: Create fulfillment order
const fulfillment = await step.run('create-fulfillment-order', async () => {
const fulfillmentOrder = await this.fulfillmentService.createOrder({
orderId,
customerId,
items,
shippingAddress: validation.order.shippingAddress,
});
return {
fulfillmentOrderId: fulfillmentOrder.id,
estimatedShipping: fulfillmentOrder.estimatedShipping,
};
});
// Step 5: Send confirmation email
await step.run('send-order-confirmation', async () => {
await this.emailService.sendOrderConfirmation({
email: validation.order.customerEmail,
orderId,
transactionId: payment.transactionId,
estimatedShipping: fulfillment.estimatedShipping,
});
return { confirmationEmailSent: true };
});
// Step 6: Send downstream events
await step.sendEvent('order-processed-events', [
{
name: 'analytics.order-completed',
data: {
orderId,
customerId,
amount: payment.paidAmount,
timestamp: new Date()
},
},
{
name: 'fulfillment.order-ready',
data: {
orderId,
fulfillmentOrderId: fulfillment.fulfillmentOrderId,
priority: validation.order.priority || 'standard',
},
},
{
name: 'customer.purchase-completed',
data: {
customerId,
orderId,
amount: payment.paidAmount,
items: items.length,
},
},
]);
return {
success: true,
orderId,
transactionId: payment.transactionId,
fulfillmentOrderId: fulfillment.fulfillmentOrderId,
};
} catch (error) {
// Handle failures - release any reserved inventory
await step.run('handle-order-failure', async () => {
await this.inventoryService.releaseReservation(orderId);
await this.orderService.markAsFailed(orderId, error.message);
// Send failure notification
await this.emailService.sendOrderFailureNotification({
email: validation?.order?.customerEmail,
orderId,
reason: error.message,
});
return { failureHandled: true, reason: error.message };
});
throw error; // Re-throw to trigger Inngest's retry mechanism
}
}
// Handle order cancellations
@InngestEvent('cancel-order', 'order.cancelled')
async cancelOrder({ event, step }: { event: any; step: any }) {
const { orderId, reason } = event.data;
// Step 1: Get order details
const order = await step.run('get-order-details', async () => {
return await this.orderService.getOrder(orderId);
});
// Step 2: Process refund if payment was processed
if (order.paymentStatus === 'completed') {
await step.run('process-refund', async () => {
const refund = await this.paymentService.processRefund({
transactionId: order.transactionId,
amount: order.totalAmount,
reason,
});
return { refundId: refund.id, refundAmount: refund.amount };
});
}
// Step 3: Release inventory
await step.run('release-inventory', async () => {
await this.inventoryService.releaseReservation(orderId);
return { inventoryReleased: true };
});
// Step 4: Cancel fulfillment if exists
if (order.fulfillmentOrderId) {
await step.run('cancel-fulfillment', async () => {
await this.fulfillmentService.cancelOrder(order.fulfillmentOrderId);
return { fulfillmentCancelled: true };
});
}
// Step 5: Send cancellation confirmation
await step.run('send-cancellation-email', async () => {
await this.emailService.sendCancellationConfirmation({
email: order.customerEmail,
orderId,
refundAmount: order.paymentStatus === 'completed' ? order.totalAmount : 0,
reason,
});
return { cancellationEmailSent: true };
});
return { success: true, orderId, cancelled: true };
}
}3. Scheduled Data Cleanup Job
A comprehensive data cleanup job that runs daily:
import { Injectable } from '@nestjs/common';
import { InngestCron } from '@torixtv/nestjs-inngest';
@Injectable()
export class DataCleanupService {
// Run daily at 2 AM UTC
@InngestCron('daily-data-cleanup', '0 2 * * *')
async dailyDataCleanup({ step }: { step: any }) {
const startTime = new Date();
// Step 1: Clean up expired sessions
const sessionCleanup = await step.run('cleanup-expired-sessions', async () => {
const expiredSessions = await this.sessionService.getExpiredSessions();
const deletedCount = await this.sessionService.deleteExpiredSessions();
return {
expiredSessionsFound: expiredSessions.length,
deletedSessions: deletedCount,
};
});
// Step 2: Clean up temporary files
const fileCleanup = await step.run('cleanup-temporary-files', async () => {
const tempFiles = await this.fileService.getTemporaryFiles();
const deletedFiles = [];
for (const file of tempFiles) {
try {
await this.fileService.deleteFile(file.id);
deletedFiles.push(file.id);
} catch (error) {
console.warn(`Failed to delete file ${file.id}:`, error);
}
}
return {
temporaryFilesFound: tempFiles.length,
deletedFiles: deletedFiles.length,
};
});
// Step 3: Archive old audit logs
const logArchiving = await step.run('archive-old-audit-logs', async () => {
const cutoffDate = new Date();
cutoffDate.setMonth(cutoffDate.getMonth() - 6); // Archive logs older than 6 months
const oldLogs = await this.auditService.getLogsOlderThan(cutoffDate);
const archivedCount = await this.auditService.archiveLogs(oldLogs.map(log => log.id));
return {
oldLogsFound: oldLogs.length,
archivedLogs: archivedCount,
};
});
// Step 4: Clean up orphaned database records
const dbCleanup = await step.run('cleanup-orphaned-records', async () => {
const orphanedRecords = await this.databaseService.findOrphanedRecords();
const cleanedTables = [];
for (const [tableName, records] of Object.entries(orphanedRecords)) {
if (records.length > 0) {
const deletedCount = await this.databaseService.cleanupOrphanedRecords(tableName);
cleanedTables.push({ tableName, deletedCount });
}
}
return { cleanedTables };
});
// Step 5: Update database statistics
await step.run('update-database-stats', async () => {
await this.databaseService.updateTableStatistics();
return { statisticsUpdated: true };
});
// Step 6: Generate cleanup report
const report = await step.run('generate-cleanup-report', async () => {
const endTime = new Date();
const duration = endTime.getTime() - startTime.getTime();
return {
date: startTime.toISOString().split('T')[0],
duration: `${Math.round(duration / 1000)}s`,
sessionsCleanup: sessionCleanup,
filesCleanup: fileCleanup,
logsArchiving: logArchiving,
databaseCleanup: dbCleanup,
};
});
// Step 7: Send report to administrators
await step.run('send-cleanup-report', async () => {
await this.emailService.sendCleanupReport({
recipients: ['[email protected]', '[email protected]'],
report,
});
return { reportSent: true };
});
return report;
}
// Weekly comprehensive cleanup (Sundays at 3 AM UTC)
@InngestCron('weekly-deep-cleanup', '0 3 * * 0')
async weeklyDeepCleanup({ step }: { step: any }) {
// Step 1: Optimize database indexes
await step.run('optimize-database-indexes', async () => {
await this.databaseService.optimizeIndexes();
return { indexesOptimized: true };
});
// Step 2: Clean up old backups
await step.run('cleanup-old-backups', async () => {
const oldBackups = await this.backupService.getOldBackups(30); // Older than 30 days
const deletedCount = await this.backupService.deleteBackups(oldBackups);
return {
oldBackupsFound: oldBackups.length,
deletedBackups: deletedCount,
};
});
// Step 3: Vacuum database
await step.run('vacuum-database', async () => {
await this.databaseService.vacuum();
return { databaseVacuumed: true };
});
return { success: true, cleanupType: 'weekly-deep-cleanup' };
}
}4. Event-Driven Microservice Communication
Cross-service communication using events for a distributed e-commerce system:
// Order Service
@Injectable()
export class OrderService {
@InngestEvent('handle-payment-completed', 'payment.completed')
async handlePaymentCompleted({ event, step }: { event: any; step: any }) {
const { orderId, paymentId, amount } = event.data;
await step.run('update-order-payment-status', async () => {
await this.updateOrderPaymentStatus(orderId, 'completed', paymentId);
return { orderUpdated: true };
});
// Trigger inventory reservation
await step.sendEvent('request-inventory-reservation', {
name: 'inventory.reserve-requested',
data: { orderId, items: event.data.items },
});
return { success: true, orderId };
}
@InngestEvent('handle-inventory-reserved', 'inventory.reserved')
async handleInventoryReserved({ event, step }: { event: any; step: any }) {
const { orderId, reservationId } = event.data;
await step.run('update-order-inventory-status', async () => {
await this.updateOrderInventoryStatus(orderId, 'reserved', reservationId);
return { orderUpdated: true };
});
// Trigger fulfillment
await step.sendEvent('request-fulfillment', {
name: 'fulfillment.order-ready',
data: { orderId, reservationId },
});
}
@InngestEvent('handle-fulfillment-shipped', 'fulfillment.shipped')
async handleFulfillmentShipped({ event, step }: { event: any; step: any }) {
const { orderId, trackingNumber, shippedAt } = event.data;
await step.run('update-order-shipping-status', async () => {
await this.updateOrderShippingStatus(orderId, 'shipped', trackingNumber, shippedAt);
return { orderUpdated: true };
});
// Send customer notification
await step.sendEvent('send-shipping-notification', {
name: 'notification.shipping-confirmation',
data: { orderId, trackingNumber, shippedAt },
});
}
}
// Inventory Service
@Injectable()
export class InventoryService {
@InngestEvent('reserve-inventory', 'inventory.reserve-requested')
async reserveInventory({ event, step }: { event: any; step: any }) {
const { orderId, items } = event.data;
const reservation = await step.run('check-and-reserve', async () => {
const availability = await this.checkAvailability(items);
if (!availability.available) {
throw new Error(`Items not available: ${availability.unavailableItems.join(', ')}`);
}
const reservationId = await this.reserveItems(orderId, items);
return { reservationId, items };
});
// Confirm reservation to order service
await step.sendEvent('confirm-reservation', {
name: 'inventory.reserved',
data: {
orderId,
reservationId: reservation.reservationId,
items: reservation.items,
},
});
return { success: true, reservationId: reservation.reservationId };
}
@InngestEvent('release-inventory', 'inventory.release-requested')
async releaseInventory({ event, step }: { event: any; step: any }) {
const { orderId, reservationId } = event.data;
await step.run('release-reservation', async () => {
await this.releaseReservation(reservationId);
return { released: true };
});
// Confirm release
await step.sendEvent('confirm-release', {
name: 'inventory.released',
data: { orderId, reservationId },
});
}
}
// Fulfillment Service
@Injectable()
export class FulfillmentService {
@InngestEvent('create-fulfillment-order', 'fulfillment.order-ready')
async createFulfillmentOrder({ event, step }: { event: any; step: any }) {
const { orderId, reservationId } = event.data;
const fulfillmentOrder = await step.run('create-fulfillment', async () => {
const orderDetails = await this.getOrderDetails(orderId);
const fulfillmentId = await this.createFulfillmentOrder(orderDetails);
return { fulfillmentId, orderDetails };
});
// Wait for warehouse to pick and pack
await step.waitForEvent('wait-for-packed', {
event: 'warehouse.packed',
timeout: '24h',
if: `async.data.fulfillmentId == "${fulfillmentOrder.fulfillmentId}"`,
});
// Ship the order
const shipping = await step.run('ship-order', async () => {
const trackingInfo = await this.shipOrder(fulfillmentOrder.fulfillmentId);
return {
trackingNumber: trackingInfo.trackingNumber,
carrier: trackingInfo.carrier,
shippedAt: new Date(),
};
});
// Notify order service
await step.sendEvent('notify-shipped', {
name: 'fulfillment.shipped',
data: {
orderId,
fulfillmentId: fulfillmentOrder.fulfillmentId,
trackingNumber: shipping.trackingNumber,
carrier: shipping.carrier,
shippedAt: shipping.shippedAt,
},
});
return { success: true, trackingNumber: shipping.trackingNumber };
}
}
// Notification Service
@Injectable()
export class NotificationService {
@InngestEvent('send-shipping-confirmation', 'notification.shipping-confirmation')
@Throttle(100, '1h') // Prevent spam
async sendShippingConfirmation({ event, step }: { event: any; step: any }) {
const { orderId, trackingNumber, shippedAt } = event.data;
const orderDetails = await step.run('get-order-details', async () => {
return await this.getOrderDetails(orderId);
});
await step.run('send-email', async () => {
await this.emailService.sendShippingConfirmation({
email: orderDetails.customerEmail,
customerName: orderDetails.customerName,
orderId,
trackingNumber,
shippedAt,
items: orderDetails.items,
});
return { emailSent: true };
});
await step.run('send-sms', async () => {
if (orderDetails.smsNotifications && orderDetails.phoneNumber) {
await this.smsService.sendShippingNotification({
phoneNumber: orderDetails.phoneNumber,
orderId,
trackingNumber,
});
return { smsSent: true };
}
return { smsSent: false, reason: 'sms-not-enabled' };
});
return { success: true, orderId, notificationsSent: true };
}
}API Reference
Configuration Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| id | string | Required | Your Inngest app ID |
| eventKey | string | undefined | Event key for sending events |
| baseUrl | string | undefined | Inngest server URL (omit for cloud) |
| signingKey | string | undefined | Webhook signing key for production |
| isGlobal | boolean | false | Make module available globally |
| mode | 'serve' \| 'connect' | 'serve' | Connection mode: HTTP webhooks or WebSocket |
| connect | InngestConnectOptions | {} | Connect mode configuration (see Connection Modes) |
| path | string | 'inngest' | API endpoint path (serve mode only) |
| servePort | number | process.env.PORT \|\| 3000 | Port where your app runs (for auto-registration) |
| serveHost | string | 'localhost' | Host/URL where your app runs. Can be hostname ('localhost') or full URL ('https://myapp.com') |
| environment | string | 'development' | Environment name |
| middleware | InngestMiddleware[] | [] | Global middleware |
| logger | any | undefined | Custom logger |
| tracing | InngestTracingConfig | {} | Tracing configuration |
| monitoring | InngestMonitoringConfig | {} | Monitoring configuration |
| health | InngestHealthConfig | {} | Health check configuration |
Decorators
@InngestFunction(config)
interface InngestFunctionConfig {
id: string; // Unique function ID
trigger: TriggerConfig; // Event trigger or cron schedule
concurrency?: number | ConcurrencyConfig;
retries?: number;
batchEvents?: BatchConfig;
cancelOn?: CancelConfig[];
rateLimit?: RateLimit;
throttle?: ThrottleConfig;
debounce?: DebounceConfig;
}@InngestEvent(id, event, options?)
Shorthand for event-triggered functions.
// Simple event
@InngestEvent('function-id', 'event.name')
// Event with conditions
@InngestEvent('function-id', {
event: 'event.name',
if: 'event.data.amount > 100'
})
// Multiple events
@InngestEvent('function-id', ['event.one', 'event.two'])@InngestCron(id, cron, options?)
Shorthand for scheduled functions.
@InngestCron('daily-job', '0 9 * * *') // Daily at 9 AM
@InngestCron('hourly-job', '0 * * * *') // Every hour
@InngestCron('weekly-job', '0 9 * * 1') // Mondays at 9 AMMiddleware Decorators
@UseMiddleware(...middleware) // Custom middleware
@Concurrency(limit, options?) // Concurrency control
@RateLimit(limit, period, key?) // Rate limiting
@Throttle(limit, period, options?) // Throttling with burst
@Debounce(period, key?) // Debouncing
@Retries(count) // Retry configurationInngestService Methods
class InngestService {
// Send single event
send(event: EventPayload): Promise<void>
// Send multiple events
send(events: EventPayload[]): Promise<void>
// Get Inngest client instance
getClient(): Inngest
// Get current connection state (connect mode only)
getConnectionState(): 'ACTIVE' | 'CONNECTING' | 'RECONNECTING' | 'PAUSED' | 'CLOSING' | 'CLOSED' | 'NOT_APPLICABLE'
// Check if actively connected (connect mode only)
isConnected(): boolean
// Get module options
getOptions(): InngestModuleOptions
// Get registered functions
getFunctions(): InngestFunction[]
}InngestHealthIndicator Methods
For use with @nestjs/terminus health checks:
class InngestHealthIndicator {
// Check if Inngest is healthy
// - Serve mode: client is initialized
// - Connect mode: WebSocket connection is ACTIVE
isHealthy(key: string): Promise<HealthIndicatorResult>
// Check if Inngest is ready (includes function registration check)
isReady(key: string): Promise<HealthIndicatorResult>
}Step Functions API
interface StepTools {
// Run a step
run<T>(id: string, fn: () => Promise<T>): Promise<T>
// Wait for an event
waitForEvent(id: string, config: {
event: string;
timeout: string;
if?: string;
match?: string;
}): Promise<EventPayload | null>
// Send event(s)
sendEvent(id: string, event: EventPayload): Promise<void>
sendEvent(id: string, events: EventPayload[]): Promise<void>
// Sleep for a duration
sleep(id: string, duration: string): Promise<void>
// Sleep until a specific time
sleepUntil(id: string, date: Date): Promise<void>
}Testing Utilities
// Create testing module
createInngestTestingModule(
config: InngestModuleOptions,
providers: Provider[]
): Promise<TestingModule>
// Mock service
class MockInngestService {
send(event: EventPayload | EventPayload[]): Promise<void>
getEvents(): EventPayload[]
clearEvents(): void
getClient(): Inngest
}
// Create mock context
createMockInngestContext(overrides?: Partial<Context>): MockContextTesting
Unit Testing with Mocks
import { Test, TestingModule } from '@nestjs/testing';
import { MockInngestService, createMockInngestContext } from '@torixtv/nestjs-inngest';
describe('UserService', () => {
let service: UserService;
let mockInngestService: MockInngestService;
beforeEach(async () => {
mockInngestService = new MockInngestService();
const module: TestingModule = await Test.createTestingModule({
providers: [
UserService,
{
provide: InngestService,
useValue: mockInngestService,
},
],
}).compile();
service = module.get<UserService>(UserService);
});
it('should send user.created event', async () => {
await service.createUser('[email protected]');
const events = mockInngestService.getEvents();
expect(events).toHaveLength(1);
expect(events[0].name).toBe('user.created');
});
it('should test function handler directly', async () => {
const mockContext = createMockInngestContext({
event: {
name: 'user.created',
data: { userId: 'test-123', email: '[email protected]' },
},
});
const result = await service.welcomeNewUser(mockContext);
expect(result.success).toBe(true);
expect(mockContext.step.run).toHaveBeenCalledWith(
'send-welcome-email',
expect.any(Function)
);
});
});Integration Testing
import { createInngestTestingModule } from '@torixtv/nestjs-inngest';
