fix(contribution): 算力分配时序保证 + bonus补发stale-read修复

4处改动确保部署清库重新同步后100%可靠:

1. contribution.scheduler.ts - CDC就绪门控
   注入CDCConsumerService,processUnprocessedAdoptions/publishRecentlyUpdatedAccounts/
   processContributionBackfill三个调度方法开头加isCdcReady()检查,
   确保用户+推荐+认种三阶段CDC同步全部完成后才开始处理。

2. contribution-calculation.service.ts - 推荐数据防护
   calculateForAdoption()中,userReferral为null时warn并return,
   不标记distributed,调度器下次重试。覆盖continuous mode下
   认种事件先于推荐事件到达的竞态场景。

3. bonus-claim.service.ts - bonus补发stale-read修复
   processBackfillForAccount()中,level事务的updateAccountUnlockStatus
   通过incrementDirectReferralAdoptedCount()同时修改unlockedLevelDepth
   和unlockedBonusTiers,导致bonus分支条件永远为false。
   修复:保存originalDirectReferralAdoptedCount和originalUnlockedBonusTiers,
   bonus分支使用原始值判断和传参。

4. config.controller.ts - mining-admin同步检查增强
   isSynced新增allAdoptionsProcessed条件(unprocessedAdoptions===0),
   确保所有认种分配+补发完成后才允许激活挖矿。
   修复data变量作用域问题(原在if块内声明,外部引用会报错)。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-30 05:54:02 -08:00
parent 83384acdac
commit 817b7d3a9f
4 changed files with 90 additions and 10 deletions

View File

@ -8,6 +8,7 @@ import { ContributionAccountRepository } from '../../infrastructure/persistence/
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { CDCConsumerService } from '../../infrastructure/kafka/cdc-consumer.service';
import { ContributionAccountUpdatedEvent } from '../../domain/events';
/**
@ -27,10 +28,18 @@ export class ContributionScheduler implements OnModuleInit {
private readonly outboxRepository: OutboxRepository,
private readonly kafkaProducer: KafkaProducerService,
private readonly redis: RedisService,
private readonly cdcConsumer: CDCConsumerService,
) {}
/**
* CDC
*/
private isCdcReady(): boolean {
return this.cdcConsumer.getSyncStatus().allPhasesCompleted;
}
async onModuleInit() {
this.logger.log('Contribution scheduler initialized');
this.logger.log('Contribution scheduler initialized, waiting for CDC initial sync to complete...');
}
/**
@ -38,6 +47,11 @@ export class ContributionScheduler implements OnModuleInit {
*/
@Cron(CronExpression.EVERY_MINUTE)
async processUnprocessedAdoptions(): Promise<void> {
if (!this.isCdcReady()) {
this.logger.debug('[CDC-Gate] processUnprocessedAdoptions skipped: CDC initial sync not yet completed');
return;
}
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:process`, 55);
if (!lockValue) {
return; // 其他实例正在处理
@ -186,6 +200,11 @@ export class ContributionScheduler implements OnModuleInit {
*/
@Cron('*/10 * * * *')
async publishRecentlyUpdatedAccounts(): Promise<void> {
if (!this.isCdcReady()) {
this.logger.debug('[CDC-Gate] publishRecentlyUpdatedAccounts skipped: CDC initial sync not yet completed');
return;
}
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:incremental-sync`, 540); // 9分钟锁
if (!lockValue) {
return;
@ -240,6 +259,11 @@ export class ContributionScheduler implements OnModuleInit {
*/
@Cron('*/10 * * * *')
async processContributionBackfill(): Promise<void> {
if (!this.isCdcReady()) {
this.logger.debug('[CDC-Gate] processContributionBackfill skipped: CDC initial sync not yet completed');
return;
}
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:backfill`, 540); // 9分钟锁
if (!lockValue) {
return;

View File

@ -300,13 +300,26 @@ export class BonusClaimService {
currentDirectReferralAdoptedCount,
);
// 保存原始值level 事务中 updateAccountUnlockStatus 会通过 incrementDirectReferralAdoptedCount
// 同时修改 unlockedLevelDepth 和 unlockedBonusTiers导致 bonus 分支条件失效)
const originalDirectReferralAdoptedCount = account.directReferralAdoptedCount;
const originalUnlockedBonusTiers = account.unlockedBonusTiers;
this.logger.log(
`[Backfill] Checking account ${accountSequence}: ` +
`hasAdopted=${account.hasAdopted}, ` +
`directReferralAdoptedCount=${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount}, ` +
`unlockedLevelDepth=${account.unlockedLevelDepth} (expected=${expectedLevelDepth}), ` +
`unlockedBonusTiers=${originalUnlockedBonusTiers} (expected=${expectedBonusTiers})`,
);
let hasBackfill = false;
// 检查是否需要补发层级贡献值
if (expectedLevelDepth > account.unlockedLevelDepth) {
this.logger.log(
`[Backfill] Account ${accountSequence} level unlock: ${account.unlockedLevelDepth} -> ${expectedLevelDepth} ` +
`(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
`(directReferralAdoptedCount: ${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
);
await this.unitOfWork.executeInTransaction(async () => {
@ -329,19 +342,30 @@ export class BonusClaimService {
expectedBonusTiers,
);
});
this.logger.log(
`[Backfill] Account ${accountSequence} level backfill transaction completed. ` +
`After mutation: directReferralAdoptedCount=${account.directReferralAdoptedCount}, ` +
`unlockedLevelDepth=${account.unlockedLevelDepth}, unlockedBonusTiers=${account.unlockedBonusTiers}`,
);
}
// 检查是否需要补发奖励档位
if (expectedBonusTiers > account.unlockedBonusTiers) {
// 检查是否需要补发奖励档位(使用原始值,因为 level 分支的 updateAccountUnlockStatus
// 会同时把 unlockedBonusTiers 更新到 expectedBonusTiers导致此条件永远为 false
this.logger.debug(
`[Backfill] Account ${accountSequence} bonus check: ` +
`expectedBonusTiers(${expectedBonusTiers}) > originalUnlockedBonusTiers(${originalUnlockedBonusTiers}) = ${expectedBonusTiers > originalUnlockedBonusTiers}`,
);
if (expectedBonusTiers > originalUnlockedBonusTiers) {
this.logger.log(
`[Backfill] Account ${accountSequence} bonus unlock: ${account.unlockedBonusTiers} -> ${expectedBonusTiers} ` +
`(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
`[Backfill] Account ${accountSequence} bonus unlock: ${originalUnlockedBonusTiers} -> ${expectedBonusTiers} ` +
`(directReferralAdoptedCount: ${originalDirectReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
);
// 使用现有的 checkAndClaimBonus 方法补发奖励
// 使用原始直推认种数,确保 checkAndClaimBonus 能正确判断需要解锁的档位
await this.checkAndClaimBonus(
accountSequence,
account.directReferralAdoptedCount,
originalDirectReferralAdoptedCount,
currentDirectReferralAdoptedCount,
);
hasBackfill = true;
@ -359,6 +383,9 @@ export class BonusClaimService {
}
}
this.logger.log(
`[Backfill] Account ${accountSequence} backfill result: hasBackfill=${hasBackfill}`,
);
return hasBackfill;
}

View File

@ -58,6 +58,20 @@ export class ContributionCalculationService {
// 获取认种用户的引荐关系
const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(adoption.accountSequence);
// 推荐数据未同步时跳过(不标记 distributed调度器下次重试
if (!userReferral) {
this.logger.warn(
`[Referral-Guard] Deferring adoption ${originalAdoptionId}: ` +
`referral for ${adoption.accountSequence} not yet synced, will retry on next scheduler tick`,
);
return;
}
this.logger.debug(
`[Referral-Guard] Referral found for ${adoption.accountSequence}: ` +
`referrer=${userReferral.referrerAccountSequence || 'NONE (root)'}`,
);
// 获取上线链条最多15级
let ancestorChain: SyncedReferral[] = [];
if (userReferral?.referrerAccountSequence) {

View File

@ -63,6 +63,7 @@ export class ConfigController {
let networkTotalContribution: string | null = null;
let userEffectiveContribution: string | null = null;
let systemAccountsContribution: string | null = null;
let unprocessedAdoptions = -1;
if (contributionResponse && contributionResponse.ok) {
const contributionResult = await contributionResponse.json();
@ -72,6 +73,8 @@ export class ConfigController {
networkTotalContribution = data.networkTotalContribution || null;
// 用户有效算力
userEffectiveContribution = data.totalContribution || null;
// 未处理认种数
unprocessedAdoptions = data.unprocessedAdoptions ?? -1;
// 系统账户算力
const systemAccounts = data.systemAccounts || [];
const systemTotal = systemAccounts
@ -86,14 +89,23 @@ export class ConfigController {
const miningUserTotal = miningData.totalContribution || '0';
// 判断算力是否同步完成
// 核心条件全网理论算力已同步mining-service 的 networkTotalContribution 与 contribution-service 相近)
// 条件1全网理论算力已同步mining-service 的 networkTotalContribution 与 contribution-service 相近)
// 全网理论算力是挖矿分母,必须同步后才能正确计算挖矿比例
const networkSynced = networkTotalContribution !== null &&
parseFloat(networkTotalContribution) > 0 &&
parseFloat(miningNetworkTotal) > 0 &&
Math.abs(parseFloat(miningNetworkTotal) - parseFloat(networkTotalContribution)) / parseFloat(networkTotalContribution) < 0.001;
const isSynced = networkSynced;
// 条件2所有认种已处理完成无未分配的认种记录
const allAdoptionsProcessed = unprocessedAdoptions === 0;
const isSynced = networkSynced && allAdoptionsProcessed;
this.logger.log(
`[SyncCheck] networkSynced=${networkSynced} (contribution=${networkTotalContribution}, mining=${miningNetworkTotal}), ` +
`allAdoptionsProcessed=${allAdoptionsProcessed} (unprocessedAdoptions=${unprocessedAdoptions}), ` +
`isSynced=${isSynced}`,
);
return {
...miningData,
@ -107,6 +119,8 @@ export class ConfigController {
miningUserTotal,
// 系统账户算力
systemAccountsContribution: systemAccountsContribution || '0',
// 未处理认种数
unprocessedAdoptions,
// 兼容旧字段
miningTotal: miningUserTotal,
contributionTotal: userEffectiveContribution || '0',
@ -125,6 +139,7 @@ export class ConfigController {
userEffectiveContribution: '0',
miningUserTotal: '0',
systemAccountsContribution: '0',
unprocessedAdoptions: -1,
miningTotal: '0',
contributionTotal: '0',
},