@flowmonkey/express
v0.0.1
Published
Express integration for FlowMonkey workflows
Maintainers
Readme
@flowmonkey/express
Express integration for FlowMonkey workflow engine.
This package provides a complete REST API for managing FlowMonkey workflows, including a dependency injection container, pre-built route handlers, middleware, and a fluent builder API.
Table of Contents
- Installation
- Quick Start
- FlowMonkeyExpress Builder
- Available Routes
- Service Container
- Middleware
- Custom Routes
- API Reference
Installation
pnpm add @flowmonkey/express @flowmonkey/core @flowmonkey/postgresQuick Start
import express from 'express';
import { Pool } from 'pg';
import { FlowMonkeyExpress } from '@flowmonkey/express';
import { httpHandler, delayHandler } from '@flowmonkey/handlers';
const app = express();
app.use(express.json());
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// Build FlowMonkey integration
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
.handler(httpHandler)
.handler(delayHandler)
.flow({
id: 'my-workflow',
version: '1.0.0',
name: 'My Workflow',
initialStepId: 'start',
steps: {
start: {
id: 'start',
type: 'http',
config: { url: 'https://api.example.com/data' },
input: { type: 'static', value: {} },
outputKey: 'data',
transitions: { onSuccess: null },
},
},
})
.build();
app.listen(3000, () => {
console.log('FlowMonkey server running on port 3000');
});This sets up a complete REST API with routes for managing executions, flows, and health checks.
FlowMonkeyExpress Builder
The builder provides a fluent API for configuring FlowMonkey integration.
Basic Configuration
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app) // Express application (required)
.database(pool) // PostgreSQL pool (recommended)
.build();Database Configuration
For production use, provide a PostgreSQL connection pool:
import { Pool } from 'pg';
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
.build();The builder automatically:
- Creates
PgExecutionStorefor execution persistence - Creates
PgFlowStorefor flow definitions - Creates
PgJobStorefor background jobs - Creates
PgEventStorefor event logging - Applies the database schema if needed
For custom stores:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.stateStore(customStateStore)
.flowRegistry(customFlowRegistry)
.handlerRegistry(customHandlerRegistry)
.build();Registering Handlers and Flows
Add handlers and flows through the builder:
import { httpHandler, delayHandler } from '@flowmonkey/handlers';
import { HttpHandler, TransformHandler } from '@flowmonkey/handlers/class';
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
// Function-based handlers
.handler(httpHandler)
.handler(delayHandler)
// Class-based handlers
.handler(new HttpHandler())
.handler(new TransformHandler())
// Flows
.flow(orderWorkflow)
.flow(notificationWorkflow)
.build();You can also add handlers and flows after building:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
.build();
// Add later
flowmonkey.registerHandler(customHandler);
flowmonkey.registerFlow(newWorkflow);Route Configuration
Control which routes are registered:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.routes({
executions: true, // POST /flows/:flowId/start, GET/POST /executions/:id
resumeTokens: true, // POST /tokens/:token/resume
admin: false, // GET /admin/flows, /admin/handlers (disabled)
health: true, // GET /health, /ready
})
.build();Add a prefix to all routes:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.prefix('/api/v1')
.build();
// Routes become:
// POST /api/v1/flows/:flowId/start
// GET /api/v1/executions/:executionId
// etc.Middleware
Add middleware applied to all FlowMonkey routes:
import rateLimit from 'express-rate-limit';
import helmet from 'helmet';
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.use(helmet())
.use(rateLimit({ windowMs: 15 * 60 * 1000, max: 100 }))
.use((req, res, next) => {
console.log(`FlowMonkey request: ${req.method} ${req.path}`);
next();
})
.build();Context Extraction
Extract tenant, user, and metadata from requests:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.context({
// Extract tenant ID from header or JWT
getTenantId: (req) => {
return req.headers['x-tenant-id'] as string
|| (req as any).user?.tenantId;
},
// Extract user ID from authenticated user
getUserId: (req) => {
return (req as any).user?.id;
},
// Add custom metadata to executions
getMetadata: (req) => ({
userAgent: req.headers['user-agent'],
ip: req.ip,
requestId: req.headers['x-request-id'],
}),
})
.build();This data is automatically attached to executions created via the API.
Lifecycle Hooks
Hook into FlowMonkey lifecycle events:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.hooks({
// Called when container is configured
onContainerReady: (container) => {
console.log('Container ready');
// Register additional services
container.registerFactory(
Symbol.for('my:Logger'),
() => new Logger()
);
},
// Called after routes are registered
onRoutesRegistered: (app) => {
console.log('Routes registered');
},
// Called when an execution starts
onExecutionStart: (executionId) => {
metrics.increment('executions.started');
},
// Called when an execution completes
onExecutionComplete: (executionId, success) => {
if (success) {
metrics.increment('executions.completed');
} else {
metrics.increment('executions.failed');
}
},
})
.build();Available Routes
Execution Routes
Start Execution
POST /api/flows/:flowId/start
Content-Type: application/json
{
"context": {
"user": { "id": "123", "name": "Alice" },
"order": { "total": 99.99 }
},
"options": {
"idempotencyKey": "unique-request-id",
"idempotencyTTL": 86400000
}
}Response:
{
"execution": {
"id": "exec_abc123",
"flowId": "my-workflow",
"status": "pending",
"createdAt": 1706500000000
},
"created": true
}Get Execution
GET /api/executions/:executionIdResponse:
{
"id": "exec_abc123",
"flowId": "my-workflow",
"flowVersion": "1.0.0",
"currentStepId": "process-data",
"status": "running",
"context": {
"user": { "id": "123" },
"data": { "fetched": true }
},
"stepCount": 2,
"createdAt": 1706500000000,
"updatedAt": 1706500100000
}Cancel Execution
POST /api/executions/:executionId/cancel
Content-Type: application/json
{
"reason": "User requested cancellation"
}Response:
{
"cancelled": true
}Resume Token Routes
Resume with Token
POST /api/tokens/:token/resume
Content-Type: application/json
{
"data": {
"approved": true,
"comment": "Looks good"
}
}Response:
{
"execution": {
"id": "exec_abc123",
"status": "running"
}
}Admin Routes
List Flows
GET /api/admin/flowsResponse:
{
"flows": [
{
"id": "order-workflow",
"version": "1.0.0",
"name": "Order Processing"
},
{
"id": "notification-workflow",
"version": "2.1.0",
"name": "Notification Service"
}
]
}List Handlers
GET /api/admin/handlersResponse:
{
"handlers": [
{
"type": "http",
"name": "HTTP Request",
"category": "external",
"stateful": false
},
{
"type": "delay",
"name": "Delay",
"category": "utility",
"stateful": false
}
]
}Health Routes
Health Check
GET /healthResponse:
{
"status": "ok",
"timestamp": 1706500000000
}Readiness Check
GET /readyResponse (healthy):
{
"status": "ready",
"checks": {
"database": "ok",
"handlers": "ok"
}
}Response (unhealthy):
{
"status": "not ready",
"checks": {
"database": "error: connection refused",
"handlers": "ok"
}
}Service Container
The service container manages dependencies and allows access to FlowMonkey internals.
Service Tokens
import { ServiceTokens } from '@flowmonkey/express';
// Available tokens
ServiceTokens.ExecutionEngine // Engine instance
ServiceTokens.FlowRegistry // Flow registry
ServiceTokens.HandlerRegistry // Handler registry
ServiceTokens.EventBus // Event bus
ServiceTokens.StateStore // Execution store
ServiceTokens.ContextStorage // Context storage
ServiceTokens.JobStore // Job queue
ServiceTokens.EventStore // Event log
ServiceTokens.ResumeTokenManager // Resume tokens
ServiceTokens.VaultProvider // Secrets vault
ServiceTokens.JobRunner // Job processor
ServiceTokens.DatabasePool // PostgreSQL pool
ServiceTokens.ExpressApp // Express appResolving Services
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
.build();
// Get container
const container = flowmonkey.getContainer();
// Resolve services
const engine = container.resolve(ServiceTokens.ExecutionEngine);
const flows = container.resolve(ServiceTokens.FlowRegistry);
const handlers = container.resolve(ServiceTokens.HandlerRegistry);
// Shortcut methods
const engine = flowmonkey.getEngine();Registering Custom Services
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.hooks({
onContainerReady: (container) => {
// Register instance
container.registerInstance(
Symbol.for('my:Config'),
{ apiKey: process.env.API_KEY }
);
// Register factory (lazy creation)
container.registerFactory(
Symbol.for('my:ApiClient'),
(c) => {
const config = c.resolve(Symbol.for('my:Config'));
return new ApiClient(config.apiKey);
}
);
},
})
.build();
// Later: resolve custom service
const apiClient = flowmonkey.resolve(Symbol.for('my:ApiClient'));Middleware
Context Middleware
Extracts tenant/user information and attaches it to the request:
import { createContextMiddleware } from '@flowmonkey/express';
const contextMiddleware = createContextMiddleware(container, {
getTenantId: (req) => req.headers['x-tenant-id'] as string,
getUserId: (req) => (req as any).user?.id,
});
app.use('/api', contextMiddleware);Error Handler
Handles errors and returns appropriate HTTP responses:
import { createErrorHandler } from '@flowmonkey/express';
const errorHandler = createErrorHandler();
// Apply after routes
app.use(errorHandler);Error types and status codes:
| Error Type | Status Code |
|------------|-------------|
| ValidationError | 400 |
| UnauthorizedError | 401 |
| NotFoundError | 404 |
| Other errors | 500 |
Async Handler
Wraps async route handlers to catch errors:
import { asyncHandler } from '@flowmonkey/express';
app.get('/custom', asyncHandler(async (req, res) => {
const data = await fetchData();
res.json(data);
}));Custom Routes
Add custom routes that access FlowMonkey services:
const flowmonkey = await FlowMonkeyExpress.builder()
.app(app)
.database(pool)
.hooks({
onRoutesRegistered: (expressApp) => {
const container = flowmonkey.getContainer();
const engine = container.resolve(ServiceTokens.ExecutionEngine);
// Custom route to run and wait for completion
expressApp.post('/api/flows/:flowId/run-sync', async (req, res, next) => {
try {
const { execution } = await engine.create(req.params.flowId, req.body);
const result = await engine.run(execution.id);
res.json(result);
} catch (error) {
next(error);
}
});
// Custom metrics endpoint
expressApp.get('/api/metrics', async (req, res) => {
const store = container.resolve(ServiceTokens.StateStore);
const running = await store.findByStatus('running', 1000);
const waiting = await store.findByStatus('waiting', 1000);
res.json({
executions: {
running: running.length,
waiting: waiting.length,
},
});
});
},
})
.build();API Reference
FlowMonkeyExpress
class FlowMonkeyExpress {
// Create builder
static builder(): FlowMonkeyExpressBuilder;
// Get service container
getContainer(): ServiceContainer;
// Resolve service by token
resolve<T>(token: ServiceToken): T;
// Register handler after build
registerHandler(handler: StepHandler): void;
// Register flow after build
registerFlow(flow: Flow): void;
// Get engine instance
getEngine(): Engine;
}FlowMonkeyExpressBuilder
class FlowMonkeyExpressBuilder {
// Required: Express app
app(app: Application): this;
// PostgreSQL pool
database(pool: Pool): this;
// Custom stores
stateStore(store: StateStore): this;
handlerRegistry(registry: HandlerRegistry): this;
flowRegistry(registry: FlowRegistry): this;
eventBus(bus: EventBus): this;
// Vault for secrets
vault(provider: VaultProvider): this;
// Route configuration
routes(config: RouteConfig): this;
prefix(prefix: string): this;
// Middleware
use(...middleware: RequestHandler[]): this;
// Context extraction
context(options: ContextMiddlewareOptions): this;
// Add handler
handler(handler: StepHandler): this;
// Add flow
flow(flow: Flow): this;
// Lifecycle hooks
hooks(hooks: LifecycleHooks): this;
// Build instance
build(): Promise<FlowMonkeyExpress>;
}ServiceContainer
class ServiceContainer {
// Register instance
registerInstance<T>(token: ServiceToken, instance: T): this;
// Register factory
registerFactory<T>(
token: ServiceToken,
factory: (container: ServiceContainer) => T,
singleton?: boolean
): this;
// Resolve service
resolve<T>(token: ServiceToken): T;
// Try to resolve (returns undefined if not found)
tryResolve<T>(token: ServiceToken): T | undefined;
// Check if registered
has(token: ServiceToken): boolean;
// Get all registered tokens
getRegisteredTokens(): ServiceToken[];
// Clear all services
clear(): void;
}Route Types
interface RouteConfig {
executions?: boolean; // Default: true
resumeTokens?: boolean; // Default: true
admin?: boolean; // Default: true
health?: boolean; // Default: true
}
interface ContextMiddlewareOptions {
getTenantId?: (req: Request) => string | undefined;
getUserId?: (req: Request) => string | undefined;
getMetadata?: (req: Request) => Record<string, unknown>;
}
interface LifecycleHooks {
onContainerReady?: (container: ServiceContainer) => void | Promise<void>;
onRoutesRegistered?: (app: Application) => void | Promise<void>;
onExecutionStart?: (executionId: string) => void;
onExecutionComplete?: (executionId: string, success: boolean) => void;
}License
MIT
