web-queue
v0.2.1
Published
A browser and Node.js compatible queue library with advanced features like priority, delay, retry, and persistence
Maintainers
Readme
web-queue
一个同时支持浏览器和Node.js环境的队列库,使用TypeScript开发。
功能特性
- 基本队列功能:支持FIFO(先进先出)操作,包括入队、出队、查看队首元素等。
- 主题队列:基于BroadcastChannel API实现跨标签页/窗口的消息广播和订阅。
- 高级队列功能:
- 消息优先级:支持为消息设置优先级,高优先级消息优先处理。
- 延迟消息:支持延迟投递消息,消息将在指定时间后进入队列。
- 幂等性:支持自定义消息ID,确保消息的唯一性。
- 重试机制:消息处理失败后自动重试,可配置最大重试次数和重试间隔。
- 死信队列:处理失败且超过最大重试次数的消息将进入死信队列。
- 持久化:支持将队列状态持久化到内存、localStorage或IndexedDB。
- 存储驱动:支持多种存储后端,包括内存、localStorage和IndexedDB。
安装
npm install web-queue
# 或
yarn add web-queue使用方法
基本用法
import { Queue } from 'web-queue';
// 创建一个队列
const queue = new Queue<number>();
// 添加元素
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
// 获取队列大小
console.log(queue.size()); // 3
// 查看队首元素但不移除
console.log(queue.peek()); // 1
// 移除并返回队首元素
console.log(queue.dequeue()); // 1
// 检查队列是否为空
console.log(queue.isEmpty()); // false
// 清空队列
queue.clear();
console.log(queue.isEmpty()); // true转换为数组
const queue = new Queue<string>();
queue.enqueue('a');
queue.enqueue('b');
queue.enqueue('c');
const array = queue.toArray();
console.log(array); // ['a', 'b', 'c']使用主题队列(TopicQueue)
TopicQueue 使用 BroadcastChannel API 实现基于主题的消息传递,适用于浏览器环境。
import { TopicQueue } from 'web-queue';
// 创建一个基于主题的队列
const topic = new TopicQueue<string>('myChannel');
// 订阅消息
const unsubscribe = topic.subscribe(message => {
console.log('收到消息:', message);
});
// 发送消息(同时添加到队列并广播)
topic.enqueue('Hello World');
// 取消订阅
unsubscribe();
// 关闭通道
topic.close();在多个标签页/窗口之间通信
// 在标签页 A 中
const topicA = new TopicQueue<string>('sharedChannel');
topicA.enqueue('来自标签页 A 的消息');
// 在标签页 B 中
const topicB = new TopicQueue<string>('sharedChannel');
topicB.subscribe(message => {
console.log('标签页 B 收到:', message); // 将显示 "来自标签页 A 的消息"
});高级队列功能(AdvancedQueue)
AdvancedQueue 提供了更多高级功能,如消息幂等性、优先级、延迟投递、重试机制和持久化。
import { AdvancedQueue, MessageStatus } from 'web-queue';
// 创建高级队列,配置最大重试次数和持久化
const queue = new AdvancedQueue<{ taskId: string, data: any }>({
maxRetries: 3,
retryDelay: 1000,
persistenceEnabled: true,
persistenceDriver: 'localstorage',
persistenceInterval: 5000
});
// 添加消息,支持自定义ID(幂等性)、优先级和延迟
const message = queue.enqueue(
{ taskId: 'task-123', data: { value: 42 } },
{
id: 'unique-message-id', // 自定义ID,确保幂等性
priority: 10, // 优先级,数字越大优先级越高
delay: 5000 // 延迟5秒后才可处理
}
);
// 获取下一个待处理的消息
const nextMessage = queue.dequeue();
if (nextMessage) {
try {
// 处理消息...
processMessage(nextMessage.data);
// 标记消息处理成功
queue.complete(nextMessage.id);
} catch (error) {
// 标记消息处理失败,会自动重试
queue.fail(nextMessage.id, error.message);
}
}
// 获取延迟队列中的消息
const delayedMessages = queue.getDelayedMessages();
// 取消延迟消息
queue.cancelDelayed('message-id');
// 获取死信队列中的消息
const deadLetterMessages = queue.getDeadLetterMessages();
// 重试死信队列中的消息
queue.retryDeadLetter('message-id');
// 清理资源
queue.dispose();高级主题队列(AdvancedTopicQueue)
AdvancedTopicQueue 结合了 AdvancedQueue 的高级功能和 TopicQueue 的广播能力。
import { AdvancedTopicQueue } from 'web-queue';
// 创建高级主题队列
const topicQueue = new AdvancedTopicQueue<{ event: string, payload: any }>(
'notifications',
{ maxRetries: 2, persistenceEnabled: true }
);
// 订阅消息
const unsubscribe = topicQueue.subscribe(message => {
console.log(`收到消息 ${message.id}:`, message.data);
});
// 发送高优先级消息
topicQueue.enqueue(
{ event: 'user.login', payload: { userId: 123 } },
{ priority: 10 }
);
// 发送延迟消息(不会立即广播)
topicQueue.enqueue(
{ event: 'maintenance.reminder', payload: { time: '2小时后' } },
{ delay: 7200000 } // 2小时后
);
// 取消订阅并关闭
unsubscribe();
topicQueue.close();存储驱动
web-queue 支持多种存储驱动来持久化队列数据:
import {
AdvancedQueue,
MemoryStorageDriver,
LocalStorageDriver,
IndexedDBStorageDriver
} from 'web-queue';
// 使用内存存储(默认)
const memoryQueue = new AdvancedQueue({
persistenceEnabled: true,
persistenceDriver: 'memory'
});
// 使用 localStorage 存储
const localStorageQueue = new AdvancedQueue({
persistenceEnabled: true,
persistenceDriver: 'localstorage'
});
// 使用 IndexedDB 存储
const indexedDBQueue = new AdvancedQueue({
persistenceEnabled: true,
persistenceDriver: 'indexeddb'
});存储驱动选项
MemoryStorageDriver:
- 适用于临时数据,不持久化到磁盘。
- 适合测试或短期使用的场景。
LocalStorageDriver:
- 基于
localStorageAPI,适合存储小量数据(通常限制为 5MB)。 - 支持跨会话持久化,但仅限同源页面。
- 基于
IndexedDBStorageDriver:
- 基于
IndexedDBAPI,适合存储大量结构化数据。 - 支持异步操作,适合高性能要求的场景。
- 基于
示例
项目包含多个交互式示例,展示了各种功能的使用方法:
examples/advanced-queue-demo.html- 高级队列基本功能演示examples/advanced-topic-queue-demo.html- 高级主题队列和多标签页通信examples/storage-drivers-demo.html- 不同存储驱动的使用examples/idempotent-messages-demo.html- 消息幂等性功能examples/retry-dead-letter-demo.html- 重试机制和死信队列examples/delayed-queue-demo.html- 延迟队列和定时消息
要运行这些示例,只需在浏览器中打开对应的HTML文件。
API
Queue
enqueue(item: T): void- 将元素添加到队列末尾dequeue(): T | undefined- 移除并返回队首元素peek(): T | undefined- 返回队首元素但不移除size(): number- 返回队列中的元素数量isEmpty(): boolean- 检查队列是否为空clear(): void- 清空队列toArray(): T[]- 将队列转换为数组
TopicQueue
TopicQueue 继承自 Queue,并添加了以下方法:
constructor(topicName?: string)- 创建一个基于主题的队列subscribe(callback: (item: T) => void): () => void- 订阅主题消息,返回取消订阅的函数close(): void- 关闭广播通道
AdvancedQueue
constructor(options?: Partial<QueueOptions>)- 创建高级队列,可配置重试、持久化等选项enqueue(data: T, options?: { priority?: number; delay?: number; id?: string }): Message<T>- 添加消息,支持优先级、延迟和自定义IDdequeue(): Message<T> | undefined- 获取并标记为处理中的下一个消息peek(): Message<T> | undefined- 查看下一个待处理消息但不移除complete(messageId: string): boolean- 标记消息处理成功fail(messageId: string, reason?: string): boolean- 标记消息处理失败findMessageById(id: string): Message<T> | undefined- 通过ID查找消息cancelDelayed(messageId: string): boolean- 取消延迟消息retryDeadLetter(messageId: string): boolean- 重试死信队列中的消息getDelayedMessages(): Message<T>[]- 获取所有延迟消息getDeadLetterMessages(): Message<T>[]- 获取所有死信消息getAllMessages(): Message<T>[]- 获取所有消息size(): number- 获取待处理消息数量totalSize(): number- 获取所有消息数量isEmpty(): boolean- 检查是否没有待处理消息clear(): void- 清空所有队列toArray(): Message<T>[]- 将待处理消息转换为数组dispose(): void- 清理资源
AdvancedTopicQueue
AdvancedTopicQueue 继承自 AdvancedQueue,并添加了以下方法:
constructor(topicName?: string, options?: Partial<QueueOptions>)- 创建高级主题队列subscribe(callback: (message: Message<T>) => void): () => void- 订阅消息,返回取消订阅的函数close(): void- 关闭广播通道并清理资源
存储驱动接口
MemoryStorageDriver- 内存存储驱动save<T>(key: string, data: T): Promise<void>- 保存数据到内存load<T>(key: string): Promise<T | null>- 从内存加载数据delete(key: string): Promise<void>- 从内存删除数据clear(): Promise<void>- 清空内存存储
LocalStorageDriver- localStorage 存储驱动save<T>(key: string, data: T): Promise<void>- 保存数据到 localStorageload<T>(key: string): Promise<T | null>- 从 localStorage 加载数据delete(key: string): Promise<void>- 从 localStorage 删除数据clear(): Promise<void>- 清空 localStorage 存储
IndexedDBStorageDriver- IndexedDB 存储驱动save<T>(key: string, data: T): Promise<void>- 保存数据到 IndexedDBload<T>(key: string): Promise<T | null>- 从 IndexedDB 加载数据delete(key: string): Promise<void>- 从 IndexedDB 删除数据clear(): Promise<void>- 清空 IndexedDB 存储
开发
# 安装依赖
npm install
# 开发模式
npm run dev
# 构建
npm run build
# 运行测试
npm test许可证
Apache License Version 2.0
