npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@tfnick/nestjs-distributed-lock

v1.2.7

Published

NestJS分布式锁组件,基于PostgreSQL原生pg_advisory_lock

Readme

@tfnick/nestjs-distributed-lock

NestJS分布式锁组件,基于PostgreSQL原生pg_advisory_lock

安装

npm install @tfnick/nestjs-distributed-lock

🚀 快速开始

方式一:自动使用TypeORM连接(推荐)

import { DistributedLockModule } from '@tfnick/nestjs-distributed-lock';

@Module({
  imports: [
    // 你的 TypeORM 配置
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: 'localhost',
      port: 5432,
      // ...
    }),
    
    // 分布式锁配置(自动使用TypeORM数据源)
    DistributedLockModule.forRoot({
      defaultTimeout: 30000,  // 30秒超时
      maxRetries: 3,          // 最多重试3次
      retryDelay: 1000,        // 重试间隔1秒
    }),
  ],
})
export class AppModule {}

方式二:自定义数据源

如果需要使用事务性数据源:

import { DistributedLockModule } from '@tfnick/nestjs-distributed-lock';
import { addTransactionalDataSource } from 'typeorm-transactional';

@Module({
  imports: [
    DistributedLockModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (config: ConfigService) => {
        const dataSource = addTransactionalDataSource(new DataSource(config.get('database')));
        
        return {
          dataSource,              // 传入自定义数据源
          defaultTimeout: 30000,
          maxRetries: 3,
          retryDelay: 1000,
        };
      },
    }),
  ],
})
export class AppModule {}

或者直接注入TypeORM的DataSource:

import { DistributedLockModule } from '@tfnick/nestjs-distributed-lock';
import { DataSource } from 'typeorm';

@Module({
  imports: [
    DistributedLockModule.forRootAsync({
      imports: [TypeOrmModule],
      inject: [DataSource],
      useFactory: async (dataSource: DataSource) => ({
        dataSource,              // 使用 TypeORM 最终提供的 DataSource(已代理)
        defaultTimeout: 30000,
        maxRetries: 3,
        retryDelay: 1000,
      }),
    }),
  ],
})
export class AppModule {}

📖 API 使用

在服务中注入

import { Injectable } from '@nestjs/common';
import { DistributedLockService } from '@tfnick/nestjs-distributed-lock';

@Injectable()
export class UserService {
  constructor(private readonly lockService: DistributedLockService) {}

  async updateUser(id: string, data: any) {
    // 方式一:自动管理锁(推荐)
    await this.lockService.withLock(`user:${id}`, async () => {
      // 临界区代码 - 自动获取和释放锁
      console.log('正在更新用户数据...');
      // ... 更新逻辑
    });
  }

  async deleteUser(id: string) {
    // 方式二:手动控制锁
    const lock = await this.lockService.acquireLock(`user:${id}`);
    
    try {
      // 关键操作
      await this.performDeletion(id);
    } finally {
      await lock.release(); // 确保锁被释放
    }
  }
}

新的API:结果驱动的锁获取

@Injectable()
export class OrderService {
  constructor(private readonly lockService: DistributedLockService) {}

  async processPayment(orderId: string, paymentId: string) {
    const lockKey = `order:${orderId}:payment:${paymentId}`;
    
    // 方式一:结果驱动(不抛出异常)
    const result = await this.lockService.acquire(lockKey);
    
    if (result.acquired) {
      try {
        console.log('锁获取成功,执行业务逻辑');
        await this.processPaymentLogic();
      } finally {
        await result.lock.release();
      }
    } else {
      // 根据原因处理失败情况
      switch (result.reason) {
        case 'held':
          console.log('锁已被占用,稍后重试');
          await this.scheduleRetry();
          break;
        case 'timeout':
          console.log('锁获取超时,处理超时逻辑');
          await this.handleTimeout();
          break;
        default:
          console.log('未知错误:', result.error?.message);
          break;
      }
    }
  }

  // 方式二:结果驱动的锁执行(推荐)
  async processOrder(orderId: string) {
    const result = await this.lockService.withLockResult(`order:${orderId}`, async () => {
      console.log('执行订单处理逻辑');
      return await this.processOrderLogic();
    });

    if (result.success) {
      console.log('订单处理成功:', result.result);
      return result.result;
    } else {
      console.log('锁获取失败:', result.error?.message);
      throw result.error;
    }
  }
}

在控制器中使用装饰器

import { Controller, Post } from '@nestjs/common';
import { Lock } from '@tfnick/nestjs-distributed-lock';
import { UserService } from './user.service';

@Controller('users')
export class UserController {
  constructor(private readonly userService: UserService) {}

  @Post(':id/lock')
  @Lock('user-lock-{id}') // 自动锁获取和释放
  async lockUser(id: string) {
    return this.userService.updateUser(id, { locked: true });
  }
}

⚙️ 配置选项

| 选项 | 类型 | 默认值 | 说明 | |------|------|--------|------| | dataSource | DataSource | - | 自定义数据源(支持事务性数据源) | | connectionName | string | - | TypeORM连接名称 | | defaultTimeout | number | 30000 | 锁超时时间(毫秒) | | maxRetries | number | 3 | 最大重试次数 | | retryDelay | number | 1000 | 重试间隔(毫秒) |

🔑 接口定义

LockAcquireResult

interface LockAcquireResult {
  /** 是否成功获取锁 */
  acquired: boolean;
  /** 锁句柄(仅在acquired=true时有效) */
  lock?: LockHandle;
  /** 错误信息(仅在acquired=false时有效) */
  error?: Error;
  /** 失败原因 */
  reason?: 'timeout' | 'held' | 'unknown';
}

withLockResult

// 返回类型:Promise<{ success: boolean; result?: T; error?: Error }>
const result = await lockService.withLockResult('key', async () => {
  return await businessLogic();
});

if (result.success) {
  console.log('执行成功:', result.result);
} else {
  console.log('锁获取失败:', result.error?.message);
}

配置选项

interface DistributedLockOptions {
  /** PostgreSQL连接名称(用于连接多个数据库的情况) */
  connectionName?: string;
  
  /** 自定义数据源(用于支持代理数据源) */
  dataSource?: DataSource;
  
  /** 默认锁超时时间(毫秒) */
  defaultTimeout?: number;
  
  /** 自动重试次数 */
  maxRetries?: number;
  
  /** 重试间隔(毫秒) */
  retryDelay?: number;
}

🧪 测试

运行测试

# 运行所有测试
npm test

# 运行测试并生成覆盖率
npm run test:cov

# 监视模式(开发时使用)
npm run test:watch

测试覆盖

核心功能测试

  • 锁获取和释放
  • 非阻塞锁模式
  • 重试机制
  • 超时处理

集成测试

  • 模块配置
  • 数据源注入
  • 异步配置

边界测试

  • 异常处理
  • 锁状态查询
  • 键生成算法

🔧 高级用法

检查锁状态

async checkLockStatus(key: string) {
  const isLocked = await this.lockService.isLocked(key);
  if (isLocked) {
    console.log(`锁 ${key} 正被持有`);
    return false;
  }
  return true;
}

自定义重试策略

DistributedLockModule.forRoot({
  defaultTimeout: 60000,  // 1分钟超时
  maxRetries: 10,          // 最多重试10次
  retryDelay: 500,         // 重试间隔500ms
})

组合使用装饰器和服务

@Injectable()
export class OrderService {
  constructor(private readonly lockService: DistributedLockService) {}

  @Lock('order-processing') // 方法级锁
  async processOrder(orderId: string) {
    // 装饰器自动处理锁,但也可以使用服务
    const orderLock = await this.lockService.acquire(`order-detail:${orderId}`);
    try {
      await this.processOrderItems(orderId);
    } finally {
      await orderLock.release();
    }
  }
}

🔑 键值映射与唯一性保证

哈希算法设计

本库使用FNV-1a 32位哈希算法确保字符串到数字的唯一性映射:

private generateLockKey(key: string): number {
  // PostgreSQL advisory lock支持64位有符号整数
  const hash = this.fnv1a32(key);
  
  // 确保是正数且在合理范围内
  return Math.abs(hash) % 2147483647; // PostgreSQL最大正整数
}

/**
 * FNV-1a 32位哈希算法
 * 具有良好的分布性和较低的冲突率
 */
private fnv1a32(str: string): number {
  let hash = 0x811c9dc5; // FNV偏移基础值
  
  for (let i = 0; i < str.length; i++) {
    hash ^= str.charCodeAt(i);
    hash = Math.imul(hash, 0x01000193); // FNV质数
  }
  
  // 确保结果在32位范围内
  return hash >>> 0;
}

唯一性保证机制

1. 算法选择

  • FNV-1a哈希:工业标准的字符串哈希算法
  • 良好分布:均匀分布在数值空间中
  • 低冲突率:不同字符串产生相同数值的概率极低

2. 数值范围控制

  • PostgreSQL限制pg_advisory_lock接受64位有符号整数
  • 安全边界:限制在2,147,483,647(最大32位正整数)
  • 符号处理:确保始终为正数

3. 碰撞检测

// 相似键值的哈希分布测试
const keys = ['order:123', 'order:124', 'order:125'];
const hashes = keys.map(key => lockService.generateLockKey(key));

// 检查唯一性
const uniqueHashes = new Set(hashes);
console.log(`唯一性: ${uniqueHashes.size}/${keys.length} keys`);

最佳实践建议

1. 键值命名规范

// ✅ 推荐:有层次结构的命名
await lockService.withLock('order:123:payment', async () => {});
await lockService.withLock('user:456:profile', async () => {});
await lockService.withLock('inventory:789:stock', async () => {});

// ❌ 避免:过于简单或冲突风险高的命名
await lockService.withLock('lock1', async () => {});
await lockService.withLock('abc', async () => {});

2. 避免哈希冲突

// ✅ 使用业务唯一标识符
const lockKey = `order:${orderId}:payment:${paymentId}`;

// ❌ 避免仅使用数字或简单字符
const lockKey = `${orderId}`;

3. 测试验证

// 开发阶段验证哈希唯一性
function testHashUniqueness(keys: string[]) {
  const hashes = keys.map(key => service.generateLockKey(key));
  const unique = new Set(hashes);
  
  console.log(`测试${keys.length}个键值,${unique.size}个唯一哈希`);
  return unique.size === keys.length;
}

⚡ 性能优化

最佳实践

  1. 锁粒度: 使用细粒度锁,避免死锁

    // ❌ 粗粒度锁 - 可能造成死锁
    await this.lockService.withLock('orders', async () => { ... });
       
    // ✅ 细粒度锁 - 更好的并发性
    await this.lockService.withLock(`order:${orderId}`, async () => { ... });
  2. 锁超时: 根据业务逻辑设置合理的超时时间

    // 短时操作
    await this.lockService.withLock('cache-update', updateCache, { timeout: 5000 });
       
    // 长时操作
    await this.lockService.withLock('report-generation', generateReport, { timeout: 300000 });
  3. 重试策略: 避免过度重试

    DistributedLockModule.forRoot({
      maxRetries: 3,      // 适度的重试次数
      retryDelay: 1000,    // 合理的重试间隔
    });

⚠️ 注意事项

依赖关系

  • 🔗 TypeORM依赖: 必须先配置 TypeOrmModule
  • 📦 PostgreSQL: 仅支持PostgreSQL数据库(使用advisory locks)
  • 🔐 数据库权限: 确保用户有执行 pg_advisory_* 函数的权限
  • 🔄 版本兼容: 支持不同TypeORM版本,避免类型冲突

最佳实践

  • 🎯 锁命名: 使用有意义的命名空间,如 user:{id}, order:{id}
  • ⏱️ 超时设置: 根据业务复杂度设置合理超时时间
  • 🔄 重试策略: 避免过度重试,保护数据库性能

🚨 故障排除

常见问题

1. DataSource provider not found

// ❌ 错误配置
DistributedLockModule.forRoot(); // 没有TypeORM连接

// ✅ 正确配置
DistributedLockModule.forRoot({
  defaultTimeout: 30000,
}); // 会自动使用TypeORM的DataSource

2. 自定义数据源问题

// ❌ 直接使用原始数据源
const dataSource = new DataSource(options);
DistributedLockModule.forRoot({ dataSource });

// ✅ 使用事务性数据源
const dataSource = addTransactionalDataSource(new DataSource(options));
DistributedLockModule.forRoot({ dataSource });

3. TypeORM版本冲突

// ❌ 类型不兼容错误(多个TypeORM版本)
// 'DataSourceOptions' 类型不兼容

// ✅ 解决方案1:使用any类型
DistributedLockModule.forRootAsync({
  inject: [DataSource],
  useFactory: async (dataSource: any) => ({
    dataSource,
    defaultTimeout: 30000,
  }),
});

// ✅ 解决方案2:使用类型断言
DistributedLockModule.forRootAsync({
  inject: [DataSource],
  useFactory: async (dataSource: DataSource) => ({
    dataSource: dataSource as any,
    defaultTimeout: 30000,
  }),
});

4. Reflector依赖问题

// ❌ 错误:Nest can't resolve dependencies of DistributedLockInterceptor
// Error: Reflector at index [0] is available in the DistributedLockModule context

// ✅ 解决方案1:Reflector现在是可选依赖
// 拦截器会优雅地处理Reflector不存在的情况
DistributedLockModule.forRootAsync({
  imports: [TypeOrmModule],
  inject: [DataSource],
  useFactory: async (dataSource: DataSource) => ({
    dataSource,
    defaultTimeout: 30000,
    maxRetries: 3,
    retryDelay: 1000,
  }),
});

// ✅ 解决方案2:如果你的应用需要装饰器功能
// 确保在应用的主模块中导入了Reflector
import { Reflector } from '@nestjs/core';

@Module({
  providers: [Reflector], // 在根模块中提供Reflector
})
export class AppModule {}

5. 锁释放问题

// ❌ 问题:release ignored, lock not held
// 原因:锁在queryRunner中获取,但在不同连接中释放

// ✅ 解决方案1:使用withLock(推荐)
await lockService.withLock('resource-key', async () => {
  // 自动管理锁的获取和释放,使用相同连接
  await criticalOperation();
});

// ✅ 解决方案2:正确的手动方式
const lock = await lockService.acquire('resource-key', { wait: true });
try {
  await criticalOperation();
} finally {
  await lock.release(); // 现在使用相同连接释放
}

// ✅ 解决方案3:验证修复
const lock = await lockService.acquire('resource-key', { wait: true });
try {
  console.log('business logic');
} finally {
  await lock.release(); // 日志:release lock success,不再有 lock not held
}

6. 锁冲突

// ❌ 可能造成死锁
await lock1.acquire('resource'); // 进程A
await lock2.acquire('resource'); // 进程B - 可能死锁

// ✅ 使用有序锁获取
await lock1.acquire('resource:step1');
await lock2.acquire('resource:step2'); // 明确顺序

调试技巧

  1. 启用日志:

    DistributedLockModule.forRoot({
      defaultTimeout: 30000,
    });
    // 查看日志输出中的锁操作信息
  2. 测试连接:

    # 测试PostgreSQL连接
    psql -h localhost -U username -d database -c "SELECT pg_advisory_lock(1);"
  3. 监控锁状态:

    -- 查看当前持有的advisory locks
    SELECT * FROM pg_locks WHERE locktype = 'advisory';

版本

当前版本:1.2.0

更新日志

v1.2.0

  • ✅ 添加完整的测试套件
  • ✅ 支持锁获取、释放、重试等核心功能测试
  • ✅ 模块集成测试
  • ✅ 配置接口测试
  • ✅ 异常处理测试

v1.1.0

  • ✅ 修复 connectionName 支持问题
  • ✅ 完善依赖注入机制
  • ✅ 支持事务性数据源

v1.0.8

  • ✅ 修复包发布配置
  • ✅ 添加 files 字段
  • ✅ 正确的导出路径

v1.0.5

  • ✅ 添加作者邮箱
  • ✅ 支持 ConfigService 异步配置