feat(identity-service): 添加钱包生成自动重试机制

功能:
- 新增 WalletRetryTask 定时任务,每分钟扫描一次
- 自动检测超过 60 秒仍在 generating/deriving 状态的钱包
- 自动检测状态为 failed 的钱包生成
- 幂等重试机制,最多 10 分钟内持续重试
- 记录重试次数和时间戳

技术实现:
- 使用 @nestjs/schedule 的 Cron 装饰器
- 在 UserAccount 聚合根中添加 createWalletGenerationEvent() 方法
- 在 RedisService 中添加 keys() 方法支持模式匹配扫描
- 通过重新发布 UserAccountCreatedEvent 触发幂等重试

相关需求:
- 用户手机号验证成功后立即创建账号
- 钱包生成在后台异步进行
- 失败后自动重试,无需用户感知

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-20 19:20:55 -08:00
parent 4906fa1815
commit 959fe93092
5 changed files with 341 additions and 2 deletions

View File

@ -261,7 +261,16 @@
"Bash(npx next:*)",
"Bash(npx prisma validate:*)",
"Bash(dir /s /b \"c:\\Users\\dong\\Desktop\\rwadurian\\backend\\services\\admin-service\\Dockerfile*\")",
"Bash(dir /b \"c:\\Users\\dong\\Desktop\\rwadurian\\frontend\")"
"Bash(dir /b \"c:\\Users\\dong\\Desktop\\rwadurian\\frontend\")",
"Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(reporting-service\\): 启动 Kafka 微服务消费者以记录真实活动\n\n- 在 main.ts 添加 Kafka 微服务连接配置\n- 调用 startAllMicroservices\\(\\) 启动事件消费\n- 支持消费 identity/authorization/planting 服务的事件\n- 实现仪表板\"最近活动\"显示真实数据\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(git commit -m \"$\\(cat <<''EOF''\nfeat\\(admin-web\\): 添加 redux-persist 实现登录状态持久化\n\n- 安装 redux-persist 依赖\n- 配置 persistReducer 持久化 auth slice 到 localStorage\n- 添加 PersistGate 确保 rehydration 完成后再渲染\n- 处理 REHYDRATE action 恢复认证状态\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(mobile-app\\): 修复 deposit_usdt_page 中未定义的 _loadWalletData 方法\n\n将错误的方法名 _loadWalletData 改为正确的 _loadData\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(admin-web\\): 修复 authSlice 的 REHYDRATE 类型错误\n\n使用 addMatcher 替代 addCase 处理 REHYDRATE action\n\n🤖 Generated with [Claude Code]\\(https://claude.com/claude-code\\)\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
"Bash(docker compose logs:*)",
"Bash(set:*)",
"Bash(npx prisma migrate:*)",
"Bash($env:DATABASE_URL=\"postgresql://postgres:password@localhost:5432/rwa_identity?schema=public\")",
"Bash(docker cp:*)"
],
"deny": [],
"ask": []

View File

@ -2,6 +2,7 @@ import { Module, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { JwtModule } from '@nestjs/jwt';
import { HttpModule } from '@nestjs/axios';
import { ScheduleModule } from '@nestjs/schedule';
import { APP_FILTER, APP_INTERCEPTOR, APP_GUARD } from '@nestjs/core';
// Config
@ -20,6 +21,7 @@ import { TokenService } from '@/application/services/token.service';
import { TotpService } from '@/application/services/totp.service';
import { BlockchainWalletHandler } from '@/application/event-handlers/blockchain-wallet.handler';
import { MpcKeygenCompletedHandler } from '@/application/event-handlers/mpc-keygen-completed.handler';
import { WalletRetryTask } from '@/application/tasks/wallet-retry.task';
// Domain Services
import {
@ -102,7 +104,7 @@ export class DomainModule {}
// ============ Application Module ============
@Module({
imports: [DomainModule, InfrastructureModule],
imports: [DomainModule, InfrastructureModule, ScheduleModule.forRoot()],
providers: [
UserApplicationService,
TokenService,
@ -110,6 +112,8 @@ export class DomainModule {}
// Event Handlers - 通过注入到 UserApplicationService 来确保它们被初始化
BlockchainWalletHandler,
MpcKeygenCompletedHandler,
// Tasks - 定时任务
WalletRetryTask,
],
exports: [UserApplicationService, TokenService, TotpService],
})

View File

@ -0,0 +1,299 @@
/**
*
*
*
* 1. Redis
* 2. 60 generating
* 3. failed
* 4. 10 10
*
*
* - 1
* - 2 1
* - 3 2
* - ...
* - 10
*/
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface';
import { Inject } from '@nestjs/common';
import { UserId } from '@/domain/value-objects';
// Redis key prefix for keygen status
const KEYGEN_STATUS_PREFIX = 'keygen:status:';
const KEYGEN_RETRY_PREFIX = 'keygen:retry:';
const MAX_RETRY_DURATION_MS = 10 * 60 * 1000; // 10 分钟
const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒
export interface KeygenStatusData {
status: 'pending' | 'generating' | 'deriving' | 'completed' | 'failed';
userId: string;
mpcSessionId?: string;
publicKey?: string;
errorMessage?: string;
updatedAt: string;
}
export interface RetryRecord {
userId: string;
retryCount: number;
firstFailedAt: string;
lastRetryAt: string;
}
@Injectable()
export class WalletRetryTask {
private readonly logger = new Logger(WalletRetryTask.name);
private isRunning = false;
constructor(
private readonly redisService: RedisService,
private readonly eventPublisher: EventPublisherService,
@Inject(USER_ACCOUNT_REPOSITORY)
private readonly userRepository: UserAccountRepository,
) {}
/**
*
*/
@Cron(CronExpression.EVERY_MINUTE)
async handleWalletRetry() {
// 防止并发执行
if (this.isRunning) {
this.logger.warn('[TASK] Previous task still running, skipping this execution');
return;
}
this.isRunning = true;
this.logger.log('[TASK] Starting wallet retry check...');
try {
// 1. 扫描所有 keygen:status:* keys
const statusKeys = await this.redisService.keys(`${KEYGEN_STATUS_PREFIX}*`);
this.logger.log(`[TASK] Found ${statusKeys.length} wallet generation records`);
for (const key of statusKeys) {
try {
await this.checkAndRetry(key);
} catch (error) {
this.logger.error(`[TASK] Error processing key ${key}: ${error}`);
}
}
this.logger.log('[TASK] Wallet retry check completed');
} catch (error) {
this.logger.error(`[TASK] Wallet retry task failed: ${error}`, error);
} finally {
this.isRunning = false;
}
}
/**
*
*/
private async checkAndRetry(statusKey: string): Promise<void> {
const statusData = await this.redisService.get(statusKey);
if (!statusData) return;
let status: KeygenStatusData;
try {
status = JSON.parse(statusData);
} catch (error) {
this.logger.warn(`[TASK] Invalid status data for ${statusKey}`);
return;
}
const { userId, status: currentStatus, updatedAt } = status;
// 跳过已完成的
if (currentStatus === 'completed') {
return;
}
// 检查是否需要重试
const needsRetry = this.shouldRetry(status);
if (!needsRetry) {
return;
}
// 检查重试限制
const canRetry = await this.checkRetryLimit(userId);
if (!canRetry) {
this.logger.warn(`[TASK] User ${userId} exceeded retry time limit (10 minutes)`);
// 更新状态为最终失败
await this.markAsFinalFailure(userId);
return;
}
// 执行重试
await this.retryWalletGeneration(userId);
}
/**
*
*/
private shouldRetry(status: KeygenStatusData): boolean {
const { status: currentStatus, updatedAt } = status;
const updatedTime = new Date(updatedAt).getTime();
const now = Date.now();
const elapsed = now - updatedTime;
// 情况1状态为 failed
if (currentStatus === 'failed') {
this.logger.log(`[TASK] User ${status.userId} status is 'failed', will retry`);
return true;
}
// 情况2状态为 generating 但超过 60 秒
if (currentStatus === 'generating' && elapsed > KEYGEN_TIMEOUT_MS) {
this.logger.log(`[TASK] User ${status.userId} generating timeout (${Math.floor(elapsed / 1000)}s), will retry`);
return true;
}
// 情况3状态为 deriving 但超过 60 秒
if (currentStatus === 'deriving' && elapsed > KEYGEN_TIMEOUT_MS) {
this.logger.log(`[TASK] User ${status.userId} deriving timeout (${Math.floor(elapsed / 1000)}s), will retry`);
return true;
}
return false;
}
/**
* 10
*/
private async checkRetryLimit(userId: string): Promise<boolean> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
if (!retryData) {
// 第一次重试
return true;
}
let record: RetryRecord;
try {
record = JSON.parse(retryData);
} catch (error) {
this.logger.warn(`[TASK] Invalid retry record for user ${userId}`);
return true;
}
const firstFailedTime = new Date(record.firstFailedAt).getTime();
const now = Date.now();
const elapsed = now - firstFailedTime;
// 如果超过 10 分钟,不再重试
if (elapsed > MAX_RETRY_DURATION_MS) {
this.logger.warn(`[TASK] User ${userId} exceeded max retry duration: ${Math.floor(elapsed / 1000 / 60)} minutes`);
return false;
}
return true;
}
/**
*
*
* UserAccountCreatedEvent MPC
*/
private async retryWalletGeneration(userId: string): Promise<void> {
this.logger.log(`[TASK] Retrying wallet generation for user: ${userId}`);
try {
// 1. 获取用户账号信息
const userIdObj = UserId.create(BigInt(userId));
const account = await this.userRepository.findById(userIdObj);
if (!account) {
this.logger.error(`[TASK] User ${userId} not found, cannot retry`);
return;
}
// 2. 更新重试记录
await this.updateRetryRecord(userId);
// 3. 重新触发钱包生成流程
// 通过重新发布 UserAccountCreatedEvent 来触发
const event = account.createWalletGenerationEvent();
await this.eventPublisher.publish(event);
this.logger.log(`[TASK] Wallet generation retry triggered for user: ${userId}`);
// 4. 更新 Redis 状态为 pending等待重新生成
const statusData: KeygenStatusData = {
status: 'pending',
userId,
updatedAt: new Date().toISOString(),
};
await this.redisService.set(
`${KEYGEN_STATUS_PREFIX}${userId}`,
JSON.stringify(statusData),
60 * 60 * 24, // 24 小时
);
} catch (error) {
this.logger.error(`[TASK] Failed to retry wallet generation for user ${userId}: ${error}`, error);
}
}
/**
*
*/
private async updateRetryRecord(userId: string): Promise<void> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
let record: RetryRecord;
if (!retryData) {
// 第一次重试
record = {
userId,
retryCount: 1,
firstFailedAt: new Date().toISOString(),
lastRetryAt: new Date().toISOString(),
};
} else {
const existing = JSON.parse(retryData) as RetryRecord;
record = {
...existing,
retryCount: existing.retryCount + 1,
lastRetryAt: new Date().toISOString(),
};
}
await this.redisService.set(
retryKey,
JSON.stringify(record),
60 * 60 * 24, // 24 小时
);
this.logger.log(`[TASK] Updated retry record for user ${userId}: count=${record.retryCount}`);
}
/**
*
*/
private async markAsFinalFailure(userId: string): Promise<void> {
const statusData: KeygenStatusData = {
status: 'failed',
userId,
errorMessage: 'Wallet generation failed after 10 minutes of retries',
updatedAt: new Date().toISOString(),
};
await this.redisService.set(
`${KEYGEN_STATUS_PREFIX}${userId}`,
JSON.stringify(statusData),
60 * 60 * 24, // 24 小时
);
this.logger.error(`[TASK] Marked user ${userId} as final failure after retry timeout`);
}
}

View File

@ -330,4 +330,27 @@ export class UserAccount {
clearDomainEvents(): void {
this._domainEvents = [];
}
/**
*
*
* UserAccountCreatedEvent MPC
*
*/
createWalletGenerationEvent(): UserAccountCreatedEvent {
// 获取第一个设备的信息
const firstDevice = this._devices.values().next().value as DeviceInfo | undefined;
return new UserAccountCreatedEvent({
userId: this._userId.toString(),
accountSequence: this._accountSequence.value,
referralCode: this._referralCode.value,
phoneNumber: this._phoneNumber?.value || null,
initialDeviceId: firstDevice?.deviceId || 'retry-unknown',
deviceName: firstDevice?.deviceName || 'retry-device',
deviceInfo: firstDevice?.deviceInfo || null,
inviterReferralCode: null, // 重试时不需要
createdAt: new Date(),
});
}
}

View File

@ -44,6 +44,10 @@ export class RedisService implements OnModuleDestroy {
await this.client.expire(key, seconds);
}
async keys(pattern: string): Promise<string[]> {
return this.client.keys(pattern);
}
/**
* keygen
* 使 Lua 脚本确保状态只能向前推进: pending < generating < deriving < completed