pipeline-processor
v1.0.8
Published
A modular data processing framework with nestable pipelines, processors, parallel execution, batch processing, and contract validation
Maintainers
Readme
pipeline-processor
A modular Pipeline-Processor pattern library for structured data processing with optional contract validation.
Features
- Modular Architecture: Easily build processing pipelines with reusable processors
- Event-Based Progress: Track progress with built-in event system
- Contract Validation: Optional contract system to validate processor compatibility
- Type Safety: Written in TypeScript with full type definitions
Installation
npm install pipeline-processorBasic Usage
import { BasePipeline, BaseProcessor } from 'pipeline-processor';
// Create a processor
class MyProcessor extends BaseProcessor<InputType, OutputType> {
public async execute(options: InputType): Promise<OutputType> {
// Implement your processing logic here
this.updateProgress(0.5, 'Processing halfway done');
// Always check if aborted to allow clean cancellation
this.checkAborted();
// Return processed result
return result;
}
}
// Create a pipeline
const pipeline = new BasePipeline<PipelineInput, PipelineOutput>();
// Register processors
pipeline.register(new MyProcessor());
// Add event listeners
pipeline.on('progress', (data) => {
console.log(`Progress: ${data.progress * 100}%`);
});
// Initialize and execute
await pipeline.initialize();
const result = await pipeline.execute(inputData);Advanced: Contract Validation
For projects where you need to ensure compatibility between processors:
import {
BasePipeline,
BaseProcessor,
GlobalContractManager,
ContractResolveType
} from 'pipeline-processor';
class MyProcessor extends BaseProcessor<InputType, OutputType> {
constructor() {
super();
// Setup contracts for this processor
const inputContracts = [
GlobalContractManager.createRequiredInputCT(
ContractResolveType.OBJECT_HAS_PROPERTIES,
'input_data',
(value, context) => {
// Validate input data
if (!value || !value.requiredProperty) {
context.message = 'Missing required property';
return false;
}
return true;
},
undefined,
'Input must include required property'
)
];
// Register contracts with the contract manager
GlobalContractManager.registerEntityContracts(
this.constructor.name,
'processor',
inputContracts
);
}
public async execute(options: InputType): Promise<OutputType> {
// Implementation
}
}Documentation
For full API documentation, examples, and guides, visit the documentation site (...)
License
MIT
Usage example
Now let's demonstrate a real-world example of using the library:
import {
BasePipeline,
BaseProcessor,
ProcessorEvent,
PipelineEvent
} from 'pipeline-processor';
// Define processor input and output types
interface ImageProcessorInput {
imagePath: string;
outputPath: string;
scale?: number;
}
interface ImageProcessorOutput {
outputPath: string;
dimensions: { width: number; height: number };
processingTime: number;
}
// Create a specialized processor
class ImageResizeProcessor extends BaseProcessor<ImageProcessorInput, ImageProcessorOutput> {
public async execute(options: ImageProcessorInput): Promise<ImageProcessorOutput> {
const startTime = Date.now();
this.updateProgress(0, 'Starting image resize process');
// Check for abort signals throughout processing
this.checkAborted();
// Simulate image processing steps
await this.simulateProcessing(0.3, 'Loading image');
this.checkAborted();
await this.simulateProcessing(0.6, 'Resizing image');
this.checkAborted();
await this.simulateProcessing(1.0, 'Saving image');
const processingTime = (Date.now() - startTime) / 1000;
// Return the result
return {
outputPath: options.outputPath,
dimensions: { width: 800, height: 600 },
processingTime
};
}
private async simulateProcessing(progressTarget: number, message: string): Promise<void> {
// Simulate a processing step
const delay = 500; // 500ms
await new Promise(resolve => setTimeout(resolve, delay));
this.updateProgress(progressTarget, message);
}
}
// Create a pipeline for image processing
const pipeline = new BasePipeline<ImageProcessorInput, ImageProcessorOutput>();
// Register the processor
pipeline.register(new ImageResizeProcessor());
// Add pipeline event listeners
pipeline.on(PipelineEvent.PROGRESS, (data) => {
console.log(`Pipeline progress: ${Math.round(data.progress * 100)}%`);
});
pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
console.log(`Processing complete. Result:`, data.result);
});
// Initialize and execute the pipeline
const run = async () => {
await pipeline.initialize();
const result = await pipeline.execute({
imagePath: '/path/to/input.jpg',
outputPath: '/path/to/output.jpg',
scale: 0.5
});
console.log('Final result:', result);
};
run().catch(console.error);
Testing
The library includes a comprehensive test suite that covers the core functionality:
# Run all tests
npm test
# Run tests with watcher
npm run test:watch
# Run tests with coverage report
npm run test:coverageThe tests cover:
- Pipeline execution and events
- Processor execution and progress tracking
- Contract validation and resolution
- Integration between components
Validators
The library includes several built-in validators for common data types:
Basic Validators
import {
objectHasProperties,
valueInRange,
valueType
} from 'pipeline-processor';
// Check if an object has specified properties
const hasRequiredProps = objectHasProperties(['id', 'name', 'type']);
// Check if a number is within a range
const isInRange = valueInRange(0, 100);
// Check if a value is of a specified type
const isString = valueType('string');
const isNumber = valueType('number');
const isBoolean = valueType('boolean');
const isObject = valueType('object');
const isArray = valueType('array');String Validators
import {
emailFormat,
urlFormat,
stringLength
} from 'pipeline-processor';
// Check if a string is a valid email
const isEmail = emailFormat();
// Check if a string is a valid URL
const isUrl = urlFormat();
// Check if a string has a specific length range
const hasValidLength = stringLength(5, 100);Array Validators
import {
arrayLength,
arrayItemType
} from 'pipeline-processor';
// Check if an array has a specific length range
const hasValidLength = arrayLength(1, 10);
// Check if all array items are of a specific type
const hasStringItems = arrayItemType('string');
const hasNumberItems = arrayItemType('number');Number Validators
import {
isInteger,
valueInRange
} from 'pipeline-processor';
// Check if a number is an integer
const isIntegerValue = isInteger();
// Check if a number is within a range
const isInRange = valueInRange(0, 100);Using Validators with Contracts
import {
GlobalContractManager,
ContractResolveType,
emailFormat
} from 'pipeline-processor';
// Create a contract requiring a valid email
const emailContract = GlobalContractManager.createRequiredInputCT(
ContractResolveType.STRING_FORMAT,
'email',
emailFormat(),
undefined,
'Input must include a valid email'
);
// Register the contract with a processor
GlobalContractManager.registerEntityContracts(
'MyProcessor',
'processor',
[emailContract]
);Batch Processing for Large Datasets
For processing large datasets efficiently, the library provides batch processing capabilities:
import { BasePipeline, BaseProcessor } from 'pipeline-processor';
// Define a processor for handling chunks of data
class DataProcessor extends BaseProcessor<any, any> {
public async execute(options: { data: any[] }): Promise<any> {
// Process the current batch of data
const processedData = options.data.map(item => {
// Your processing logic here
return { ...item, processed: true };
});
return { ...options, data: processedData };
}
}
// Create and configure the pipeline
const pipeline = new BasePipeline();
pipeline.register(new DataProcessor());
await pipeline.initialize();
// Large dataset to process
const largeDataset = Array.from({ length: 10000 }, (_, i) => ({ id: i, value: `Item ${i}` }));
// Process in batches of 500 items
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
(batchResult, batchIndex) => {
console.log(`Processed batch ${batchIndex + 1}`);
}
);
console.log(`Processed ${result.data.length} items in total`);Batch Processing Benefits
- Memory Efficient: Processes large datasets in manageable chunks
- Progress Tracking: Provides detailed progress updates per batch
- Cancellation: Can be aborted at any batch boundary
- Customizable: Override
combineBatchResultsmethod to customize how batch results are merged
Custom Result Merging
You can extend BasePipeline to customize how batch results are combined:
class CustomPipeline extends BasePipeline<any, any> {
protected combineBatchResults(batchResults: any[]): any {
// Custom logic to combine batch results
const allData = batchResults.flatMap(result => result.data || []);
// Calculate aggregated statistics
const total = allData.reduce((sum, item) => sum + item.value, 0);
const average = total / allData.length;
return {
data: allData,
stats: {
total,
average,
count: allData.length
}
};
}
}Advanced Performance Optimizations
The library includes several advanced features for high-performance processing of large datasets:
Memory-Optimized Batch Processing
For extremely large datasets where memory usage is a concern:
// Process a large dataset with memory optimization
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
(batchResult, batchIndex) => {
console.log(`Processed batch ${batchIndex + 1}`);
},
{
memoryOptimized: true // Only keep final result, discard intermediate batch results
}
);Parallel Batch Processing
For faster processing on multi-core systems:
// Process batches in parallel using multiple CPU cores
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
undefined, // No batch callback needed
{
parallelBatches: 4, // Process 4 batches in parallel
memoryOptimized: true // Optional memory optimization
}
);Early Termination
For cases where you might need to stop processing once certain conditions are met:
// Stop processing early if a condition is met
const result = await pipeline.executeBatch(
{ data: largeDataset },
500,
undefined,
{
earlyTermination: true,
terminationCheck: (batchResult) => {
// Check if this batch result meets termination criteria
// For example, if we found what we're looking for
return batchResult.data.some(item => item.isSpecialItem);
}
}
);Streaming Data Processing
For processing data as it arrives, without loading the entire dataset into memory:
// Create an async data source
async function* generateData() {
for (let i = 0; i < 10000; i++) {
// Simulate data arriving over time
await new Promise(resolve => setTimeout(resolve, 10));
yield { id: i, value: `Item ${i}` };
}
}
// Process data in a streaming fashion
const processingStream = pipeline.createProcessingStream(generateData(), 100);
// Process results as they become available
for await (const result of processingStream) {
console.log(`Processed batch with ${result.data.length} items`);
// You can do something with each batch result immediately
await saveResults(result.data);
}This streaming approach is ideal for:
- Processing data from network sources in real-time
- Handling infinitely large datasets
- Minimizing memory consumption
- Displaying incremental results as they become available
Parallel Processing
The library supports parallel execution of independent processors, which can significantly improve performance for complex processing pipelines:
import { BasePipeline, BaseProcessor } from 'pipeline-processor';
// Create processors
class PreprocessorA extends BaseProcessor<any, any> {
// implementation
}
class PreprocessorB extends BaseProcessor<any, any> {
// implementation
}
class Combiner extends BaseProcessor<any, any> {
// implementation that uses results from both preprocessors
}
// Create pipeline with parallel processing
const pipeline = new BasePipeline({ parallelProcessingEnabled: true });
// First two processors can run in parallel (no dependencies)
pipeline.register(new PreprocessorA()); // index 0
pipeline.register(new PreprocessorB()); // index 1
// Combiner depends on both preprocessors
pipeline.register(new Combiner(), {}, [0, 1]); // depends on indices 0 and 1
// Alternative way to enable parallel processing
pipeline.setParallelProcessing(true);
// Execute the pipeline
await pipeline.initialize();
const result = await pipeline.execute(inputData);How Parallel Processing Works
- By default, processors execute sequentially in the order they are registered
- When parallel processing is enabled, the pipeline analyzes processor dependencies
- Processors with the same dependencies will execute in parallel when possible
- Dependencies can be specified when registering a processor
- If no dependencies are specified, a processor depends on the previous processor
- Circular dependencies are detected and will cause an error
When to Use Parallel Processing
Parallel processing is most beneficial when:
- You have multiple independent preprocessing steps
- Some processors are CPU-intensive but don't depend on each other
- Your pipeline has natural points of parallelism (e.g., data enrichment from multiple sources)
For simple linear pipelines, sequential processing may be more appropriate.
Nested Pipelines
The library supports nested pipelines, allowing you to use a pipeline as a processor within another pipeline. This enables building complex processing workflows with reusable components:
import {
BasePipeline,
BaseProcessor,
PipelineProcessor
} from 'pipeline-processor';
// Create an inner pipeline
const innerPipeline = new BasePipeline<InnerInputType, InnerOutputType>();
innerPipeline.register(new ProcessorA());
innerPipeline.register(new ProcessorB());
await innerPipeline.initialize();
// Wrap the inner pipeline as a processor
const pipelineProcessor = new PipelineProcessor(innerPipeline);
// Create an outer pipeline
const outerPipeline = new BasePipeline<OuterInputType, OuterOutputType>();
outerPipeline.register(new PreProcessor());
outerPipeline.register(pipelineProcessor); // Use the pipeline as a processor
outerPipeline.register(new PostProcessor());
await outerPipeline.initialize();
// Execute the outer pipeline, which will also execute the inner pipeline
const result = await outerPipeline.execute(inputData);Benefits of Nested Pipelines
Nested pipelines offer several advantages:
- Reusability: Create reusable pipeline modules for common processing tasks
- Encapsulation: Hide the complexity of multi-step processes behind a single processor
- Maintainability: Break down complex workflows into smaller, manageable units
- Composability: Compose complex workflows from simpler building blocks
Event Propagation in Nested Pipelines
Events from inner pipelines are automatically forwarded to the wrapping PipelineProcessor:
- Progress events are forwarded as processor progress updates
- Error events are forwarded as processor errors
- Completion events are forwarded as processor completion
- Abort signals propagate to nested pipelines
Enhanced Type Inference
The library has been enhanced with improved TypeScript type inference to provide better type safety across processor chains.
Type-Safe Processors and Pipelines
import {
TypedBaseProcessor,
TypedBasePipeline,
TypedPipelineBuilder,
ProcessorEvent,
PipelineEvent
} from 'pipeline-processor';
// Define your data types
interface InputData {
id: string;
data: string[];
}
interface EnrichedData extends InputData {
metadata: Record<string, any>;
}
interface OutputData {
result: string;
processedAt: Date;
}
// Create strongly-typed processors
class EnrichmentProcessor extends TypedBaseProcessor<InputData, EnrichedData> {
async execute(input: InputData): Promise<EnrichedData> {
// Implementation...
return {
...input,
metadata: { processedBy: 'EnrichmentProcessor' }
};
}
}
class TransformProcessor extends TypedBaseProcessor<EnrichedData, OutputData> {
async execute(input: EnrichedData): Promise<OutputData> {
// Implementation...
return {
result: input.data.join(','),
processedAt: new Date()
};
}
}
// Method 1: Create a pipeline with type checking across processor chain
const pipeline = new TypedBasePipeline<InputData, OutputData>();
pipeline.register(new EnrichmentProcessor());
pipeline.register(new TransformProcessor());
// Type-safe event handling
pipeline.on(PipelineEvent.PIPELINE_COMPLETE, (data) => {
// TypeScript knows data.result is OutputData
console.log(`Completed at: ${data.result.processedAt}`);
});
// Method 2: Use the fluent builder API with type inference
const pipeline2 = TypedPipelineBuilder
.create<InputData>()
.withFirstProcessor(new EnrichmentProcessor())
.pipe(new TransformProcessor())
.build();
// Execute with properly typed input and output
const result = await pipeline.execute({
id: '123',
data: ['a', 'b', 'c']
});
// Result is properly typed as OutputData
console.log(result.result); // Type-safe accessBenefits of Enhanced Type Inference
- Catch Errors Earlier: TypeScript catches type incompatibilities between processors at compile time
- Self-Documenting Code: Types clearly show the data flow through the pipeline
- IDE Support: Get proper code completion and hover information
- Refactoring Safety: When changing processor interfaces, TypeScript identifies all impacted areas
- Type-Safe Events: Event handlers receive properly typed event data
Type Utilities
The library provides several utility types to help with type inference:
import {
ProcessorInput,
ProcessorOutput,
CanChain,
ChainOutput,
EnsureCompatibleChain
} from 'pipeline-processor';
// Get the input type of a processor
type InputType = ProcessorInput<MyProcessor>;
// Get the output type of a processor
type OutputType = ProcessorOutput<MyProcessor>;
// Check if processors can be chained
type CanBeChained = CanChain<ProcessorA, ProcessorB>; // true or false
// Get the final output type from a chain of processors
type FinalOutputType = ChainOutput<[ProcessorA, ProcessorB, ProcessorC]>;
// Check if an entire chain is compatible
type IsChainValid = EnsureCompatibleChain<[ProcessorA, ProcessorB, ProcessorC]>;Schema-Based Validation
The library now supports schema-based validation for complex data structures using JSON Schema and composable validators:
import {
jsonSchema,
complexObject,
allOf,
anyOf,
conditional,
GlobalContractManager,
ContractResolveType
} from 'pipeline-processor';
import { JSONSchemaType } from 'ajv';
// Define a schema for your data type
interface User {
id: string;
name: string;
email: string;
age: number;
}
const userSchema: JSONSchemaType<User> = {
type: 'object',
properties: {
id: { type: 'string' },
name: { type: 'string', minLength: 2 },
email: { type: 'string', format: 'email' },
age: { type: 'number', minimum: 18 }
},
required: ['id', 'name', 'email', 'age'],
additionalProperties: false
};
// Use in a processor
class UserProcessor extends BaseProcessor<any, any> {
constructor() {
super();
// Create a contract using JSON Schema
const schemaContract = GlobalContractManager.createRequiredInputCT(
ContractResolveType.JSON_SCHEMA,
'user',
jsonSchema(userSchema),
undefined,
'Input must include a valid user object'
);
// Create complex validation with nested property validation
const complexValidation = GlobalContractManager.createRequiredInputCT(
ContractResolveType.COMPLEX_OBJECT,
'data',
complexObject({
'user.name': stringLength(2, 50),
'user.email': emailFormat(),
'metadata.tags': arrayLength(1, 10)
}),
undefined,
'Complex validation for user data'
);
// Use conditional validation
const conditionalValidation = GlobalContractManager.createRequiredInputCT(
ContractResolveType.CONDITIONAL,
'conditionalData',
conditional(
value => value.type === 'premium',
// For premium users
complexObject({
'subscription.level': valueType('number'),
'subscription.expiresAt': (value) => value instanceof Date
}),
// For regular users
valueType('object')
),
undefined,
'Conditional validation based on user type'
);
// Register all contracts
GlobalContractManager.registerEntityContracts(
this.constructor.name,
'processor',
[schemaContract, complexValidation, conditionalValidation]
);
}
// Implementation...
}Validator Composition
You can compose validators to create complex validation logic:
allOf([validator1, validator2, ...]): All validators must passanyOf([validator1, validator2, ...]): At least one validator must passconditional(predicate, thenValidator, elseValidator?): Apply different validators based on a conditioncomplexObject({ 'prop.path': validator }): Apply different validators to different object properties
JSON Schema Integration
Full JSON Schema validation is supported through Ajv:
const validator = jsonSchema({
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number', minimum: 0 }
},
required: ['name', 'age']
});This enables:
- Complex nested validation with a declarative approach
- Type constraints, string formats, patterns, and ranges
- Custom formats and keywords
- Detailed validation error messages
Advanced Error Recovery
The library includes advanced error recovery mechanisms to build resilient processing pipelines:
Retry Mechanisms
Automatically retry failed operations with configurable attempts and backoff:
import {
BaseProcessor,
withRetry,
createNetworkRetryConfig
} from 'pipeline-processor';
// Create a processor that might fail
class NetworkProcessor extends BaseProcessor<any, any> {
// implementation
}
// Add retry capabilities with exponential backoff
const processor = withRetry(
new NetworkProcessor(),
createNetworkRetryConfig(3) // Retry up to 3 times with exponential backoff
);
// Standard retry with custom options
const customRetryProcessor = withRetry(
new NetworkProcessor(),
{
maxAttempts: 5,
delayMs: 1000,
useExponentialBackoff: true,
backoffMultiplier: 2,
maxBackoffDelayMs: 30000,
isRetryable: (error) => {
// Custom logic to determine if an error is retryable
return error.message.includes('timeout');
}
}
);Fallback Processors
Specify fallback data sources when primary processors fail:
import {
BaseProcessor,
withFallback
} from 'pipeline-processor';
// Primary and fallback processors
class PrimaryDataSource extends BaseProcessor<any, any> {
// implementation that might fail
}
class CachedDataSource extends BaseProcessor<any, any> {
// implementation providing fallback data
}
// Create processor with fallback capabilities
const processor = withFallback(
new PrimaryDataSource(),
new CachedDataSource(),
(error) => {
// Optional: Only use fallback for specific errors
return error.message.includes('network');
}
);Circuit Breakers
Prevent cascading failures by automatically "opening the circuit" when error thresholds are reached:
import {
BaseProcessor,
withCircuitBreaker,
createCircuitBreakerConfig
} from 'pipeline-processor';
// Create processor with circuit breaker
const processor = withCircuitBreaker(
new UnreliableServiceProcessor(),
createCircuitBreakerConfig({
failureThreshold: 5, // Open after 5 failures
failureWindowMs: 60000, // Within a 1-minute window
resetTimeoutMs: 30000 // Try again after 30 seconds
})
);
// Listen for circuit state changes
processor.on('circuitStateChange', (data) => {
console.log(`Circuit changed from ${data.from} to ${data.to}`);
});Partial Completion for Batch Operations
Allow batch operations to succeed partially when some items fail:
import {
BaseProcessor,
withPartialCompletion
} from 'pipeline-processor';
// Create a processor for batch operations
class BatchProcessor extends BaseProcessor<{data: any[]}, {data: any[]}> {
// implementation that processes multiple items
}
// Add partial completion capabilities
const processor = withPartialCompletion(
new BatchProcessor(),
{
allowPartialCompletion: true,
minCompletionThreshold: 0.7, // Require at least 70% success
isItemSuccessful: (item, error) => {
// Custom logic to determine if an item with error should be considered successful
return error?.message.includes('non-critical');
},
combinePartialResults: (results) => {
// Custom logic to combine partial results
return {
success: true,
data: results,
partialSuccess: true
};
}
}
);Combining Recovery Strategies
Recovery mechanisms can be combined for sophisticated error handling:
import {
withRetry,
withFallback,
withCircuitBreaker
} from 'pipeline-processor';
// Start with base processor
const baseProcessor = new DataProcessor();
// Add retry capabilities
const withRetries = withRetry(baseProcessor, {
maxAttempts: 3,
delayMs: 1000
});
// Add circuit breaker
const withCircuit = withCircuitBreaker(withRetries, {
failureThreshold: 5,
resetTimeoutMs: 30000
});
// Finally add fallback
const resilientProcessor = withFallback(
withCircuit,
new FallbackProcessor()
);
// The result is a processor with all three recovery mechanisms
pipeline.register(resilientProcessor);These error recovery mechanisms help build robust pipelines that can handle real-world failure scenarios gracefully.
Roadmap
- [x] ISS_001: Fixing event recursion issues in abort functionality when an event listener calls
abort()while processing pipeline events - [x] TASK_001: Performance optimization for large datasets
- [x] TASK_002: Improved TypeScript type inference across processor chains
- [x] TASK_003: Expanding contract validation testing for complex validation scenarios and schema-based validation
- [ ] TASK_004: Enhancing documentation and usage examples
- [ ] TASK_005: Contract visualization and documentation generation tooling
- [x] FEAT_001: Parallel processing support for independent processors
- [x] FEAT_002: Advanced error recovery mechanisms (retries, fallbacks, partial completion, circuit breakers)
- [x] FEAT_003: Nested pipeline support (using pipelines as processors in larger pipelines)
- [ ] FEAT_004: Conditional branching to support different processing paths based on data
- [ ] FEAT_005: Comprehensive caching layer with content-addressable caching and tiered storage options
- [ ] FEAT_006: Pipeline serialization/deserialization with checkpointing and cross-environment execution
- [ ] FEAT_007: Flexible custom batch result merging strategies
- [ ] FEAT_008: Enhanced monitoring with detailed metrics, telemetry, and execution graph tracing
- [x] FEAT_009: Stream-based processing for continuous data flows
- [ ] FEAT_010: Web Worker integration for browser environments
- [ ] FEAT_011: Pipeline visualization tools for debugging complex workflows
- [ ] FEAT_012: Enhanced data transformation framework with standardized mapping patterns
- [ ] FEAT_013: Standardized plugin architecture with processor discovery and registration
- [ ] FEAT_014: Smart resource management with processor pooling and adaptive concurrency
- [ ] FEAT_015: Format conversion processors for common data formats (JSON, XML, CSV, binary)
- [ ] FEAT_016: Dependency injection system for processor configuration and composition
- [ ] FEAT_017: Partial success handling with graceful degradation policies
- [ ] FEAT_018: Resource quotas and fairness mechanisms for multi-tenant environments
- [ ] FEAT_019: Runtime type checking with detailed error reporting
- [ ] FEAT_020: Event subscription system for external monitoring and integration
