fix(mining): 修复 mining-service 订阅错误的 Kafka topic

问题:mining-service 订阅的是 cdc.contribution.outbox (Debezium CDC topic),
但 contribution-service 使用 Outbox Pattern 直接发送到 contribution.{eventType} topic。

修复:
- mining-service 订阅正确的 topic 列表
- 修复消息解析逻辑支持 Outbox Pattern 消息格式
- contribution-service 添加 GET /admin/unallocated-contributions 端点(调试用)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-20 03:03:17 -08:00
parent 0fddd3164a
commit 1c787a22a3
3 changed files with 90 additions and 8 deletions

View File

@ -489,6 +489,51 @@ export class AdminController {
};
}
@Get('unallocated-contributions')
@Public()
@ApiOperation({ summary: '获取所有未分配算力列表,供 mining-service 定时同步' })
async getUnallocatedContributions(): Promise<{
contributions: Array<{
sourceAdoptionId: string;
sourceAccountSequence: string;
wouldBeAccountSequence: string | null;
contributionType: string;
amount: string;
reason: string | null;
effectiveDate: string;
expireDate: string;
}>;
total: number;
}> {
const unallocatedContributions = await this.prisma.unallocatedContribution.findMany({
where: { status: 'PENDING' },
select: {
sourceAdoptionId: true,
sourceAccountSequence: true,
wouldBeAccountSequence: true,
unallocType: true,
amount: true,
reason: true,
effectiveDate: true,
expireDate: true,
},
});
return {
contributions: unallocatedContributions.map((uc) => ({
sourceAdoptionId: uc.sourceAdoptionId.toString(),
sourceAccountSequence: uc.sourceAccountSequence,
wouldBeAccountSequence: uc.wouldBeAccountSequence,
contributionType: uc.unallocType,
amount: uc.amount.toString(),
reason: uc.reason,
effectiveDate: uc.effectiveDate.toISOString(),
expireDate: uc.expireDate.toISOString(),
})),
total: unallocatedContributions.length,
};
}
@Post('unallocated-contributions/publish-all')
@Public()
@ApiOperation({ summary: '发布所有未分配算力事件到 outbox用于同步到 mining-service' })

View File

@ -17,7 +17,16 @@ export class ContributionEventHandler implements OnModuleInit {
async onModuleInit() {
const kafkaBrokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092');
const topic = this.configService.get<string>('CDC_TOPIC_CONTRIBUTION_OUTBOX', 'cdc.contribution.outbox');
// contribution-service 使用 Outbox Pattern 直接发送 Kafka 消息到这些 topic
// topic 名称格式: contribution.{eventType.toLowerCase()}
const topics = [
'contribution.contributionaccountupdated',
'contribution.dailysnapshotcreated',
'contribution.systemaccountsynced',
'contribution.networkprogressupdated',
'contribution.unallocatedcontributionsynced',
];
const kafka = new Kafka({
clientId: 'mining-service',
@ -28,7 +37,11 @@ export class ContributionEventHandler implements OnModuleInit {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic, fromBeginning: false });
// 订阅多个 topic
for (const topic of topics) {
await this.consumer.subscribe({ topic, fromBeginning: false });
}
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
@ -36,7 +49,7 @@ export class ContributionEventHandler implements OnModuleInit {
},
});
this.logger.log(`Subscribed to ${topic} for contribution sync`);
this.logger.log(`Subscribed to ${topics.length} topics for contribution sync: ${topics.join(', ')}`);
} catch (error) {
this.logger.error('Failed to connect to Kafka for contribution sync', error);
}
@ -44,15 +57,15 @@ export class ContributionEventHandler implements OnModuleInit {
private async handleMessage(payload: EachMessagePayload): Promise<void> {
try {
const { message } = payload;
const { message, topic } = payload;
if (!message.value) return;
const event = JSON.parse(message.value.toString());
// CDC 消息格式:{ after: { event_type, payload, ... } }
const data = event.after || event;
const eventType = data.event_type || data.eventType;
const eventPayload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload;
// Outbox Pattern 直接发送的消息格式payload 本身就是事件数据
// 可以通过 topic 或 eventType 字段判断事件类型
const eventPayload = event.eventType ? event : (event.payload || event);
const eventType = eventPayload.eventType || this.extractEventTypeFromTopic(topic);
if (!eventPayload) return;
@ -112,4 +125,27 @@ export class ContributionEventHandler implements OnModuleInit {
this.logger.error('Failed to handle contribution event', error);
}
}
/**
* topic
* 例如: contribution.unallocatedcontributionsynced -> UnallocatedContributionSynced
*/
private extractEventTypeFromTopic(topic: string): string {
// topic 格式: contribution.{eventtype}
const parts = topic.split('.');
if (parts.length < 2) return '';
const rawType = parts[1]; // e.g., unallocatedcontributionsynced
// 已知的事件类型映射
const typeMap: Record<string, string> = {
contributionaccountupdated: 'ContributionAccountUpdated',
dailysnapshotcreated: 'DailySnapshotCreated',
systemaccountsynced: 'SystemAccountSynced',
networkprogressupdated: 'NetworkProgressUpdated',
unallocatedcontributionsynced: 'UnallocatedContributionSynced',
};
return typeMap[rawType] || rawType;
}
}

View File

@ -168,6 +168,7 @@ export class NetworkSyncService {
});
// 3. 获取最新的 MiningConfig 来返回结果
// 注:未分配算力通过 CDC (Kafka) 实时同步,不需要定时拉取
const config = await this.prisma.miningConfig.findFirst();
return {