@zinbrox/taskgroup
v1.0.0
Published
A Python-style async task scheduler with priorities, concurrency control, retries, timeouts, and cancellation
Maintainers
Readme
@zinbrox/taskgroup
A robust Python-style async task scheduler for TypeScript/JavaScript with advanced concurrency control, priority queuing, and fault tolerance features.
Perfect for batch processing, API rate limiting, resource-intensive operations, and building resilient async workflows.
Features
- Concurrency Control - Limit parallel task execution
- Priority Queuing - Higher priority tasks run first (FIFO tie-breaking)
- Smart Retries - Exponential backoff with jitter
- Timeouts & Cancellation - Per-task timeouts with AbortSignal support
- Rate Limiting - Prevent API throttling
- Circuit Breaker - Fail fast when services are down
- Streaming Results - Process results as they complete
- Rich Instrumentation - Built-in logging and lifecycle hooks
Quick Start
npm install @zinbrox/taskgroupimport { TaskGroup } from '@zinbrox/taskgroup';
// Create a task group
const group = new TaskGroup('api-calls');
// Add tasks with different priorities
group.add(async ({ signal }) => {
const response = await fetch('/api/critical-data', { signal });
return response.json();
}, { name: 'critical-fetch', priority: 10 });
group.add(async ({ signal }) => {
const response = await fetch('/api/background-data', { signal });
return response.json();
}, { name: 'background-fetch', priority: 1 });
// Execute with concurrency control
const results = await group.run({
concurrency: 3,
rateLimit: { maxPerInterval: 10, intervalMs: 1000 }
});
console.log(results);
// [
// {
// id: 1,
// name: "critical-fetch",
// status: "fulfilled",
// value: { ... },
// priority: 10,
// attempts: 1
// },
// { ... }
// ]API Reference
TaskGroup Constructor
const group = new TaskGroup(name?: string)Create a new task group instance with an optional name for debugging.
Core Methods
add(fn, options?): number
Add a task to the group. Returns a unique task ID.
const taskId = group.add(async ({ signal, name, groupName }) => {
// Your async work here
return result;
}, {
name: 'my-task', // Optional task name
priority: 5, // Higher = runs earlier (default: 0)
critical: false, // If true, prevents failFast (default: false)
timeoutMs: 5000, // Task timeout (default: none)
retry: { // Retry configuration
retries: 3,
factor: 2,
minDelayMs: 100,
maxDelayMs: 5000
}
});run(options?): Promise<TaskResult[]>
Execute all tasks and return results ordered by task ID.
runIterator(options?): AsyncGenerator<TaskResult>
Streaming API that yields results as tasks complete.
Task Control
pause()- Pause scheduling new tasksresume()- Resume schedulingcancel(id: number): boolean- Cancel a specific taskclose()- Prevent adding new tasks
Run Options
interface RunOptions {
concurrency?: number; // Max parallel tasks (default: 1)
failFast?: boolean; // Stop on first error (default: false)
signal?: AbortSignal; // External cancellation signal
// Rate limiting
rateLimit?: {
maxPerInterval?: number; // Max tasks per interval
intervalMs?: number; // Interval duration
};
// Circuit breaker
circuitBreaker?: {
errorThreshold?: number; // Failures before opening (default: 5)
resetTimeoutMs?: number; // Reset attempt delay (default: 60000)
};
// Queue management
queue?: {
queueTimeoutMs?: number; // Drop tasks waiting too long
};
// Lifecycle hooks
onTaskStart?: (task: TaskInfo) => void;
onTaskEnd?: (result: TaskResult) => void;
logger?: (event: LogEvent) => void;
}Examples
Batch API Calls with Rate Limiting
const group = new TaskGroup('api-batch');
// Add multiple API calls
const userIds = [1, 2, 3, 4, 5];
userIds.forEach(id => {
group.add(async ({ signal }) => {
const response = await fetch(`/api/users/${id}`, { signal });
return response.json();
}, { name: `fetch-user-${id}` });
});
// Execute with rate limiting
const users = await group.run({
concurrency: 2,
rateLimit: { maxPerInterval: 10, intervalMs: 1000 }
});Resilient Tasks with Retries
const group = new TaskGroup('resilient-tasks');
group.add(async ({ signal }) => {
// Potentially flaky operation
const result = await unreliableApiCall(signal);
return result;
}, {
name: 'flaky-api',
retry: {
retries: 3,
factor: 2,
minDelayMs: 100,
maxDelayMs: 5000
},
timeoutMs: 10000
});
await group.run();Streaming Results for Real-time Processing
const group = new TaskGroup('stream-processor');
// Add many tasks
for (let i = 0; i < 100; i++) {
group.add(async () => processItem(i), { name: `item-${i}` });
}
// Process results as they complete
for await (const result of group.runIterator({ concurrency: 5 })) {
if (result.status === 'fulfilled') {
console.log(`✅ ${result.name}: ${result.value}`);
} else {
console.error(`❌ ${result.name}: ${result.reason}`);
}
}Priority-based Task Execution
const group = new TaskGroup('priority-tasks');
// Critical tasks (run first)
group.add(criticalTask, { name: 'critical', priority: 10, critical: true });
// Normal tasks
group.add(normalTask, { name: 'normal', priority: 5 });
// Background tasks (run last)
group.add(backgroundTask, { name: 'background', priority: 1 });
await group.run({ concurrency: 2, failFast: true });Circuit Breaker for External Services
const group = new TaskGroup('external-api');
// Add tasks that call external service
for (let i = 0; i < 20; i++) {
group.add(async ({ signal }) => {
return await externalApiCall(signal);
}, { name: `api-call-${i}` });
}
// Use circuit breaker to fail fast when service is down
await group.run({
concurrency: 3,
circuitBreaker: {
errorThreshold: 5, // Open after 5 failures
resetTimeoutMs: 30000 // Try again after 30s
}
});Task Cancellation
const group = new TaskGroup();
const taskId = group.add(async ({ signal }) => {
// Long-running operation that respects cancellation
for (let i = 0; i < 1000; i++) {
if (signal.aborted) throw new Error('Cancelled');
await processChunk(i);
}
}, { name: 'long-task' });
// Cancel after 5 seconds
setTimeout(() => {
group.cancel(taskId);
}, 5000);
await group.run();Development
# Install dependencies
npm install
# Build the project
npm run build
# Run tests
npm test
# Run tests in watch mode
npm run test:watch
# Lint code
npm run lintContributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
For bugs and feature requests, please open an issue.
License
MIT © zinbrox
