From b1607666a05771e9c9b99d9c12f4ccfaf02afcbd Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 30 Jan 2026 10:12:48 -0800 Subject: [PATCH] =?UTF-8?q?fix(contribution):=20LEVEL=5FOVERFLOW=20?= =?UTF-8?q?=E5=9B=9E=E6=94=B6=E4=BB=BB=E5=8A=A1=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=B7=B2=E8=A7=A3=E9=94=81=E5=B1=82=E7=BA=A7=E7=9A=84=E6=BA=A2?= =?UTF-8?q?=E5=87=BA=E8=AE=B0=E5=BD=95=E6=97=A0=E6=B3=95=E8=A2=AB=E5=9B=9E?= =?UTF-8?q?=E6=94=B6=E7=9A=84=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当下级认种时上级 unlocked_level_depth 不足,层级奖励进入 LEVEL_OVERFLOW(PENDING)。 上级后续解锁到足够层级后,现有 backfill 因条件 expectedLevel > currentLevel 为 false 而跳过,导致 PENDING 记录永远无法被回收。新增独立调度任务每10分钟扫描并回收。 Co-Authored-By: Claude Opus 4.5 --- .../schedulers/contribution.scheduler.ts | 29 +++++++++ .../services/bonus-claim.service.ts | 60 +++++++++++++++++++ .../unallocated-contribution.repository.ts | 19 ++++++ 3 files changed, 108 insertions(+) diff --git a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts index 3d153f1e..bd24ec31 100644 --- a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts +++ b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts @@ -253,6 +253,35 @@ export class ContributionScheduler implements OnModuleInit { } } + /** + * 每10分钟回收已解锁层级的 LEVEL_OVERFLOW 记录 + * 处理场景:下级认种时上级 unlocked_level_depth 不足导致 overflow, + * 上级后续解锁后这些 PENDING 记录需要被回收 + */ + @Cron('*/10 * * * *') + async processLevelOverflowReclaim(): Promise { + if (!this.isCdcReady()) { + this.logger.debug('[CDC-Gate] processLevelOverflowReclaim skipped: CDC initial sync not yet completed'); + return; + } + + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:overflow-reclaim`, 540); // 9分钟锁 + if (!lockValue) { + return; + } + + try { + const reclaimed = await this.bonusClaimService.reclaimLevelOverflows(); + if (reclaimed > 0) { + this.logger.log(`Level overflow reclaim: ${reclaimed} records reclaimed`); + } + } catch (error) { + this.logger.error('Failed to process level overflow reclaim', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:overflow-reclaim`, lockValue); + } + } + /** * 每10分钟扫描并补发未完全解锁的贡献值 * 处理因下级先于上级认种导致的层级/奖励档位未能及时分配的情况 diff --git a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts index 4121870e..47f8048c 100644 --- a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts +++ b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts @@ -273,6 +273,66 @@ export class BonusClaimService { }); } + // ========== LEVEL_OVERFLOW 回收逻辑 ========== + + /** + * 回收已解锁层级的 LEVEL_OVERFLOW 记录 + * 处理场景:下级认种时上级 unlocked_level_depth 不足,产生 LEVEL_OVERFLOW; + * 后续上级解锁到足够层级后,这些 PENDING 记录需要被回收分配 + * @param limit 每次扫描的最大账户数 + * @returns 回收的记录总数 + */ + async reclaimLevelOverflows(limit: number = 100): Promise { + // 1. 查找有 PENDING LEVEL_OVERFLOW 记录的账户 + const accountSequences = await this.unallocatedContributionRepository + .findAccountSequencesWithPendingLevelOverflow(limit); + + if (accountSequences.length === 0) { + return 0; + } + + this.logger.log(`[OverflowReclaim] Found ${accountSequences.length} accounts with pending LEVEL_OVERFLOW`); + + let totalReclaimed = 0; + let errorCount = 0; + + for (const accountSequence of accountSequences) { + try { + const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + if (!account || account.unlockedLevelDepth === 0) { + continue; + } + + // 只回收已解锁层级范围内的 overflow + await this.unitOfWork.executeInTransaction(async () => { + const claimed = await this.claimLevelContributions( + accountSequence, + 1, + account.unlockedLevelDepth, + ); + + if (claimed > 0) { + totalReclaimed += claimed; + // 重新读取账户(claimLevelContributions 已更新余额),发布更新事件 + const updatedAccount = await this.contributionAccountRepository + .findByAccountSequence(accountSequence); + if (updatedAccount) { + await this.publishContributionAccountUpdatedEvent(updatedAccount); + } + } + }); + } catch (error) { + errorCount++; + this.logger.error(`[OverflowReclaim] Failed for account ${accountSequence}`, error); + } + } + + this.logger.log( + `[OverflowReclaim] Completed: ${totalReclaimed} records reclaimed, ${errorCount} errors`, + ); + return totalReclaimed; + } + // ========== 定时任务补发逻辑 ========== private readonly domainCalculator = new ContributionCalculatorService(); diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts index 63567bd6..5955fd2c 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts @@ -280,6 +280,25 @@ export class UnallocatedContributionRepository { }; } + /** + * 查询有待回收 LEVEL_OVERFLOW 记录的用户账号列表 + * 用于定时任务扫描:当用户已解锁到足够层级,但之前的 overflow 尚未回收时 + */ + async findAccountSequencesWithPendingLevelOverflow(limit: number): Promise { + const records = await this.client.unallocatedContribution.findMany({ + where: { + unallocType: 'LEVEL_OVERFLOW', + status: 'PENDING', + wouldBeAccountSequence: { not: null }, + }, + select: { wouldBeAccountSequence: true }, + distinct: ['wouldBeAccountSequence'], + take: limit, + }); + + return records.map((r) => r.wouldBeAccountSequence!); + } + /** * 获取分档位的未分配奖励统计 */