media-pipeline
v1.6.1
Published
A storage-agnostic file processing pipeline for Node.js
Downloads
522
Maintainers
Readme
Media Pipeline
Version: 1.5.11 Runtime: Node.js Purpose: Storage-agnostic, extensible file validation, transformation, and storage pipeline.
Overview
Media Pipeline is a modular file processing system where files flow through a structured pipeline:
Input → Validators → Processors → Storage → OutputEach stage is fully customizable and extensible via plugins.
Installation
npm install media-pipelineBasic Usage
import {
createPipeline,
localStorage,
maxSize,
allowedMimeTypes
} from 'media-pipeline';
const pipeline = createPipeline({
validators: [
maxSize(5 * 1024 * 1024),
allowedMimeTypes(['image/jpeg'])
],
storage: localStorage('./uploads')
});
const file = {
buffer: Buffer.from('data'),
filename: 'image.jpg',
mimeType: 'image/jpeg',
size: 1024
};
const result = await pipeline.process(file);
console.log(result);Core Concepts
Pipeline Flow
- Input: Raw file object
- Validators: Ensure file meets requirements
- Processors: Transform file
- Storage: Persist file
- Output: Result with metadata and trace
API Reference
createPipeline(config)
Creates a new pipeline instance.
Config
type PipelineConfig = {
validators?: Validator[];
processors?: Processor[];
storage: Storage;
hooks?: PipelineHooks;
};Built-in Utilities
localStorage(basePath)
Stores files locally.
maxSize(limit)
Validates file size.
allowedMimeTypes(types)
Validates MIME type.
identityProcessor()
No-op processor.
Types
PipelineFile
{
buffer: Buffer;
filename: string;
mimeType: string;
size: number;
}PipelineResult
{
url: string;
path: string;
size: number;
metadata: Record<string, any>;
meta: {
plugins: PluginMeta[];
trace: TraceEvent[];
};
}PipelineContext
{
file: PipelineFile;
metadata: Record<string, any>;
meta: PipelineMeta;
}Validators
Validators check conditions and throw errors if invalid.
type Validator = (ctx: PipelineContext) => void | Promise<void>;Example
function imageOnlyValidator(ctx) {
if (!ctx.file.mimeType.startsWith('image/')) {
throw new Error('Invalid file type');
}
}Processors
Processors transform files.
type Processor = (ctx: PipelineContext) => PipelineContext | Promise<PipelineContext>;Example
function renameProcessor(ctx) {
return {
...ctx,
file: {
...ctx.file,
filename: `processed-${ctx.file.filename}`
}
};
}Storage
Storage persists files.
type Storage = {
save(file: PipelineFile): Promise<PipelineResult>;
};Hooks
Hooks allow lifecycle customization.
type PipelineHooks = {
onStart?: (ctx) => void | Promise<void>;
afterValidate?: (ctx) => void | Promise<void>;
afterProcess?: (ctx) => void | Promise<void>;
onError?: (error, ctx) => void | Promise<void>;
onFinish?: (result, ctx) => void | Promise<void>;
};Plugin System
Plugins extend functionality.
Plugin Types
Object Plugin
{
name: string;
version?: string;
setup(builder: PipelineBuilder): void;
}Function Plugin
(builder: PipelineBuilder) => void;Builder API
builder.addValidator(fn);
builder.addProcessor(fn);
builder.mergeHooks(hooks);
builder.setStorage(storage);Example Plugin (Image Resize)
const sharpPlugin = {
name: 'sharp-resize',
setup(builder) {
builder.addProcessor(async (ctx) => {
if (ctx.file.mimeType.startsWith('image/')) {
const sharp = require('sharp');
const buffer = await sharp(ctx.file.buffer)
.resize(800, 800)
.toBuffer();
return {
...ctx,
file: {
...ctx.file,
buffer,
size: buffer.length
}
};
}
return ctx;
});
}
};Execution Flow
onStart- Validators
afterValidate- Processors
afterProcess- Storage
onFinish
On error:
onErroris called- Error is re-thrown
Tracing
Each step is recorded.
{
plugin: string;
stage: 'validator' | 'processor' | 'hook' | 'storage';
message: string;
duration?: number;
timestamp: number;
}Errors
Classes
PipelineErrorValidationErrorProcessorErrorStorageError
Example Handling
try {
await pipeline.process(file);
} catch (err) {
if (err instanceof ValidationError) {
console.log(err.details);
}
}Best Practices
- Use named functions for better tracing
- Always handle errors
- Keep processors pure
- Use plugins for reusable logic
Limitations
- No streaming support
- Sequential execution only
- Trace accumulation across runs
- No automatic error wrapping
Advanced Usage
Using Plugins
pipeline.use(sharpPlugin);Custom Storage Example
const memoryStorage = {
async save(file) {
return {
url: 'memory://file',
path: file.filename,
size: file.size,
metadata: {}
};
}
};Design Principles
- Separation of concerns
- Composability
- Extensibility
- Minimal core
Future Improvements
- Streaming support
- Parallel processing
- Queue system
- Plugin marketplace
License
MIT
