fib-flow
v4.1.0
Published
A robust workflow management system for fibjs with task orchestration, state management, and distributed execution capabilities
Maintainers
Readme
fib-flow
A powerful workflow management system built on fibjs for orchestrating complex task dependencies and distributed task execution.
Key Features
- Workflow Management: Parent-child task relationships, automatic state propagation
- Task Types: Async tasks and cron jobs with priorities and delays
- State Management: Comprehensive task lifecycle and state transitions
- Hot Reload: Update or remove handlers at runtime without restarting workers
- Reliability: Automatic retries, timeout protection, transaction safety
- Execution Audit: Persisted task events, attempts, workflow timelines, and handler checkpoints
- Database Support: SQLite/MySQL/PostgreSQL with flexible connection options
- Resource Management: Load balancing and specialized worker support
Installation
fibjs --install fib-flowQuick Start
const { TaskManager } = require('fib-flow');
// Initialize task manager with an explicit backend
const taskManager = new TaskManager({
dbConnection: 'sqlite::memory:'
});
taskManager.db.setup();
// Basic task handler
taskManager.use('sendEmail', async (task) => {
const { to, subject, body } = task.payload;
task.audit('payload_validated', {
message: 'Email payload validated',
metadata: { recipient: to }
});
task.progress('Sending email', {
stage_name: 'delivery',
progress_percent: 80
});
await sendEmail(to, subject, body);
return { sent: true };
});
// Handler with configuration
taskManager.use('processImage', {
handler: async (task) => {
const { path } = task.payload;
await processImage(path);
return { processed: true };
},
timeout: 120, // 2 minutes timeout
max_retries: 2, // Maximum 2 retries
retry_interval: 30 // 30 seconds retry interval
});
// Start processing
taskManager.start();
// Add a task
taskManager.async('sendEmail', {
to: '[email protected]',
subject: 'Hello',
body: 'World'
});
// Query task audit history with pagination metadata
const taskAudit = taskManager.getTaskAudit(1, {
events: { limit: 20, order: 'asc' },
attempts: { limit: 10, order: 'asc' }
});
// Query workflow-level aggregate audit summary
const workflowSummary = taskManager.getWorkflowAuditSummary(1);
// Run explicit retention cleanup when needed
taskManager.runRetention();
console.log(workflowSummary.stage_timings);
console.log(workflowSummary.critical_path);
// Hot update a handler for future claims
taskManager.use('sendEmail', async (task) => {
const { to, subject, body } = task.payload;
await sendEmailV2(to, subject, body);
return { sent: true, provider: 'v2' };
});
// Remove a handler when a flow is unloaded
taskManager.unuse('processImage');Hot reload semantics:
- A task attempt that is already executing keeps the handler version captured when it started.
- A paused or suspended task that resumes later uses the latest registered handler.
- Child tasks created by a running parent resolve against the live handler registry when they are created.
Documentation
Core Concepts
Configuration & Setup
Reference
- API Documentation
- Retention Policy
- Execution Audit Events
- Cron Syntax Guide
- Usage Examples
- Common Use Cases
Example Use Cases
- Background Processing: File processing, report generation
- Scheduled Tasks: Data synchronization, backups
- Complex Workflows: Multi-step data pipelines
- Distributed Systems: Task coordination across services
For detailed examples and implementation guides, see Usage Examples.
License
MIT License
