@robertkirk/jessie
v0.6.0
Published
Simple, Type-safe, Observable, & Concurrent pipeline framework. Best for IO bottlenecked workflows.
Maintainers
Readme
jessie
A type-safe, observable pipeline framework with dependency injection for building robust data processing workflows. Best for io bottlenecked workflows like calling external APIs.
Features
- 🔒 Type-safe: Full TypeScript support with context inference across pipeline steps
- ⚡ Observable: Comprehensive logging hooks for monitoring and debugging
- ⏱️ Timeout Support: Per-step timeout configuration with automatic enforcement
- 🚀 Enhanced Queue Processing: Full-featured queue management with priority, pause/resume, and statistics
- 🧩 Composable: Support for sub-pipelines with parent-child relationships
- 💉 Dependency Injected: No hard dependencies - bring your own logger, database, etc.
- 📊 Metrics Ready: Built-in timing, failure reporting, and queue statistics
When to use Jessie
- You have a workflow that is IO bottlenecked (Jessie is not multithreaded)
- You want a simple way to build a pipeline
- You want logging and observability
- You want typesafety
- You want a simple solution with no dependencies (no Redis or other external services)
Installation
npm install @robertkirk/jessie
# or
yarn add @robertkirk/jessie
# or
bun add @robertkirk/jessieQuick Start
import {
Pipeline,
Queue,
PipelineOptions,
createConsoleLogger
} from '@robertkirk/jessie';
// Define your context type (only your data, no framework boilerplate)
interface MyContext {
userId: string;
}
// Create pipeline options
const opts: PipelineOptions = {
idGen: () => crypto.randomUUID(),
logger: createConsoleLogger()
};
// Build your pipeline
const pipeline = new Pipeline<MyContext>('data-processing', opts)
.step('fetchUser', async (ctx) => {
// Your async logic here
return { id: ctx.userId, name: 'John Doe' };
}, { timeoutMs: 5000 })
.step('processData', async (ctx) => {
// Access previous step result via ctx.fetchUser
return `Processed: ${ctx.fetchUser.name}`;
})
.step('saveResult', async (ctx) => {
// Save the processed data
console.log('Saving:', ctx.processData);
return 'saved-id-123';
});
// Execute the pipeline - only provide your data
const result = await pipeline.run({ userId: 'user-123' });
console.log('Final result:', result.saveResult);
// Or process multiple items with enhanced queue management
const queue = new Queue(pipeline, { concurrency: 2 });
queue.add({ userId: 'user-1' }, 5); // High priority
queue.add({ userId: 'user-2' }, 1); // Low priority
await queue.start();Core Concepts
Pipeline
A Pipeline is a sequence of typed steps that build up a context object. Each step receives the accumulated context from previous steps and adds its result to the context.
const opts: PipelineOptions = {
idGen: () => crypto.randomUUID(),
logger: createConsoleLogger()
};
const pipeline = new Pipeline<{ value: number }>('my-pipeline', opts)
.step('step1', async (ctx) => 'result1')
.step('step2', async (ctx) => ctx.step1.toUpperCase()) // TypeScript knows ctx.step1 exists
.step('step3', async (ctx) => `${ctx.step1}-${ctx.step2}`); // And ctx.step2Pipeline Visualization and Introspection
Pipelines support detailed introspection for visualization and debugging:
// Add steps with descriptions for better visualization
const pipeline = new Pipeline<{ userId: string }>('user-pipeline', opts)
.step('authenticate', async (ctx) => ({ valid: true, role: 'user' }),
{
timeoutMs: 3000,
description: 'Authenticates the user and determines their role'
})
.step('fetchData', async (ctx) => getUserData(ctx.userId),
{ description: 'Retrieves user data from the database' })
.step('transform', async (ctx) => processData((ctx as any).fetchData))
.step('validate', async (ctx) => (ctx as any).transform.length > 0,
{
concurrency: 1,
description: 'Ensures we have valid data to process'
});
// Get detailed step information for visualization
const stepOrder = pipeline.getStepOrder();
console.log(`Pipeline: ${stepOrder.pipelineName}`);
console.log(`Total steps: ${stepOrder.totalSteps}`);
stepOrder.steps.forEach(step => {
console.log(`Step ${step.index}: ${step.key}`);
console.log(` Input context: ${step.inputContext.join(', ')}`);
console.log(` Output type: ${step.outputType}`);
if (step.description) {
console.log(` Description: ${step.description}`);
}
if (step.options) {
console.log(` Options: ${JSON.stringify(step.options)}`);
}
});
// Get enhanced type information with sample data
const typedOrder = pipeline.getStepOrderWithTypes({ userId: 'user-123' });
console.log('Initial context keys:', typedOrder.initialContextKeys);
console.log('Estimated types:', typedOrder.estimatedTypes);Automatic Pipeline Logging
Pipelines automatically provide detailed logging for all steps using the standard pipeline logger methods. This comprehensive logging helps with debugging, monitoring, and understanding pipeline execution:
const pipeline = new Pipeline<{ data: string }>('debug-pipeline', opts)
.step('process', async (ctx) => {
// Some complex processing
return processData(ctx.data);
}, {
description: 'Processes the input data with complex logic'
});Automatic pipeline logging provides:
- Complete step visibility: Every step logs start, completion, and context information
- Performance insights: Execution times for all steps
- Context tracking: See what data is available at each step
- Error details: Comprehensive error information when steps fail
- Timeout monitoring: Automatic logging when timeouts are configured
User Logging Methods
For custom logging within your pipeline steps, use the verbose and debug methods available on the logger:
const pipeline = new Pipeline<{ userId: string }>('user-pipeline', opts)
.step('fetchUser', async (ctx) => {
ctx.log.verbose?.('Fetching user details', ctx.runId);
const user = await getUserById(ctx.userId);
ctx.log.debug?.('User data retrieved', ctx.runId);
return user;
}, { description: 'Retrieves user from database' });PipelineOptions
Every pipeline requires options that specify:
idGen: Function to generate unique run IDslogger: Logger instance for the pipeline
PipelineRunOptions
When calling pipeline.run(), you can optionally provide:
runId: Override the auto-generated run IDparentName: Specify parent pipeline name for sub-pipeline tracking
const result = await pipeline.run(context, {
runId: 'custom-id',
parentName: 'parent-pipeline'
});Context
Every pipeline step receives a readonly context that includes:
- Your custom data
runId: Unique identifier for this pipeline execution (added automatically)log: Logger instance for the pipeline (added automatically)- Results from all previous steps (automatically typed)
Logging & Observability
Implement the PipelineLogger interface to capture pipeline events:
interface PipelineLogger {
logi(step: string, msg: string): void;
logw(step: string, msg: string): void;
loge(step: string, msg: string): void;
onStepStart?(step: string, runId: string, meta?: PipelineMeta): void;
onStepEnd?(step: string, runId: string, ms: number, error?: Error, meta?: PipelineMeta): void;
onFailure?(step: string, runId: string, error: Error, meta?: PipelineMeta): void;
}Sub-pipelines
Create reusable sub-pipelines and execute them within main pipelines:
const subOptions: PipelineOptions = {
idGen: () => crypto.randomUUID(),
logger: createConsoleLogger()
};
const subPipeline = new Pipeline<MyContext & { data: any }>('transform', subOptions)
.step('validate', async (ctx) => validateData(ctx.data))
.step('clean', async (ctx) => cleanData(ctx.validate));
const mainOptions: PipelineOptions = {
idGen: () => crypto.randomUUID(),
logger: createConsoleLogger()
};
const mainPipeline = new Pipeline<MyContext>('main', mainOptions)
.step('fetchData', async (ctx) => getData(ctx.userId))
.step('transform', async (ctx) => {
const result = await subPipeline.run({
...ctx,
data: ctx.fetchData
}, { parentName: 'main' }); // Specify parent for logging
return result.clean;
});Queue Processing
Jessie provides two approaches to queue processing:
Simple Queue Processing
For basic use cases, use the runQueue function:
import { runQueue, createArrayQueue } from '@robertkirk/jessie';
const items = [
{ userId: 'user1' },
{ userId: 'user2' },
{ userId: 'user3' }
];
const queue = createArrayQueue(items);
await runQueue(queue, pipeline, { concurrency: 2 });Enhanced Queue Management
For advanced use cases, use the Queue class with full queue management capabilities:
import { Queue, Pipeline } from '@robertkirk/jessie';
// Create a queue with your pipeline
const queue = new Queue(pipeline, { concurrency: 3 });
// Add items with optional priority (higher numbers = higher priority)
const taskId1 = queue.add({ userId: 'user1' }, 1);
const taskId2 = queue.add({ userId: 'user2' }, 5); // High priority
const taskId3 = queue.add({ userId: 'user3' }, 3);
// Add multiple items at once
const taskIds = queue.addMany([
{ userId: 'user4' },
{ userId: 'user5' }
], 2);
// Manage queue items
queue.remove(taskId1); // Remove specific item
queue.reorder(taskId2, 0); // Move item to front
queue.setPriority(taskId3, 10); // Change priority
queue.clear(); // Remove all pending items
// Monitor queue status
const status = queue.getStatus();
console.log(`${status.pending} pending, ${status.processing} processing`);
// Get detailed statistics
const stats = queue.getStats();
console.log(`Processed: ${stats.totalProcessed}, Failed: ${stats.totalFailed}`);
console.log(`Avg time: ${stats.averageProcessingTime}ms`);
console.log(`Throughput: ${stats.throughputPerSecond} items/sec`);
// Control processing
await queue.start(); // Start processing
queue.pause(); // Pause processing
queue.resume(); // Resume processing
await queue.stop(); // Stop gracefully
// Dynamic concurrency control
queue.setConcurrency(5); // Change concurrency on the fly
const currentConcurrency = queue.getConcurrency(); // Get current setting
// Pipeline timeout control
queue.setTimeoutMs(30000); // Set overall pipeline timeout
const timeout = queue.getTimeoutMs(); // Get current timeout setting
// Wait for completion
await queue.waitUntilEmpty(); // Wait until all items processedQueue Features
- Priority Processing: Items with higher priority values are processed first
- Dynamic Management: Add, remove, and reorder items while processing
- Dynamic Concurrency: Change concurrency limits on the fly with graceful scaling
- Pipeline Timeout: Set overall timeout limits for the entire queue processing
- Pause/Resume: Temporarily halt processing without losing progress
- Real-time Statistics: Monitor processing speed, success/failure rates
- Graceful Shutdown: Stop processing while allowing current items to complete
- Status Monitoring: Track pending, processing, completed, and failed items
Error Handling & Timeouts
const opts: PipelineOptions = {
idGen: () => crypto.randomUUID(),
logger: createConsoleLogger()
};
const pipeline = new Pipeline<MyContext>('error-handling', opts)
.step('riskyOperation', async (ctx) => {
// This step will timeout after 1 second
await longRunningOperation();
return 'success';
}, { timeoutMs: 1000 })
.step('cleanup', async (ctx) => {
// This step only runs if the previous step succeeded
return 'cleaned';
});
try {
await pipeline.run({ userId: 'user-123' });
} catch (error) {
// Handle TimeoutError or other failures
console.error('Pipeline failed:', error.message);
}API Reference
Classes
Pipeline<C extends Record<string, any>>
Main pipeline class for building and executing step sequences.
Constructor:
new Pipeline<C>(name: string, options: PipelineOptions, parentName?: string)Methods:
step<K, V>(key: K, fn: StepFunction<C, V>, opts?: StepOptions): Pipeline<C & { [K]: V }>run(ctx: C, options?: PipelineRunOptions): Promise<C>getMeta(): PipelineMetagetStepCount(): numbergetStepNames(): string[]getStepOrder(): PipelineStepOrder- Get detailed step information for visualizationgetStepOrderWithTypes<T>(sampleContext?: T): PipelineStepOrder & { initialContextKeys: string[]; estimatedTypes: Record<string, string>; }- Get enhanced step information with type details
Queue<T extends Record<string, any>>
Enhanced queue management with full control over processing.
Constructor:
new Queue<T>(pipeline: Pipeline<T>, options?: QueueOptions)Methods:
add(data: T, priority?: number): string- Add item with optional priorityaddMany(items: T[], priority?: number): string[]- Add multiple itemsremove(id: string): boolean- Remove item by IDclear(): void- Remove all pending itemsreorder(id: string, newIndex: number): boolean- Reorder item positionsetPriority(id: string, priority: number): boolean- Update item prioritygetStatus(): QueueStatus- Get current queue statusgetStats(): QueueStats- Get processing statisticsgetPendingItems(): QueueItem<T>[]- Get all pending itemsgetItem(id: string): QueueItem<T> | null- Get item by IDstart(): Promise<void>- Start processingstop(): Promise<void>- Stop processing gracefullypause(): void- Pause processingresume(): void- Resume processingisRunning(): boolean- Check if processingisPaused(): boolean- Check if pausedisStopped(): boolean- Check if stoppedwaitUntilEmpty(): Promise<void>- Wait for completiongetConcurrency(): number- Get current concurrency settingsetConcurrency(concurrency: number): void- Change concurrency dynamicallygetTimeoutMs(): number | undefined- Get pipeline timeout settingsetTimeoutMs(timeoutMs?: number): void- Set pipeline timeout
QueueProcessor<T extends Record<string, any>>
Legacy queue processor with basic start/stop controls.
Functions
runQueue<T>(getNext: () => Promise<T | null>, pipeline: Pipeline<T>, options?: QueueOptions): Promise<void>createArrayQueue<T>(items: T[]): () => Promise<T | null>createConsoleLogger(): PipelineLoggercreateNoOpLogger(): PipelineLoggercreateTimer(): StepTimerwithTimeout<T>(promise: Promise<T>, timeoutMs: number, stepName: string): Promise<T>
Types
PipelineOptions: Configuration for creating pipelinesPipelineLogger: Interface for logging implementationsStepOptions: Configuration for individual steps (timeoutMs, concurrency, description)StepFunction<C, V>: Function signature for pipeline stepsQueueOptions: Configuration for queue processing (concurrency, timeoutMs)QueueItem<T>: Queue item with metadata (id, data, priority, addedAt, attempts, lastError)QueueStatus: Current queue status (total, pending, processing, completed, failed, isRunning, isPaused)QueueStats: Processing statistics (totalProcessed, totalFailed, averageProcessingTime, throughputPerSecond, startedAt, lastProcessedAt)StepInfo: Individual step information for visualization (key, index, inputContext, outputType, options, description)PipelineStepOrder: Complete pipeline step order information (pipelineName, totalSteps, steps, contextFlow)TimeoutError: Error thrown when steps exceed timeout
License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
