From 2a79c83715b95e6fd59c9b5b5effd409c9f5e884 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 23:58:54 -0800 Subject: [PATCH] feat(contribution): implement TEAM_BONUS backfill when unlock conditions met When a user's direct referral count reaches 2 or 4, the system now automatically backfills previously pending TEAM_BONUS (T2/T3) contributions that were allocated to headquarters while waiting for unlock conditions. - Add BonusClaimService for handling bonus backfill logic - Add findPendingBonusByAccountSequence and claimBonusRecords to repository - Integrate bonus claim into updateReferrerUnlockStatus flow - Add BonusClaimed event consumer in mining-wallet-service - Generate ledger records for backfilled contributions Co-Authored-By: Claude Opus 4.5 --- .../src/application/application.module.ts | 2 + .../services/bonus-claim.service.ts | 210 ++++++++++++++++++ .../contribution-calculation.service.ts | 16 +- .../unallocated-contribution.repository.ts | 68 +++++- .../contribution-distribution.consumer.ts | 61 +++++ .../events/contribution-distribution.event.ts | 30 +++ 6 files changed, 382 insertions(+), 5 deletions(-) create mode 100644 backend/services/contribution-service/src/application/services/bonus-claim.service.ts diff --git a/backend/services/contribution-service/src/application/application.module.ts b/backend/services/contribution-service/src/application/application.module.ts index 2e1d44ed..e375374d 100644 --- a/backend/services/contribution-service/src/application/application.module.ts +++ b/backend/services/contribution-service/src/application/application.module.ts @@ -12,6 +12,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher'; import { ContributionCalculationService } from './services/contribution-calculation.service'; import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service'; import { ContributionRateService } from './services/contribution-rate.service'; +import { BonusClaimService } from './services/bonus-claim.service'; import { SnapshotService } from './services/snapshot.service'; // Queries @@ -38,6 +39,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler'; ContributionCalculationService, ContributionDistributionPublisherService, ContributionRateService, + BonusClaimService, SnapshotService, // Queries diff --git a/backend/services/contribution-service/src/application/services/bonus-claim.service.ts b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts new file mode 100644 index 00000000..42009670 --- /dev/null +++ b/backend/services/contribution-service/src/application/services/bonus-claim.service.ts @@ -0,0 +1,210 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { UnallocatedContributionRepository, UnallocatedContribution } from '../../infrastructure/persistence/repositories/unallocated-contribution.repository'; +import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository'; +import { ContributionRecordRepository } from '../../infrastructure/persistence/repositories/contribution-record.repository'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; +import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate'; +import { ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate'; +import { ContributionAmount } from '../../domain/value-objects/contribution-amount.vo'; +import { DistributionRate } from '../../domain/value-objects/distribution-rate.vo'; +import { ContributionRecordSyncedEvent } from '../../domain/events'; + +/** + * 奖励补发服务 + * 当用户解锁新的奖励档位时,补发之前所有认种对应的奖励 + */ +@Injectable() +export class BonusClaimService { + private readonly logger = new Logger(BonusClaimService.name); + + constructor( + private readonly unallocatedContributionRepository: UnallocatedContributionRepository, + private readonly contributionAccountRepository: ContributionAccountRepository, + private readonly contributionRecordRepository: ContributionRecordRepository, + private readonly outboxRepository: OutboxRepository, + private readonly unitOfWork: UnitOfWork, + ) {} + + /** + * 检查并处理奖励补发 + * 当用户的直推认种人数变化时调用 + * @param accountSequence 用户账号 + * @param previousCount 之前的直推认种人数 + * @param newCount 新的直推认种人数 + */ + async checkAndClaimBonus( + accountSequence: string, + previousCount: number, + newCount: number, + ): Promise { + // 检查是否达到新的解锁条件 + const tiersToClaimList: number[] = []; + + // T2: 直推≥2人认种时解锁 + if (previousCount < 2 && newCount >= 2) { + tiersToClaimList.push(2); + } + + // T3: 直推≥4人认种时解锁 + if (previousCount < 4 && newCount >= 4) { + tiersToClaimList.push(3); + } + + if (tiersToClaimList.length === 0) { + return; + } + + this.logger.log( + `User ${accountSequence} unlocked bonus tiers: ${tiersToClaimList.join(', ')} ` + + `(directReferralAdoptedCount: ${previousCount} -> ${newCount})`, + ); + + // 在事务中处理补发 + await this.unitOfWork.executeInTransaction(async () => { + for (const tier of tiersToClaimList) { + await this.claimBonusTier(accountSequence, tier); + } + }); + } + + /** + * 补发指定档位的奖励 + */ + private async claimBonusTier(accountSequence: string, bonusTier: number): Promise { + // 1. 查询待领取的记录 + const pendingRecords = await this.unallocatedContributionRepository.findPendingBonusByAccountSequence( + accountSequence, + bonusTier, + ); + + if (pendingRecords.length === 0) { + this.logger.debug(`No pending T${bonusTier} bonus records for ${accountSequence}`); + return; + } + + this.logger.log( + `Claiming ${pendingRecords.length} T${bonusTier} bonus records for ${accountSequence}`, + ); + + // 2. 创建贡献值记录 + const contributionRecords: ContributionRecordAggregate[] = []; + for (const pending of pendingRecords) { + const record = new ContributionRecordAggregate({ + accountSequence: accountSequence, + sourceType: ContributionSourceType.TEAM_BONUS, + sourceAdoptionId: pending.sourceAdoptionId, + sourceAccountSequence: pending.sourceAccountSequence, + treeCount: 0, // 补发记录不记录树数 + baseContribution: new ContributionAmount(0), + distributionRate: DistributionRate.BONUS_PER, + bonusTier: bonusTier, + amount: pending.amount, + effectiveDate: pending.effectiveDate, + expireDate: pending.expireDate, + }); + contributionRecords.push(record); + } + + // 3. 保存贡献值记录 + const savedRecords = await this.contributionRecordRepository.saveMany(contributionRecords); + + // 4. 更新用户的贡献值账户 + let totalAmount = new ContributionAmount(0); + for (const pending of pendingRecords) { + totalAmount = new ContributionAmount(totalAmount.value.plus(pending.amount.value)); + } + + await this.contributionAccountRepository.updateContribution( + accountSequence, + ContributionSourceType.TEAM_BONUS, + totalAmount, + null, + bonusTier, + ); + + // 5. 标记待领取记录为已分配 + const pendingIds = pendingRecords.map((r) => r.id); + await this.unallocatedContributionRepository.claimBonusRecords(pendingIds, accountSequence); + + // 6. 发布事件到 Kafka(通过 Outbox) + await this.publishBonusClaimEvents(accountSequence, savedRecords, pendingRecords); + + this.logger.log( + `Claimed T${bonusTier} bonus for ${accountSequence}: ` + + `${pendingRecords.length} records, total amount: ${totalAmount.value.toString()}`, + ); + } + + /** + * 发布补发事件 + */ + private async publishBonusClaimEvents( + accountSequence: string, + savedRecords: ContributionRecordAggregate[], + pendingRecords: UnallocatedContribution[], + ): Promise { + // 1. 发布贡献值记录同步事件(用于 mining-admin-service CDC) + for (const record of savedRecords) { + const event = new ContributionRecordSyncedEvent( + record.id!, + record.accountSequence, + record.sourceType, + record.sourceAdoptionId, + record.sourceAccountSequence, + record.treeCount, + record.baseContribution.value.toString(), + record.distributionRate.value.toString(), + record.levelDepth, + record.bonusTier, + record.amount.value.toString(), + record.effectiveDate, + record.expireDate, + record.isExpired, + record.createdAt, + ); + + await this.outboxRepository.save({ + aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE, + aggregateId: record.id!.toString(), + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } + + // 2. 发布补发事件到 mining-wallet-service + const userContributions = savedRecords.map((record, index) => ({ + accountSequence: record.accountSequence, + contributionType: 'TEAM_BONUS', + amount: record.amount.value.toString(), + bonusTier: record.bonusTier, + effectiveDate: record.effectiveDate.toISOString(), + expireDate: record.expireDate.toISOString(), + sourceAdoptionId: record.sourceAdoptionId.toString(), + sourceAccountSequence: record.sourceAccountSequence, + isBackfill: true, // 标记为补发 + })); + + const eventId = `bonus-claim-${accountSequence}-${Date.now()}`; + const payload = { + eventType: 'BonusClaimed', + eventId, + timestamp: new Date().toISOString(), + payload: { + accountSequence, + bonusTier: savedRecords[0]?.bonusTier, + claimedCount: savedRecords.length, + userContributions, + }, + }; + + await this.outboxRepository.save({ + eventType: 'BonusClaimed', + topic: 'contribution.bonus.claimed', + key: accountSequence, + payload, + aggregateId: accountSequence, + aggregateType: 'ContributionAccount', + }); + } +} diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index dca61bc0..cb349104 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -12,6 +12,7 @@ import { ContributionRecordAggregate } from '../../domain/aggregates/contributio import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface'; import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service'; import { ContributionRateService } from './contribution-rate.service'; +import { BonusClaimService } from './bonus-claim.service'; import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent } from '../../domain/events'; /** @@ -33,6 +34,7 @@ export class ContributionCalculationService { private readonly unitOfWork: UnitOfWork, private readonly distributionPublisher: ContributionDistributionPublisherService, private readonly contributionRateService: ContributionRateService, + private readonly bonusClaimService: BonusClaimService, ) {} /** @@ -330,6 +332,7 @@ export class ContributionCalculationService { /** * 更新上线的解锁状态(直推用户认种后) + * 如果解锁了新的奖励档位,会触发补发逻辑 */ private async updateReferrerUnlockStatus(referrerAccountSequence: string): Promise { const account = await this.contributionAccountRepository.findByAccountSequence(referrerAccountSequence); @@ -341,10 +344,10 @@ export class ContributionCalculationService { ); // 更新解锁状态 - const currentCount = account.directReferralAdoptedCount; - if (directReferralAdoptedCount > currentCount) { + const previousCount = account.directReferralAdoptedCount; + if (directReferralAdoptedCount > previousCount) { // 需要增量更新 - for (let i = currentCount; i < directReferralAdoptedCount; i++) { + for (let i = previousCount; i < directReferralAdoptedCount; i++) { account.incrementDirectReferralAdoptedCount(); } await this.contributionAccountRepository.save(account); @@ -355,6 +358,13 @@ export class ContributionCalculationService { this.logger.debug( `Updated referrer ${referrerAccountSequence} unlock status: level=${account.unlockedLevelDepth}, bonus=${account.unlockedBonusTiers}`, ); + + // 检查并处理奖励补发(T2: 直推≥2人, T3: 直推≥4人) + await this.bonusClaimService.checkAndClaimBonus( + referrerAccountSequence, + previousCount, + directReferralAdoptedCount, + ); } } diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts index 6b00e835..026effa3 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/unallocated-contribution.repository.ts @@ -7,14 +7,16 @@ export interface UnallocatedContribution { unallocType: string; wouldBeAccountSequence: string | null; levelDepth: number | null; + bonusTier: number | null; amount: ContributionAmount; reason: string | null; sourceAdoptionId: bigint; sourceAccountSequence: string; effectiveDate: Date; expireDate: Date; - allocatedToHeadquarters: boolean; + status: string; allocatedAt: Date | null; + allocatedToAccountSequence: string | null; createdAt: Date; } @@ -130,20 +132,82 @@ export class UnallocatedContributionRepository { }; } + /** + * 查询用户待领取的奖励档位贡献值 + * @param accountSequence 用户账号 + * @param bonusTier 奖励档位 (2 或 3) + */ + async findPendingBonusByAccountSequence( + accountSequence: string, + bonusTier: number, + ): Promise { + const records = await this.client.unallocatedContribution.findMany({ + where: { + wouldBeAccountSequence: accountSequence, + unallocType: `BONUS_TIER_${bonusTier}`, + status: 'PENDING', + }, + orderBy: { createdAt: 'asc' }, + }); + + return records.map((r) => this.toDomain(r)); + } + + /** + * 领取奖励档位 - 将待领取记录标记为已分配给用户 + * @param ids 记录ID列表 + * @param accountSequence 分配给的用户账号 + */ + async claimBonusRecords(ids: bigint[], accountSequence: string): Promise { + if (ids.length === 0) return; + + await this.client.unallocatedContribution.updateMany({ + where: { + id: { in: ids }, + status: 'PENDING', + }, + data: { + status: 'ALLOCATED_TO_USER', + allocatedAt: new Date(), + allocatedToAccountSequence: accountSequence, + }, + }); + } + + /** + * 查询用户所有待领取的奖励(所有档位) + */ + async findAllPendingBonusByAccountSequence( + accountSequence: string, + ): Promise { + const records = await this.client.unallocatedContribution.findMany({ + where: { + wouldBeAccountSequence: accountSequence, + unallocType: { startsWith: 'BONUS_TIER_' }, + status: 'PENDING', + }, + orderBy: { createdAt: 'asc' }, + }); + + return records.map((r) => this.toDomain(r)); + } + private toDomain(record: any): UnallocatedContribution { return { id: record.id, unallocType: record.unallocType, wouldBeAccountSequence: record.wouldBeAccountSequence, levelDepth: record.levelDepth, + bonusTier: record.bonusTier, amount: new ContributionAmount(record.amount), reason: record.reason, sourceAdoptionId: record.sourceAdoptionId, sourceAccountSequence: record.sourceAccountSequence, effectiveDate: record.effectiveDate, expireDate: record.expireDate, - allocatedToHeadquarters: record.allocatedToHeadquarters, + status: record.status, allocatedAt: record.allocatedAt, + allocatedToAccountSequence: record.allocatedToAccountSequence, createdAt: record.createdAt, }; } diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts index 745f4a85..9534d497 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts @@ -9,6 +9,8 @@ import { SystemAccountService } from '../../../application/services/system-accou import { ContributionDistributionCompletedEvent, ContributionDistributionPayload, + BonusClaimedEvent, + BonusClaimedPayload, } from '../events/contribution-distribution.event'; // 4小时 TTL(秒) @@ -114,6 +116,65 @@ export class ContributionDistributionConsumer implements OnModuleInit { } } + /** + * 处理奖励补发事件 + * 当用户解锁新的奖励档位时,补发之前所有认种对应的奖励 + */ + @EventPattern('contribution.bonus.claimed') + async handleBonusClaimed(@Payload() message: any): Promise { + const event: BonusClaimedEvent = message.value || message; + const eventId = event.eventId || message.eventId; + + if (!eventId) { + this.logger.warn('Received BonusClaimed event without eventId, skipping'); + return; + } + + this.logger.debug(`Processing bonus claim event: ${eventId}`); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + await this.processBonusClaim(event.payload); + + // 标记为已处理 + await this.markEventProcessed(eventId, event.eventType); + + this.logger.log( + `Bonus claim for ${event.payload.accountSequence} T${event.payload.bonusTier} processed: ` + + `${event.payload.claimedCount} records`, + ); + } catch (error) { + this.logger.error( + `Failed to process bonus claim for ${event.payload.accountSequence}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 处理奖励补发 + */ + private async processBonusClaim(payload: BonusClaimedPayload): Promise { + for (const contrib of payload.userContributions) { + await this.contributionWalletService.creditContribution({ + accountSequence: contrib.accountSequence, + amount: new Decimal(contrib.amount), + contributionType: contrib.contributionType, + bonusTier: contrib.bonusTier, + effectiveDate: new Date(contrib.effectiveDate), + expireDate: new Date(contrib.expireDate), + sourceAdoptionId: contrib.sourceAdoptionId, + sourceAccountSequence: contrib.sourceAccountSequence, + }); + } + } + /** * 幂等性检查 - Redis + DB 双重检查,4小时去重窗口 */ diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts index 03059a60..7969b5fb 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/contribution-distribution.event.ts @@ -54,6 +54,36 @@ export interface UnallocatedContributionItem { bonusTier?: number; } +/** + * 奖励补发事件 + * 来自 contribution-service,当用户解锁新的奖励档位时触发 + */ +export interface BonusClaimedEvent { + eventType: 'BonusClaimed'; + eventId: string; + timestamp: string; + payload: BonusClaimedPayload; +} + +export interface BonusClaimedPayload { + accountSequence: string; + bonusTier: number; + claimedCount: number; + userContributions: BonusClaimedContributionItem[]; +} + +export interface BonusClaimedContributionItem { + accountSequence: string; + contributionType: 'TEAM_BONUS'; + amount: string; + bonusTier: number; + effectiveDate: string; + expireDate: string; + sourceAdoptionId: string; + sourceAccountSequence: string; + isBackfill: boolean; +} + /** * 用户注册事件 * 来自 auth-service