fix(contribution): LEVEL_OVERFLOW 回收任务,修复已解锁层级的溢出记录无法被回收的 bug

当下级认种时上级 unlocked_level_depth 不足,层级奖励进入 LEVEL_OVERFLOW(PENDING)。
上级后续解锁到足够层级后,现有 backfill 因条件 expectedLevel > currentLevel 为 false
而跳过,导致 PENDING 记录永远无法被回收。新增独立调度任务每10分钟扫描并回收。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-30 10:12:48 -08:00
parent ca4e5393be
commit b1607666a0
3 changed files with 108 additions and 0 deletions

View File

@ -253,6 +253,35 @@ export class ContributionScheduler implements OnModuleInit {
}
}
/**
* 10 LEVEL_OVERFLOW
* unlocked_level_depth overflow
* PENDING
*/
@Cron('*/10 * * * *')
async processLevelOverflowReclaim(): Promise<void> {
if (!this.isCdcReady()) {
this.logger.debug('[CDC-Gate] processLevelOverflowReclaim skipped: CDC initial sync not yet completed');
return;
}
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:overflow-reclaim`, 540); // 9分钟锁
if (!lockValue) {
return;
}
try {
const reclaimed = await this.bonusClaimService.reclaimLevelOverflows();
if (reclaimed > 0) {
this.logger.log(`Level overflow reclaim: ${reclaimed} records reclaimed`);
}
} catch (error) {
this.logger.error('Failed to process level overflow reclaim', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:overflow-reclaim`, lockValue);
}
}
/**
* 10
* /

View File

@ -273,6 +273,66 @@ export class BonusClaimService {
});
}
// ========== LEVEL_OVERFLOW 回收逻辑 ==========
/**
* LEVEL_OVERFLOW
* unlocked_level_depth LEVEL_OVERFLOW
* PENDING
* @param limit
* @returns
*/
async reclaimLevelOverflows(limit: number = 100): Promise<number> {
// 1. 查找有 PENDING LEVEL_OVERFLOW 记录的账户
const accountSequences = await this.unallocatedContributionRepository
.findAccountSequencesWithPendingLevelOverflow(limit);
if (accountSequences.length === 0) {
return 0;
}
this.logger.log(`[OverflowReclaim] Found ${accountSequences.length} accounts with pending LEVEL_OVERFLOW`);
let totalReclaimed = 0;
let errorCount = 0;
for (const accountSequence of accountSequences) {
try {
const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (!account || account.unlockedLevelDepth === 0) {
continue;
}
// 只回收已解锁层级范围内的 overflow
await this.unitOfWork.executeInTransaction(async () => {
const claimed = await this.claimLevelContributions(
accountSequence,
1,
account.unlockedLevelDepth,
);
if (claimed > 0) {
totalReclaimed += claimed;
// 重新读取账户claimLevelContributions 已更新余额),发布更新事件
const updatedAccount = await this.contributionAccountRepository
.findByAccountSequence(accountSequence);
if (updatedAccount) {
await this.publishContributionAccountUpdatedEvent(updatedAccount);
}
}
});
} catch (error) {
errorCount++;
this.logger.error(`[OverflowReclaim] Failed for account ${accountSequence}`, error);
}
}
this.logger.log(
`[OverflowReclaim] Completed: ${totalReclaimed} records reclaimed, ${errorCount} errors`,
);
return totalReclaimed;
}
// ========== 定时任务补发逻辑 ==========
private readonly domainCalculator = new ContributionCalculatorService();

View File

@ -280,6 +280,25 @@ export class UnallocatedContributionRepository {
};
}
/**
* LEVEL_OVERFLOW
* overflow
*/
async findAccountSequencesWithPendingLevelOverflow(limit: number): Promise<string[]> {
const records = await this.client.unallocatedContribution.findMany({
where: {
unallocType: 'LEVEL_OVERFLOW',
status: 'PENDING',
wouldBeAccountSequence: { not: null },
},
select: { wouldBeAccountSequence: true },
distinct: ['wouldBeAccountSequence'],
take: limit,
});
return records.map((r) => r.wouldBeAccountSequence!);
}
/**
*
*/