@stardyn/angular-emitter
v2.0.11
Published
Angular Emitter Package - Lightweight promise queue and cancellable promise utilities for Angular applications
Maintainers
Readme
@stardyn/angular-emitter
Lightweight promise queue and cancellable promise utilities for Angular and TypeScript applications. Provides advanced concurrency control, priority queuing, timeout handling, and cancellation support.
Features
- Promise Queue with Concurrency Control: Manage promise execution with configurable concurrency limits
- Priority Queue: Execute promises based on priority levels
- Cancellable Promises: Cancel promises with proper cleanup handling
- Timeout Support: Automatic timeout handling for long-running operations
- Sequential Queue: Simple FIFO queue for guaranteed sequential execution
- Event-Driven: Built-in event system for monitoring queue state
- TypeScript First: Full TypeScript support with comprehensive type definitions
- Zero Dependencies: Minimal footprint with only EventEmitter3 as dependency
Installation
npm install @stardyn/angular-emitterQuick Start
Basic Promise Queue
import { XQueue } from '@stardyn/angular-emitter';
const queue = new XQueue({ concurrency: 3 });
// Add tasks to the queue
const result1 = await queue.add(async () => {
const response = await fetch('/api/data');
return response.json();
});
const result2 = await queue.add(async () => {
return 'Task completed';
});Priority Queue
import { XQueue, XPriorityQueue } from '@stardyn/angular-emitter';
const queue = new XQueue({
concurrency: 2,
queueClass: XPriorityQueue
});
// High priority task
await queue.add(
async () => 'Important task',
{ priority: 10 }
);
// Low priority task
await queue.add(
async () => 'Regular task',
{ priority: 1 }
);Sequential Queue
import { XSimpleSequentialQueue } from '@stardyn/angular-emitter';
const sequentialQueue = new XSimpleSequentialQueue();
// Tasks execute one by one in order
const result1 = await sequentialQueue.add(async () => 'First');
const result2 = await sequentialQueue.add(async () => 'Second');
const result3 = await sequentialQueue.add(async () => 'Third');Cancellable Promises
import { XCancelable } from '@stardyn/angular-emitter';
const cancelableTask = new XCancelable(async (resolve, reject, onCancel) => {
const timeout = setTimeout(() => resolve('Completed'), 5000);
onCancel(() => {
clearTimeout(timeout);
console.log('Task was cancelled');
});
});
// Cancel after 2 seconds
setTimeout(() => cancelableTask.cancel('User cancelled'), 2000);
try {
const result = await cancelableTask;
} catch (error) {
console.log('Task cancelled:', error.message);
}Timeout Handling
import { xTimeout } from '@stardyn/angular-emitter';
async function fetchWithTimeout() {
const fetchPromise = fetch('/api/slow-endpoint');
try {
const response = await xTimeout(fetchPromise, {
milliseconds: 5000,
message: 'Request timed out after 5 seconds'
});
return response.json();
} catch (error) {
console.log('Request failed:', error.message);
}
}API Reference
XQueue
Main queue class with concurrency control and advanced features.
Constructor Options
interface Options {
concurrency?: number; // Max concurrent tasks (default: Infinity)
autoStart?: boolean; // Auto-start tasks (default: true)
queueClass?: QueueClass; // Custom queue implementation
intervalCap?: number; // Max tasks per interval
interval?: number; // Interval duration in ms
carryoverConcurrencyCount?: boolean; // Carry over running tasks to next interval
timeout?: number; // Default timeout for tasks
throwOnTimeout?: boolean; // Throw error on timeout (default: false)
}Methods
// Add a task to the queue
async add<T>(task: Task<T>, options?: QueueAddOptions): Promise<T>
// Add multiple tasks
async addAll<T>(tasks: Task<T>[], options?: QueueAddOptions): Promise<T[]>
// Queue control
start(): XQueue // Start/resume queue
pause(): void // Pause queue
clear(): void // Clear pending tasks
// Wait for events
async onEmpty(): Promise<void> // Wait until queue is empty
async onIdle(): Promise<void> // Wait until queue is idle
// Properties
get size(): number // Pending tasks count
get pending(): number // Currently running tasks
get isPaused(): boolean // Queue pause state
get concurrency(): number // Current concurrency limit
set concurrency(value: number) // Update concurrency limitEvents
queue.on('add', () => console.log('Task added'));
queue.on('next', () => console.log('Task completed'));
queue.on('active', () => console.log('Task started'));
queue.on('idle', () => console.log('Queue is idle'));
queue.on('empty', () => console.log('Queue is empty'));
queue.on('error', (error) => console.log('Task failed:', error));
queue.on('completed', (result) => console.log('Task completed:', result));XPriorityQueue
Priority-based queue implementation.
const priorityQueue = new XQueue({
queueClass: XPriorityQueue,
concurrency: 3
});
await priorityQueue.add(task, { priority: 10 }); // High priority
await priorityQueue.add(task, { priority: 1 }); // Low priorityXSimpleSequentialQueue
Lightweight sequential queue for FIFO execution.
const sequentialQueue = new XSimpleSequentialQueue<string>();
await sequentialQueue.add(async () => 'Task 1');
await sequentialQueue.add(async () => 'Task 2');
// Wait for all tasks to complete
await sequentialQueue.onIdle();
console.log(sequentialQueue.stats); // Get queue statisticsXCancelable
Cancellable promise implementation.
const cancelable = new XCancelable<string>((resolve, reject, onCancel) => {
// Setup your async operation
const operation = startAsyncOperation();
// Register cleanup handler
onCancel(() => operation.abort());
operation.then(resolve).catch(reject);
});
// Cancel the operation
cancelable.cancel('Reason for cancellation');xTimeout
Promise timeout utility.
const result = await xTimeout(promise, {
milliseconds: 5000, // Timeout duration
message: 'Custom timeout message', // Error message
fallback: () => 'Default value', // Fallback value on timeout
signal: abortController.signal // AbortSignal support
});Advanced Examples
Rate Limited API Calls
import { XQueue } from '@stardyn/angular-emitter';
const apiQueue = new XQueue({
concurrency: 1,
intervalCap: 10, // Max 10 requests
interval: 60000 // Per minute
});
async function makeApiCall(endpoint: string) {
return apiQueue.add(async () => {
const response = await fetch(endpoint);
return response.json();
});
}Task Prioritization
import { XQueue, XPriorityQueue } from '@stardyn/angular-emitter';
const taskQueue = new XQueue({
concurrency: 2,
queueClass: XPriorityQueue
});
// Critical tasks (highest priority)
await taskQueue.add(criticalTask, { priority: 100 });
// Normal tasks
await taskQueue.add(normalTask, { priority: 50 });
// Background tasks (lowest priority)
await taskQueue.add(backgroundTask, { priority: 1 });Cancellation with Cleanup
import { XCancelable, CancelError } from '@stardyn/angular-emitter';
function createCancellableUpload(file: File) {
return new XCancelable(async (resolve, reject, onCancel) => {
const xhr = new XMLHttpRequest();
onCancel(() => {
xhr.abort();
console.log('Upload cancelled');
});
xhr.upload.onprogress = (e) => {
console.log(`Progress: ${(e.loaded / e.total) * 100}%`);
};
xhr.onload = () => resolve(xhr.response);
xhr.onerror = () => reject(new Error('Upload failed'));
xhr.open('POST', '/upload');
xhr.send(file);
});
}
const uploadTask = createCancellableUpload(file);
// Cancel upload if needed
setTimeout(() => uploadTask.cancel(), 10000);
try {
await uploadTask;
} catch (error) {
if (error instanceof CancelError) {
console.log('Upload was cancelled');
}
}TypeScript Support
Full TypeScript support with comprehensive type definitions:
import type {
QueueAddOptions,
TaskOptions,
QueueConfiguration
} from '@stardyn/angular-emitter';
interface MyTaskOptions extends QueueAddOptions {
customOption?: string;
}
const queue = new XQueue<MyQueue, MyTaskOptions>({
concurrency: 3
});Error Handling
import { TimeoutError, CancelError, AbortError } from '@stardyn/angular-emitter';
try {
const result = await queue.add(async () => {
// Your async task
}, { timeout: 5000 });
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Task timed out');
} else if (error instanceof CancelError) {
console.log('Task was cancelled');
} else if (error instanceof AbortError) {
console.log('Task was aborted');
} else {
console.log('Task failed:', error);
}
}Performance Tips
- Choose the Right Queue: Use
XSimpleSequentialQueuefor simple FIFO operations - Set Appropriate Concurrency: Balance between throughput and resource usage
- Use Priorities Wisely: Only use priority queue when needed
- Handle Timeouts: Set reasonable timeout values for network operations
- Clean Up Resources: Always handle cancellation properly
Browser Support
- Chrome 60+
- Firefox 55+
- Safari 12+
- Edge 79+
- Node.js 16+
Dependencies
eventemitter3: ^5.0.1 (for event handling)
License
MIT License - see LICENSE file for details.
Repository
https://github.com/stardyn/angular-emitter
