@xcons/emitter
v2.0.5
Published
XCons Emitter - Promise Queue and Utilities Package with Concurrency Control
Maintainers
Readme
@xcons/emitter
Promise queue and utilities package with concurrency control
🚀 Quick Start
Basic HTML Usage
<!DOCTYPE html>
<html>
<head>
<title>My App</title>
<!-- Load via CDN -->
<script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>
</head>
<body>
<script>
const { XQueue } = XConsEmitter;
// Create queue with 3 concurrent tasks
const queue = new XQueue({ concurrency: 3 });
// Add task to queue
queue.add(async () => {
const response = await fetch('https://api.example.com');
return response.json();
}).then(data => {
console.log('Data received:', data);
});
</script>
</body>
</html>NPM/Node.js Usage
npm install @xcons/emitterimport { XQueue, xTimeout, XCancelable } from '@xcons/emitter';
// Queue with concurrency limit
const queue = new XQueue({ concurrency: 5 });
// Add timeout to promise
const result = await xTimeout(fetchData(), { milliseconds: 5000 });
// Cancelable promise
const cancelable = new XCancelable((resolve, reject, onCancel) => {
const timer = setTimeout(() => resolve('Done!'), 1000);
onCancel(() => clearTimeout(timer));
});✨ Features
- 🔄 Promise Queue - Async operation management with concurrency control
- 🎯 Priority Queue - Execute tasks based on priority levels
- 📋 Sequential Queue - FIFO queue for guaranteed sequential execution
- ❌ Cancelable Promises - Cancel running promises with built-in error handling
- ⏱️ Timeout Utilities - Add timeout functionality to any promise
- 🎨 TypeScript Support - Full TypeScript type definitions included
- 🌐 Universal Compatibility - Works in browser and Node.js
- 📊 Event System - Listen to queue events (active, idle, empty, etc.)
📦 Installation
Via CDN (Browser)
<!-- Latest version -->
<script src="https://unpkg.com/@xcons/emitter@latest/core.js"></script>
<!-- Specific version -->
<script src="https://unpkg.com/@xcons/[email protected]/core.js"></script>
<!-- ES Module -->
<script type="module">
import { XQueue } from 'https://unpkg.com/@xcons/emitter@latest/core.js';
</script>Via NPM
# npm
npm install @xcons/emitter
# yarn
yarn add @xcons/emitter
# pnpm
pnpm add @xcons/emitter🎯 Basic Usage
XQueue - Promise Queue
import { XQueue } from '@xcons/emitter';
// Basic queue
const queue = new XQueue({
concurrency: 5, // Max concurrent tasks
autoStart: true, // Auto start
timeout: 30000, // Per-task timeout (ms)
intervalCap: 10, // Rate limiting
interval: 1000 // Rate limit interval
});
// Add task
await queue.add(async () => {
const response = await fetch('/api/data');
return response.json();
});
// Add multiple tasks
const results = await queue.addAll([
() => fetch('/api/user'),
() => fetch('/api/posts'),
() => fetch('/api/comments')
]);Priority Tasks
import { XQueue } from '@xcons/emitter';
const queue = new XQueue({ concurrency: 3 });
// High priority task
await queue.add(async () => {
return await fetchCriticalData();
}, { priority: 10 });
// Normal priority task
await queue.add(async () => {
return await fetchRegularData();
}, { priority: 5 });
// Low priority task
await queue.add(async () => {
return await fetchBackgroundData();
}, { priority: 1 });
// Update priority
queue.setPriority('task-id', 15);Queue Events
const queue = new XQueue({ concurrency: 3 });
// When task is added
queue.on('add', () => {
console.log('New task added');
});
// When task starts
queue.on('active', () => {
console.log('Task running');
});
// When task completes
queue.on('completed', (result) => {
console.log('Task completed:', result);
});
// On error
queue.on('error', (error) => {
console.error('Task error:', error);
});
// When queue becomes empty
queue.on('empty', () => {
console.log('Queue is empty');
});
// When all tasks are done
queue.on('idle', () => {
console.log('Queue is idle');
});
// Queue status
console.log(queue.stats);
// { size: 0, pending: 0, isPaused: false, concurrency: 3 }Queue Control
const queue = new XQueue({ concurrency: 5 });
// Pause queue
queue.pause();
// Start/resume queue
queue.start();
// Clear queue
queue.clear();
// Wait until queue is empty
await queue.onEmpty();
// Wait until size is less than limit
await queue.onSizeLessThan(5);
// Wait until all tasks are done
await queue.onIdle();
// Check queue status
if (queue.isPaused) {
console.log('Queue is paused');
}
console.log(`Pending tasks: ${queue.size}`);
console.log(`Running tasks: ${queue.pending}`);XSimpleSequentialQueue - Sequential Queue
import { XSimpleSequentialQueue } from '@xcons/emitter';
const queue = XSimpleSequentialQueue.create();
// Tasks run sequentially
await queue.add(() => task1());
await queue.add(() => task2());
await queue.add(() => task3());
// Add multiple tasks
const results = await queue.addAll([
() => processStep1(),
() => processStep2(),
() => processStep3()
]);
// Wait until queue is empty
await queue.onIdle();
// Queue statistics
console.log(queue.stats);
// {
// size: 0,
// isProcessing: false,
// totalProcessed: 15,
// totalFailed: 2
// }
// Clear queue
queue.clear();xTimeout - Promise Timeout
import { xTimeout, TimeoutError } from '@xcons/emitter';
// Basic timeout
try {
const result = await xTimeout(fetchData(), {
milliseconds: 5000,
message: 'Operation timed out'
});
} catch (error) {
if (error instanceof TimeoutError) {
console.error('Timeout!');
}
}
// With fallback value
const result = await xTimeout(fetchData(), {
milliseconds: 3000,
fallback: () => ({ default: 'data' })
});
// With AbortSignal
const controller = new AbortController();
const promise = xTimeout(fetchData(), {
milliseconds: 5000,
signal: controller.signal
});
// Cancel
controller.abort();
// Clear timeout
const timedPromise = xTimeout(longRunning(), {
milliseconds: 10000
});
timedPromise.clear(); // Cancel timeoutXCancelable - Cancelable Promise
import { XCancelable, CancelError } from '@xcons/emitter';
// Create cancelable promise
const cancelable = new XCancelable((resolve, reject, onCancel) => {
const timer = setTimeout(() => {
resolve('Operation completed!');
}, 5000);
// Cleanup logic
onCancel(() => {
clearTimeout(timer);
console.log('Operation canceled');
});
});
// Cancel
try {
setTimeout(() => cancelable.cancel('User canceled'), 2000);
const result = await cancelable;
} catch (error) {
if (error instanceof CancelError) {
console.log('Canceled:', error.message);
}
}
// Use as function
const fetchWithCancel = XCancelable.fn(async (url, onCancel) => {
const controller = new AbortController();
onCancel(() => controller.abort());
const response = await fetch(url, {
signal: controller.signal
});
return response.json();
});
const promise = fetchWithCancel('https://api.example.com');
setTimeout(() => promise.cancel(), 3000); // Cancel after 3 seconds💼 Real-world Examples
Batch Data Processing
import { XQueue } from '@xcons/emitter';
async function processBatchData(items) {
const queue = new XQueue({
concurrency: 5,
timeout: 30000
});
// Progress tracking
let completed = 0;
queue.on('completed', () => {
completed++;
console.log(`Progress: ${completed}/${items.length}`);
});
// Error handling
queue.on('error', (error) => {
console.error('Processing error:', error);
});
// Add all items to queue
const results = await queue.addAll(
items.map(item => async () => {
const response = await fetch('/api/process', {
method: 'POST',
body: JSON.stringify(item)
});
return response.json();
})
);
return results;
}Rate Limiting
import { XQueue } from '@xcons/emitter';
// Limit to 10 requests per second
const apiQueue = new XQueue({
concurrency: 3,
intervalCap: 10, // 10 requests per second
interval: 1000 // 1 second
});
// API calls are automatically rate limited
for (let i = 0; i < 100; i++) {
apiQueue.add(async () => {
return await fetch(`/api/items/${i}`);
});
}
// Wait for all requests to complete
await apiQueue.onIdle();Priority Task System
import { XQueue } from '@xcons/emitter';
const taskQueue = new XQueue({ concurrency: 2 });
// Critical task
await taskQueue.add(async () => {
await processPayment();
}, { priority: 100 });
// Normal task
await taskQueue.add(async () => {
await sendEmail();
}, { priority: 50 });
// Background task
await taskQueue.add(async () => {
await cleanupCache();
}, { priority: 1 });
// Tasks run in priority orderParallel Data Fetching
import { XQueue, xTimeout } from '@xcons/emitter';
async function fetchAllData() {
const queue = new XQueue({ concurrency: 10 });
const endpoints = [
'/api/users',
'/api/posts',
'/api/comments',
'/api/analytics'
];
const results = await queue.addAll(
endpoints.map(url => async () => {
// 5 second timeout per request
return await xTimeout(
fetch(url).then(r => r.json()),
{ milliseconds: 5000 }
);
})
);
return {
users: results[0],
posts: results[1],
comments: results[2],
analytics: results[3]
};
}File Upload Queue
import { XQueue } from '@xcons/emitter';
class FileUploader {
constructor() {
this.queue = new XQueue({
concurrency: 3,
timeout: 60000 // 1 minute timeout
});
this.setupEventHandlers();
}
setupEventHandlers() {
this.queue.on('active', () => {
console.log('Uploading file...');
});
this.queue.on('completed', (result) => {
console.log('File uploaded:', result.filename);
});
this.queue.on('error', (error) => {
console.error('Upload error:', error);
});
}
async uploadFiles(files) {
return await this.queue.addAll(
Array.from(files).map(file => async () => {
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/api/upload', {
method: 'POST',
body: formData
});
return {
filename: file.name,
url: await response.text()
};
})
);
}
get uploadProgress() {
return {
total: this.queue.size + this.queue.pending,
pending: this.queue.size,
uploading: this.queue.pending
};
}
}
// Usage
const uploader = new FileUploader();
const files = document.querySelector('input[type="file"]').files;
const results = await uploader.uploadFiles(files);🔧 Advanced Features
Task with Timeout
const queue = new XQueue({
concurrency: 3,
timeout: 10000, // Default 10 second timeout
throwOnTimeout: false // Don't throw on timeout
});
// Task with custom timeout
await queue.add(async () => {
return await longRunningTask();
}, {
timeout: 30000, // 30 seconds for this task
throwOnTimeout: true // Throw error for this task
});AbortSignal Support
const controller = new AbortController();
const queue = new XQueue({ concurrency: 3 });
// Task with AbortSignal
const promise = queue.add(async ({ signal }) => {
const response = await fetch('/api/data', { signal });
return response.json();
}, { signal: controller.signal });
// Cancel task
controller.abort();Queue Filtering
const queue = new XQueue({ concurrency: 5 });
// Filter by priority
const highPriorityCount = queue.sizeBy({ priority: 10 });
console.log(`High priority tasks: ${highPriorityCount}`);📊 Error Handling
import {
TimeoutError,
CancelError,
AbortError
} from '@xcons/emitter';
const queue = new XQueue({ concurrency: 3 });
try {
const result = await queue.add(async () => {
// Task logic
});
} catch (error) {
if (error instanceof TimeoutError) {
console.error('Task timed out');
} else if (error instanceof CancelError) {
console.error('Task was canceled');
} else if (error instanceof AbortError) {
console.error('Task was aborted');
} else {
console.error('Unknown error:', error);
}
}🌍 Browser Support
- Chrome 60+
- Firefox 55+
- Safari 12+
- Edge 79+
🔧 Node.js Support
- Node.js 16+
📚 API Reference
XQueue
Constructor Options
interface Options {
concurrency?: number; // Concurrent task limit (default: Infinity)
autoStart?: boolean; // Auto start (default: true)
timeout?: number; // Per-task timeout (ms)
throwOnTimeout?: boolean; // Throw on timeout (default: false)
intervalCap?: number; // Max tasks in interval
interval?: number; // Rate limit interval (ms)
carryoverConcurrencyCount?: boolean; // Carry over tasks at interval end
queueClass?: QueueClass; // Custom queue class
}Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| add(task, options?) | async add<T>(task: Task<T>, options?: Partial<QueueAddOptions>): Promise<T \| void> | Add task to queue |
| addAll(tasks, options?) | async addAll<T>(tasks: Task<T>[], options?: Partial<QueueAddOptions>): Promise<(T \| void)[]> | Add multiple tasks |
| start() | start(): this | Start/resume queue |
| pause() | pause(): void | Pause queue |
| clear() | clear(): void | Clear queue |
| setPriority(id, priority) | setPriority(id: string, priority: number): void | Update task priority |
| onEmpty() | async onEmpty(): Promise<void> | Wait until queue is empty |
| onSizeLessThan(limit) | async onSizeLessThan(limit: number): Promise<void> | Wait until size is less than limit |
| onIdle() | async onIdle(): Promise<void> | Wait until all tasks are done |
| sizeBy(options) | sizeBy(options: Partial<QueueAddOptions>): number | Filtered queue size |
Properties
| Property | Type | Description |
|----------|------|-------------|
| size | number | Pending task count |
| pending | number | Running task count |
| isPaused | boolean | Is queue paused |
| concurrency | number | Concurrency limit (get/set) |
| timeout | number \| undefined | Per-task timeout (ms) |
| stats | QueueStats | Queue statistics |
QueueAddOptions
interface QueueAddOptions {
priority?: number; // Task priority (higher = runs first)
id?: string; // Unique task ID
signal?: AbortSignal; // Abort signal
timeout?: number; // Timeout for this task
throwOnTimeout?: boolean; // Throw on timeout
}Events
| Event | Parameter | Description |
|-------|-----------|-------------|
| add | - | When task is added |
| active | - | When task starts |
| completed | result: any | When task completes |
| error | error: Error | On error |
| next | - | When moving to next task |
| empty | - | When queue becomes empty |
| idle | - | When all tasks are done |
XSimpleSequentialQueue
Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| add(task) | async add<T>(task: () => Promise<T>): Promise<T> | Add sequential task |
| addAll(tasks) | async addAll<T>(tasks: (() => Promise<T>)[]): Promise<T[]> | Add multiple sequential tasks |
| waitUntilEmpty(pollInterval?) | async waitUntilEmpty(pollInterval?: number): Promise<void> | Wait until queue is empty |
| onIdle() | async onIdle(): Promise<void> | Wait until queue is idle |
| clear(rejectPending?) | clear(rejectPending?: boolean): void | Clear queue |
Properties
| Property | Type | Description |
|----------|------|-------------|
| size | number | Pending task count |
| isEmpty | boolean | Is queue empty |
| isProcessing | boolean | Is processing task |
| stats | SequentialQueueStats | Queue statistics |
SequentialQueueStats
interface SequentialQueueStats {
size: number; // Pending task count
isProcessing: boolean; // Processing status
totalProcessed: number; // Total processed tasks
totalFailed: number; // Total failed tasks
}Static Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| create<T>() | static create<T>(): XSimpleSequentialQueue<T> | Create typed queue |
xTimeout
Function Signature
function xTimeout<T>(
promise: Promise<T>,
options: XTimeoutOptions
): XTimeoutPromise<T>XTimeoutOptions
interface XTimeoutOptions {
milliseconds: number; // Timeout duration (ms) - required
message?: string | Error | false; // Error message (false = no error)
fallback?: () => any; // Fallback value on timeout
customTimers?: CustomTimers; // Custom timer functions
signal?: AbortSignal; // Abort signal
}XTimeoutPromise
interface XTimeoutPromise<T> extends Promise<T> {
clear(): void; // Cancel timeout
}Examples
// Basic usage
const result = await xTimeout(promise, { milliseconds: 5000 });
// Custom message
await xTimeout(promise, {
milliseconds: 3000,
message: 'Operation took too long'
});
// No error throwing
await xTimeout(promise, {
milliseconds: 3000,
message: false // Returns undefined instead of throwing
});
// Fallback value
const result = await xTimeout(promise, {
milliseconds: 3000,
fallback: () => ({ default: true })
});
// Manual cleanup
const timedPromise = xTimeout(promise, { milliseconds: 5000 });
timedPromise.clear(); // Timeout canceledXCancelable
Constructor
class XCancelable<T> implements Promise<T> {
constructor(
executor: (
resolve: (value: T | PromiseLike<T>) => void,
reject: (reason?: any) => void,
onCancel: OnCancelWithProperties
) => void
)
}OnCancelWithProperties
interface OnCancelWithProperties {
(handler: () => void): void;
shouldReject: boolean; // Should reject on cancel
}Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| cancel(reason?) | cancel(reason?: string): void | Cancel promise |
| then(onFulfilled?, onRejected?) | then<T1, T2>(...): Promise<T1 \| T2> | Promise then |
| catch(onRejected?) | catch<T>(onRejected?): Promise<T> | Promise catch |
| finally(onFinally?) | finally(onFinally?): Promise<T> | Promise finally |
Properties
| Property | Type | Description |
|----------|------|-------------|
| isCanceled | boolean | Is canceled |
Static Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| fn<TArgs, TReturn>(userFn) | static fn<TArgs, TReturn>(fn): (...args: TArgs) => XCancelable<TReturn> | Create cancelable function |
Examples
// Basic usage
const cancelable = new XCancelable((resolve, reject, onCancel) => {
const timer = setTimeout(() => resolve('OK'), 5000);
onCancel(() => clearTimeout(timer));
});
cancelable.cancel('Canceled');
// shouldReject control
const cancelable = new XCancelable((resolve, reject, onCancel) => {
onCancel.shouldReject = false; // Don't reject on cancel
const timer = setTimeout(() => resolve('OK'), 5000);
onCancel(() => {
clearTimeout(timer);
resolve('Canceled but no error'); // Return your own result
});
});
// Function wrapper
const fetchWithCancel = XCancelable.fn(async (url, onCancel) => {
const controller = new AbortController();
onCancel(() => controller.abort());
const response = await fetch(url, { signal: controller.signal });
return response.json();
});
const promise = fetchWithCancel('https://api.example.com');
promise.cancel(); // CancelXPriorityQueue
Methods
| Method | Signature | Description |
|--------|-----------|-------------|
| enqueue(run, options?) | enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void | Add priority task |
| dequeue() | dequeue(): RunFunction \| undefined | Get highest priority task |
| setPriority(id, priority) | setPriority(id: string, priority: number): void | Update task priority |
| filter(options) | filter(options: Partial<PriorityQueueOptions>): RunFunction[] | Filter by priority |
| peek() | peek(): RunFunction \| undefined | View next task |
| clear() | clear(): void | Clear queue |
| isEmpty() | isEmpty(): boolean | Is queue empty |
Properties
| Property | Type | Description |
|----------|------|-------------|
| size | number | Task count in queue |
PriorityQueueOptions
interface PriorityQueueOptions {
priority?: number; // Priority value (higher = runs first)
id?: string; // Unique task ID
}Error Classes
TimeoutError
class TimeoutError extends Error {
name: 'TimeoutError';
constructor(message?: string);
}CancelError
class CancelError extends Error {
name: 'CancelError';
readonly isCanceled: true;
constructor(reason?: string);
}AbortError
class AbortError extends Error {
name: 'AbortError';
constructor(message?: string);
}Package Information
// Package version and build info
const { VERSION, BUILD_TIME, PACKAGE_INFO } = XConsEmitter;
console.log(PACKAGE_INFO);
// {
// name: '@xcons/emitter',
// version: '2.0.4',
// buildTime: '2024-01-15T10:30:00.000Z',
// description: 'XCons Emitter - Promise Queue and Utilities Package',
// author: 'XCON Studio'
// }📄 License
MIT © XCon Studio
🔗 Links
🤝 Contributing
Contributions, issues and feature requests are welcome!
⭐ Support
Give a ⭐️ if this project helped you!
Made with ❤️ by XCon Studio
