196 lines
5.8 KiB
TypeScript
196 lines
5.8 KiB
TypeScript
import { Injectable, Logger, Inject } from '@nestjs/common';
|
|
import {
|
|
DEPOSIT_TRANSACTION_REPOSITORY,
|
|
IDepositTransactionRepository,
|
|
} from '@/domain/repositories/deposit-transaction.repository.interface';
|
|
import {
|
|
OUTBOX_EVENT_REPOSITORY,
|
|
IOutboxEventRepository,
|
|
OutboxEventStatus,
|
|
} from '@/domain/repositories/outbox-event.repository.interface';
|
|
import { DepositConfirmedEvent } from '@/domain/events';
|
|
|
|
/**
|
|
* 充值修复服务
|
|
*
|
|
* 用于诊断和修复历史遗留的充值问题:
|
|
* 1. CONFIRMED 状态但未在 Outbox 中的充值
|
|
* 2. Outbox 中 FAILED 状态的事件
|
|
* 3. 手动重新发送充值事件
|
|
*/
|
|
@Injectable()
|
|
export class DepositRepairService {
|
|
private readonly logger = new Logger(DepositRepairService.name);
|
|
|
|
constructor(
|
|
@Inject(DEPOSIT_TRANSACTION_REPOSITORY)
|
|
private readonly depositRepo: IDepositTransactionRepository,
|
|
@Inject(OUTBOX_EVENT_REPOSITORY)
|
|
private readonly outboxRepo: IOutboxEventRepository,
|
|
) {}
|
|
|
|
/**
|
|
* 诊断:查询所有需要修复的充值
|
|
*/
|
|
async diagnose(): Promise<{
|
|
confirmedNotNotified: Array<{
|
|
id: string;
|
|
txHash: string;
|
|
userId: string;
|
|
accountSequence: string;
|
|
amount: string;
|
|
confirmedAt: string;
|
|
}>;
|
|
outboxPending: number;
|
|
outboxSent: number;
|
|
outboxFailed: number;
|
|
}> {
|
|
// 查找 CONFIRMED 但未通知的充值
|
|
const pendingDeposits = await this.depositRepo.findPendingNotification();
|
|
|
|
// 统计 Outbox 中各状态的事件数量
|
|
const [pending, sent, failed] = await Promise.all([
|
|
this.outboxRepo.findPendingEvents(1000),
|
|
this.outboxRepo.findUnackedEvents(0, 1000), // SENT 状态
|
|
this.getFailedOutboxCount(),
|
|
]);
|
|
|
|
return {
|
|
confirmedNotNotified: pendingDeposits.map((d) => ({
|
|
id: d.id?.toString() ?? '',
|
|
txHash: d.txHash.toString(),
|
|
userId: d.userId.toString(),
|
|
accountSequence: d.accountSequence,
|
|
amount: d.amount.toFixed(6),
|
|
confirmedAt: d.createdAt?.toISOString() ?? '',
|
|
})),
|
|
outboxPending: pending.length,
|
|
outboxSent: sent.length,
|
|
outboxFailed: failed,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 修复单个充值:重新创建 Outbox 事件
|
|
*/
|
|
async repairDeposit(depositId: bigint): Promise<{
|
|
success: boolean;
|
|
message: string;
|
|
}> {
|
|
const deposit = await this.depositRepo.findById(depositId);
|
|
|
|
if (!deposit) {
|
|
return { success: false, message: `Deposit ${depositId} not found` };
|
|
}
|
|
|
|
if (deposit.notifiedAt) {
|
|
return {
|
|
success: false,
|
|
message: `Deposit ${depositId} already notified at ${deposit.notifiedAt.toISOString()}`,
|
|
};
|
|
}
|
|
|
|
// 创建 DepositConfirmedEvent
|
|
const event = new DepositConfirmedEvent({
|
|
depositId: deposit.id?.toString() ?? '',
|
|
chainType: deposit.chainType.toString(),
|
|
txHash: deposit.txHash.toString(),
|
|
toAddress: deposit.toAddress.toString(),
|
|
amount: deposit.amount.raw.toString(),
|
|
amountFormatted: deposit.amount.toFixed(8),
|
|
confirmations: deposit.confirmations,
|
|
accountSequence: deposit.accountSequence,
|
|
userId: deposit.userId.toString(),
|
|
});
|
|
|
|
// 写入 Outbox
|
|
await this.outboxRepo.create({
|
|
eventType: event.eventType,
|
|
aggregateId: deposit.id?.toString() ?? deposit.txHash.toString(),
|
|
aggregateType: 'DepositTransaction',
|
|
payload: event.toPayload(),
|
|
});
|
|
|
|
this.logger.log(`Created repair outbox event for deposit ${depositId}`);
|
|
|
|
return {
|
|
success: true,
|
|
message: `Created outbox event for deposit ${depositId}, will be sent in next cycle`,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 批量修复所有未通知的充值
|
|
*/
|
|
async repairAll(): Promise<{
|
|
total: number;
|
|
success: number;
|
|
failed: number;
|
|
details: Array<{ id: string; success: boolean; message: string }>;
|
|
}> {
|
|
const pendingDeposits = await this.depositRepo.findPendingNotification();
|
|
|
|
this.logger.log(`Starting batch repair for ${pendingDeposits.length} deposits`);
|
|
|
|
const results: Array<{ id: string; success: boolean; message: string }> = [];
|
|
let successCount = 0;
|
|
let failedCount = 0;
|
|
|
|
for (const deposit of pendingDeposits) {
|
|
const depositId = deposit.id;
|
|
if (!depositId) {
|
|
results.push({ id: 'unknown', success: false, message: 'No deposit ID' });
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
const result = await this.repairDeposit(depositId);
|
|
results.push({ id: depositId.toString(), ...result });
|
|
if (result.success) {
|
|
successCount++;
|
|
} else {
|
|
failedCount++;
|
|
}
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : 'Unknown error';
|
|
results.push({ id: depositId.toString(), success: false, message });
|
|
failedCount++;
|
|
this.logger.error(`Failed to repair deposit ${depositId}: ${message}`);
|
|
}
|
|
}
|
|
|
|
this.logger.log(`Batch repair completed: ${successCount} success, ${failedCount} failed`);
|
|
|
|
return {
|
|
total: pendingDeposits.length,
|
|
success: successCount,
|
|
failed: failedCount,
|
|
details: results,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 重置失败的 Outbox 事件为 PENDING
|
|
* 注意:当前接口不支持直接查询 FAILED 状态的事件
|
|
* 此方法暂时返回空结果
|
|
*/
|
|
async resetFailedOutboxEvents(): Promise<{
|
|
reset: number;
|
|
message: string;
|
|
}> {
|
|
// 当前接口不支持查询 FAILED 状态的事件
|
|
// 需要在 IOutboxEventRepository 中添加 findFailedEvents 方法
|
|
this.logger.warn('resetFailedOutboxEvents: Not implemented - interface does not support finding FAILED events');
|
|
return {
|
|
reset: 0,
|
|
message: 'Not implemented - use direct database query to find and reset FAILED events',
|
|
};
|
|
}
|
|
|
|
private async getFailedOutboxCount(): Promise<number> {
|
|
// 当前接口不支持查询 FAILED 状态的事件
|
|
return 0;
|
|
}
|
|
}
|