rwadurian/backend/services/identity-service/src/application/tasks/wallet-retry.task.ts

396 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 钱包生成重试定时任务 (增强版)
*
* 设计目标100% 确保每个用户的钱包最终能生成成功
*
* 核心改进:
* 1. 基于数据库扫描:不依赖 Redis 状态,直接查询钱包不完整的用户
* 2. 指数退避1分钟 → 2分钟 → 4分钟 → ... → 最大60分钟
* 3. 无时间限制:持续重试直到成功,不会放弃
* 4. 分布式锁:防止多实例重复触发同一用户的重试
*
* 重试触发条件:
* - 数据库中三链地址不全的用户
* - Redis 状态为 failed/超时/无状态
*
* 与 getWalletStatus API 的配合:
* - API 调用时也会触发重试(用户主动轮询)
* - 定时任务作为兜底,确保即使用户不调用 API 也能重试
*/
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import {
UserAccountRepository,
USER_ACCOUNT_REPOSITORY,
} from '@/domain/repositories/user-account.repository.interface';
import { Inject } from '@nestjs/common';
import { UserId } from '@/domain/value-objects';
import { MpcKeygenRequestedEvent } from '@/domain/events';
// Redis key prefix
const KEYGEN_STATUS_PREFIX = 'keygen:status:';
const KEYGEN_RETRY_PREFIX = 'keygen:retry:';
const KEYGEN_LOCK_PREFIX = 'keygen:lock:';
// 超时配置
const KEYGEN_TIMEOUT_MS = 60 * 1000; // 60 秒视为超时
// 指数退避配置
const MIN_BACKOFF_MINUTES = 1; // 最小退避时间1分钟
const MAX_BACKOFF_MINUTES = 60; // 最大退避时间60分钟
// 每次扫描的最大用户数
const MAX_USERS_PER_SCAN = 100;
// 每次任务最多触发的重试数量(防止 MPC 服务过载)
const MAX_RETRIES_PER_RUN = 10;
export interface KeygenStatusData {
status: 'pending' | 'generating' | 'deriving' | 'completed' | 'failed';
userId: string;
mpcSessionId?: string;
publicKey?: string;
errorMessage?: string;
updatedAt: string;
}
export interface RetryRecord {
userId: string;
retryCount: number;
firstRetryAt: string;
lastRetryAt: string;
nextRetryAt: string; // 下次允许重试的时间
}
@Injectable()
export class WalletRetryTask {
private readonly logger = new Logger(WalletRetryTask.name);
constructor(
private readonly redisService: RedisService,
private readonly eventPublisher: EventPublisherService,
@Inject(USER_ACCOUNT_REPOSITORY)
private readonly userRepository: UserAccountRepository,
) {}
/**
* 每分钟执行一次检查
*
* 使用分布式锁保护整个任务,防止多实例并发执行
*/
@Cron(CronExpression.EVERY_MINUTE)
async handleWalletRetry() {
const taskLockKey = 'wallet-retry-task:lock';
// 尝试获取任务级别的分布式锁2分钟过期
const gotTaskLock = await this.redisService.tryLock(taskLockKey, 120);
if (!gotTaskLock) {
this.logger.debug(
'[TASK] Another instance is running the task, skipping',
);
return;
}
this.logger.log('[TASK] Starting wallet retry check (database-driven)...');
try {
// 1. 从数据库查询钱包不完整的用户
const incompleteUserIds =
await this.userRepository.findUsersWithIncompleteWallets(
MAX_USERS_PER_SCAN,
);
if (incompleteUserIds.length === 0) {
this.logger.log('[TASK] No users with incomplete wallets found');
return;
}
this.logger.log(
`[TASK] Found ${incompleteUserIds.length} users with incomplete wallets`,
);
// 2. 逐个检查并处理
let retriedCount = 0;
let skippedCount = 0;
for (const userId of incompleteUserIds) {
// 限制每次任务最多触发的重试数量,防止 MPC 服务过载
if (retriedCount >= MAX_RETRIES_PER_RUN) {
this.logger.log(
`[TASK] Reached max retries per run (${MAX_RETRIES_PER_RUN}), stopping`,
);
break;
}
try {
const retried = await this.checkAndRetryUser(userId.value.toString());
if (retried) {
retriedCount++;
} else {
skippedCount++;
}
} catch (error) {
this.logger.error(
`[TASK] Error processing user ${userId.value}: ${error}`,
);
}
}
this.logger.log(
`[TASK] Wallet retry check completed: retried=${retriedCount}, skipped=${skippedCount}`,
);
} catch (error) {
this.logger.error(`[TASK] Wallet retry task failed: ${error}`, error);
} finally {
// 释放任务锁
await this.redisService.unlock(taskLockKey);
}
}
/**
* 检查单个用户并决定是否重试
*
* @returns true 如果触发了重试false 如果跳过
*/
private async checkAndRetryUser(userId: string): Promise<boolean> {
// 1. 检查是否在退避期内
const shouldWait = await this.isInBackoffPeriod(userId);
if (shouldWait) {
this.logger.debug(
`[TASK] User ${userId} is in backoff period, skipping`,
);
return false;
}
// 2. 检查 Redis 状态
const redisKey = `${KEYGEN_STATUS_PREFIX}${userId}`;
const statusData = await this.redisService.get(redisKey);
let needsRetry = false;
if (statusData) {
try {
const parsed = JSON.parse(statusData) as KeygenStatusData;
const status = parsed.status;
const updatedAt = parsed.updatedAt;
// 检查是否超时
const isTimeout =
updatedAt &&
Date.now() - new Date(updatedAt).getTime() > KEYGEN_TIMEOUT_MS;
if (status === 'completed') {
// Redis 显示 completed 但数据库钱包不完整 → 状态不一致,需要重试
this.logger.warn(
`[TASK] User ${userId} Redis status 'completed' but wallet incomplete`,
);
needsRetry = true;
} else if (status === 'failed') {
// 明确失败
needsRetry = true;
} else if (
(status === 'generating' || status === 'deriving') &&
isTimeout
) {
// 超时
this.logger.log(
`[TASK] User ${userId} status '${status}' timed out`,
);
needsRetry = true;
} else if (status === 'pending' && isTimeout) {
// pending 超时
needsRetry = true;
}
// generating/deriving/pending 且未超时 → 正常等待,不重试
} catch (e) {
// JSON 解析失败
needsRetry = true;
}
} else {
// Redis 无状态但数据库钱包不完整 → 需要触发生成
needsRetry = true;
}
if (!needsRetry) {
return false;
}
// 3. 尝试获取用户级别的分布式锁
const lockKey = `${KEYGEN_LOCK_PREFIX}${userId}`;
const gotLock = await this.redisService.tryLock(lockKey, 60);
if (!gotLock) {
this.logger.debug(
`[TASK] User ${userId} lock not acquired, retry in progress`,
);
return false;
}
// 4. 执行重试
await this.retryWalletGeneration(userId);
return true;
}
/**
* 检查用户是否在退避期内
*
* 指数退避策略:每次重试后等待时间翻倍
* 1分钟 → 2分钟 → 4分钟 → 8分钟 → 16分钟 → 32分钟 → 60分钟上限
*/
private async isInBackoffPeriod(userId: string): Promise<boolean> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
if (!retryData) {
return false; // 第一次重试,无需等待
}
try {
const record = JSON.parse(retryData) as RetryRecord;
const nextRetryTime = new Date(record.nextRetryAt).getTime();
const now = Date.now();
if (now < nextRetryTime) {
// 还在退避期内
const remainingSeconds = Math.floor((nextRetryTime - now) / 1000);
this.logger.debug(
`[TASK] User ${userId} backoff: ${remainingSeconds}s remaining`,
);
return true;
}
return false;
} catch (e) {
return false;
}
}
/**
* 计算下次重试时间(指数退避)
*/
private calculateNextRetryTime(retryCount: number): Date {
// 指数退避2^(retryCount-1) 分钟,最大 60 分钟
const backoffMinutes = Math.min(
Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES,
MAX_BACKOFF_MINUTES,
);
const nextRetryTime = new Date(Date.now() + backoffMinutes * 60 * 1000);
return nextRetryTime;
}
/**
* 执行钱包生成重试
*
* 发布 MpcKeygenRequestedEvent 触发 MPC 服务生成钱包
*/
private async retryWalletGeneration(userId: string): Promise<void> {
this.logger.log(`[TASK] Retrying wallet generation for user: ${userId}`);
try {
// 1. 获取用户账号信息
const userIdObj = UserId.create(BigInt(userId));
const account = await this.userRepository.findById(userIdObj);
if (!account) {
this.logger.error(`[TASK] User ${userId} not found, cannot retry`);
return;
}
// 2. 更新重试记录(包含指数退避时间)
const retryCount = await this.updateRetryRecord(userId);
// 3. 发布 MpcKeygenRequestedEvent 触发钱包生成(与注册流程使用相同的事件类型)
const sessionId = crypto.randomUUID();
await this.eventPublisher.publish(
new MpcKeygenRequestedEvent({
sessionId,
userId: account.userId.toString(),
accountSequence: account.accountSequence.value,
username: `user_${account.accountSequence.value}`,
threshold: 2,
totalParties: 3,
requireDelegate: true,
}),
);
this.logger.log(
`[TASK] Wallet generation retry #${retryCount} triggered for user: ${userId}, sessionId=${sessionId}`,
);
// 4. 更新 Redis 状态为 pending
const statusData: KeygenStatusData = {
status: 'pending',
userId,
updatedAt: new Date().toISOString(),
};
await this.redisService.set(
`${KEYGEN_STATUS_PREFIX}${userId}`,
JSON.stringify(statusData),
60 * 60 * 24, // 24 小时
);
} catch (error) {
this.logger.error(
`[TASK] Failed to retry wallet generation for user ${userId}: ${error}`,
error,
);
}
}
/**
* 更新重试记录
*
* @returns 当前重试次数
*/
private async updateRetryRecord(userId: string): Promise<number> {
const retryKey = `${KEYGEN_RETRY_PREFIX}${userId}`;
const retryData = await this.redisService.get(retryKey);
let record: RetryRecord;
let retryCount: number;
if (!retryData) {
// 第一次重试
retryCount = 1;
record = {
userId,
retryCount: 1,
firstRetryAt: new Date().toISOString(),
lastRetryAt: new Date().toISOString(),
nextRetryAt: this.calculateNextRetryTime(1).toISOString(),
};
} else {
const existing = JSON.parse(retryData) as RetryRecord;
retryCount = existing.retryCount + 1;
record = {
...existing,
retryCount,
lastRetryAt: new Date().toISOString(),
nextRetryAt: this.calculateNextRetryTime(retryCount).toISOString(),
};
}
// TTL 设置为 7 天,足够长以跟踪长期重试
await this.redisService.set(
retryKey,
JSON.stringify(record),
60 * 60 * 24 * 7, // 7 天
);
const nextRetryMinutes = Math.min(
Math.pow(2, retryCount - 1) * MIN_BACKOFF_MINUTES,
MAX_BACKOFF_MINUTES,
);
this.logger.log(
`[TASK] User ${userId} retry #${retryCount}, next retry in ${nextRetryMinutes} minutes`,
);
return retryCount;
}
}