npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

pipeline-processor

v1.0.8

Published

A modular data processing framework with nestable pipelines, processors, parallel execution, batch processing, and contract validation

Readme

pipeline-processor

A modular Pipeline-Processor pattern library for structured data processing with optional contract validation.

Features

  • Modular Architecture: Easily build processing pipelines with reusable processors
  • Event-Based Progress: Track progress with built-in event system
  • Contract Validation: Optional contract system to validate processor compatibility
  • Type Safety: Written in TypeScript with full type definitions

Installation

npm install pipeline-processor

Basic Usage

import { BasePipeline, BaseProcessor } from 'pipeline-processor';

// Create a processor
class MyProcessor extends BaseProcessor<InputType, OutputType> {
  public async execute(options: InputType): Promise<OutputType> {
    // Implement your processing logic here
    this.updateProgress(0.5, 'Processing halfway done');
    
    // Always check if aborted to allow clean cancellation
    this.checkAborted();
    
    // Return processed result
    return result;
  }
}

// Create a pipeline
const pipeline = new BasePipeline<PipelineInput, PipelineOutput>();

// Register processors
pipeline.register(new MyProcessor());

// Add event listeners
pipeline.on('progress', (data) => {
  console.log(`Progress: ${data.progress * 100}%`);
});

// Initialize and execute
await pipeline.initialize();
const result = await pipeline.execute(inputData);

Advanced: Contract Validation

For projects where you need to ensure compatibility between processors:

import { 
  BasePipeline, 
  BaseProcessor, 
  GlobalContractManager,
  ContractResolveType 
} from 'pipeline-processor';

class MyProcessor extends BaseProcessor<InputType, OutputType> {
  constructor() {
    super();
    
    // Setup contracts for this processor
    const inputContracts = [
      GlobalContractManager.createRequiredInputCT(
        ContractResolveType.OBJECT_HAS_PROPERTIES,
        'input_data',
        (value, context) => {
          // Validate input data
          if (!value || !value.requiredProperty) {
            context.message = 'Missing required property';
            return false;
          }
          return true;
        },
        undefined,
        'Input must include required property'
      )
    ];
    
    // Register contracts with the contract manager
    GlobalContractManager.registerEntityContracts(
      this.constructor.name,
      'processor',
      inputContracts
    );
  }
  
  public async execute(options: InputType): Promise<OutputType> {
    // Implementation
  }
}

Documentation

For full API documentation, examples, and guides, visit the documentation site (...)

License

MIT

Usage example

Now let's demonstrate a real-world example of using the library:

import { 
    BasePipeline, 
    BaseProcessor, 
    ProcessorEvent, 
    PipelineEvent 
} from 'pipeline-processor';

// Define processor input and output types
interface ImageProcessorInput {
  imagePath: string;
  outputPath: string;
  scale?: number;
}

interface ImageProcessorOutput {
  outputPath: string;
  dimensions: { width: number; height: number };
  processingTime: number;
}

// Create a specialized processor
class ImageResizeProcessor extends BaseProcessor<ImageProcessorInput, ImageProcessorOutput> {
  public async execute(options: ImageProcessorInput): Promise<ImageProcessorOutput> {
    const startTime = Date.now();
    this.updateProgress(0, 'Starting image resize process');
    
    // Check for abort signals throughout processing
    this.checkAborted();
    
    // Simulate image processing steps
    await this.simulateProcessing(0.3, 'Loading image');
    this.checkAborted();
    
    await this.simulateProcessing(0.6, 'Resizing image');
    this.checkAborted();
    
    await this.simulateProcessing(1.0, 'Saving image');
    
    const processingTime = (Date.now() - startTime) / 1000;
    
    // Return the result
    return {
      outputPath: options.outputPath,
      dimensions: { width: 800, height: 600 },
      processingTime
    };
  }
  
  private async simulateProcessing(progressTarget: number, message: string): Promise<void> {
    // Simulate a processing step
    const delay = 500; // 500ms
    await new Promise(resolve => setTimeout(resolve, delay));
    this.updateProgress(progressTarget, message);
  }
}

// Create a pipeline for image processing
const pipeline = new BasePipeline<ImageProcessorInput, ImageProcessorOutput>();

// Register the processor
pipeline.register(new ImageResizeProcessor());

// Add pipeline event listeners
pipeline.on(PipelineEvent.PROGRESS, (data) => {
  console.log(`Pipeline progress: ${Math.round(data.progress * 100)}%`);
});

pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
  console.log(`Processing complete. Result:`, data.result);
});

// Initialize and execute the pipeline
const run = async () => {
  await pipeline.initialize();
  
  const result = await pipeline.execute({
    imagePath: '/path/to/input.jpg',
    outputPath: '/path/to/output.jpg',
    scale: 0.5
  });
  
  console.log('Final result:', result);
};

run().catch(console.error);

Testing

The library includes a comprehensive test suite that covers the core functionality:

# Run all tests
npm test

# Run tests with watcher
npm run test:watch 

# Run tests with coverage report
npm run test:coverage

The tests cover:

  • Pipeline execution and events
  • Processor execution and progress tracking
  • Contract validation and resolution
  • Integration between components

Validators

The library includes several built-in validators for common data types:

Basic Validators

import { 
  objectHasProperties, 
  valueInRange, 
  valueType 
} from 'pipeline-processor';

// Check if an object has specified properties
const hasRequiredProps = objectHasProperties(['id', 'name', 'type']);

// Check if a number is within a range
const isInRange = valueInRange(0, 100);

// Check if a value is of a specified type
const isString = valueType('string');
const isNumber = valueType('number');
const isBoolean = valueType('boolean');
const isObject = valueType('object');
const isArray = valueType('array');

String Validators

import { 
  emailFormat, 
  urlFormat, 
  stringLength 
} from 'pipeline-processor';

// Check if a string is a valid email
const isEmail = emailFormat();

// Check if a string is a valid URL
const isUrl = urlFormat();

// Check if a string has a specific length range
const hasValidLength = stringLength(5, 100);

Array Validators

import { 
  arrayLength, 
  arrayItemType 
} from 'pipeline-processor';

// Check if an array has a specific length range
const hasValidLength = arrayLength(1, 10);

// Check if all array items are of a specific type
const hasStringItems = arrayItemType('string');
const hasNumberItems = arrayItemType('number');

Number Validators

import { 
  isInteger, 
  valueInRange 
} from 'pipeline-processor';

// Check if a number is an integer
const isIntegerValue = isInteger();

// Check if a number is within a range
const isInRange = valueInRange(0, 100);

Using Validators with Contracts

import { 
  GlobalContractManager, 
  ContractResolveType,
  emailFormat
} from 'pipeline-processor';

// Create a contract requiring a valid email
const emailContract = GlobalContractManager.createRequiredInputCT(
  ContractResolveType.STRING_FORMAT,
  'email',
  emailFormat(),
  undefined,
  'Input must include a valid email'
);

// Register the contract with a processor
GlobalContractManager.registerEntityContracts(
  'MyProcessor',
  'processor',
  [emailContract]
);

Batch Processing for Large Datasets

For processing large datasets efficiently, the library provides batch processing capabilities:

import { BasePipeline, BaseProcessor } from 'pipeline-processor';

// Define a processor for handling chunks of data
class DataProcessor extends BaseProcessor<any, any> {
  public async execute(options: { data: any[] }): Promise<any> {
    // Process the current batch of data
    const processedData = options.data.map(item => {
      // Your processing logic here
      return { ...item, processed: true };
    });
    
    return { ...options, data: processedData };
  }
}

// Create and configure the pipeline
const pipeline = new BasePipeline();
pipeline.register(new DataProcessor());
await pipeline.initialize();

// Large dataset to process
const largeDataset = Array.from({ length: 10000 }, (_, i) => ({ id: i, value: `Item ${i}` }));

// Process in batches of 500 items
const result = await pipeline.executeBatch(
  { data: largeDataset }, 
  500, 
  (batchResult, batchIndex) => {
    console.log(`Processed batch ${batchIndex + 1}`);
  }
);

console.log(`Processed ${result.data.length} items in total`);

Batch Processing Benefits

  • Memory Efficient: Processes large datasets in manageable chunks
  • Progress Tracking: Provides detailed progress updates per batch
  • Cancellation: Can be aborted at any batch boundary
  • Customizable: Override combineBatchResults method to customize how batch results are merged

Custom Result Merging

You can extend BasePipeline to customize how batch results are combined:

class CustomPipeline extends BasePipeline<any, any> {
  protected combineBatchResults(batchResults: any[]): any {
    // Custom logic to combine batch results
    const allData = batchResults.flatMap(result => result.data || []);
    
    // Calculate aggregated statistics
    const total = allData.reduce((sum, item) => sum + item.value, 0);
    const average = total / allData.length;
    
    return {
      data: allData,
      stats: {
        total,
        average,
        count: allData.length
      }
    };
  }
}

Advanced Performance Optimizations

The library includes several advanced features for high-performance processing of large datasets:

Memory-Optimized Batch Processing

For extremely large datasets where memory usage is a concern:

// Process a large dataset with memory optimization
const result = await pipeline.executeBatch(
  { data: largeDataset }, 
  500,
  (batchResult, batchIndex) => {
    console.log(`Processed batch ${batchIndex + 1}`);
  },
  {
    memoryOptimized: true // Only keep final result, discard intermediate batch results
  }
);

Parallel Batch Processing

For faster processing on multi-core systems:

// Process batches in parallel using multiple CPU cores
const result = await pipeline.executeBatch(
  { data: largeDataset }, 
  500,
  undefined, // No batch callback needed
  {
    parallelBatches: 4, // Process 4 batches in parallel
    memoryOptimized: true // Optional memory optimization
  }
);

Early Termination

For cases where you might need to stop processing once certain conditions are met:

// Stop processing early if a condition is met
const result = await pipeline.executeBatch(
  { data: largeDataset }, 
  500,
  undefined,
  {
    earlyTermination: true,
    terminationCheck: (batchResult) => {
      // Check if this batch result meets termination criteria
      // For example, if we found what we're looking for
      return batchResult.data.some(item => item.isSpecialItem);
    }
  }
);

Streaming Data Processing

For processing data as it arrives, without loading the entire dataset into memory:

// Create an async data source
async function* generateData() {
  for (let i = 0; i < 10000; i++) {
    // Simulate data arriving over time
    await new Promise(resolve => setTimeout(resolve, 10));
    yield { id: i, value: `Item ${i}` };
  }
}

// Process data in a streaming fashion
const processingStream = pipeline.createProcessingStream(generateData(), 100);

// Process results as they become available
for await (const result of processingStream) {
  console.log(`Processed batch with ${result.data.length} items`);
  
  // You can do something with each batch result immediately
  await saveResults(result.data);
}

This streaming approach is ideal for:

  • Processing data from network sources in real-time
  • Handling infinitely large datasets
  • Minimizing memory consumption
  • Displaying incremental results as they become available

Parallel Processing

The library supports parallel execution of independent processors, which can significantly improve performance for complex processing pipelines:

import { BasePipeline, BaseProcessor } from 'pipeline-processor';

// Create processors
class PreprocessorA extends BaseProcessor<any, any> {
  // implementation
}

class PreprocessorB extends BaseProcessor<any, any> {
  // implementation
}

class Combiner extends BaseProcessor<any, any> {
  // implementation that uses results from both preprocessors
}

// Create pipeline with parallel processing
const pipeline = new BasePipeline({ parallelProcessingEnabled: true });

// First two processors can run in parallel (no dependencies)
pipeline.register(new PreprocessorA());  // index 0
pipeline.register(new PreprocessorB());  // index 1

// Combiner depends on both preprocessors
pipeline.register(new Combiner(), {}, [0, 1]);  // depends on indices 0 and 1

// Alternative way to enable parallel processing
pipeline.setParallelProcessing(true);

// Execute the pipeline
await pipeline.initialize();
const result = await pipeline.execute(inputData);

How Parallel Processing Works

  1. By default, processors execute sequentially in the order they are registered
  2. When parallel processing is enabled, the pipeline analyzes processor dependencies
  3. Processors with the same dependencies will execute in parallel when possible
  4. Dependencies can be specified when registering a processor
  5. If no dependencies are specified, a processor depends on the previous processor
  6. Circular dependencies are detected and will cause an error

When to Use Parallel Processing

Parallel processing is most beneficial when:

  • You have multiple independent preprocessing steps
  • Some processors are CPU-intensive but don't depend on each other
  • Your pipeline has natural points of parallelism (e.g., data enrichment from multiple sources)

For simple linear pipelines, sequential processing may be more appropriate.

Nested Pipelines

The library supports nested pipelines, allowing you to use a pipeline as a processor within another pipeline. This enables building complex processing workflows with reusable components:

import { 
    BasePipeline, 
    BaseProcessor, 
    PipelineProcessor 
} from 'pipeline-processor';

// Create an inner pipeline
const innerPipeline = new BasePipeline<InnerInputType, InnerOutputType>();
innerPipeline.register(new ProcessorA());
innerPipeline.register(new ProcessorB());
await innerPipeline.initialize();

// Wrap the inner pipeline as a processor
const pipelineProcessor = new PipelineProcessor(innerPipeline);

// Create an outer pipeline
const outerPipeline = new BasePipeline<OuterInputType, OuterOutputType>();
outerPipeline.register(new PreProcessor());
outerPipeline.register(pipelineProcessor); // Use the pipeline as a processor
outerPipeline.register(new PostProcessor());
await outerPipeline.initialize();

// Execute the outer pipeline, which will also execute the inner pipeline
const result = await outerPipeline.execute(inputData);

Benefits of Nested Pipelines

Nested pipelines offer several advantages:

  1. Reusability: Create reusable pipeline modules for common processing tasks
  2. Encapsulation: Hide the complexity of multi-step processes behind a single processor
  3. Maintainability: Break down complex workflows into smaller, manageable units
  4. Composability: Compose complex workflows from simpler building blocks

Event Propagation in Nested Pipelines

Events from inner pipelines are automatically forwarded to the wrapping PipelineProcessor:

  • Progress events are forwarded as processor progress updates
  • Error events are forwarded as processor errors
  • Completion events are forwarded as processor completion
  • Abort signals propagate to nested pipelines

Enhanced Type Inference

The library has been enhanced with improved TypeScript type inference to provide better type safety across processor chains.

Type-Safe Processors and Pipelines

import { 
    TypedBaseProcessor, 
    TypedBasePipeline,
    TypedPipelineBuilder,
    ProcessorEvent,
    PipelineEvent
} from 'pipeline-processor';

// Define your data types
interface InputData {
    id: string;
    data: string[];
}

interface EnrichedData extends InputData {
    metadata: Record<string, any>;
}

interface OutputData {
    result: string;
    processedAt: Date;
}

// Create strongly-typed processors
class EnrichmentProcessor extends TypedBaseProcessor<InputData, EnrichedData> {
    async execute(input: InputData): Promise<EnrichedData> {
        // Implementation...
        return {
            ...input,
            metadata: { processedBy: 'EnrichmentProcessor' }
        };
    }
}

class TransformProcessor extends TypedBaseProcessor<EnrichedData, OutputData> {
    async execute(input: EnrichedData): Promise<OutputData> {
        // Implementation...
        return {
            result: input.data.join(','),
            processedAt: new Date()
        };
    }
}

// Method 1: Create a pipeline with type checking across processor chain
const pipeline = new TypedBasePipeline<InputData, OutputData>();
pipeline.register(new EnrichmentProcessor());
pipeline.register(new TransformProcessor());

// Type-safe event handling
pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
    // TypeScript knows data.result is OutputData
    console.log(`Completed at: ${data.result.processedAt}`);
});

// Method 2: Use the fluent builder API with type inference
const pipeline2 = TypedPipelineBuilder
    .create<InputData>()
    .withFirstProcessor(new EnrichmentProcessor())
    .pipe(new TransformProcessor())
    .build();

// Execute with properly typed input and output
const result = await pipeline.execute({
    id: '123',
    data: ['a', 'b', 'c']
});

// Result is properly typed as OutputData
console.log(result.result); // Type-safe access

Benefits of Enhanced Type Inference

  • Catch Errors Earlier: TypeScript catches type incompatibilities between processors at compile time
  • Self-Documenting Code: Types clearly show the data flow through the pipeline
  • IDE Support: Get proper code completion and hover information
  • Refactoring Safety: When changing processor interfaces, TypeScript identifies all impacted areas
  • Type-Safe Events: Event handlers receive properly typed event data

Type Utilities

The library provides several utility types to help with type inference:

import { 
    ProcessorInput, 
    ProcessorOutput,
    CanChain,
    ChainOutput,
    EnsureCompatibleChain
} from 'pipeline-processor';

// Get the input type of a processor
type InputType = ProcessorInput<MyProcessor>;

// Get the output type of a processor
type OutputType = ProcessorOutput<MyProcessor>;

// Check if processors can be chained
type CanBeChained = CanChain<ProcessorA, ProcessorB>; // true or false

// Get the final output type from a chain of processors
type FinalOutputType = ChainOutput<[ProcessorA, ProcessorB, ProcessorC]>;

// Check if an entire chain is compatible
type IsChainValid = EnsureCompatibleChain<[ProcessorA, ProcessorB, ProcessorC]>;

Schema-Based Validation

The library now supports schema-based validation for complex data structures using JSON Schema and composable validators:

import { 
  jsonSchema, 
  complexObject, 
  allOf, 
  anyOf, 
  conditional,
  GlobalContractManager,
  ContractResolveType
} from 'pipeline-processor';
import { JSONSchemaType } from 'ajv';

// Define a schema for your data type
interface User {
  id: string;
  name: string;
  email: string;
  age: number;
}

const userSchema: JSONSchemaType<User> = {
  type: 'object',
  properties: {
    id: { type: 'string' },
    name: { type: 'string', minLength: 2 },
    email: { type: 'string', format: 'email' },
    age: { type: 'number', minimum: 18 }
  },
  required: ['id', 'name', 'email', 'age'],
  additionalProperties: false
};

// Use in a processor
class UserProcessor extends BaseProcessor<any, any> {
  constructor() {
    super();
    
    // Create a contract using JSON Schema
    const schemaContract = GlobalContractManager.createRequiredInputCT(
      ContractResolveType.JSON_SCHEMA,
      'user',
      jsonSchema(userSchema),
      undefined,
      'Input must include a valid user object'
    );
    
    // Create complex validation with nested property validation
    const complexValidation = GlobalContractManager.createRequiredInputCT(
      ContractResolveType.COMPLEX_OBJECT,
      'data',
      complexObject({
        'user.name': stringLength(2, 50),
        'user.email': emailFormat(),
        'metadata.tags': arrayLength(1, 10)
      }),
      undefined,
      'Complex validation for user data'
    );
    
    // Use conditional validation
    const conditionalValidation = GlobalContractManager.createRequiredInputCT(
      ContractResolveType.CONDITIONAL,
      'conditionalData',
      conditional(
        value => value.type === 'premium',
        // For premium users
        complexObject({
          'subscription.level': valueType('number'),
          'subscription.expiresAt': (value) => value instanceof Date
        }),
        // For regular users
        valueType('object')
      ),
      undefined,
      'Conditional validation based on user type'
    );
    
    // Register all contracts
    GlobalContractManager.registerEntityContracts(
      this.constructor.name,
      'processor',
      [schemaContract, complexValidation, conditionalValidation]
    );
  }
  
  // Implementation...
}

Validator Composition

You can compose validators to create complex validation logic:

  • allOf([validator1, validator2, ...]): All validators must pass
  • anyOf([validator1, validator2, ...]): At least one validator must pass
  • conditional(predicate, thenValidator, elseValidator?): Apply different validators based on a condition
  • complexObject({ 'prop.path': validator }): Apply different validators to different object properties

JSON Schema Integration

Full JSON Schema validation is supported through Ajv:

const validator = jsonSchema({
  type: 'object',
  properties: {
    name: { type: 'string' },
    age: { type: 'number', minimum: 0 }
  },
  required: ['name', 'age']
});

This enables:

  • Complex nested validation with a declarative approach
  • Type constraints, string formats, patterns, and ranges
  • Custom formats and keywords
  • Detailed validation error messages

Advanced Error Recovery

The library includes advanced error recovery mechanisms to build resilient processing pipelines:

Retry Mechanisms

Automatically retry failed operations with configurable attempts and backoff:

import { 
    BaseProcessor, 
    withRetry, 
    createNetworkRetryConfig 
} from 'pipeline-processor';

// Create a processor that might fail
class NetworkProcessor extends BaseProcessor<any, any> {
    // implementation
}

// Add retry capabilities with exponential backoff
const processor = withRetry(
    new NetworkProcessor(),
    createNetworkRetryConfig(3) // Retry up to 3 times with exponential backoff
);

// Standard retry with custom options
const customRetryProcessor = withRetry(
    new NetworkProcessor(),
    {
        maxAttempts: 5,
        delayMs: 1000,
        useExponentialBackoff: true,
        backoffMultiplier: 2,
        maxBackoffDelayMs: 30000,
        isRetryable: (error) => {
            // Custom logic to determine if an error is retryable
            return error.message.includes('timeout');
        }
    }
);

Fallback Processors

Specify fallback data sources when primary processors fail:

import { 
    BaseProcessor, 
    withFallback 
} from 'pipeline-processor';

// Primary and fallback processors
class PrimaryDataSource extends BaseProcessor<any, any> {
    // implementation that might fail
}

class CachedDataSource extends BaseProcessor<any, any> {
    // implementation providing fallback data
}

// Create processor with fallback capabilities
const processor = withFallback(
    new PrimaryDataSource(),
    new CachedDataSource(),
    (error) => {
        // Optional: Only use fallback for specific errors
        return error.message.includes('network');
    }
);

Circuit Breakers

Prevent cascading failures by automatically "opening the circuit" when error thresholds are reached:

import { 
    BaseProcessor, 
    withCircuitBreaker, 
    createCircuitBreakerConfig 
} from 'pipeline-processor';

// Create processor with circuit breaker
const processor = withCircuitBreaker(
    new UnreliableServiceProcessor(),
    createCircuitBreakerConfig({
        failureThreshold: 5,       // Open after 5 failures
        failureWindowMs: 60000,    // Within a 1-minute window
        resetTimeoutMs: 30000      // Try again after 30 seconds
    })
);

// Listen for circuit state changes
processor.on('circuitStateChange', (data) => {
    console.log(`Circuit changed from ${data.from} to ${data.to}`);
});

Partial Completion for Batch Operations

Allow batch operations to succeed partially when some items fail:

import { 
    BaseProcessor, 
    withPartialCompletion 
} from 'pipeline-processor';

// Create a processor for batch operations
class BatchProcessor extends BaseProcessor<{data: any[]}, {data: any[]}> {
    // implementation that processes multiple items
}

// Add partial completion capabilities
const processor = withPartialCompletion(
    new BatchProcessor(),
    {
        allowPartialCompletion: true,
        minCompletionThreshold: 0.7,  // Require at least 70% success
        isItemSuccessful: (item, error) => {
            // Custom logic to determine if an item with error should be considered successful
            return error?.message.includes('non-critical');
        },
        combinePartialResults: (results) => {
            // Custom logic to combine partial results
            return {
                success: true,
                data: results,
                partialSuccess: true
            };
        }
    }
);

Combining Recovery Strategies

Recovery mechanisms can be combined for sophisticated error handling:

import { 
    withRetry, 
    withFallback, 
    withCircuitBreaker 
} from 'pipeline-processor';

// Start with base processor
const baseProcessor = new DataProcessor();

// Add retry capabilities
const withRetries = withRetry(baseProcessor, {
    maxAttempts: 3,
    delayMs: 1000
});

// Add circuit breaker
const withCircuit = withCircuitBreaker(withRetries, {
    failureThreshold: 5,
    resetTimeoutMs: 30000
});

// Finally add fallback
const resilientProcessor = withFallback(
    withCircuit,
    new FallbackProcessor()
);

// The result is a processor with all three recovery mechanisms
pipeline.register(resilientProcessor);

These error recovery mechanisms help build robust pipelines that can handle real-world failure scenarios gracefully.


Roadmap

  • [x] ISS_001: Fixing event recursion issues in abort functionality when an event listener calls abort() while processing pipeline events
  • [x] TASK_001: Performance optimization for large datasets
  • [x] TASK_002: Improved TypeScript type inference across processor chains
  • [x] TASK_003: Expanding contract validation testing for complex validation scenarios and schema-based validation
  • [ ] TASK_004: Enhancing documentation and usage examples
  • [ ] TASK_005: Contract visualization and documentation generation tooling
  • [x] FEAT_001: Parallel processing support for independent processors
  • [x] FEAT_002: Advanced error recovery mechanisms (retries, fallbacks, partial completion, circuit breakers)
  • [x] FEAT_003: Nested pipeline support (using pipelines as processors in larger pipelines)
  • [ ] FEAT_004: Conditional branching to support different processing paths based on data
  • [ ] FEAT_005: Comprehensive caching layer with content-addressable caching and tiered storage options
  • [ ] FEAT_006: Pipeline serialization/deserialization with checkpointing and cross-environment execution
  • [ ] FEAT_007: Flexible custom batch result merging strategies
  • [ ] FEAT_008: Enhanced monitoring with detailed metrics, telemetry, and execution graph tracing
  • [x] FEAT_009: Stream-based processing for continuous data flows
  • [ ] FEAT_010: Web Worker integration for browser environments
  • [ ] FEAT_011: Pipeline visualization tools for debugging complex workflows
  • [ ] FEAT_012: Enhanced data transformation framework with standardized mapping patterns
  • [ ] FEAT_013: Standardized plugin architecture with processor discovery and registration
  • [ ] FEAT_014: Smart resource management with processor pooling and adaptive concurrency
  • [ ] FEAT_015: Format conversion processors for common data formats (JSON, XML, CSV, binary)
  • [ ] FEAT_016: Dependency injection system for processor configuration and composition
  • [ ] FEAT_017: Partial success handling with graceful degradation policies
  • [ ] FEAT_018: Resource quotas and fairness mechanisms for multi-tenant environments
  • [ ] FEAT_019: Runtime type checking with detailed error reporting
  • [ ] FEAT_020: Event subscription system for external monitoring and integration