diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index 6d0cbb9b..e5e4529c 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -1,43 +1,80 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { EventPattern, Payload } from '@nestjs/microservices'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; import { ContributionSyncService } from '../services/contribution-sync.service'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; @Injectable() -export class ContributionEventHandler { +export class ContributionEventHandler implements OnModuleInit { private readonly logger = new Logger(ContributionEventHandler.name); + private consumer: Consumer; - constructor(private readonly syncService: ContributionSyncService) {} + constructor( + private readonly syncService: ContributionSyncService, + private readonly configService: ConfigService, + ) {} + + async onModuleInit() { + const kafkaBrokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092'); + const topic = this.configService.get('CDC_TOPIC_CONTRIBUTION_OUTBOX', 'cdc.contribution.outbox'); + + const kafka = new Kafka({ + clientId: 'mining-service', + brokers: kafkaBrokers.split(','), + }); + + this.consumer = kafka.consumer({ groupId: 'mining-service-contribution-sync' }); - @EventPattern('contribution.ContributionCalculated') - async handleContributionCalculated(@Payload() message: any): Promise { try { - const { payload } = message.value || message; - this.logger.debug(`Received ContributionCalculated event for ${payload.accountSequence}`); + await this.consumer.connect(); + await this.consumer.subscribe({ topic, fromBeginning: false }); - await this.syncService.handleContributionCalculated({ - accountSequence: payload.accountSequence, - personalContribution: payload.personalContribution, - calculatedAt: payload.calculatedAt, + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, }); + + this.logger.log(`Subscribed to ${topic} for contribution sync`); } catch (error) { - this.logger.error('Failed to handle ContributionCalculated event', error); + this.logger.error('Failed to connect to Kafka for contribution sync', error); } } - @EventPattern('contribution.DailySnapshotCreated') - async handleDailySnapshotCreated(@Payload() message: any): Promise { + private async handleMessage(payload: EachMessagePayload): Promise { try { - const { payload } = message.value || message; - this.logger.log(`Received DailySnapshotCreated event for ${payload.snapshotDate}`); + const { message } = payload; + if (!message.value) return; - await this.syncService.handleDailySnapshotCreated({ - snapshotId: payload.snapshotId, - snapshotDate: payload.snapshotDate, - totalContribution: payload.totalContribution, - activeAccounts: payload.activeAccounts, - }); + const event = JSON.parse(message.value.toString()); + + // CDC 消息格式:{ after: { event_type, payload, ... } } + const data = event.after || event; + const eventType = data.event_type || data.eventType; + const eventPayload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload; + + if (!eventPayload) return; + + if (eventType === 'ContributionAccountUpdated') { + this.logger.debug(`Received ContributionAccountUpdated for ${eventPayload.accountSequence}`); + + // 使用 effectiveContribution 作为挖矿算力 + await this.syncService.handleContributionCalculated({ + accountSequence: eventPayload.accountSequence, + personalContribution: eventPayload.effectiveContribution || eventPayload.totalContribution || '0', + calculatedAt: eventPayload.createdAt || new Date().toISOString(), + }); + } else if (eventType === 'DailySnapshotCreated') { + this.logger.log(`Received DailySnapshotCreated for ${eventPayload.snapshotDate}`); + + await this.syncService.handleDailySnapshotCreated({ + snapshotId: eventPayload.snapshotId, + snapshotDate: eventPayload.snapshotDate, + totalContribution: eventPayload.totalContribution, + activeAccounts: eventPayload.activeAccounts, + }); + } } catch (error) { - this.logger.error('Failed to handle DailySnapshotCreated event', error); + this.logger.error('Failed to handle contribution event', error); } } }