feat(identity-service): 增强钱包生成可靠性,确保100%生成成功

核心改进:
- 基于数据库扫描代替Redis扫描,防止状态丢失后无法重试
- 指数退避策略(1分钟→60分钟),无时间限制持续重试
- 分布式锁保护,防止多实例/并发重复触发
- getWalletStatus API 检测失败状态并自动触发重试

修改内容:
- RedisService: 添加 tryLock/unlock 分布式锁方法
- UserAccountRepository: 添加 findUsersWithIncompleteWallets 查询
- getWalletStatus: 增强状态检测,失败/超时时自动触发重试
- WalletRetryTask: 完全重写,基于数据库驱动+指数退避

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-27 09:40:51 -08:00
parent c84516b222
commit 55bb129477
6 changed files with 427 additions and 175 deletions

View File

@ -95,9 +95,11 @@ describe('UserApplicationService - Referral APIs', () => {
getNextAccountSequence: jest.fn(),
findUsers: jest.fn(),
countUsers: jest.fn(),
findByUserIds: jest.fn(),
findByInviterSequence: jest.fn(),
createReferralLink: jest.fn(),
findReferralLinksByUserId: jest.fn(),
findUsersWithIncompleteWallets: jest.fn(),
};
const mockMpcKeyShareRepository: jest.Mocked<MpcKeyShareRepository> = {

View File

@ -1603,6 +1603,11 @@ export class UserApplicationService {
*
* Kafka
*
*
*
* - Redis
* - failed failed
* - 使
*/
async getWalletStatus(
query: GetWalletStatusQuery,
@ -1615,6 +1620,8 @@ export class UserApplicationService {
throw new ApplicationError('用户不存在');
}
const userId = account.userId.value.toString();
// 获取所有钱包地址
const wallets = account.getAllWalletAddresses();
@ -1629,6 +1636,10 @@ export class UserApplicationService {
BigInt(account.userId.value),
);
// 清理 Redis 状态(可选,节省内存)
await this.redisService.delete(`keygen:status:${userId}`);
await this.redisService.delete(`keygen:retry:${userId}`);
return {
status: 'ready',
walletAddresses: {
@ -1640,12 +1651,143 @@ export class UserApplicationService {
};
}
// 钱包还在生成中
// 钱包不完整,检查 Redis 中的生成状态
const redisKey = `keygen:status:${userId}`;
const statusData = await this.redisService.get(redisKey);
let shouldTriggerRetry = false;
let isFailed = false;
let errorMessage = '';
if (statusData) {
try {
const parsed = JSON.parse(statusData);
const status = parsed.status;
const updatedAt = parsed.updatedAt;
// 检查状态是否超时超过60秒未更新
const TIMEOUT_MS = 60 * 1000;
const isTimeout =
updatedAt && Date.now() - new Date(updatedAt).getTime() > TIMEOUT_MS;
if (status === 'failed') {
// 明确失败状态
isFailed = true;
errorMessage = parsed.errorMessage || '钱包生成失败';
shouldTriggerRetry = true;
this.logger.log(
`[WALLET-STATUS] User ${userId} status is 'failed', will trigger retry`,
);
} else if (
(status === 'generating' || status === 'deriving') &&
isTimeout
) {
// 超时状态
isFailed = true;
errorMessage = '钱包生成超时';
shouldTriggerRetry = true;
this.logger.log(
`[WALLET-STATUS] User ${userId} status '${status}' timed out, will trigger retry`,
);
} else if (status === 'completed') {
// Redis 显示 completed 但数据库没有完整地址,状态不一致
shouldTriggerRetry = true;
this.logger.warn(
`[WALLET-STATUS] User ${userId} Redis status is 'completed' but wallet incomplete, will trigger retry`,
);
}
// pending/generating/deriving 且未超时 → 正常等待
} catch (e) {
// JSON 解析失败,触发重试
shouldTriggerRetry = true;
this.logger.warn(
`[WALLET-STATUS] Failed to parse status data for user ${userId}, will trigger retry`,
);
}
} else {
// Redis 无状态但数据库钱包不完整 → 需要触发生成
shouldTriggerRetry = true;
this.logger.log(
`[WALLET-STATUS] User ${userId} has no Redis status but wallet incomplete, will trigger retry`,
);
}
// 如果需要触发重试,使用分布式锁防止并发
if (shouldTriggerRetry) {
const lockKey = `keygen:lock:${userId}`;
const gotLock = await this.redisService.tryLock(lockKey, 60); // 60秒锁
if (gotLock) {
// 异步触发重试,不阻塞响应
this.triggerWalletRetryAsync(userId, account).catch((err) => {
this.logger.error(
`[WALLET-STATUS] Async retry failed for user ${userId}`,
err,
);
});
this.logger.log(
`[WALLET-STATUS] Acquired lock and triggered retry for user ${userId}`,
);
} else {
this.logger.log(
`[WALLET-STATUS] Lock not acquired for user ${userId}, retry already in progress`,
);
}
}
// 如果是失败状态,返回 failed否则返回 generating
if (isFailed) {
return {
status: 'failed',
errorMessage,
};
}
return {
status: 'generating',
};
}
/**
*
*
* getWalletStatus
* UserAccountCreatedEvent MPC
*/
private async triggerWalletRetryAsync(
userId: string,
account: UserAccount,
): Promise<void> {
try {
// 发布事件触发钱包生成
const event = account.createWalletGenerationEvent();
await this.eventPublisher.publish(event);
// 更新 Redis 状态为 pending
const statusData = {
status: 'pending',
userId,
updatedAt: new Date().toISOString(),
};
await this.redisService.set(
`keygen:status:${userId}`,
JSON.stringify(statusData),
60 * 60 * 24, // 24 小时
);
this.logger.log(
`[WALLET-STATUS] Wallet generation retry triggered for user ${userId}`,
);
} catch (error) {
this.logger.error(
`[WALLET-STATUS] Failed to trigger retry for user ${userId}`,
error,
);
throw error;
}
}
/**
*
*

View File

@ -1,18 +1,21 @@
/**
*
* ()
*
*
* 1. Redis
* 2. 60 generating
* 3. failed
* 4. 10 10
* 100%
*
*
* - 1
* - 2 1
* - 3 2
* - ...
* - 10
*
* 1. Redis
* 2. 退1 2 4 ... 60
* 3.
* 4.
*
*
* -
* - Redis failed//
*
* getWalletStatus API
* - API
* - 使 API
*/
import { Injectable, Logger } from '@nestjs/common';
@ -24,13 +27,21 @@ import {
USER_ACCOUNT_REPOSITORY,
} from '@/domain/repositories/user-account.repository.interface';
import { Inject } from '@nestjs/common';
import { UserId } from '@/domain/value-objects';
// Redis key prefix for keygen status
// Redis key prefix
const KEYGEN_STATUS_PREFIX = 'keygen:status:';
const KEYGEN_RETRY_PREFIX = 'keygen:retry:';
const MAX_RETRY_DURATION_MS = 10 * 60 * 1000; // 10 分钟
const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒
const KEYGEN_LOCK_PREFIX = 'keygen:lock:';
// 超时配置
const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒视为超时
// 指数退避配置
const MIN_BACKOFF_MINUTES = 1; // 最小退避时间1分钟
const MAX_BACKOFF_MINUTES = 60; // 最大退避时间60分钟
// 每次扫描的最大用户数
const MAX_USERS_PER_SCAN = 100;
export interface KeygenStatusData {
status: 'pending' | 'generating' | 'deriving' | 'completed' | 'failed';
@ -44,14 +55,14 @@ export interface KeygenStatusData {
export interface RetryRecord {
userId: string;
retryCount: number;
firstFailedAt: string;
firstRetryAt: string;
lastRetryAt: string;
nextRetryAt: string; // 下次允许重试的时间
}
@Injectable()
export class WalletRetryTask {
private readonly logger = new Logger(WalletRetryTask.name);
private isRunning = false;
constructor(
private readonly redisService: RedisService,
@ -62,162 +73,202 @@ export class WalletRetryTask {
/**
*
*
* 使
*/
@Cron(CronExpression.EVERY_MINUTE)
async handleWalletRetry() {
// 防止并发执行
if (this.isRunning) {
this.logger.warn(
'[TASK] Previous task still running, skipping this execution',
const taskLockKey = 'wallet-retry-task:lock';
// 尝试获取任务级别的分布式锁2分钟过期
const gotTaskLock = await this.redisService.tryLock(taskLockKey, 120);
if (!gotTaskLock) {
this.logger.debug(
'[TASK] Another instance is running the task, skipping',
);
return;
}
this.isRunning = true;
this.logger.log('[TASK] Starting wallet retry check...');
this.logger.log('[TASK] Starting wallet retry check (database-driven)...');
try {
// 1. 扫描所有 keygen:status:* keys
const statusKeys = await this.redisService.keys(
`${KEYGEN_STATUS_PREFIX}*`,
);
// 1. 从数据库查询钱包不完整的用户
const incompleteUserIds =
await this.userRepository.findUsersWithIncompleteWallets(
MAX_USERS_PER_SCAN,
);
if (incompleteUserIds.length === 0) {
this.logger.log('[TASK] No users with incomplete wallets found');
return;
}
this.logger.log(
`[TASK] Found ${statusKeys.length} wallet generation records`,
`[TASK] Found ${incompleteUserIds.length} users with incomplete wallets`,
);
for (const key of statusKeys) {
// 2. 逐个检查并处理
let retriedCount = 0;
let skippedCount = 0;
for (const userId of incompleteUserIds) {
try {
await this.checkAndRetry(key);
const retried = await this.checkAndRetryUser(userId.value.toString());
if (retried) {
retriedCount++;
} else {
skippedCount++;
}
} catch (error) {
this.logger.error(`[TASK] Error processing key ${key}: ${error}`);
this.logger.error(
`[TASK] Error processing user ${userId.value}: ${error}`,
);
}
}
this.logger.log('[TASK] Wallet retry check completed');
this.logger.log(
`[TASK] Wallet retry check completed: retried=${retriedCount}, skipped=${skippedCount}`,
);
} catch (error) {
this.logger.error(`[TASK] Wallet retry task failed: ${error}`, error);
} finally {
this.isRunning = false;
// 释放任务锁
await this.redisService.unlock(taskLockKey);
}
}
/**
*
*
*
* @returns true false
*/
private async checkAndRetry(statusKey: string): Promise<void> {
const statusData = await this.redisService.get(statusKey);
if (!statusData) return;
let status: KeygenStatusData;
try {
status = JSON.parse(statusData);
} catch (error) {
this.logger.warn(`[TASK] Invalid status data for ${statusKey}`);
return;
}
const { userId, status: currentStatus, updatedAt } = status;
// 跳过已完成的
if (currentStatus === 'completed') {
private async checkAndRetryUser(userId: string): Promise<boolean> {
// 1. 检查是否在退避期内
const shouldWait = await this.isInBackoffPeriod(userId);
if (shouldWait) {
this.logger.debug(
`[TASK] User ${userId} status is 'completed', skipping`,
);
return;
}
// 检查是否需要重试
const needsRetry = this.shouldRetry(status);
if (!needsRetry) {
return;
}
// 检查重试限制
const canRetry = await this.checkRetryLimit(userId);
if (!canRetry) {
this.logger.warn(
`[TASK] User ${userId} exceeded retry time limit (10 minutes)`,
);
// 更新状态为最终失败
await this.markAsFinalFailure(userId);
return;
}
// 执行重试
await this.retryWalletGeneration(userId);
}
/**
*
*/
private shouldRetry(status: KeygenStatusData): boolean {
const { status: currentStatus, updatedAt } = status;
const updatedTime = new Date(updatedAt).getTime();
const now = Date.now();
const elapsed = now - updatedTime;
// 情况1状态为 failed
if (currentStatus === 'failed') {
this.logger.log(
`[TASK] User ${status.userId} status is 'failed', will retry`,
);
return true;
}
// 情况2状态为 generating 但超过 60 秒
if (currentStatus === 'generating' && elapsed > KEYGEN_TIMEOUT_MS) {
this.logger.log(
`[TASK] User ${status.userId} generating timeout (${Math.floor(elapsed / 1000)}s), will retry`,
);
return true;
}
// 情况3状态为 deriving 但超过 60 秒
if (currentStatus === 'deriving' && elapsed > KEYGEN_TIMEOUT_MS) {
this.logger.log(
`[TASK] User ${status.userId} deriving timeout (${Math.floor(elapsed / 1000)}s), will retry`,
);
return true;
}
return false;
}
/**
* 10
*/
private async checkRetryLimit(userId: string): Promise<boolean> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
if (!retryData) {
// 第一次重试
return true;
}
let record: RetryRecord;
try {
record = JSON.parse(retryData);
} catch (error) {
this.logger.warn(`[TASK] Invalid retry record for user ${userId}`);
return true;
}
const firstFailedTime = new Date(record.firstFailedAt).getTime();
const now = Date.now();
const elapsed = now - firstFailedTime;
// 如果超过 10 分钟,不再重试
if (elapsed > MAX_RETRY_DURATION_MS) {
this.logger.warn(
`[TASK] User ${userId} exceeded max retry duration: ${Math.floor(elapsed / 1000 / 60)} minutes`,
`[TASK] User ${userId} is in backoff period, skipping`,
);
return false;
}
// 2. 检查 Redis 状态
const redisKey = `${KEYGEN_STATUS_PREFIX}${userId}`;
const statusData = await this.redisService.get(redisKey);
let needsRetry = false;
if (statusData) {
try {
const parsed = JSON.parse(statusData) as KeygenStatusData;
const status = parsed.status;
const updatedAt = parsed.updatedAt;
// 检查是否超时
const isTimeout =
updatedAt &&
Date.now() - new Date(updatedAt).getTime() > KEYGEN_TIMEOUT_MS;
if (status === 'completed') {
// Redis 显示 completed 但数据库钱包不完整 → 状态不一致,需要重试
this.logger.warn(
`[TASK] User ${userId} Redis status 'completed' but wallet incomplete`,
);
needsRetry = true;
} else if (status === 'failed') {
// 明确失败
needsRetry = true;
} else if (
(status === 'generating' || status === 'deriving') &&
isTimeout
) {
// 超时
this.logger.log(
`[TASK] User ${userId} status '${status}' timed out`,
);
needsRetry = true;
} else if (status === 'pending' && isTimeout) {
// pending 超时
needsRetry = true;
}
// generating/deriving/pending 且未超时 → 正常等待,不重试
} catch (e) {
// JSON 解析失败
needsRetry = true;
}
} else {
// Redis 无状态但数据库钱包不完整 → 需要触发生成
needsRetry = true;
}
if (!needsRetry) {
return false;
}
// 3. 尝试获取用户级别的分布式锁
const lockKey = `${KEYGEN_LOCK_PREFIX}${userId}`;
const gotLock = await this.redisService.tryLock(lockKey, 60);
if (!gotLock) {
this.logger.debug(
`[TASK] User ${userId} lock not acquired, retry in progress`,
);
return false;
}
// 4. 执行重试
await this.retryWalletGeneration(userId);
return true;
}
/**
* 退
*
* 退
* 1 2 4 8 16 32 60
*/
private async isInBackoffPeriod(userId: string): Promise<boolean> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
if (!retryData) {
return false; // 第一次重试,无需等待
}
try {
const record = JSON.parse(retryData) as RetryRecord;
const nextRetryTime = new Date(record.nextRetryAt).getTime();
const now = Date.now();
if (now < nextRetryTime) {
// 还在退避期内
const remainingSeconds = Math.floor((nextRetryTime - now) / 1000);
this.logger.debug(
`[TASK] User ${userId} backoff: ${remainingSeconds}s remaining`,
);
return true;
}
return false;
} catch (e) {
return false;
}
}
/**
* 退
*/
private calculateNextRetryTime(retryCount: number): Date {
// 指数退避2^(retryCount-1) 分钟,最大 60 分钟
const backoffMinutes = Math.min(
Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES,
MAX_BACKOFF_MINUTES,
);
const nextRetryTime = new Date(Date.now() + backoffMinutes * 60 * 1000);
return nextRetryTime;
}
/**
*
*
@ -228,7 +279,9 @@ export class WalletRetryTask {
try {
// 1. 获取用户账号信息
const userIdObj = UserId.create(BigInt(userId));
const userIdObj = await import('@/domain/value-objects').then((m) =>
m.UserId.create(BigInt(userId)),
);
const account = await this.userRepository.findById(userIdObj);
if (!account) {
@ -236,20 +289,18 @@ export class WalletRetryTask {
return;
}
// 2. 更新重试记录
await this.updateRetryRecord(userId);
// 2. 更新重试记录(包含指数退避时间)
const retryCount = await this.updateRetryRecord(userId);
// 3. 重新触发钱包生成流程
// 通过重新发布 UserAccountCreatedEvent 来触发
const event = account.createWalletGenerationEvent();
await this.eventPublisher.publish(event);
this.logger.log(
`[TASK] Wallet generation retry triggered for user: ${userId}`,
`[TASK] Wallet generation retry #${retryCount} triggered for user: ${userId}`,
);
// 4. 更新 Redis 状态为 pending(等待重新生成)
// 4. 更新 Redis 状态为 pending
const statusData: KeygenStatusData = {
status: 'pending',
userId,
@ -271,60 +322,53 @@ export class WalletRetryTask {
/**
*
*
* @returns
*/
private async updateRetryRecord(userId: string): Promise<void> {
private async updateRetryRecord(userId: string): Promise<number> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
let record: RetryRecord;
let retryCount: number;
if (!retryData) {
// 第一次重试
retryCount = 1;
record = {
userId,
retryCount: 1,
firstFailedAt: new Date().toISOString(),
firstRetryAt: new Date().toISOString(),
lastRetryAt: new Date().toISOString(),
nextRetryAt: this.calculateNextRetryTime(1).toISOString(),
};
} else {
const existing = JSON.parse(retryData) as RetryRecord;
retryCount = existing.retryCount + 1;
record = {
...existing,
retryCount: existing.retryCount + 1,
retryCount,
lastRetryAt: new Date().toISOString(),
nextRetryAt: this.calculateNextRetryTime(retryCount).toISOString(),
};
}
// TTL 设置为 7 天,足够长以跟踪长期重试
await this.redisService.set(
retryKey,
JSON.stringify(record),
60 * 60 * 24, // 24 小时
60 * 60 * 24 * 7, // 7 天
);
const nextRetryMinutes = Math.min(
Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES,
MAX_BACKOFF_MINUTES,
);
this.logger.log(
`[TASK] Updated retry record for user ${userId}: count=${record.retryCount}`,
);
}
/**
*
*/
private async markAsFinalFailure(userId: string): Promise<void> {
const statusData: KeygenStatusData = {
status: 'failed',
userId,
errorMessage: 'Wallet generation failed after 10 minutes of retries',
updatedAt: new Date().toISOString(),
};
await this.redisService.set(
`${KEYGEN_STATUS_PREFIX}${userId}`,
JSON.stringify(statusData),
60 * 60 * 24, // 24 小时
`[TASK] User ${userId} retry #${retryCount}, next retry in ${nextRetryMinutes} minutes`,
);
this.logger.error(
`[TASK] Marked user ${userId} as final failure after retry timeout`,
);
return retryCount;
}
}

View File

@ -71,6 +71,17 @@ export interface UserAccountRepository {
params: CreateReferralLinkParams,
): Promise<ReferralLinkData>;
findReferralLinksByUserId(userId: UserId): Promise<ReferralLinkData[]>;
// 钱包生成相关
/**
*
*
*
* ID
*
* @param limit
*/
findUsersWithIncompleteWallets(limit: number): Promise<UserId[]>;
}
export const USER_ACCOUNT_REPOSITORY = Symbol('USER_ACCOUNT_REPOSITORY');

View File

@ -465,4 +465,31 @@ export class UserAccountRepositoryImpl implements UserAccountRepository {
createdAt: r.createdAt,
}));
}
/**
*
*
* 使 SQL
* - KAVADSTBSC
* - ACTIVE
*/
async findUsersWithIncompleteWallets(limit: number): Promise<UserId[]> {
// 使用原生 SQL 查询,因为 Prisma 不支持 HAVING 子句的复杂聚合
const results = await this.prisma.$queryRaw<Array<{ user_id: bigint }>>`
SELECT u.user_id
FROM user_accounts u
LEFT JOIN wallet_addresses w ON u.user_id = w.user_id
WHERE u.status = 'ACTIVE'
GROUP BY u.user_id
HAVING COUNT(DISTINCT w.chain_type) < 3
ORDER BY u.registered_at ASC
LIMIT ${limit}
`;
this.logger.log(
`[QUERY] Found ${results.length} users with incomplete wallets`,
);
return results.map((r) => UserId.create(r.user_id));
}
}

View File

@ -48,6 +48,32 @@ export class RedisService implements OnModuleDestroy {
return this.client.keys(pattern);
}
/**
*
*
* 使 Redis SET NX EX
* - TTL
* -
* - TTL
*
* @param key key
* @param ttlSeconds
* @returns true false
*/
async tryLock(key: string, ttlSeconds: number): Promise<boolean> {
const result = await this.client.set(key, '1', 'EX', ttlSeconds, 'NX');
return result === 'OK';
}
/**
*
*
* @param key key
*/
async unlock(key: string): Promise<void> {
await this.client.del(key);
}
/**
* keygen
* 使 Lua 脚本确保状态只能向前推进: pending < generating < deriving < completed