fix(contribution-service): auto-publish contribution records on calculation

- Change saveMany to return saved records with IDs
- Update saveDistributionResult to use saved records for event publishing
- Contribution records are now automatically synced to mining-admin-service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 07:32:50 -08:00
parent 04fd7b946a
commit 52c573d507
3 changed files with 29 additions and 24 deletions

View File

@ -153,8 +153,12 @@ export class ContributionCalculationService {
sourceAdoptionId: bigint, sourceAdoptionId: bigint,
sourceAccountSequence: string, sourceAccountSequence: string,
): Promise<void> { ): Promise<void> {
// 收集所有保存后的记录带ID用于发布事件
const savedRecords: ContributionRecordAggregate[] = [];
// 1. 保存个人算力记录 // 1. 保存个人算力记录
await this.contributionRecordRepository.save(result.personalRecord); const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord);
savedRecords.push(savedPersonalRecord);
// 更新个人算力账户 // 更新个人算力账户
let account = await this.contributionAccountRepository.findByAccountSequence( let account = await this.contributionAccountRepository.findByAccountSequence(
@ -168,7 +172,8 @@ export class ContributionCalculationService {
// 2. 保存团队层级算力记录 // 2. 保存团队层级算力记录
if (result.teamLevelRecords.length > 0) { if (result.teamLevelRecords.length > 0) {
await this.contributionRecordRepository.saveMany(result.teamLevelRecords); const savedLevelRecords = await this.contributionRecordRepository.saveMany(result.teamLevelRecords);
savedRecords.push(...savedLevelRecords);
// 更新各上线的算力账户 // 更新各上线的算力账户
for (const record of result.teamLevelRecords) { for (const record of result.teamLevelRecords) {
@ -184,7 +189,8 @@ export class ContributionCalculationService {
// 3. 保存团队奖励算力记录 // 3. 保存团队奖励算力记录
if (result.teamBonusRecords.length > 0) { if (result.teamBonusRecords.length > 0) {
await this.contributionRecordRepository.saveMany(result.teamBonusRecords); const savedBonusRecords = await this.contributionRecordRepository.saveMany(result.teamBonusRecords);
savedRecords.push(...savedBonusRecords);
// 更新直接上线的算力账户 // 更新直接上线的算力账户
for (const record of result.teamBonusRecords) { for (const record of result.teamBonusRecords) {
@ -233,23 +239,19 @@ export class ContributionCalculationService {
} }
} }
// 6. 发布算力记录同步事件(用于 mining-admin-service // 6. 发布算力记录同步事件(用于 mining-admin-service- 使用保存后带 ID 的记录
await this.publishContributionRecordEvents(result); await this.publishContributionRecordEvents(savedRecords);
} }
/** /**
* *
*/ */
private async publishContributionRecordEvents( private async publishContributionRecordEvents(
result: ContributionDistributionResult, savedRecords: ContributionRecordAggregate[],
): Promise<void> { ): Promise<void> {
const allRecords: ContributionRecordAggregate[] = [ if (savedRecords.length === 0) return;
result.personalRecord,
...result.teamLevelRecords,
...result.teamBonusRecords,
];
const events = allRecords.map((record) => { const events = savedRecords.map((record) => {
const event = new ContributionRecordSyncedEvent( const event = new ContributionRecordSyncedEvent(
record.id!, record.id!,
record.accountSequence, record.accountSequence,
@ -276,9 +278,7 @@ export class ContributionCalculationService {
}; };
}); });
if (events.length > 0) { await this.outboxRepository.saveMany(events);
await this.outboxRepository.saveMany(events);
}
} }
/** /**

View File

@ -13,7 +13,7 @@ export interface IContributionRecordRepository {
/** /**
* *
*/ */
saveMany(records: ContributionRecordAggregate[], tx?: any): Promise<void>; saveMany(records: ContributionRecordAggregate[], tx?: any): Promise<ContributionRecordAggregate[]>;
/** /**
* *

View File

@ -99,17 +99,22 @@ export class ContributionRecordRepository implements IContributionRecordReposito
return this.toDomain(result); return this.toDomain(result);
} }
async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise<void> { async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise<ContributionRecordAggregate[]> {
if (aggregates.length === 0) return; if (aggregates.length === 0) return [];
const client = tx ?? this.client; const client = tx ?? this.client;
// 使用事务批量插入
const createData = aggregates.map((a) => a.toPersistence());
await client.contributionRecord.createMany({ // 逐个创建以获取返回的 ID
data: createData, const results: ContributionRecordAggregate[] = [];
skipDuplicates: true, for (const aggregate of aggregates) {
}); const data = aggregate.toPersistence();
const result = await client.contributionRecord.create({
data,
});
results.push(this.toDomain(result));
}
return results;
} }
async findExpiring(beforeDate: Date, limit?: number): Promise<ContributionRecordAggregate[]> { async findExpiring(beforeDate: Date, limit?: number): Promise<ContributionRecordAggregate[]> {