diff --git a/backend/services/identity-service/src/application/services/user-application.service.referral.spec.ts b/backend/services/identity-service/src/application/services/user-application.service.referral.spec.ts index 2792c684..4913ab90 100644 --- a/backend/services/identity-service/src/application/services/user-application.service.referral.spec.ts +++ b/backend/services/identity-service/src/application/services/user-application.service.referral.spec.ts @@ -95,9 +95,11 @@ describe('UserApplicationService - Referral APIs', () => { getNextAccountSequence: jest.fn(), findUsers: jest.fn(), countUsers: jest.fn(), + findByUserIds: jest.fn(), findByInviterSequence: jest.fn(), createReferralLink: jest.fn(), findReferralLinksByUserId: jest.fn(), + findUsersWithIncompleteWallets: jest.fn(), }; const mockMpcKeyShareRepository: jest.Mocked = { diff --git a/backend/services/identity-service/src/application/services/user-application.service.ts b/backend/services/identity-service/src/application/services/user-application.service.ts index 1e2f8854..0991b241 100644 --- a/backend/services/identity-service/src/application/services/user-application.service.ts +++ b/backend/services/identity-service/src/application/services/user-application.service.ts @@ -1603,6 +1603,11 @@ export class UserApplicationService { * * 钱包通过 Kafka 事件异步生成,此接口用于轮询查询状态 * 首次查询时会返回助记词(未备份状态),之后不再返回 + * + * 增强功能: + * - 检查 Redis 中的实际生成状态 + * - 如果状态是 failed 或超时,返回 failed 状态 + * - 同时自动触发重试(使用分布式锁防止重复触发) */ async getWalletStatus( query: GetWalletStatusQuery, @@ -1615,6 +1620,8 @@ export class UserApplicationService { throw new ApplicationError('用户不存在'); } + const userId = account.userId.value.toString(); + // 获取所有钱包地址 const wallets = account.getAllWalletAddresses(); @@ -1629,6 +1636,10 @@ export class UserApplicationService { BigInt(account.userId.value), ); + // 清理 Redis 状态(可选,节省内存) + await this.redisService.delete(`keygen:status:${userId}`); + await this.redisService.delete(`keygen:retry:${userId}`); + return { status: 'ready', walletAddresses: { @@ -1640,12 +1651,143 @@ export class UserApplicationService { }; } - // 钱包还在生成中 + // 钱包不完整,检查 Redis 中的生成状态 + const redisKey = `keygen:status:${userId}`; + const statusData = await this.redisService.get(redisKey); + + let shouldTriggerRetry = false; + let isFailed = false; + let errorMessage = ''; + + if (statusData) { + try { + const parsed = JSON.parse(statusData); + const status = parsed.status; + const updatedAt = parsed.updatedAt; + + // 检查状态是否超时(超过60秒未更新) + const TIMEOUT_MS = 60 * 1000; + const isTimeout = + updatedAt && Date.now() - new Date(updatedAt).getTime() > TIMEOUT_MS; + + if (status === 'failed') { + // 明确失败状态 + isFailed = true; + errorMessage = parsed.errorMessage || '钱包生成失败'; + shouldTriggerRetry = true; + this.logger.log( + `[WALLET-STATUS] User ${userId} status is 'failed', will trigger retry`, + ); + } else if ( + (status === 'generating' || status === 'deriving') && + isTimeout + ) { + // 超时状态 + isFailed = true; + errorMessage = '钱包生成超时'; + shouldTriggerRetry = true; + this.logger.log( + `[WALLET-STATUS] User ${userId} status '${status}' timed out, will trigger retry`, + ); + } else if (status === 'completed') { + // Redis 显示 completed 但数据库没有完整地址,状态不一致 + shouldTriggerRetry = true; + this.logger.warn( + `[WALLET-STATUS] User ${userId} Redis status is 'completed' but wallet incomplete, will trigger retry`, + ); + } + // pending/generating/deriving 且未超时 → 正常等待 + } catch (e) { + // JSON 解析失败,触发重试 + shouldTriggerRetry = true; + this.logger.warn( + `[WALLET-STATUS] Failed to parse status data for user ${userId}, will trigger retry`, + ); + } + } else { + // Redis 无状态但数据库钱包不完整 → 需要触发生成 + shouldTriggerRetry = true; + this.logger.log( + `[WALLET-STATUS] User ${userId} has no Redis status but wallet incomplete, will trigger retry`, + ); + } + + // 如果需要触发重试,使用分布式锁防止并发 + if (shouldTriggerRetry) { + const lockKey = `keygen:lock:${userId}`; + const gotLock = await this.redisService.tryLock(lockKey, 60); // 60秒锁 + + if (gotLock) { + // 异步触发重试,不阻塞响应 + this.triggerWalletRetryAsync(userId, account).catch((err) => { + this.logger.error( + `[WALLET-STATUS] Async retry failed for user ${userId}`, + err, + ); + }); + this.logger.log( + `[WALLET-STATUS] Acquired lock and triggered retry for user ${userId}`, + ); + } else { + this.logger.log( + `[WALLET-STATUS] Lock not acquired for user ${userId}, retry already in progress`, + ); + } + } + + // 如果是失败状态,返回 failed;否则返回 generating + if (isFailed) { + return { + status: 'failed', + errorMessage, + }; + } + return { status: 'generating', }; } + /** + * 异步触发钱包生成重试 + * + * 内部方法,由 getWalletStatus 调用 + * 重新发布 UserAccountCreatedEvent 触发 MPC 生成流程 + */ + private async triggerWalletRetryAsync( + userId: string, + account: UserAccount, + ): Promise { + try { + // 发布事件触发钱包生成 + const event = account.createWalletGenerationEvent(); + await this.eventPublisher.publish(event); + + // 更新 Redis 状态为 pending + const statusData = { + status: 'pending', + userId, + updatedAt: new Date().toISOString(), + }; + + await this.redisService.set( + `keygen:status:${userId}`, + JSON.stringify(statusData), + 60 * 60 * 24, // 24 小时 + ); + + this.logger.log( + `[WALLET-STATUS] Wallet generation retry triggered for user ${userId}`, + ); + } catch (error) { + this.logger.error( + `[WALLET-STATUS] Failed to trigger retry for user ${userId}`, + error, + ); + throw error; + } + } + /** * 手动重试钱包生成 * diff --git a/backend/services/identity-service/src/application/tasks/wallet-retry.task.ts b/backend/services/identity-service/src/application/tasks/wallet-retry.task.ts index f6162a3d..53f42819 100644 --- a/backend/services/identity-service/src/application/tasks/wallet-retry.task.ts +++ b/backend/services/identity-service/src/application/tasks/wallet-retry.task.ts @@ -1,18 +1,21 @@ /** - * 钱包生成重试定时任务 + * 钱包生成重试定时任务 (增强版) * - * 功能: - * 1. 每分钟扫描 Redis 中的钱包生成状态 - * 2. 检测超过 60 秒仍在 generating 状态的账号 - * 3. 检测状态为 failed 的账号 - * 4. 自动触发重试,最多重试 10 分钟(约 10 次) + * 设计目标:100% 确保每个用户的钱包最终能生成成功 * - * 重试策略: - * - 第 1 次失败:立即重试 - * - 第 2 次失败:1 分钟后重试 - * - 第 3 次失败:2 分钟后重试 - * - ... - * - 最多 10 分钟内持续重试 + * 核心改进: + * 1. 基于数据库扫描:不依赖 Redis 状态,直接查询钱包不完整的用户 + * 2. 指数退避:1分钟 → 2分钟 → 4分钟 → ... → 最大60分钟 + * 3. 无时间限制:持续重试直到成功,不会放弃 + * 4. 分布式锁:防止多实例重复触发同一用户的重试 + * + * 重试触发条件: + * - 数据库中三链地址不全的用户 + * - Redis 状态为 failed/超时/无状态 + * + * 与 getWalletStatus API 的配合: + * - API 调用时也会触发重试(用户主动轮询) + * - 定时任务作为兜底,确保即使用户不调用 API 也能重试 */ import { Injectable, Logger } from '@nestjs/common'; @@ -24,13 +27,21 @@ import { USER_ACCOUNT_REPOSITORY, } from '@/domain/repositories/user-account.repository.interface'; import { Inject } from '@nestjs/common'; -import { UserId } from '@/domain/value-objects'; -// Redis key prefix for keygen status +// Redis key prefix const KEYGEN_STATUS_PREFIX = 'keygen:status:'; const KEYGEN_RETRY_PREFIX = 'keygen:retry:'; -const MAX_RETRY_DURATION_MS = 10 * 60 * 1000; // 10 分钟 -const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒 +const KEYGEN_LOCK_PREFIX = 'keygen:lock:'; + +// 超时配置 +const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒视为超时 + +// 指数退避配置 +const MIN_BACKOFF_MINUTES = 1; // 最小退避时间:1分钟 +const MAX_BACKOFF_MINUTES = 60; // 最大退避时间:60分钟 + +// 每次扫描的最大用户数 +const MAX_USERS_PER_SCAN = 100; export interface KeygenStatusData { status: 'pending' | 'generating' | 'deriving' | 'completed' | 'failed'; @@ -44,14 +55,14 @@ export interface KeygenStatusData { export interface RetryRecord { userId: string; retryCount: number; - firstFailedAt: string; + firstRetryAt: string; lastRetryAt: string; + nextRetryAt: string; // 下次允许重试的时间 } @Injectable() export class WalletRetryTask { private readonly logger = new Logger(WalletRetryTask.name); - private isRunning = false; constructor( private readonly redisService: RedisService, @@ -62,162 +73,202 @@ export class WalletRetryTask { /** * 每分钟执行一次检查 + * + * 使用分布式锁保护整个任务,防止多实例并发执行 */ @Cron(CronExpression.EVERY_MINUTE) async handleWalletRetry() { - // 防止并发执行 - if (this.isRunning) { - this.logger.warn( - '[TASK] Previous task still running, skipping this execution', + const taskLockKey = 'wallet-retry-task:lock'; + + // 尝试获取任务级别的分布式锁(2分钟过期) + const gotTaskLock = await this.redisService.tryLock(taskLockKey, 120); + if (!gotTaskLock) { + this.logger.debug( + '[TASK] Another instance is running the task, skipping', ); return; } - this.isRunning = true; - this.logger.log('[TASK] Starting wallet retry check...'); + this.logger.log('[TASK] Starting wallet retry check (database-driven)...'); try { - // 1. 扫描所有 keygen:status:* keys - const statusKeys = await this.redisService.keys( - `${KEYGEN_STATUS_PREFIX}*`, - ); + // 1. 从数据库查询钱包不完整的用户 + const incompleteUserIds = + await this.userRepository.findUsersWithIncompleteWallets( + MAX_USERS_PER_SCAN, + ); + + if (incompleteUserIds.length === 0) { + this.logger.log('[TASK] No users with incomplete wallets found'); + return; + } + this.logger.log( - `[TASK] Found ${statusKeys.length} wallet generation records`, + `[TASK] Found ${incompleteUserIds.length} users with incomplete wallets`, ); - for (const key of statusKeys) { + // 2. 逐个检查并处理 + let retriedCount = 0; + let skippedCount = 0; + + for (const userId of incompleteUserIds) { try { - await this.checkAndRetry(key); + const retried = await this.checkAndRetryUser(userId.value.toString()); + if (retried) { + retriedCount++; + } else { + skippedCount++; + } } catch (error) { - this.logger.error(`[TASK] Error processing key ${key}: ${error}`); + this.logger.error( + `[TASK] Error processing user ${userId.value}: ${error}`, + ); } } - this.logger.log('[TASK] Wallet retry check completed'); + this.logger.log( + `[TASK] Wallet retry check completed: retried=${retriedCount}, skipped=${skippedCount}`, + ); } catch (error) { this.logger.error(`[TASK] Wallet retry task failed: ${error}`, error); } finally { - this.isRunning = false; + // 释放任务锁 + await this.redisService.unlock(taskLockKey); } } /** - * 检查单个钱包生成状态并决定是否重试 + * 检查单个用户并决定是否重试 + * + * @returns true 如果触发了重试,false 如果跳过 */ - private async checkAndRetry(statusKey: string): Promise { - const statusData = await this.redisService.get(statusKey); - if (!statusData) return; - - let status: KeygenStatusData; - try { - status = JSON.parse(statusData); - } catch (error) { - this.logger.warn(`[TASK] Invalid status data for ${statusKey}`); - return; - } - - const { userId, status: currentStatus, updatedAt } = status; - - // 跳过已完成的 - if (currentStatus === 'completed') { + private async checkAndRetryUser(userId: string): Promise { + // 1. 检查是否在退避期内 + const shouldWait = await this.isInBackoffPeriod(userId); + if (shouldWait) { this.logger.debug( - `[TASK] User ${userId} status is 'completed', skipping`, - ); - return; - } - - // 检查是否需要重试 - const needsRetry = this.shouldRetry(status); - if (!needsRetry) { - return; - } - - // 检查重试限制 - const canRetry = await this.checkRetryLimit(userId); - if (!canRetry) { - this.logger.warn( - `[TASK] User ${userId} exceeded retry time limit (10 minutes)`, - ); - // 更新状态为最终失败 - await this.markAsFinalFailure(userId); - return; - } - - // 执行重试 - await this.retryWalletGeneration(userId); - } - - /** - * 判断是否应该重试 - */ - private shouldRetry(status: KeygenStatusData): boolean { - const { status: currentStatus, updatedAt } = status; - const updatedTime = new Date(updatedAt).getTime(); - const now = Date.now(); - const elapsed = now - updatedTime; - - // 情况1:状态为 failed - if (currentStatus === 'failed') { - this.logger.log( - `[TASK] User ${status.userId} status is 'failed', will retry`, - ); - return true; - } - - // 情况2:状态为 generating 但超过 60 秒 - if (currentStatus === 'generating' && elapsed > KEYGEN_TIMEOUT_MS) { - this.logger.log( - `[TASK] User ${status.userId} generating timeout (${Math.floor(elapsed / 1000)}s), will retry`, - ); - return true; - } - - // 情况3:状态为 deriving 但超过 60 秒 - if (currentStatus === 'deriving' && elapsed > KEYGEN_TIMEOUT_MS) { - this.logger.log( - `[TASK] User ${status.userId} deriving timeout (${Math.floor(elapsed / 1000)}s), will retry`, - ); - return true; - } - - return false; - } - - /** - * 检查重试限制(10 分钟内) - */ - private async checkRetryLimit(userId: string): Promise { - const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`; - const retryData = await this.redisService.get(retryKey); - - if (!retryData) { - // 第一次重试 - return true; - } - - let record: RetryRecord; - try { - record = JSON.parse(retryData); - } catch (error) { - this.logger.warn(`[TASK] Invalid retry record for user ${userId}`); - return true; - } - - const firstFailedTime = new Date(record.firstFailedAt).getTime(); - const now = Date.now(); - const elapsed = now - firstFailedTime; - - // 如果超过 10 分钟,不再重试 - if (elapsed > MAX_RETRY_DURATION_MS) { - this.logger.warn( - `[TASK] User ${userId} exceeded max retry duration: ${Math.floor(elapsed / 1000 / 60)} minutes`, + `[TASK] User ${userId} is in backoff period, skipping`, ); return false; } + // 2. 检查 Redis 状态 + const redisKey = `${KEYGEN_STATUS_PREFIX}${userId}`; + const statusData = await this.redisService.get(redisKey); + + let needsRetry = false; + + if (statusData) { + try { + const parsed = JSON.parse(statusData) as KeygenStatusData; + const status = parsed.status; + const updatedAt = parsed.updatedAt; + + // 检查是否超时 + const isTimeout = + updatedAt && + Date.now() - new Date(updatedAt).getTime() > KEYGEN_TIMEOUT_MS; + + if (status === 'completed') { + // Redis 显示 completed 但数据库钱包不完整 → 状态不一致,需要重试 + this.logger.warn( + `[TASK] User ${userId} Redis status 'completed' but wallet incomplete`, + ); + needsRetry = true; + } else if (status === 'failed') { + // 明确失败 + needsRetry = true; + } else if ( + (status === 'generating' || status === 'deriving') && + isTimeout + ) { + // 超时 + this.logger.log( + `[TASK] User ${userId} status '${status}' timed out`, + ); + needsRetry = true; + } else if (status === 'pending' && isTimeout) { + // pending 超时 + needsRetry = true; + } + // generating/deriving/pending 且未超时 → 正常等待,不重试 + } catch (e) { + // JSON 解析失败 + needsRetry = true; + } + } else { + // Redis 无状态但数据库钱包不完整 → 需要触发生成 + needsRetry = true; + } + + if (!needsRetry) { + return false; + } + + // 3. 尝试获取用户级别的分布式锁 + const lockKey = `${KEYGEN_LOCK_PREFIX}${userId}`; + const gotLock = await this.redisService.tryLock(lockKey, 60); + + if (!gotLock) { + this.logger.debug( + `[TASK] User ${userId} lock not acquired, retry in progress`, + ); + return false; + } + + // 4. 执行重试 + await this.retryWalletGeneration(userId); return true; } + /** + * 检查用户是否在退避期内 + * + * 指数退避策略:每次重试后等待时间翻倍 + * 1分钟 → 2分钟 → 4分钟 → 8分钟 → 16分钟 → 32分钟 → 60分钟(上限) + */ + private async isInBackoffPeriod(userId: string): Promise { + const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`; + const retryData = await this.redisService.get(retryKey); + + if (!retryData) { + return false; // 第一次重试,无需等待 + } + + try { + const record = JSON.parse(retryData) as RetryRecord; + const nextRetryTime = new Date(record.nextRetryAt).getTime(); + const now = Date.now(); + + if (now < nextRetryTime) { + // 还在退避期内 + const remainingSeconds = Math.floor((nextRetryTime - now) / 1000); + this.logger.debug( + `[TASK] User ${userId} backoff: ${remainingSeconds}s remaining`, + ); + return true; + } + + return false; + } catch (e) { + return false; + } + } + + /** + * 计算下次重试时间(指数退避) + */ + private calculateNextRetryTime(retryCount: number): Date { + // 指数退避:2^(retryCount-1) 分钟,最大 60 分钟 + const backoffMinutes = Math.min( + Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES, + MAX_BACKOFF_MINUTES, + ); + + const nextRetryTime = new Date(Date.now() + backoffMinutes * 60 * 1000); + return nextRetryTime; + } + /** * 执行钱包生成重试 * @@ -228,7 +279,9 @@ export class WalletRetryTask { try { // 1. 获取用户账号信息 - const userIdObj = UserId.create(BigInt(userId)); + const userIdObj = await import('@/domain/value-objects').then((m) => + m.UserId.create(BigInt(userId)), + ); const account = await this.userRepository.findById(userIdObj); if (!account) { @@ -236,20 +289,18 @@ export class WalletRetryTask { return; } - // 2. 更新重试记录 - await this.updateRetryRecord(userId); + // 2. 更新重试记录(包含指数退避时间) + const retryCount = await this.updateRetryRecord(userId); // 3. 重新触发钱包生成流程 - // 通过重新发布 UserAccountCreatedEvent 来触发 const event = account.createWalletGenerationEvent(); - await this.eventPublisher.publish(event); this.logger.log( - `[TASK] Wallet generation retry triggered for user: ${userId}`, + `[TASK] Wallet generation retry #${retryCount} triggered for user: ${userId}`, ); - // 4. 更新 Redis 状态为 pending(等待重新生成) + // 4. 更新 Redis 状态为 pending const statusData: KeygenStatusData = { status: 'pending', userId, @@ -271,60 +322,53 @@ export class WalletRetryTask { /** * 更新重试记录 + * + * @returns 当前重试次数 */ - private async updateRetryRecord(userId: string): Promise { + private async updateRetryRecord(userId: string): Promise { const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`; const retryData = await this.redisService.get(retryKey); let record: RetryRecord; + let retryCount: number; if (!retryData) { // 第一次重试 + retryCount = 1; record = { userId, retryCount: 1, - firstFailedAt: new Date().toISOString(), + firstRetryAt: new Date().toISOString(), lastRetryAt: new Date().toISOString(), + nextRetryAt: this.calculateNextRetryTime(1).toISOString(), }; } else { const existing = JSON.parse(retryData) as RetryRecord; + retryCount = existing.retryCount + 1; record = { ...existing, - retryCount: existing.retryCount + 1, + retryCount, lastRetryAt: new Date().toISOString(), + nextRetryAt: this.calculateNextRetryTime(retryCount).toISOString(), }; } + // TTL 设置为 7 天,足够长以跟踪长期重试 await this.redisService.set( retryKey, JSON.stringify(record), - 60 * 60 * 24, // 24 小时 + 60 * 60 * 24 * 7, // 7 天 + ); + + const nextRetryMinutes = Math.min( + Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES, + MAX_BACKOFF_MINUTES, ); this.logger.log( - `[TASK] Updated retry record for user ${userId}: count=${record.retryCount}`, - ); - } - - /** - * 标记为最终失败(超过重试时间限制) - */ - private async markAsFinalFailure(userId: string): Promise { - const statusData: KeygenStatusData = { - status: 'failed', - userId, - errorMessage: 'Wallet generation failed after 10 minutes of retries', - updatedAt: new Date().toISOString(), - }; - - await this.redisService.set( - `${KEYGEN_STATUS_PREFIX}${userId}`, - JSON.stringify(statusData), - 60 * 60 * 24, // 24 小时 + `[TASK] User ${userId} retry #${retryCount}, next retry in ${nextRetryMinutes} minutes`, ); - this.logger.error( - `[TASK] Marked user ${userId} as final failure after retry timeout`, - ); + return retryCount; } } diff --git a/backend/services/identity-service/src/domain/repositories/user-account.repository.interface.ts b/backend/services/identity-service/src/domain/repositories/user-account.repository.interface.ts index ec8ef29a..6455f1ff 100644 --- a/backend/services/identity-service/src/domain/repositories/user-account.repository.interface.ts +++ b/backend/services/identity-service/src/domain/repositories/user-account.repository.interface.ts @@ -71,6 +71,17 @@ export interface UserAccountRepository { params: CreateReferralLinkParams, ): Promise; findReferralLinksByUserId(userId: UserId): Promise; + + // 钱包生成相关 + /** + * 查找钱包地址不完整的用户(三链地址不全) + * + * 用于定时任务扫描需要重试钱包生成的用户 + * 返回所有三链地址不全的用户 ID 列表 + * + * @param limit 最大返回数量,防止一次查询过多 + */ + findUsersWithIncompleteWallets(limit: number): Promise; } export const USER_ACCOUNT_REPOSITORY = Symbol('USER_ACCOUNT_REPOSITORY'); diff --git a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts index 8ff2670f..bfb6371f 100644 --- a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts +++ b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts @@ -465,4 +465,31 @@ export class UserAccountRepositoryImpl implements UserAccountRepository { createdAt: r.createdAt, })); } + + /** + * 查找钱包地址不完整的用户 + * + * 使用 SQL 查询找出三链地址不全的用户: + * - 需要 KAVA、DST、BSC 三种链都有地址才算完整 + * - 只返回 ACTIVE 状态的用户 + */ + async findUsersWithIncompleteWallets(limit: number): Promise { + // 使用原生 SQL 查询,因为 Prisma 不支持 HAVING 子句的复杂聚合 + const results = await this.prisma.$queryRaw>` + SELECT u.user_id + FROM user_accounts u + LEFT JOIN wallet_addresses w ON u.user_id = w.user_id + WHERE u.status = 'ACTIVE' + GROUP BY u.user_id + HAVING COUNT(DISTINCT w.chain_type) < 3 + ORDER BY u.registered_at ASC + LIMIT ${limit} + `; + + this.logger.log( + `[QUERY] Found ${results.length} users with incomplete wallets`, + ); + + return results.map((r) => UserId.create(r.user_id)); + } } diff --git a/backend/services/identity-service/src/infrastructure/redis/redis.service.ts b/backend/services/identity-service/src/infrastructure/redis/redis.service.ts index 78306bd6..a26cd361 100644 --- a/backend/services/identity-service/src/infrastructure/redis/redis.service.ts +++ b/backend/services/identity-service/src/infrastructure/redis/redis.service.ts @@ -48,6 +48,32 @@ export class RedisService implements OnModuleDestroy { return this.client.keys(pattern); } + /** + * 尝试获取分布式锁 + * + * 使用 Redis SET NX EX 原子操作实现分布式锁 + * - 如果锁不存在,获取成功并设置 TTL + * - 如果锁已存在,获取失败 + * - TTL 到期后锁自动释放,防止死锁 + * + * @param key 锁的 key + * @param ttlSeconds 锁的过期时间(秒) + * @returns true 如果获取锁成功,false 如果锁已被持有 + */ + async tryLock(key: string, ttlSeconds: number): Promise { + const result = await this.client.set(key, '1', 'EX', ttlSeconds, 'NX'); + return result === 'OK'; + } + + /** + * 释放分布式锁 + * + * @param key 锁的 key + */ + async unlock(key: string): Promise { + await this.client.del(key); + } + /** * 原子更新 keygen 状态 * 使用 Lua 脚本确保状态只能向前推进: pending < generating < deriving < completed