diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index ea1ccc99..bdffc3d2 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -11,6 +11,7 @@ import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, SystemAccountSyncedEvent, + UnallocatedContributionSyncedEvent, } from '../../domain/events'; import { Public } from '../../shared/guards/jwt-auth.guard'; @@ -487,4 +488,78 @@ export class AdminController { total: systemAccounts.length, }; } + + @Post('unallocated-contributions/publish-all') + @Public() + @ApiOperation({ summary: '发布所有未分配算力事件到 outbox,用于同步到 mining-service' }) + async publishAllUnallocatedContributions(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const unallocatedContributions = await this.prisma.unallocatedContribution.findMany({ + where: { status: 'PENDING' }, + select: { + id: true, + sourceAdoptionId: true, + sourceAccountSequence: true, + wouldBeAccountSequence: true, + unallocType: true, + amount: true, + reason: true, + effectiveDate: true, + expireDate: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + const batchSize = 100; + for (let i = 0; i < unallocatedContributions.length; i += batchSize) { + const batch = unallocatedContributions.slice(i, i + batchSize); + + try { + await this.unitOfWork.executeInTransaction(async () => { + const events = batch.map((uc) => { + const event = new UnallocatedContributionSyncedEvent( + uc.sourceAdoptionId, + uc.sourceAccountSequence, + uc.wouldBeAccountSequence, + uc.unallocType, + uc.amount.toString(), + uc.reason, + uc.effectiveDate, + uc.expireDate, + ); + + return { + aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE, + aggregateId: `${uc.sourceAdoptionId}-${uc.unallocType}`, + eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + publishedCount += batch.length; + this.logger.debug(`Published unallocated contribution batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`); + } catch (error) { + failedCount += batch.length; + this.logger.error(`Failed to publish unallocated contribution batch ${Math.floor(i / batchSize) + 1}`, error); + } + } + + this.logger.log(`Published ${publishedCount} unallocated contribution events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${unallocatedContributions.length} total`, + }; + } } diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index 8e67ce58..89ba5bad 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -13,7 +13,7 @@ import { SyncedReferral } from '../../domain/repositories/synced-data.repository import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service'; import { ContributionRateService } from './contribution-rate.service'; import { BonusClaimService } from './bonus-claim.service'; -import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent } from '../../domain/events'; +import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, UnallocatedContributionSyncedEvent } from '../../domain/events'; /** * 算力计算应用服务 @@ -267,7 +267,7 @@ export class ContributionCalculationService { const effectiveDate = result.personalRecord.effectiveDate; const expireDate = result.personalRecord.expireDate; - // 4. 保存未分配算力 + // 4. 保存未分配算力并发布同步事件 if (result.unallocatedContributions.length > 0) { await this.unallocatedContributionRepository.saveMany( result.unallocatedContributions.map((u) => ({ @@ -278,6 +278,26 @@ export class ContributionCalculationService { expireDate, })), ); + + // 发布未分配算力同步事件(用于 mining-service 同步待解锁算力) + for (const unallocated of result.unallocatedContributions) { + const event = new UnallocatedContributionSyncedEvent( + sourceAdoptionId, + sourceAccountSequence, + unallocated.wouldBeAccountSequence, + unallocated.contributionType, + unallocated.amount.value.toString(), + unallocated.reason, + effectiveDate, + expireDate, + ); + await this.outboxRepository.save({ + aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE, + aggregateId: `${sourceAdoptionId}-${unallocated.contributionType}`, + eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } } // 5. 保存系统账户算力并发布同步事件 diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index 028f7e60..a4d3bb68 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -7,3 +7,4 @@ export * from './adoption-synced.event'; export * from './contribution-record-synced.event'; export * from './network-progress-updated.event'; export * from './system-account-synced.event'; +export * from './unallocated-contribution-synced.event'; diff --git a/backend/services/contribution-service/src/domain/events/unallocated-contribution-synced.event.ts b/backend/services/contribution-service/src/domain/events/unallocated-contribution-synced.event.ts new file mode 100644 index 00000000..31b26bad --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/unallocated-contribution-synced.event.ts @@ -0,0 +1,33 @@ +/** + * 未分配算力同步事件 + * 用于同步待解锁算力到 mining-service + */ +export class UnallocatedContributionSyncedEvent { + static readonly EVENT_TYPE = 'UnallocatedContributionSynced'; + static readonly AGGREGATE_TYPE = 'UnallocatedContribution'; + + constructor( + public readonly sourceAdoptionId: bigint, + public readonly sourceAccountSequence: string, + public readonly wouldBeAccountSequence: string | null, + public readonly contributionType: string, // LEVEL_NO_ANCESTOR, LEVEL_OVERFLOW, BONUS_TIER_1, BONUS_TIER_2, BONUS_TIER_3 + public readonly amount: string, + public readonly reason: string | null, + public readonly effectiveDate: Date, + public readonly expireDate: Date, + ) {} + + toPayload(): Record { + return { + eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE, + sourceAdoptionId: this.sourceAdoptionId.toString(), + sourceAccountSequence: this.sourceAccountSequence, + wouldBeAccountSequence: this.wouldBeAccountSequence, + contributionType: this.contributionType, + amount: this.amount, + reason: this.reason, + effectiveDate: this.effectiveDate.toISOString(), + expireDate: this.expireDate.toISOString(), + }; + } +} diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index 891e27fc..bf3defc7 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -94,6 +94,19 @@ export class ContributionEventHandler implements OnModuleInit { currentContributionPerTree: eventPayload.currentContributionPerTree, nextUnitTreeCount: eventPayload.nextUnitTreeCount, }); + } else if (eventType === 'UnallocatedContributionSynced') { + this.logger.log(`Received UnallocatedContributionSynced: adoption=${eventPayload.sourceAdoptionId}, type=${eventPayload.contributionType}`); + + await this.networkSyncService.handleUnallocatedContributionSynced({ + sourceAdoptionId: eventPayload.sourceAdoptionId, + sourceAccountSequence: eventPayload.sourceAccountSequence, + wouldBeAccountSequence: eventPayload.wouldBeAccountSequence, + contributionType: eventPayload.contributionType, + amount: eventPayload.amount, + reason: eventPayload.reason, + effectiveDate: eventPayload.effectiveDate, + expireDate: eventPayload.expireDate, + }); } } catch (error) { this.logger.error('Failed to handle contribution event', error); diff --git a/backend/services/mining-service/src/application/services/network-sync.service.ts b/backend/services/mining-service/src/application/services/network-sync.service.ts index b9a738c3..895be42c 100644 --- a/backend/services/mining-service/src/application/services/network-sync.service.ts +++ b/backend/services/mining-service/src/application/services/network-sync.service.ts @@ -21,6 +21,17 @@ interface NetworkProgressUpdatedData { nextUnitTreeCount: number; } +interface UnallocatedContributionSyncedData { + sourceAdoptionId: string; + sourceAccountSequence: string; + wouldBeAccountSequence: string | null; + contributionType: string; + amount: string; + reason: string | null; + effectiveDate: string; + expireDate: string; +} + /** * 全网同步服务 * 处理系统账户算力同步和全网理论算力更新 @@ -181,6 +192,54 @@ export class NetworkSyncService { return this.systemAccountRepository.getTotalContribution(); } + /** + * 处理未分配算力同步事件 + * 将待解锁算力同步到 PendingContributionMining 表 + */ + async handleUnallocatedContributionSynced(data: UnallocatedContributionSyncedData): Promise { + try { + const sourceAdoptionId = BigInt(data.sourceAdoptionId); + const effectiveDate = new Date(data.effectiveDate); + const expireDate = new Date(data.expireDate); + + await this.prisma.pendingContributionMining.upsert({ + where: { + sourceAdoptionId_wouldBeAccountSequence_contributionType: { + sourceAdoptionId, + wouldBeAccountSequence: data.wouldBeAccountSequence || '', + contributionType: data.contributionType, + }, + }, + create: { + sourceAdoptionId, + sourceAccountSequence: data.sourceAccountSequence, + wouldBeAccountSequence: data.wouldBeAccountSequence, + contributionType: data.contributionType, + amount: new Decimal(data.amount), + reason: data.reason, + effectiveDate, + expireDate, + isExpired: false, + lastSyncedAt: new Date(), + }, + update: { + amount: new Decimal(data.amount), + reason: data.reason, + effectiveDate, + expireDate, + lastSyncedAt: new Date(), + }, + }); + + this.logger.log( + `Synced unallocated contribution: adoption=${data.sourceAdoptionId}, type=${data.contributionType}, amount=${data.amount}`, + ); + } catch (error) { + this.logger.error(`Failed to sync unallocated contribution: adoption=${data.sourceAdoptionId}`, error); + throw error; + } + } + private mapAccountType(type: string): SystemAccountType | null { switch (type.toUpperCase()) { case 'OPERATION':