diff --git a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts index 899b851a..b515e21d 100644 --- a/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts +++ b/backend/services/contribution-service/src/application/schedulers/contribution.scheduler.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; import { SnapshotService } from '../services/snapshot.service'; +import { BonusClaimService } from '../services/bonus-claim.service'; import { ContributionRecordRepository } from '../../infrastructure/persistence/repositories/contribution-record.repository'; import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; @@ -20,6 +21,7 @@ export class ContributionScheduler implements OnModuleInit { constructor( private readonly calculationService: ContributionCalculationService, private readonly snapshotService: SnapshotService, + private readonly bonusClaimService: BonusClaimService, private readonly contributionRecordRepository: ContributionRecordRepository, private readonly contributionAccountRepository: ContributionAccountRepository, private readonly outboxRepository: OutboxRepository, @@ -232,6 +234,59 @@ export class ContributionScheduler implements OnModuleInit { } } + /** + * 每10分钟扫描并补发未完全解锁的贡献值 + * 处理因下级先于上级认种导致的层级/奖励档位未能及时分配的情况 + */ + @Cron('*/10 * * * *') + async processContributionBackfill(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:backfill`, 540); // 9分钟锁 + if (!lockValue) { + return; + } + + try { + this.logger.log('Starting contribution backfill scan...'); + + // 查找解锁状态不完整的账户(已认种但层级<15或奖励档位<3) + const accounts = await this.contributionAccountRepository.findAccountsWithIncompleteUnlock(100); + + if (accounts.length === 0) { + this.logger.debug('No accounts with incomplete unlock status found'); + return; + } + + this.logger.log(`Found ${accounts.length} accounts with incomplete unlock status`); + + let backfilledCount = 0; + let errorCount = 0; + + for (const account of accounts) { + try { + const hasBackfill = await this.bonusClaimService.processBackfillForAccount(account.accountSequence); + if (hasBackfill) { + backfilledCount++; + } + } catch (error) { + errorCount++; + this.logger.error( + `Failed to process backfill for account ${account.accountSequence}`, + error, + ); + // 继续处理下一个账户 + } + } + + this.logger.log( + `Contribution backfill completed: ${backfilledCount} accounts backfilled, ${errorCount} errors`, + ); + } catch (error) { + this.logger.error('Failed to process contribution backfill', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:backfill`, lockValue); + } + } + /** * 每天凌晨4点全量发布所有贡献值账户更新事件 * 作为数据一致性的最终兜底保障 diff --git a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts index 187e366a..28f4bb29 100644 --- a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts +++ b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts @@ -7,10 +7,11 @@ import { OutboxRepository } from '../../infrastructure/persistence/repositories/ import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate'; -import { ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate'; +import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate'; import { ContributionAmount } from '../../domain/value-objects/contribution-amount.vo'; import { DistributionRate } from '../../domain/value-objects/distribution-rate.vo'; -import { ContributionRecordSyncedEvent, SystemAccountSyncedEvent } from '../../domain/events'; +import { ContributionCalculatorService } from '../../domain/services/contribution-calculator.service'; +import { ContributionRecordSyncedEvent, SystemAccountSyncedEvent, ContributionAccountUpdatedEvent } from '../../domain/events'; /** * 奖励补发服务 @@ -271,4 +272,352 @@ export class BonusClaimService { aggregateType: 'ContributionAccount', }); } + + // ========== 定时任务补发逻辑 ========== + + private readonly domainCalculator = new ContributionCalculatorService(); + + /** + * 处理单个账户的补发逻辑 + * 检查是否有新解锁的层级或奖励档位,并进行补发 + * @returns 是否有补发 + */ + async processBackfillForAccount(accountSequence: string): Promise { + const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + if (!account) { + return false; + } + + // 重新计算直推认种用户数 + const currentDirectReferralAdoptedCount = await this.syncedDataRepository.getDirectReferralAdoptedCount( + accountSequence, + ); + + // 计算应该解锁的层级深度和奖励档位 + const expectedLevelDepth = this.domainCalculator.calculateUnlockedLevelDepth(currentDirectReferralAdoptedCount); + const expectedBonusTiers = this.domainCalculator.calculateUnlockedBonusTiers( + account.hasAdopted, + currentDirectReferralAdoptedCount, + ); + + let hasBackfill = false; + + // 检查是否需要补发层级贡献值 + if (expectedLevelDepth > account.unlockedLevelDepth) { + this.logger.log( + `[Backfill] Account ${accountSequence} level unlock: ${account.unlockedLevelDepth} -> ${expectedLevelDepth} ` + + `(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, + ); + + await this.unitOfWork.executeInTransaction(async () => { + // 补发层级贡献值 + const levelClaimed = await this.claimLevelContributions( + accountSequence, + account.unlockedLevelDepth + 1, + expectedLevelDepth, + ); + + if (levelClaimed > 0) { + hasBackfill = true; + } + + // 更新账户的直推认种数和解锁状态 + await this.updateAccountUnlockStatus( + account, + currentDirectReferralAdoptedCount, + expectedLevelDepth, + expectedBonusTiers, + ); + }); + } + + // 检查是否需要补发奖励档位 + if (expectedBonusTiers > account.unlockedBonusTiers) { + this.logger.log( + `[Backfill] Account ${accountSequence} bonus unlock: ${account.unlockedBonusTiers} -> ${expectedBonusTiers} ` + + `(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`, + ); + + // 使用现有的 checkAndClaimBonus 方法补发奖励 + await this.checkAndClaimBonus( + accountSequence, + account.directReferralAdoptedCount, + currentDirectReferralAdoptedCount, + ); + hasBackfill = true; + + // 如果只有奖励档位需要补发(层级已经是最新的),也需要更新账户状态 + if (expectedLevelDepth <= account.unlockedLevelDepth) { + await this.unitOfWork.executeInTransaction(async () => { + await this.updateAccountUnlockStatus( + account, + currentDirectReferralAdoptedCount, + expectedLevelDepth, + expectedBonusTiers, + ); + }); + } + } + + return hasBackfill; + } + + /** + * 补发层级贡献值 + * @param accountSequence 用户账号 + * @param minLevel 最小层级(包含) + * @param maxLevel 最大层级(包含) + * @returns 补发的记录数 + */ + private async claimLevelContributions( + accountSequence: string, + minLevel: number, + maxLevel: number, + ): Promise { + // 1. 查询待领取的层级贡献值记录 + const pendingRecords = await this.unallocatedContributionRepository.findPendingLevelByAccountSequence( + accountSequence, + minLevel, + maxLevel, + ); + + if (pendingRecords.length === 0) { + this.logger.debug(`[Backfill] No pending level records for ${accountSequence} (levels ${minLevel}-${maxLevel})`); + return 0; + } + + this.logger.log( + `[Backfill] Claiming ${pendingRecords.length} level records for ${accountSequence} (levels ${minLevel}-${maxLevel})`, + ); + + // 2. 查询原始认种数据,获取 treeCount 和 baseContribution + const adoptionDataMap = new Map(); + for (const pending of pendingRecords) { + const adoptionIdStr = pending.sourceAdoptionId.toString(); + if (!adoptionDataMap.has(adoptionIdStr)) { + const adoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(pending.sourceAdoptionId); + if (adoption) { + adoptionDataMap.set(adoptionIdStr, { + treeCount: adoption.treeCount, + baseContribution: new ContributionAmount(adoption.contributionPerTree), + }); + } else { + this.logger.warn(`[Backfill] Adoption not found for sourceAdoptionId: ${pending.sourceAdoptionId}`); + adoptionDataMap.set(adoptionIdStr, { + treeCount: 0, + baseContribution: new ContributionAmount(0), + }); + } + } + } + + // 3. 创建贡献值记录 + const contributionRecords: ContributionRecordAggregate[] = []; + for (const pending of pendingRecords) { + const adoptionData = adoptionDataMap.get(pending.sourceAdoptionId.toString())!; + const record = new ContributionRecordAggregate({ + accountSequence: accountSequence, + sourceType: ContributionSourceType.TEAM_LEVEL, + sourceAdoptionId: pending.sourceAdoptionId, + sourceAccountSequence: pending.sourceAccountSequence, + treeCount: adoptionData.treeCount, + baseContribution: adoptionData.baseContribution, + distributionRate: DistributionRate.LEVEL_PER, + levelDepth: pending.levelDepth!, + amount: pending.amount, + effectiveDate: pending.effectiveDate, + expireDate: pending.expireDate, + }); + contributionRecords.push(record); + } + + // 4. 保存贡献值记录 + const savedRecords = await this.contributionRecordRepository.saveMany(contributionRecords); + + // 5. 更新用户的贡献值账户(按层级分别更新) + for (const pending of pendingRecords) { + await this.contributionAccountRepository.updateContribution( + accountSequence, + ContributionSourceType.TEAM_LEVEL, + pending.amount, + pending.levelDepth, + null, + ); + } + + // 6. 标记待领取记录为已分配 + const pendingIds = pendingRecords.map((r) => r.id); + await this.unallocatedContributionRepository.claimLevelRecords(pendingIds, accountSequence); + + // 7. 计算总金额用于从 HEADQUARTERS 扣除 + let totalAmount = new ContributionAmount(0); + for (const pending of pendingRecords) { + totalAmount = new ContributionAmount(totalAmount.value.plus(pending.amount.value)); + } + + // 8. 从 HEADQUARTERS 减少算力并删除明细记录 + await this.systemAccountRepository.subtractContribution('HEADQUARTERS', null, totalAmount); + for (const pending of pendingRecords) { + await this.systemAccountRepository.deleteContributionRecordsByAdoption( + 'HEADQUARTERS', + null, + pending.sourceAdoptionId, + pending.sourceAccountSequence, + ); + } + + // 9. 发布 HEADQUARTERS 账户更新事件 + const headquartersAccount = await this.systemAccountRepository.findByTypeAndRegion('HEADQUARTERS', null); + if (headquartersAccount) { + const hqEvent = new SystemAccountSyncedEvent( + 'HEADQUARTERS', + null, + headquartersAccount.name, + headquartersAccount.contributionBalance.value.toString(), + headquartersAccount.createdAt, + ); + await this.outboxRepository.save({ + aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE, + aggregateId: 'HEADQUARTERS', + eventType: SystemAccountSyncedEvent.EVENT_TYPE, + payload: hqEvent.toPayload(), + }); + } + + // 10. 发布贡献值记录同步事件 + await this.publishLevelClaimEvents(accountSequence, savedRecords, pendingRecords); + + this.logger.log( + `[Backfill] Claimed level contributions for ${accountSequence}: ` + + `${pendingRecords.length} records, total amount: ${totalAmount.value.toString()}`, + ); + + return pendingRecords.length; + } + + /** + * 更新账户的解锁状态 + */ + private async updateAccountUnlockStatus( + account: ContributionAccountAggregate, + newDirectReferralAdoptedCount: number, + expectedLevelDepth: number, + expectedBonusTiers: number, + ): Promise { + // 增量更新直推认种数 + const previousCount = account.directReferralAdoptedCount; + if (newDirectReferralAdoptedCount > previousCount) { + for (let i = previousCount; i < newDirectReferralAdoptedCount; i++) { + account.incrementDirectReferralAdoptedCount(); + } + } + + await this.contributionAccountRepository.save(account); + + // 发布账户更新事件 + await this.publishContributionAccountUpdatedEvent(account); + } + + /** + * 发布层级补发事件 + */ + private async publishLevelClaimEvents( + accountSequence: string, + savedRecords: ContributionRecordAggregate[], + pendingRecords: UnallocatedContribution[], + ): Promise { + // 1. 发布贡献值记录同步事件(用于 mining-admin-service CDC) + for (const record of savedRecords) { + const event = new ContributionRecordSyncedEvent( + record.id!, + record.accountSequence, + record.sourceType, + record.sourceAdoptionId, + record.sourceAccountSequence, + record.treeCount, + record.baseContribution.value.toString(), + record.distributionRate.value.toString(), + record.levelDepth, + record.bonusTier, + record.amount.value.toString(), + record.effectiveDate, + record.expireDate, + record.isExpired, + record.createdAt, + ); + + await this.outboxRepository.save({ + aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE, + aggregateId: record.id!.toString(), + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } + + // 2. 发布补发事件到 mining-wallet-service + const userContributions = savedRecords.map((record) => ({ + accountSequence: record.accountSequence, + contributionType: 'TEAM_LEVEL', + amount: record.amount.value.toString(), + levelDepth: record.levelDepth, + effectiveDate: record.effectiveDate.toISOString(), + expireDate: record.expireDate.toISOString(), + sourceAdoptionId: record.sourceAdoptionId.toString(), + sourceAccountSequence: record.sourceAccountSequence, + isBackfill: true, // 标记为补发 + })); + + const eventId = `level-claim-${accountSequence}-${Date.now()}`; + const payload = { + eventType: 'LevelClaimed', + eventId, + timestamp: new Date().toISOString(), + payload: { + accountSequence, + claimedCount: savedRecords.length, + userContributions, + }, + }; + + await this.outboxRepository.save({ + eventType: 'LevelClaimed', + topic: 'contribution.level.claimed', + key: accountSequence, + payload, + aggregateId: accountSequence, + aggregateType: 'ContributionAccount', + }); + } + + /** + * 发布贡献值账户更新事件 + */ + private async publishContributionAccountUpdatedEvent( + account: ContributionAccountAggregate, + ): Promise { + const totalContribution = account.personalContribution.value + .plus(account.totalLevelPending.value) + .plus(account.totalBonusPending.value); + + const event = new ContributionAccountUpdatedEvent( + account.accountSequence, + account.personalContribution.value.toString(), + account.totalLevelPending.value.toString(), + account.totalBonusPending.value.toString(), + totalContribution.toString(), + account.effectiveContribution.value.toString(), + account.hasAdopted, + account.directReferralAdoptedCount, + account.unlockedLevelDepth, + account.unlockedBonusTiers, + account.createdAt, + ); + + await this.outboxRepository.save({ + aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE, + aggregateId: account.accountSequence, + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } } diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts index da93d8da..6f478587 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts @@ -233,6 +233,31 @@ export class ContributionAccountRepository implements IContributionAccountReposi return records.map((r) => this.toDomain(r)); } + /** + * 查找解锁状态不完整的账户 + * 用于定时任务补发奖励 + * @param limit 返回的最大数量 + * @returns 解锁状态不完整的账户列表 + */ + async findAccountsWithIncompleteUnlock(limit: number = 100): Promise { + // 查找已认种但未达到满解锁状态的账户: + // - unlockedLevelDepth < 15 或 + // - unlockedBonusTiers < 3 + const records = await this.client.contributionAccount.findMany({ + where: { + hasAdopted: true, + OR: [ + { unlockedLevelDepth: { lt: 15 } }, + { unlockedBonusTiers: { lt: 3 } }, + ], + }, + orderBy: { updatedAt: 'asc' }, // 优先处理最久未更新的 + take: limit, + }); + + return records.map((r) => this.toDomain(r)); + } + /** * 获取详细算力汇总(按类型分解) */ diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts index 8c0cfa01..63567bd6 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts @@ -192,6 +192,54 @@ export class UnallocatedContributionRepository { return records.map((r) => this.toDomain(r)); } + /** + * 查询用户待领取的层级贡献值(按层级范围) + * @param accountSequence 用户账号 + * @param minLevel 最小层级(包含) + * @param maxLevel 最大层级(包含) + */ + async findPendingLevelByAccountSequence( + accountSequence: string, + minLevel: number, + maxLevel: number, + ): Promise { + const records = await this.client.unallocatedContribution.findMany({ + where: { + wouldBeAccountSequence: accountSequence, + unallocType: 'LEVEL_OVERFLOW', + levelDepth: { + gte: minLevel, + lte: maxLevel, + }, + status: 'PENDING', + }, + orderBy: { levelDepth: 'asc' }, + }); + + return records.map((r) => this.toDomain(r)); + } + + /** + * 领取层级贡献值 - 将待领取记录标记为已分配给用户 + * @param ids 记录ID列表 + * @param accountSequence 分配给的用户账号 + */ + async claimLevelRecords(ids: bigint[], accountSequence: string): Promise { + if (ids.length === 0) return; + + await this.client.unallocatedContribution.updateMany({ + where: { + id: { in: ids }, + status: 'PENDING', + }, + data: { + status: 'ALLOCATED_TO_USER', + allocatedAt: new Date(), + allocatedToAccountSequence: accountSequence, + }, + }); + } + /** * 获取分层级的未分配算力统计 */