diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index 9885e6a4..a7121846 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -153,8 +153,12 @@ export class ContributionCalculationService { sourceAdoptionId: bigint, sourceAccountSequence: string, ): Promise { + // 收集所有保存后的记录(带ID)用于发布事件 + const savedRecords: ContributionRecordAggregate[] = []; + // 1. 保存个人算力记录 - await this.contributionRecordRepository.save(result.personalRecord); + const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord); + savedRecords.push(savedPersonalRecord); // 更新个人算力账户 let account = await this.contributionAccountRepository.findByAccountSequence( @@ -168,7 +172,8 @@ export class ContributionCalculationService { // 2. 保存团队层级算力记录 if (result.teamLevelRecords.length > 0) { - await this.contributionRecordRepository.saveMany(result.teamLevelRecords); + const savedLevelRecords = await this.contributionRecordRepository.saveMany(result.teamLevelRecords); + savedRecords.push(...savedLevelRecords); // 更新各上线的算力账户 for (const record of result.teamLevelRecords) { @@ -184,7 +189,8 @@ export class ContributionCalculationService { // 3. 保存团队奖励算力记录 if (result.teamBonusRecords.length > 0) { - await this.contributionRecordRepository.saveMany(result.teamBonusRecords); + const savedBonusRecords = await this.contributionRecordRepository.saveMany(result.teamBonusRecords); + savedRecords.push(...savedBonusRecords); // 更新直接上线的算力账户 for (const record of result.teamBonusRecords) { @@ -233,23 +239,19 @@ export class ContributionCalculationService { } } - // 6. 发布算力记录同步事件(用于 mining-admin-service) - await this.publishContributionRecordEvents(result); + // 6. 发布算力记录同步事件(用于 mining-admin-service)- 使用保存后带 ID 的记录 + await this.publishContributionRecordEvents(savedRecords); } /** * 发布算力记录同步事件 */ private async publishContributionRecordEvents( - result: ContributionDistributionResult, + savedRecords: ContributionRecordAggregate[], ): Promise { - const allRecords: ContributionRecordAggregate[] = [ - result.personalRecord, - ...result.teamLevelRecords, - ...result.teamBonusRecords, - ]; + if (savedRecords.length === 0) return; - const events = allRecords.map((record) => { + const events = savedRecords.map((record) => { const event = new ContributionRecordSyncedEvent( record.id!, record.accountSequence, @@ -276,9 +278,7 @@ export class ContributionCalculationService { }; }); - if (events.length > 0) { - await this.outboxRepository.saveMany(events); - } + await this.outboxRepository.saveMany(events); } /** diff --git a/backend/services/contribution-service/src/domain/repositories/contribution-record.repository.interface.ts b/backend/services/contribution-service/src/domain/repositories/contribution-record.repository.interface.ts index 44455503..ee1cac96 100644 --- a/backend/services/contribution-service/src/domain/repositories/contribution-record.repository.interface.ts +++ b/backend/services/contribution-service/src/domain/repositories/contribution-record.repository.interface.ts @@ -13,7 +13,7 @@ export interface IContributionRecordRepository { /** * 批量保存 */ - saveMany(records: ContributionRecordAggregate[], tx?: any): Promise; + saveMany(records: ContributionRecordAggregate[], tx?: any): Promise; /** * 根据账户序列号查找 diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-record.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-record.repository.ts index 51223534..66170919 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-record.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-record.repository.ts @@ -99,17 +99,22 @@ export class ContributionRecordRepository implements IContributionRecordReposito return this.toDomain(result); } - async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise { - if (aggregates.length === 0) return; + async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise { + if (aggregates.length === 0) return []; const client = tx ?? this.client; - // 使用事务批量插入 - const createData = aggregates.map((a) => a.toPersistence()); - await client.contributionRecord.createMany({ - data: createData, - skipDuplicates: true, - }); + // 逐个创建以获取返回的 ID + const results: ContributionRecordAggregate[] = []; + for (const aggregate of aggregates) { + const data = aggregate.toPersistence(); + const result = await client.contributionRecord.create({ + data, + }); + results.push(this.toDomain(result)); + } + + return results; } async findExpiring(beforeDate: Date, limit?: number): Promise {