From 1c787a22a393664decaa7d3fbc6403d9fb77e359 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 20 Jan 2026 03:03:17 -0800 Subject: [PATCH] =?UTF-8?q?fix(mining):=20=E4=BF=AE=E5=A4=8D=20mining-serv?= =?UTF-8?q?ice=20=E8=AE=A2=E9=98=85=E9=94=99=E8=AF=AF=E7=9A=84=20Kafka=20t?= =?UTF-8?q?opic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:mining-service 订阅的是 cdc.contribution.outbox (Debezium CDC topic), 但 contribution-service 使用 Outbox Pattern 直接发送到 contribution.{eventType} topic。 修复: - mining-service 订阅正确的 topic 列表 - 修复消息解析逻辑支持 Outbox Pattern 消息格式 - contribution-service 添加 GET /admin/unallocated-contributions 端点(调试用) Co-Authored-By: Claude Opus 4.5 --- .../src/api/controllers/admin.controller.ts | 45 ++++++++++++++++ .../contribution-event.handler.ts | 52 ++++++++++++++++--- .../services/network-sync.service.ts | 1 + 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index bdffc3d2..f93abb73 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -489,6 +489,51 @@ export class AdminController { }; } + @Get('unallocated-contributions') + @Public() + @ApiOperation({ summary: '获取所有未分配算力列表,供 mining-service 定时同步' }) + async getUnallocatedContributions(): Promise<{ + contributions: Array<{ + sourceAdoptionId: string; + sourceAccountSequence: string; + wouldBeAccountSequence: string | null; + contributionType: string; + amount: string; + reason: string | null; + effectiveDate: string; + expireDate: string; + }>; + total: number; + }> { + const unallocatedContributions = await this.prisma.unallocatedContribution.findMany({ + where: { status: 'PENDING' }, + select: { + sourceAdoptionId: true, + sourceAccountSequence: true, + wouldBeAccountSequence: true, + unallocType: true, + amount: true, + reason: true, + effectiveDate: true, + expireDate: true, + }, + }); + + return { + contributions: unallocatedContributions.map((uc) => ({ + sourceAdoptionId: uc.sourceAdoptionId.toString(), + sourceAccountSequence: uc.sourceAccountSequence, + wouldBeAccountSequence: uc.wouldBeAccountSequence, + contributionType: uc.unallocType, + amount: uc.amount.toString(), + reason: uc.reason, + effectiveDate: uc.effectiveDate.toISOString(), + expireDate: uc.expireDate.toISOString(), + })), + total: unallocatedContributions.length, + }; + } + @Post('unallocated-contributions/publish-all') @Public() @ApiOperation({ summary: '发布所有未分配算力事件到 outbox,用于同步到 mining-service' }) diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index bf3defc7..198ba8f3 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -17,7 +17,16 @@ export class ContributionEventHandler implements OnModuleInit { async onModuleInit() { const kafkaBrokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092'); - const topic = this.configService.get('CDC_TOPIC_CONTRIBUTION_OUTBOX', 'cdc.contribution.outbox'); + + // contribution-service 使用 Outbox Pattern 直接发送 Kafka 消息到这些 topic + // topic 名称格式: contribution.{eventType.toLowerCase()} + const topics = [ + 'contribution.contributionaccountupdated', + 'contribution.dailysnapshotcreated', + 'contribution.systemaccountsynced', + 'contribution.networkprogressupdated', + 'contribution.unallocatedcontributionsynced', + ]; const kafka = new Kafka({ clientId: 'mining-service', @@ -28,7 +37,11 @@ export class ContributionEventHandler implements OnModuleInit { try { await this.consumer.connect(); - await this.consumer.subscribe({ topic, fromBeginning: false }); + + // 订阅多个 topic + for (const topic of topics) { + await this.consumer.subscribe({ topic, fromBeginning: false }); + } await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { @@ -36,7 +49,7 @@ export class ContributionEventHandler implements OnModuleInit { }, }); - this.logger.log(`Subscribed to ${topic} for contribution sync`); + this.logger.log(`Subscribed to ${topics.length} topics for contribution sync: ${topics.join(', ')}`); } catch (error) { this.logger.error('Failed to connect to Kafka for contribution sync', error); } @@ -44,15 +57,15 @@ export class ContributionEventHandler implements OnModuleInit { private async handleMessage(payload: EachMessagePayload): Promise { try { - const { message } = payload; + const { message, topic } = payload; if (!message.value) return; const event = JSON.parse(message.value.toString()); - // CDC 消息格式:{ after: { event_type, payload, ... } } - const data = event.after || event; - const eventType = data.event_type || data.eventType; - const eventPayload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload; + // Outbox Pattern 直接发送的消息格式:payload 本身就是事件数据 + // 可以通过 topic 或 eventType 字段判断事件类型 + const eventPayload = event.eventType ? event : (event.payload || event); + const eventType = eventPayload.eventType || this.extractEventTypeFromTopic(topic); if (!eventPayload) return; @@ -112,4 +125,27 @@ export class ContributionEventHandler implements OnModuleInit { this.logger.error('Failed to handle contribution event', error); } } + + /** + * 从 topic 名称提取事件类型 + * 例如: contribution.unallocatedcontributionsynced -> UnallocatedContributionSynced + */ + private extractEventTypeFromTopic(topic: string): string { + // topic 格式: contribution.{eventtype} + const parts = topic.split('.'); + if (parts.length < 2) return ''; + + const rawType = parts[1]; // e.g., unallocatedcontributionsynced + + // 已知的事件类型映射 + const typeMap: Record = { + contributionaccountupdated: 'ContributionAccountUpdated', + dailysnapshotcreated: 'DailySnapshotCreated', + systemaccountsynced: 'SystemAccountSynced', + networkprogressupdated: 'NetworkProgressUpdated', + unallocatedcontributionsynced: 'UnallocatedContributionSynced', + }; + + return typeMap[rawType] || rawType; + } } diff --git a/backend/services/mining-service/src/application/services/network-sync.service.ts b/backend/services/mining-service/src/application/services/network-sync.service.ts index 756c772a..9cbe3cba 100644 --- a/backend/services/mining-service/src/application/services/network-sync.service.ts +++ b/backend/services/mining-service/src/application/services/network-sync.service.ts @@ -168,6 +168,7 @@ export class NetworkSyncService { }); // 3. 获取最新的 MiningConfig 来返回结果 + // 注:未分配算力通过 CDC (Kafka) 实时同步,不需要定时拉取 const config = await this.prisma.miningConfig.findFirst(); return {