feat(contribution): 添加定时任务补发未完全解锁的贡献值

每10分钟扫描已认种但解锁状态不完整的账户,检查其直推用户认种情况,
若满足新的解锁条件则自动补发层级贡献值和奖励档位。

- 添加 findAccountsWithIncompleteUnlock 查询方法
- 添加 findPendingLevelByAccountSequence 和 claimLevelRecords 方法
- 实现 processBackfillForAccount 和 claimLevelContributions 补发逻辑
- 添加 processContributionBackfill 定时任务(每10分钟执行)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-28 06:25:57 -08:00
parent 2597d0ef46
commit cec98e9d3e
4 changed files with 479 additions and 2 deletions

View File

@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { ContributionCalculationService } from '../services/contribution-calculation.service';
import { SnapshotService } from '../services/snapshot.service';
import { BonusClaimService } from '../services/bonus-claim.service';
import { ContributionRecordRepository } from '../../infrastructure/persistence/repositories/contribution-record.repository';
import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
@ -20,6 +21,7 @@ export class ContributionScheduler implements OnModuleInit {
constructor(
private readonly calculationService: ContributionCalculationService,
private readonly snapshotService: SnapshotService,
private readonly bonusClaimService: BonusClaimService,
private readonly contributionRecordRepository: ContributionRecordRepository,
private readonly contributionAccountRepository: ContributionAccountRepository,
private readonly outboxRepository: OutboxRepository,
@ -232,6 +234,59 @@ export class ContributionScheduler implements OnModuleInit {
}
}
/**
* 10
* /
*/
@Cron('*/10 * * * *')
async processContributionBackfill(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:backfill`, 540); // 9分钟锁
if (!lockValue) {
return;
}
try {
this.logger.log('Starting contribution backfill scan...');
// 查找解锁状态不完整的账户(已认种但层级<15或奖励档位<3
const accounts = await this.contributionAccountRepository.findAccountsWithIncompleteUnlock(100);
if (accounts.length === 0) {
this.logger.debug('No accounts with incomplete unlock status found');
return;
}
this.logger.log(`Found ${accounts.length} accounts with incomplete unlock status`);
let backfilledCount = 0;
let errorCount = 0;
for (const account of accounts) {
try {
const hasBackfill = await this.bonusClaimService.processBackfillForAccount(account.accountSequence);
if (hasBackfill) {
backfilledCount++;
}
} catch (error) {
errorCount++;
this.logger.error(
`Failed to process backfill for account ${account.accountSequence}`,
error,
);
// 继续处理下一个账户
}
}
this.logger.log(
`Contribution backfill completed: ${backfilledCount} accounts backfilled, ${errorCount} errors`,
);
} catch (error) {
this.logger.error('Failed to process contribution backfill', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:backfill`, lockValue);
}
}
/**
* 4
*

View File

@ -7,10 +7,11 @@ import { OutboxRepository } from '../../infrastructure/persistence/repositories/
import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate';
import { ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate';
import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate';
import { ContributionAmount } from '../../domain/value-objects/contribution-amount.vo';
import { DistributionRate } from '../../domain/value-objects/distribution-rate.vo';
import { ContributionRecordSyncedEvent, SystemAccountSyncedEvent } from '../../domain/events';
import { ContributionCalculatorService } from '../../domain/services/contribution-calculator.service';
import { ContributionRecordSyncedEvent, SystemAccountSyncedEvent, ContributionAccountUpdatedEvent } from '../../domain/events';
/**
*
@ -271,4 +272,352 @@ export class BonusClaimService {
aggregateType: 'ContributionAccount',
});
}
// ========== 定时任务补发逻辑 ==========
private readonly domainCalculator = new ContributionCalculatorService();
/**
*
*
* @returns
*/
async processBackfillForAccount(accountSequence: string): Promise<boolean> {
const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (!account) {
return false;
}
// 重新计算直推认种用户数
const currentDirectReferralAdoptedCount = await this.syncedDataRepository.getDirectReferralAdoptedCount(
accountSequence,
);
// 计算应该解锁的层级深度和奖励档位
const expectedLevelDepth = this.domainCalculator.calculateUnlockedLevelDepth(currentDirectReferralAdoptedCount);
const expectedBonusTiers = this.domainCalculator.calculateUnlockedBonusTiers(
account.hasAdopted,
currentDirectReferralAdoptedCount,
);
let hasBackfill = false;
// 检查是否需要补发层级贡献值
if (expectedLevelDepth > account.unlockedLevelDepth) {
this.logger.log(
`[Backfill] Account ${accountSequence} level unlock: ${account.unlockedLevelDepth} -> ${expectedLevelDepth} ` +
`(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
);
await this.unitOfWork.executeInTransaction(async () => {
// 补发层级贡献值
const levelClaimed = await this.claimLevelContributions(
accountSequence,
account.unlockedLevelDepth + 1,
expectedLevelDepth,
);
if (levelClaimed > 0) {
hasBackfill = true;
}
// 更新账户的直推认种数和解锁状态
await this.updateAccountUnlockStatus(
account,
currentDirectReferralAdoptedCount,
expectedLevelDepth,
expectedBonusTiers,
);
});
}
// 检查是否需要补发奖励档位
if (expectedBonusTiers > account.unlockedBonusTiers) {
this.logger.log(
`[Backfill] Account ${accountSequence} bonus unlock: ${account.unlockedBonusTiers} -> ${expectedBonusTiers} ` +
`(directReferralAdoptedCount: ${account.directReferralAdoptedCount} -> ${currentDirectReferralAdoptedCount})`,
);
// 使用现有的 checkAndClaimBonus 方法补发奖励
await this.checkAndClaimBonus(
accountSequence,
account.directReferralAdoptedCount,
currentDirectReferralAdoptedCount,
);
hasBackfill = true;
// 如果只有奖励档位需要补发(层级已经是最新的),也需要更新账户状态
if (expectedLevelDepth <= account.unlockedLevelDepth) {
await this.unitOfWork.executeInTransaction(async () => {
await this.updateAccountUnlockStatus(
account,
currentDirectReferralAdoptedCount,
expectedLevelDepth,
expectedBonusTiers,
);
});
}
}
return hasBackfill;
}
/**
*
* @param accountSequence
* @param minLevel
* @param maxLevel
* @returns
*/
private async claimLevelContributions(
accountSequence: string,
minLevel: number,
maxLevel: number,
): Promise<number> {
// 1. 查询待领取的层级贡献值记录
const pendingRecords = await this.unallocatedContributionRepository.findPendingLevelByAccountSequence(
accountSequence,
minLevel,
maxLevel,
);
if (pendingRecords.length === 0) {
this.logger.debug(`[Backfill] No pending level records for ${accountSequence} (levels ${minLevel}-${maxLevel})`);
return 0;
}
this.logger.log(
`[Backfill] Claiming ${pendingRecords.length} level records for ${accountSequence} (levels ${minLevel}-${maxLevel})`,
);
// 2. 查询原始认种数据,获取 treeCount 和 baseContribution
const adoptionDataMap = new Map<string, { treeCount: number; baseContribution: ContributionAmount }>();
for (const pending of pendingRecords) {
const adoptionIdStr = pending.sourceAdoptionId.toString();
if (!adoptionDataMap.has(adoptionIdStr)) {
const adoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(pending.sourceAdoptionId);
if (adoption) {
adoptionDataMap.set(adoptionIdStr, {
treeCount: adoption.treeCount,
baseContribution: new ContributionAmount(adoption.contributionPerTree),
});
} else {
this.logger.warn(`[Backfill] Adoption not found for sourceAdoptionId: ${pending.sourceAdoptionId}`);
adoptionDataMap.set(adoptionIdStr, {
treeCount: 0,
baseContribution: new ContributionAmount(0),
});
}
}
}
// 3. 创建贡献值记录
const contributionRecords: ContributionRecordAggregate[] = [];
for (const pending of pendingRecords) {
const adoptionData = adoptionDataMap.get(pending.sourceAdoptionId.toString())!;
const record = new ContributionRecordAggregate({
accountSequence: accountSequence,
sourceType: ContributionSourceType.TEAM_LEVEL,
sourceAdoptionId: pending.sourceAdoptionId,
sourceAccountSequence: pending.sourceAccountSequence,
treeCount: adoptionData.treeCount,
baseContribution: adoptionData.baseContribution,
distributionRate: DistributionRate.LEVEL_PER,
levelDepth: pending.levelDepth!,
amount: pending.amount,
effectiveDate: pending.effectiveDate,
expireDate: pending.expireDate,
});
contributionRecords.push(record);
}
// 4. 保存贡献值记录
const savedRecords = await this.contributionRecordRepository.saveMany(contributionRecords);
// 5. 更新用户的贡献值账户(按层级分别更新)
for (const pending of pendingRecords) {
await this.contributionAccountRepository.updateContribution(
accountSequence,
ContributionSourceType.TEAM_LEVEL,
pending.amount,
pending.levelDepth,
null,
);
}
// 6. 标记待领取记录为已分配
const pendingIds = pendingRecords.map((r) => r.id);
await this.unallocatedContributionRepository.claimLevelRecords(pendingIds, accountSequence);
// 7. 计算总金额用于从 HEADQUARTERS 扣除
let totalAmount = new ContributionAmount(0);
for (const pending of pendingRecords) {
totalAmount = new ContributionAmount(totalAmount.value.plus(pending.amount.value));
}
// 8. 从 HEADQUARTERS 减少算力并删除明细记录
await this.systemAccountRepository.subtractContribution('HEADQUARTERS', null, totalAmount);
for (const pending of pendingRecords) {
await this.systemAccountRepository.deleteContributionRecordsByAdoption(
'HEADQUARTERS',
null,
pending.sourceAdoptionId,
pending.sourceAccountSequence,
);
}
// 9. 发布 HEADQUARTERS 账户更新事件
const headquartersAccount = await this.systemAccountRepository.findByTypeAndRegion('HEADQUARTERS', null);
if (headquartersAccount) {
const hqEvent = new SystemAccountSyncedEvent(
'HEADQUARTERS',
null,
headquartersAccount.name,
headquartersAccount.contributionBalance.value.toString(),
headquartersAccount.createdAt,
);
await this.outboxRepository.save({
aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE,
aggregateId: 'HEADQUARTERS',
eventType: SystemAccountSyncedEvent.EVENT_TYPE,
payload: hqEvent.toPayload(),
});
}
// 10. 发布贡献值记录同步事件
await this.publishLevelClaimEvents(accountSequence, savedRecords, pendingRecords);
this.logger.log(
`[Backfill] Claimed level contributions for ${accountSequence}: ` +
`${pendingRecords.length} records, total amount: ${totalAmount.value.toString()}`,
);
return pendingRecords.length;
}
/**
*
*/
private async updateAccountUnlockStatus(
account: ContributionAccountAggregate,
newDirectReferralAdoptedCount: number,
expectedLevelDepth: number,
expectedBonusTiers: number,
): Promise<void> {
// 增量更新直推认种数
const previousCount = account.directReferralAdoptedCount;
if (newDirectReferralAdoptedCount > previousCount) {
for (let i = previousCount; i < newDirectReferralAdoptedCount; i++) {
account.incrementDirectReferralAdoptedCount();
}
}
await this.contributionAccountRepository.save(account);
// 发布账户更新事件
await this.publishContributionAccountUpdatedEvent(account);
}
/**
*
*/
private async publishLevelClaimEvents(
accountSequence: string,
savedRecords: ContributionRecordAggregate[],
pendingRecords: UnallocatedContribution[],
): Promise<void> {
// 1. 发布贡献值记录同步事件(用于 mining-admin-service CDC
for (const record of savedRecords) {
const event = new ContributionRecordSyncedEvent(
record.id!,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.value.toString(),
record.distributionRate.value.toString(),
record.levelDepth,
record.bonusTier,
record.amount.value.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
await this.outboxRepository.save({
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id!.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
}
// 2. 发布补发事件到 mining-wallet-service
const userContributions = savedRecords.map((record) => ({
accountSequence: record.accountSequence,
contributionType: 'TEAM_LEVEL',
amount: record.amount.value.toString(),
levelDepth: record.levelDepth,
effectiveDate: record.effectiveDate.toISOString(),
expireDate: record.expireDate.toISOString(),
sourceAdoptionId: record.sourceAdoptionId.toString(),
sourceAccountSequence: record.sourceAccountSequence,
isBackfill: true, // 标记为补发
}));
const eventId = `level-claim-${accountSequence}-${Date.now()}`;
const payload = {
eventType: 'LevelClaimed',
eventId,
timestamp: new Date().toISOString(),
payload: {
accountSequence,
claimedCount: savedRecords.length,
userContributions,
},
};
await this.outboxRepository.save({
eventType: 'LevelClaimed',
topic: 'contribution.level.claimed',
key: accountSequence,
payload,
aggregateId: accountSequence,
aggregateType: 'ContributionAccount',
});
}
/**
*
*/
private async publishContributionAccountUpdatedEvent(
account: ContributionAccountAggregate,
): Promise<void> {
const totalContribution = account.personalContribution.value
.plus(account.totalLevelPending.value)
.plus(account.totalBonusPending.value);
const event = new ContributionAccountUpdatedEvent(
account.accountSequence,
account.personalContribution.value.toString(),
account.totalLevelPending.value.toString(),
account.totalBonusPending.value.toString(),
totalContribution.toString(),
account.effectiveContribution.value.toString(),
account.hasAdopted,
account.directReferralAdoptedCount,
account.unlockedLevelDepth,
account.unlockedBonusTiers,
account.createdAt,
);
await this.outboxRepository.save({
aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE,
aggregateId: account.accountSequence,
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
}
}

View File

@ -233,6 +233,31 @@ export class ContributionAccountRepository implements IContributionAccountReposi
return records.map((r) => this.toDomain(r));
}
/**
*
*
* @param limit
* @returns
*/
async findAccountsWithIncompleteUnlock(limit: number = 100): Promise<ContributionAccountAggregate[]> {
// 查找已认种但未达到满解锁状态的账户:
// - unlockedLevelDepth < 15 或
// - unlockedBonusTiers < 3
const records = await this.client.contributionAccount.findMany({
where: {
hasAdopted: true,
OR: [
{ unlockedLevelDepth: { lt: 15 } },
{ unlockedBonusTiers: { lt: 3 } },
],
},
orderBy: { updatedAt: 'asc' }, // 优先处理最久未更新的
take: limit,
});
return records.map((r) => this.toDomain(r));
}
/**
*
*/

View File

@ -192,6 +192,54 @@ export class UnallocatedContributionRepository {
return records.map((r) => this.toDomain(r));
}
/**
*
* @param accountSequence
* @param minLevel
* @param maxLevel
*/
async findPendingLevelByAccountSequence(
accountSequence: string,
minLevel: number,
maxLevel: number,
): Promise<UnallocatedContribution[]> {
const records = await this.client.unallocatedContribution.findMany({
where: {
wouldBeAccountSequence: accountSequence,
unallocType: 'LEVEL_OVERFLOW',
levelDepth: {
gte: minLevel,
lte: maxLevel,
},
status: 'PENDING',
},
orderBy: { levelDepth: 'asc' },
});
return records.map((r) => this.toDomain(r));
}
/**
* -
* @param ids ID列表
* @param accountSequence
*/
async claimLevelRecords(ids: bigint[], accountSequence: string): Promise<void> {
if (ids.length === 0) return;
await this.client.unallocatedContribution.updateMany({
where: {
id: { in: ids },
status: 'PENDING',
},
data: {
status: 'ALLOCATED_TO_USER',
allocatedAt: new Date(),
allocatedToAccountSequence: accountSequence,
},
});
}
/**
*
*/