feat(contribution/mining): sync unallocated contributions to mining-service

- Add UnallocatedContributionSyncedEvent in contribution-service
- Add event handler in mining-service's contribution-event.handler.ts
- Add handleUnallocatedContributionSynced in network-sync.service.ts
- Add admin endpoint to publish all unallocated contributions
- Sync pending/unallocated contributions to PendingContributionMining table

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-16 05:29:28 -08:00
parent 130bf57842
commit 0241930011
6 changed files with 203 additions and 2 deletions

View File

@ -11,6 +11,7 @@ import {
ContributionRecordSyncedEvent,
NetworkProgressUpdatedEvent,
SystemAccountSyncedEvent,
UnallocatedContributionSyncedEvent,
} from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ -487,4 +488,78 @@ export class AdminController {
total: systemAccounts.length,
};
}
@Post('unallocated-contributions/publish-all')
@Public()
@ApiOperation({ summary: '发布所有未分配算力事件到 outbox用于同步到 mining-service' })
async publishAllUnallocatedContributions(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const unallocatedContributions = await this.prisma.unallocatedContribution.findMany({
where: { status: 'PENDING' },
select: {
id: true,
sourceAdoptionId: true,
sourceAccountSequence: true,
wouldBeAccountSequence: true,
unallocType: true,
amount: true,
reason: true,
effectiveDate: true,
expireDate: true,
},
});
let publishedCount = 0;
let failedCount = 0;
const batchSize = 100;
for (let i = 0; i < unallocatedContributions.length; i += batchSize) {
const batch = unallocatedContributions.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((uc) => {
const event = new UnallocatedContributionSyncedEvent(
uc.sourceAdoptionId,
uc.sourceAccountSequence,
uc.wouldBeAccountSequence,
uc.unallocType,
uc.amount.toString(),
uc.reason,
uc.effectiveDate,
uc.expireDate,
);
return {
aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE,
aggregateId: `${uc.sourceAdoptionId}-${uc.unallocType}`,
eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published unallocated contribution batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish unallocated contribution batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} unallocated contribution events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${unallocatedContributions.length} total`,
};
}
}

View File

@ -13,7 +13,7 @@ import { SyncedReferral } from '../../domain/repositories/synced-data.repository
import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service';
import { ContributionRateService } from './contribution-rate.service';
import { BonusClaimService } from './bonus-claim.service';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent } from '../../domain/events';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, UnallocatedContributionSyncedEvent } from '../../domain/events';
/**
*
@ -267,7 +267,7 @@ export class ContributionCalculationService {
const effectiveDate = result.personalRecord.effectiveDate;
const expireDate = result.personalRecord.expireDate;
// 4. 保存未分配算力
// 4. 保存未分配算力并发布同步事件
if (result.unallocatedContributions.length > 0) {
await this.unallocatedContributionRepository.saveMany(
result.unallocatedContributions.map((u) => ({
@ -278,6 +278,26 @@ export class ContributionCalculationService {
expireDate,
})),
);
// 发布未分配算力同步事件(用于 mining-service 同步待解锁算力)
for (const unallocated of result.unallocatedContributions) {
const event = new UnallocatedContributionSyncedEvent(
sourceAdoptionId,
sourceAccountSequence,
unallocated.wouldBeAccountSequence,
unallocated.contributionType,
unallocated.amount.value.toString(),
unallocated.reason,
effectiveDate,
expireDate,
);
await this.outboxRepository.save({
aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE,
aggregateId: `${sourceAdoptionId}-${unallocated.contributionType}`,
eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
}
}
// 5. 保存系统账户算力并发布同步事件

View File

@ -7,3 +7,4 @@ export * from './adoption-synced.event';
export * from './contribution-record-synced.event';
export * from './network-progress-updated.event';
export * from './system-account-synced.event';
export * from './unallocated-contribution-synced.event';

View File

@ -0,0 +1,33 @@
/**
*
* mining-service
*/
export class UnallocatedContributionSyncedEvent {
static readonly EVENT_TYPE = 'UnallocatedContributionSynced';
static readonly AGGREGATE_TYPE = 'UnallocatedContribution';
constructor(
public readonly sourceAdoptionId: bigint,
public readonly sourceAccountSequence: string,
public readonly wouldBeAccountSequence: string | null,
public readonly contributionType: string, // LEVEL_NO_ANCESTOR, LEVEL_OVERFLOW, BONUS_TIER_1, BONUS_TIER_2, BONUS_TIER_3
public readonly amount: string,
public readonly reason: string | null,
public readonly effectiveDate: Date,
public readonly expireDate: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE,
sourceAdoptionId: this.sourceAdoptionId.toString(),
sourceAccountSequence: this.sourceAccountSequence,
wouldBeAccountSequence: this.wouldBeAccountSequence,
contributionType: this.contributionType,
amount: this.amount,
reason: this.reason,
effectiveDate: this.effectiveDate.toISOString(),
expireDate: this.expireDate.toISOString(),
};
}
}

View File

@ -94,6 +94,19 @@ export class ContributionEventHandler implements OnModuleInit {
currentContributionPerTree: eventPayload.currentContributionPerTree,
nextUnitTreeCount: eventPayload.nextUnitTreeCount,
});
} else if (eventType === 'UnallocatedContributionSynced') {
this.logger.log(`Received UnallocatedContributionSynced: adoption=${eventPayload.sourceAdoptionId}, type=${eventPayload.contributionType}`);
await this.networkSyncService.handleUnallocatedContributionSynced({
sourceAdoptionId: eventPayload.sourceAdoptionId,
sourceAccountSequence: eventPayload.sourceAccountSequence,
wouldBeAccountSequence: eventPayload.wouldBeAccountSequence,
contributionType: eventPayload.contributionType,
amount: eventPayload.amount,
reason: eventPayload.reason,
effectiveDate: eventPayload.effectiveDate,
expireDate: eventPayload.expireDate,
});
}
} catch (error) {
this.logger.error('Failed to handle contribution event', error);

View File

@ -21,6 +21,17 @@ interface NetworkProgressUpdatedData {
nextUnitTreeCount: number;
}
interface UnallocatedContributionSyncedData {
sourceAdoptionId: string;
sourceAccountSequence: string;
wouldBeAccountSequence: string | null;
contributionType: string;
amount: string;
reason: string | null;
effectiveDate: string;
expireDate: string;
}
/**
*
*
@ -181,6 +192,54 @@ export class NetworkSyncService {
return this.systemAccountRepository.getTotalContribution();
}
/**
*
* PendingContributionMining
*/
async handleUnallocatedContributionSynced(data: UnallocatedContributionSyncedData): Promise<void> {
try {
const sourceAdoptionId = BigInt(data.sourceAdoptionId);
const effectiveDate = new Date(data.effectiveDate);
const expireDate = new Date(data.expireDate);
await this.prisma.pendingContributionMining.upsert({
where: {
sourceAdoptionId_wouldBeAccountSequence_contributionType: {
sourceAdoptionId,
wouldBeAccountSequence: data.wouldBeAccountSequence || '',
contributionType: data.contributionType,
},
},
create: {
sourceAdoptionId,
sourceAccountSequence: data.sourceAccountSequence,
wouldBeAccountSequence: data.wouldBeAccountSequence,
contributionType: data.contributionType,
amount: new Decimal(data.amount),
reason: data.reason,
effectiveDate,
expireDate,
isExpired: false,
lastSyncedAt: new Date(),
},
update: {
amount: new Decimal(data.amount),
reason: data.reason,
effectiveDate,
expireDate,
lastSyncedAt: new Date(),
},
});
this.logger.log(
`Synced unallocated contribution: adoption=${data.sourceAdoptionId}, type=${data.contributionType}, amount=${data.amount}`,
);
} catch (error) {
this.logger.error(`Failed to sync unallocated contribution: adoption=${data.sourceAdoptionId}`, error);
throw error;
}
}
private mapAccountType(type: string): SystemAccountType | null {
switch (type.toUpperCase()) {
case 'OPERATION':