reentrant
v1.0.0
Published
[](https://nodejs.org/) [](https://nodejs.org/api/async_hooks.html)
Readme
Node.js Async ReentrantLock
基于 async_hooks 的异步安全可重入锁实现,适用于 Node.js 异步并发环境。
特性
✅ 异步上下文感知 - 自动跟踪异步调用链
✅ 可重入支持 - 同一上下文可多次获取锁
✅ 公平队列 - FIFO 顺序保证锁获取公平性
✅ 错误安全 - 强制上下文绑定和所有权验证
✅ 嵌套调用 - 支持多层异步锁操作
🚫 未经严格测试,生产环境禁用
安装
直接复制以下文件到您的项目:
npm i reentrant
快速开始
const { ReentrantLock, contextStorage } = require('./ReentrantLock');
async function main() {
const lock = new ReentrantLock();
async function criticalTask(id) {
await contextStorage.run(id, async () => {
await lock.acquire();
try {
console.log(`Task ${id} entered critical section`);
// 嵌套调用示例
await nestedOperation(id);
} finally {
lock.release();
}
});
}
async function nestedOperation(id) {
await lock.acquire(); // 可重入获取
try {
console.log(`Task ${id} nested access`);
} finally {
lock.release();
}
}
await Promise.all([criticalTask(1), criticalTask(2)]);
}
main();输出示例:
Task 1 entered critical section
Task 1 nested access
Task 2 entered critical section
Task 2 nested accessAPI 文档
ReentrantLock
方法
方法 参数 返回 描述
acquire()`` 无 Promise 获取锁(支持异步等待)
release() 无 void 释放锁
contextStorage
AsyncLocalStorage 实例,用于创建异步上下文:
await contextStorage.run(contextId, async () => { // 在此执行需要锁的操作 });
实现原理
核心机制
上下文跟踪
使用 AsyncLocalStorage 跟踪异步调用链,每个独立任务通过 run() 方法创建唯一上下文
锁状态管理
owner: 当前锁持有者的上下文ID count: 重入计数器 queue: 等待队列(Promise解析器 + 上下文ID) 重入逻辑 同一上下文多次获取时仅增加计数器,释放时递减直到归零
流程图 graph TD A[acquire()] --> B{当前上下文是否持有锁?} B -->|是| C[增加计数] B -->|否| D{锁是否空闲?} D -->|是| E[获取锁] D -->|否| F[加入等待队列] E --> G[执行临界区代码] G --> H[release()] H --> I{计数归零?} I -->|是| J[唤醒队列下一个] I -->|否| K[保持锁状态] 使用场景 数据库事务管理
async function updateUserBalance(userId, amount) {
await contextStorage.run(`tx_${userId}`, async () => {
await lock.acquire();
try {
const balance = await db.getBalance(userId);
await db.updateBalance(userId, balance + amount);
await audit.logTransaction(userId, amount);
} finally {
lock.release();
}
});
}文件系统原子操作
async function appendToLog(filePath, content) {
await lock.acquire();
try {
const fd = await fs.promises.open(filePath, 'a');
await fd.appendFile(content);
await fd.close();
} finally {
lock.release();
}
}
分布式任务调度
taskQueue.process(async (job) => {
await contextStorage.run(job.id, async () => {
await lock.acquire();
try {
await processJob(job);
await updateJobStatus(job.id, 'completed');
} finally {
lock.release();
}
});
});注意事项 ⚠️ 强制上下文 必须通过 contextStorage.run() 创建执行上下文,否则会抛出错误
🔒 释放匹配 必须保证每个 acquire() 都有对应的 release()(建议使用 try/finally)
⏳ 队列公平性 等待队列采用严格 FIFO 顺序,长时间持有的锁可能导致队尾饥饿
🚫 禁止跨上下文
// 错误示例!
async function wrongUsage() {
await lock.acquire();
setTimeout(() => {
lock.release(); // 在错误上下文中释放
}, 1000);
}性能建议 减少临界区耗时 保持锁内操作尽可能简短
上下文复用 对相关操作使用相同上下文ID
队列监控 定期检查 lock.queue.length 预防任务堆积
超时机制扩展
async function acquireWithTimeout(timeout) {
return Promise.race([
lock.acquire(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), timeout)
)
]);
}许可证
MIT License
