npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@robertkirk/jessie

v0.6.0

Published

Simple, Type-safe, Observable, & Concurrent pipeline framework. Best for IO bottlenecked workflows.

Readme

jessie

npm version TypeScript

A type-safe, observable pipeline framework with dependency injection for building robust data processing workflows. Best for io bottlenecked workflows like calling external APIs.

Features

  • 🔒 Type-safe: Full TypeScript support with context inference across pipeline steps
  • Observable: Comprehensive logging hooks for monitoring and debugging
  • ⏱️ Timeout Support: Per-step timeout configuration with automatic enforcement
  • 🚀 Enhanced Queue Processing: Full-featured queue management with priority, pause/resume, and statistics
  • 🧩 Composable: Support for sub-pipelines with parent-child relationships
  • 💉 Dependency Injected: No hard dependencies - bring your own logger, database, etc.
  • 📊 Metrics Ready: Built-in timing, failure reporting, and queue statistics

When to use Jessie

  • You have a workflow that is IO bottlenecked (Jessie is not multithreaded)
  • You want a simple way to build a pipeline
  • You want logging and observability
  • You want typesafety
  • You want a simple solution with no dependencies (no Redis or other external services)

Installation

npm install @robertkirk/jessie
# or
yarn add @robertkirk/jessie
# or
bun add @robertkirk/jessie

Quick Start

import { 
  Pipeline, 
  Queue,
  PipelineOptions,
  createConsoleLogger
} from '@robertkirk/jessie';

// Define your context type (only your data, no framework boilerplate)
interface MyContext {
  userId: string;
}

// Create pipeline options
const opts: PipelineOptions = {
  idGen: () => crypto.randomUUID(),
  logger: createConsoleLogger()
};

// Build your pipeline
const pipeline = new Pipeline<MyContext>('data-processing', opts)
  .step('fetchUser', async (ctx) => {
    // Your async logic here
    return { id: ctx.userId, name: 'John Doe' };
  }, { timeoutMs: 5000 })
  .step('processData', async (ctx) => {
    // Access previous step result via ctx.fetchUser
    return `Processed: ${ctx.fetchUser.name}`;
  })
  .step('saveResult', async (ctx) => {
    // Save the processed data
    console.log('Saving:', ctx.processData);
    return 'saved-id-123';
  });

// Execute the pipeline - only provide your data
const result = await pipeline.run({ userId: 'user-123' });
console.log('Final result:', result.saveResult);

// Or process multiple items with enhanced queue management
const queue = new Queue(pipeline, { concurrency: 2 });
queue.add({ userId: 'user-1' }, 5); // High priority
queue.add({ userId: 'user-2' }, 1); // Low priority
await queue.start();

Core Concepts

Pipeline

A Pipeline is a sequence of typed steps that build up a context object. Each step receives the accumulated context from previous steps and adds its result to the context.

const opts: PipelineOptions = {
  idGen: () => crypto.randomUUID(),
  logger: createConsoleLogger()
};

const pipeline = new Pipeline<{ value: number }>('my-pipeline', opts)
  .step('step1', async (ctx) => 'result1')
  .step('step2', async (ctx) => ctx.step1.toUpperCase()) // TypeScript knows ctx.step1 exists
  .step('step3', async (ctx) => `${ctx.step1}-${ctx.step2}`); // And ctx.step2

Pipeline Visualization and Introspection

Pipelines support detailed introspection for visualization and debugging:

// Add steps with descriptions for better visualization
const pipeline = new Pipeline<{ userId: string }>('user-pipeline', opts)
  .step('authenticate', async (ctx) => ({ valid: true, role: 'user' }), 
        { 
          timeoutMs: 3000, 
          description: 'Authenticates the user and determines their role'
        })
  .step('fetchData', async (ctx) => getUserData(ctx.userId), 
        { description: 'Retrieves user data from the database' })
  .step('transform', async (ctx) => processData((ctx as any).fetchData))
  .step('validate', async (ctx) => (ctx as any).transform.length > 0, 
        { 
          concurrency: 1, 
          description: 'Ensures we have valid data to process'
        });

// Get detailed step information for visualization
const stepOrder = pipeline.getStepOrder();
console.log(`Pipeline: ${stepOrder.pipelineName}`);
console.log(`Total steps: ${stepOrder.totalSteps}`);

stepOrder.steps.forEach(step => {
  console.log(`Step ${step.index}: ${step.key}`);
  console.log(`  Input context: ${step.inputContext.join(', ')}`);
  console.log(`  Output type: ${step.outputType}`);
  if (step.description) {
    console.log(`  Description: ${step.description}`);
  }
  if (step.options) {
    console.log(`  Options: ${JSON.stringify(step.options)}`);
  }
});

// Get enhanced type information with sample data
const typedOrder = pipeline.getStepOrderWithTypes({ userId: 'user-123' });
console.log('Initial context keys:', typedOrder.initialContextKeys);
console.log('Estimated types:', typedOrder.estimatedTypes);

Automatic Pipeline Logging

Pipelines automatically provide detailed logging for all steps using the standard pipeline logger methods. This comprehensive logging helps with debugging, monitoring, and understanding pipeline execution:

const pipeline = new Pipeline<{ data: string }>('debug-pipeline', opts)
  .step('process', async (ctx) => {
    // Some complex processing
    return processData(ctx.data);
  }, { 
    description: 'Processes the input data with complex logic'
  });

Automatic pipeline logging provides:

  • Complete step visibility: Every step logs start, completion, and context information
  • Performance insights: Execution times for all steps
  • Context tracking: See what data is available at each step
  • Error details: Comprehensive error information when steps fail
  • Timeout monitoring: Automatic logging when timeouts are configured

User Logging Methods

For custom logging within your pipeline steps, use the verbose and debug methods available on the logger:

const pipeline = new Pipeline<{ userId: string }>('user-pipeline', opts)
  .step('fetchUser', async (ctx) => {
    ctx.log.verbose?.('Fetching user details', ctx.runId);
    const user = await getUserById(ctx.userId);
    ctx.log.debug?.('User data retrieved', ctx.runId);
    return user;
  }, { description: 'Retrieves user from database' });

PipelineOptions

Every pipeline requires options that specify:

  • idGen: Function to generate unique run IDs
  • logger: Logger instance for the pipeline

PipelineRunOptions

When calling pipeline.run(), you can optionally provide:

  • runId: Override the auto-generated run ID
  • parentName: Specify parent pipeline name for sub-pipeline tracking
const result = await pipeline.run(context, {
  runId: 'custom-id',
  parentName: 'parent-pipeline'
});

Context

Every pipeline step receives a readonly context that includes:

  • Your custom data
  • runId: Unique identifier for this pipeline execution (added automatically)
  • log: Logger instance for the pipeline (added automatically)
  • Results from all previous steps (automatically typed)

Logging & Observability

Implement the PipelineLogger interface to capture pipeline events:

interface PipelineLogger {
  logi(step: string, msg: string): void;
  logw(step: string, msg: string): void;
  loge(step: string, msg: string): void;
  onStepStart?(step: string, runId: string, meta?: PipelineMeta): void;
  onStepEnd?(step: string, runId: string, ms: number, error?: Error, meta?: PipelineMeta): void;
  onFailure?(step: string, runId: string, error: Error, meta?: PipelineMeta): void;
}

Sub-pipelines

Create reusable sub-pipelines and execute them within main pipelines:

const subOptions: PipelineOptions = {
  idGen: () => crypto.randomUUID(),
  logger: createConsoleLogger()
};

const subPipeline = new Pipeline<MyContext & { data: any }>('transform', subOptions)
  .step('validate', async (ctx) => validateData(ctx.data))
  .step('clean', async (ctx) => cleanData(ctx.validate));

const mainOptions: PipelineOptions = {
  idGen: () => crypto.randomUUID(),
  logger: createConsoleLogger()
};

const mainPipeline = new Pipeline<MyContext>('main', mainOptions)
  .step('fetchData', async (ctx) => getData(ctx.userId))
  .step('transform', async (ctx) => {
    const result = await subPipeline.run({
      ...ctx,
      data: ctx.fetchData
    }, { parentName: 'main' }); // Specify parent for logging
    return result.clean;
  });

Queue Processing

Jessie provides two approaches to queue processing:

Simple Queue Processing

For basic use cases, use the runQueue function:

import { runQueue, createArrayQueue } from '@robertkirk/jessie';

const items = [
  { userId: 'user1' },
  { userId: 'user2' },
  { userId: 'user3' }
];

const queue = createArrayQueue(items);
await runQueue(queue, pipeline, { concurrency: 2 });

Enhanced Queue Management

For advanced use cases, use the Queue class with full queue management capabilities:

import { Queue, Pipeline } from '@robertkirk/jessie';

// Create a queue with your pipeline
const queue = new Queue(pipeline, { concurrency: 3 });

// Add items with optional priority (higher numbers = higher priority)
const taskId1 = queue.add({ userId: 'user1' }, 1);
const taskId2 = queue.add({ userId: 'user2' }, 5); // High priority
const taskId3 = queue.add({ userId: 'user3' }, 3);

// Add multiple items at once
const taskIds = queue.addMany([
  { userId: 'user4' },
  { userId: 'user5' }
], 2);

// Manage queue items
queue.remove(taskId1);                    // Remove specific item
queue.reorder(taskId2, 0);               // Move item to front
queue.setPriority(taskId3, 10);          // Change priority
queue.clear();                           // Remove all pending items

// Monitor queue status
const status = queue.getStatus();
console.log(`${status.pending} pending, ${status.processing} processing`);

// Get detailed statistics
const stats = queue.getStats();
console.log(`Processed: ${stats.totalProcessed}, Failed: ${stats.totalFailed}`);
console.log(`Avg time: ${stats.averageProcessingTime}ms`);
console.log(`Throughput: ${stats.throughputPerSecond} items/sec`);

// Control processing
await queue.start();                     // Start processing
queue.pause();                          // Pause processing
queue.resume();                         // Resume processing
await queue.stop();                     // Stop gracefully

// Dynamic concurrency control
queue.setConcurrency(5);                // Change concurrency on the fly
const currentConcurrency = queue.getConcurrency(); // Get current setting

// Pipeline timeout control
queue.setTimeoutMs(30000);              // Set overall pipeline timeout
const timeout = queue.getTimeoutMs();   // Get current timeout setting

// Wait for completion
await queue.waitUntilEmpty();           // Wait until all items processed

Queue Features

  • Priority Processing: Items with higher priority values are processed first
  • Dynamic Management: Add, remove, and reorder items while processing
  • Dynamic Concurrency: Change concurrency limits on the fly with graceful scaling
  • Pipeline Timeout: Set overall timeout limits for the entire queue processing
  • Pause/Resume: Temporarily halt processing without losing progress
  • Real-time Statistics: Monitor processing speed, success/failure rates
  • Graceful Shutdown: Stop processing while allowing current items to complete
  • Status Monitoring: Track pending, processing, completed, and failed items

Error Handling & Timeouts

const opts: PipelineOptions = {
  idGen: () => crypto.randomUUID(),
  logger: createConsoleLogger()
};

const pipeline = new Pipeline<MyContext>('error-handling', opts)
  .step('riskyOperation', async (ctx) => {
    // This step will timeout after 1 second
    await longRunningOperation();
    return 'success';
  }, { timeoutMs: 1000 })
  .step('cleanup', async (ctx) => {
    // This step only runs if the previous step succeeded
    return 'cleaned';
  });

try {
  await pipeline.run({ userId: 'user-123' });
} catch (error) {
  // Handle TimeoutError or other failures
  console.error('Pipeline failed:', error.message);
}

API Reference

Classes

Pipeline<C extends Record<string, any>>

Main pipeline class for building and executing step sequences.

Constructor:

new Pipeline<C>(name: string, options: PipelineOptions, parentName?: string)

Methods:

  • step<K, V>(key: K, fn: StepFunction<C, V>, opts?: StepOptions): Pipeline<C & { [K]: V }>
  • run(ctx: C, options?: PipelineRunOptions): Promise<C>
  • getMeta(): PipelineMeta
  • getStepCount(): number
  • getStepNames(): string[]
  • getStepOrder(): PipelineStepOrder - Get detailed step information for visualization
  • getStepOrderWithTypes<T>(sampleContext?: T): PipelineStepOrder & { initialContextKeys: string[]; estimatedTypes: Record<string, string>; } - Get enhanced step information with type details

Queue<T extends Record<string, any>>

Enhanced queue management with full control over processing.

Constructor:

new Queue<T>(pipeline: Pipeline<T>, options?: QueueOptions)

Methods:

  • add(data: T, priority?: number): string - Add item with optional priority
  • addMany(items: T[], priority?: number): string[] - Add multiple items
  • remove(id: string): boolean - Remove item by ID
  • clear(): void - Remove all pending items
  • reorder(id: string, newIndex: number): boolean - Reorder item position
  • setPriority(id: string, priority: number): boolean - Update item priority
  • getStatus(): QueueStatus - Get current queue status
  • getStats(): QueueStats - Get processing statistics
  • getPendingItems(): QueueItem<T>[] - Get all pending items
  • getItem(id: string): QueueItem<T> | null - Get item by ID
  • start(): Promise<void> - Start processing
  • stop(): Promise<void> - Stop processing gracefully
  • pause(): void - Pause processing
  • resume(): void - Resume processing
  • isRunning(): boolean - Check if processing
  • isPaused(): boolean - Check if paused
  • isStopped(): boolean - Check if stopped
  • waitUntilEmpty(): Promise<void> - Wait for completion
  • getConcurrency(): number - Get current concurrency setting
  • setConcurrency(concurrency: number): void - Change concurrency dynamically
  • getTimeoutMs(): number | undefined - Get pipeline timeout setting
  • setTimeoutMs(timeoutMs?: number): void - Set pipeline timeout

QueueProcessor<T extends Record<string, any>>

Legacy queue processor with basic start/stop controls.

Functions

  • runQueue<T>(getNext: () => Promise<T | null>, pipeline: Pipeline<T>, options?: QueueOptions): Promise<void>
  • createArrayQueue<T>(items: T[]): () => Promise<T | null>
  • createConsoleLogger(): PipelineLogger
  • createNoOpLogger(): PipelineLogger
  • createTimer(): StepTimer
  • withTimeout<T>(promise: Promise<T>, timeoutMs: number, stepName: string): Promise<T>

Types

  • PipelineOptions: Configuration for creating pipelines
  • PipelineLogger: Interface for logging implementations
  • StepOptions: Configuration for individual steps (timeoutMs, concurrency, description)
  • StepFunction<C, V>: Function signature for pipeline steps
  • QueueOptions: Configuration for queue processing (concurrency, timeoutMs)
  • QueueItem<T>: Queue item with metadata (id, data, priority, addedAt, attempts, lastError)
  • QueueStatus: Current queue status (total, pending, processing, completed, failed, isRunning, isPaused)
  • QueueStats: Processing statistics (totalProcessed, totalFailed, averageProcessingTime, throughputPerSecond, startedAt, lastProcessedAt)
  • StepInfo: Individual step information for visualization (key, index, inputContext, outputType, options, description)
  • PipelineStepOrder: Complete pipeline step order information (pipelineName, totalSteps, steps, contextFlow)
  • TimeoutError: Error thrown when steps exceed timeout

License

MIT

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.