From 817b7d3a9f36670443f4e69a544b1b4ac169fc26 Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 30 Jan 2026 05:54:02 -0800 Subject: [PATCH] =?UTF-8?q?fix(contribution):=20=E7=AE=97=E5=8A=9B?= =?UTF-8?q?=E5=88=86=E9=85=8D=E6=97=B6=E5=BA=8F=E4=BF=9D=E8=AF=81=20+=20bo?= =?UTF-8?q?nus=E8=A1=A5=E5=8F=91stale-read=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 4处改动确保部署清库重新同步后100%可靠: 1. contribution.scheduler.ts - CDC就绪门控 注入CDCConsumerService,processUnprocessedAdoptions/publishRecentlyUpdatedAccounts/ processContributionBackfill三个调度方法开头加isCdcReady()检查, 确保用户+推荐+认种三阶段CDC同步全部完成后才开始处理。 2. contribution-calculation.service.ts - 推荐数据防护 calculateForAdoption()中,userReferral为null时warn并return, 不标记distributed,调度器下次重试。覆盖continuous mode下 认种事件先于推荐事件到达的竞态场景。 3. bonus-claim.service.ts - bonus补发stale-read修复 processBackfillForAccount()中,level事务的updateAccountUnlockStatus 通过incrementDirectReferralAdoptedCount()同时修改unlockedLevelDepth 和unlockedBonusTiers,导致bonus分支条件永远为false。 修复:保存originalDirectReferralAdoptedCount和originalUnlockedBonusTiers, bonus分支使用原始值判断和传参。 4. config.controller.ts - mining-admin同步检查增强 isSynced新增allAdoptionsProcessed条件(unprocessedAdoptions===0), 确保所有认种分配+补发完成后才允许激活挖矿。 修复data变量作用域问题(原在if块内声明,外部引用会报错)。 Co-Authored-By: Claude Opus 4.5 --- .../schedulers/contribution.scheduler.ts | 26 +++++++++++- .../services/bonus-claim.service.ts | 41 +++++++++++++++---- .../contribution-calculation.service.ts | 14 +++++++ .../src/api/controllers/config.controller.ts | 19 ++++++++- 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts index b515e21d..3d153f1e 100644 --- a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts +++ b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts @@ -8,6 +8,7 @@ import { ContributionAccountRepository } from '../../infrastructure/persistence/ import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; +import { CDCConsumerService } from '../../infrastructure/kafka/cdc-consumer.service'; import { ContributionAccountUpdatedEvent } from '../../domain/events'; /** @@ -27,10 +28,18 @@ export class ContributionScheduler implements OnModuleInit { private readonly outboxRepository: OutboxRepository, private readonly kafkaProducer: KafkaProducerService, private readonly redis: RedisService, + private readonly cdcConsumer: CDCConsumerService, ) {} + /** + * CDC 初始同步是否完成(用户、推荐、认种三阶段全部完成) + */ + private isCdcReady(): boolean { + return this.cdcConsumer.getSyncStatus().allPhasesCompleted; + } + async onModuleInit() { - this.logger.log('Contribution scheduler initialized'); + this.logger.log('Contribution scheduler initialized, waiting for CDC initial sync to complete...'); } /** @@ -38,6 +47,11 @@ export class ContributionScheduler implements OnModuleInit { */ @Cron(CronExpression.EVERY_MINUTE) async processUnprocessedAdoptions(): Promise { + if (!this.isCdcReady()) { + this.logger.debug('[CDC-Gate] processUnprocessedAdoptions skipped: CDC initial sync not yet completed'); + return; + } + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:process`, 55); if (!lockValue) { return; // 其他实例正在处理 @@ -186,6 +200,11 @@ export class ContributionScheduler implements OnModuleInit { */ @Cron('*/10 * * * *') async publishRecentlyUpdatedAccounts(): Promise { + if (!this.isCdcReady()) { + this.logger.debug('[CDC-Gate] publishRecentlyUpdatedAccounts skipped: CDC initial sync not yet completed'); + return; + } + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:incremental-sync`, 540); // 9分钟锁 if (!lockValue) { return; @@ -240,6 +259,11 @@ export class ContributionScheduler implements OnModuleInit { */ @Cron('*/10 * * * *') async processContributionBackfill(): Promise { + if (!this.isCdcReady()) { + this.logger.debug('[CDC-Gate] processContributionBackfill skipped: CDC initial sync not yet completed'); + return; + } + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:backfill`, 540); // 9分钟锁 if (!lockValue) { return; diff --git a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts index 28f4bb29..4121870e 100644 --- a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts +++ b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts @@ -300,13 +300,26 @@ export class BonusClaimService { currentDirectReferralAdoptedCount, ); + // 保存原始值(level 事务中 updateAccountUnlockStatus 会通过 incrementDirectReferralAdoptedCount + // 同时修改 unlockedLevelDepth 和 unlockedBonusTiers,导致 bonus 分支条件失效) + const originalDirectReferralAdoptedCount = account.directReferralAdoptedCount; + const originalUnlockedBonusTiers = account.unlockedBonusTiers; + + this.logger.log( + `[Backfill] Checking account ${accountSequence}: ` + + `hasAdopted=${account.hasAdopted}, ` + + `directReferralAdoptedCount=${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount}, ` + + `unlockedLevelDepth=${account.unlockedLevelDepth} (expected=${expectedLevelDepth}), ` + + `unlockedBonusTiers=${originalUnlockedBonusTiers} (expected=${expectedBonusTiers})`, + ); + let hasBackfill = false; // 检查是否需要补发层级贡献值 if (expectedLevelDepth > account.unlockedLevelDepth) { this.logger.log( `[Backfill] Account ${accountSequence} level unlock: ${account.unlockedLevelDepth} -> ${expectedLevelDepth} ` + - `(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, + `(directReferralAdoptedCount: ${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, ); await this.unitOfWork.executeInTransaction(async () => { @@ -329,19 +342,30 @@ export class BonusClaimService { expectedBonusTiers, ); }); + + this.logger.log( + `[Backfill] Account ${accountSequence} level backfill transaction completed. ` + + `After mutation: directReferralAdoptedCount=${account.directReferralAdoptedCount}, ` + + `unlockedLevelDepth=${account.unlockedLevelDepth}, unlockedBonusTiers=${account.unlockedBonusTiers}`, + ); } - // 检查是否需要补发奖励档位 - if (expectedBonusTiers > account.unlockedBonusTiers) { + // 检查是否需要补发奖励档位(使用原始值,因为 level 分支的 updateAccountUnlockStatus + // 会同时把 unlockedBonusTiers 更新到 expectedBonusTiers,导致此条件永远为 false) + this.logger.debug( + `[Backfill] Account ${accountSequence} bonus check: ` + + `expectedBonusTiers(${expectedBonusTiers}) > originalUnlockedBonusTiers(${originalUnlockedBonusTiers}) = ${expectedBonusTiers > originalUnlockedBonusTiers}`, + ); + if (expectedBonusTiers > originalUnlockedBonusTiers) { this.logger.log( - `[Backfill] Account ${accountSequence} bonus unlock: ${account.unlockedBonusTiers} -> ${expectedBonusTiers} ` + - `(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, + `[Backfill] Account ${accountSequence} bonus unlock: ${originalUnlockedBonusTiers} -> ${expectedBonusTiers} ` + + `(directReferralAdoptedCount: ${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, ); - // 使用现有的 checkAndClaimBonus 方法补发奖励 + // 使用原始直推认种数,确保 checkAndClaimBonus 能正确判断需要解锁的档位 await this.checkAndClaimBonus( accountSequence, - account.directReferralAdoptedCount, + originalDirectReferralAdoptedCount, currentDirectReferralAdoptedCount, ); hasBackfill = true; @@ -359,6 +383,9 @@ export class BonusClaimService { } } + this.logger.log( + `[Backfill] Account ${accountSequence} backfill result: hasBackfill=${hasBackfill}`, + ); return hasBackfill; } diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index 0f4ca3d6..ba89852e 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -58,6 +58,20 @@ export class ContributionCalculationService { // 获取认种用户的引荐关系 const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(adoption.accountSequence); + // 推荐数据未同步时跳过(不标记 distributed,调度器下次重试) + if (!userReferral) { + this.logger.warn( + `[Referral-Guard] Deferring adoption ${originalAdoptionId}: ` + + `referral for ${adoption.accountSequence} not yet synced, will retry on next scheduler tick`, + ); + return; + } + + this.logger.debug( + `[Referral-Guard] Referral found for ${adoption.accountSequence}: ` + + `referrer=${userReferral.referrerAccountSequence || 'NONE (root)'}`, + ); + // 获取上线链条(最多15级) let ancestorChain: SyncedReferral[] = []; if (userReferral?.referrerAccountSequence) { diff --git a/backend/services/mining-admin-service/src/api/controllers/config.controller.ts b/backend/services/mining-admin-service/src/api/controllers/config.controller.ts index 0a4323d8..5a675aa4 100644 --- a/backend/services/mining-admin-service/src/api/controllers/config.controller.ts +++ b/backend/services/mining-admin-service/src/api/controllers/config.controller.ts @@ -63,6 +63,7 @@ export class ConfigController { let networkTotalContribution: string | null = null; let userEffectiveContribution: string | null = null; let systemAccountsContribution: string | null = null; + let unprocessedAdoptions = -1; if (contributionResponse && contributionResponse.ok) { const contributionResult = await contributionResponse.json(); @@ -72,6 +73,8 @@ export class ConfigController { networkTotalContribution = data.networkTotalContribution || null; // 用户有效算力 userEffectiveContribution = data.totalContribution || null; + // 未处理认种数 + unprocessedAdoptions = data.unprocessedAdoptions ?? -1; // 系统账户算力 const systemAccounts = data.systemAccounts || []; const systemTotal = systemAccounts @@ -86,14 +89,23 @@ export class ConfigController { const miningUserTotal = miningData.totalContribution || '0'; // 判断算力是否同步完成 - // 核心条件:全网理论算力已同步(mining-service 的 networkTotalContribution 与 contribution-service 相近) + // 条件1:全网理论算力已同步(mining-service 的 networkTotalContribution 与 contribution-service 相近) // 全网理论算力是挖矿分母,必须同步后才能正确计算挖矿比例 const networkSynced = networkTotalContribution !== null && parseFloat(networkTotalContribution) > 0 && parseFloat(miningNetworkTotal) > 0 && Math.abs(parseFloat(miningNetworkTotal) - parseFloat(networkTotalContribution)) / parseFloat(networkTotalContribution) < 0.001; - const isSynced = networkSynced; + // 条件2:所有认种已处理完成(无未分配的认种记录) + const allAdoptionsProcessed = unprocessedAdoptions === 0; + + const isSynced = networkSynced && allAdoptionsProcessed; + + this.logger.log( + `[SyncCheck] networkSynced=${networkSynced} (contribution=${networkTotalContribution}, mining=${miningNetworkTotal}), ` + + `allAdoptionsProcessed=${allAdoptionsProcessed} (unprocessedAdoptions=${unprocessedAdoptions}), ` + + `isSynced=${isSynced}`, + ); return { ...miningData, @@ -107,6 +119,8 @@ export class ConfigController { miningUserTotal, // 系统账户算力 systemAccountsContribution: systemAccountsContribution || '0', + // 未处理认种数 + unprocessedAdoptions, // 兼容旧字段 miningTotal: miningUserTotal, contributionTotal: userEffectiveContribution || '0', @@ -125,6 +139,7 @@ export class ConfigController { userEffectiveContribution: '0', miningUserTotal: '0', systemAccountsContribution: '0', + unprocessedAdoptions: -1, miningTotal: '0', contributionTotal: '0', },