fix(mining-service): listen to correct CDC topic for contribution sync

Changed event handler to:
- Listen to 'cdc.contribution.outbox' topic (CDC/Debezium format)
- Handle 'ContributionAccountUpdated' events instead of 'ContributionCalculated'
- Use effectiveContribution for mining power calculation

This fixes the issue where mining accounts had zero totalContribution
because they weren't receiving contribution sync events.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-14 05:30:38 -08:00
parent e6d966e89f
commit ce95c40c84
1 changed files with 61 additions and 24 deletions

View File

@ -1,43 +1,80 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices'; import { ConfigService } from '@nestjs/config';
import { ContributionSyncService } from '../services/contribution-sync.service'; import { ContributionSyncService } from '../services/contribution-sync.service';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
@Injectable() @Injectable()
export class ContributionEventHandler { export class ContributionEventHandler implements OnModuleInit {
private readonly logger = new Logger(ContributionEventHandler.name); private readonly logger = new Logger(ContributionEventHandler.name);
private consumer: Consumer;
constructor(private readonly syncService: ContributionSyncService) {} constructor(
private readonly syncService: ContributionSyncService,
private readonly configService: ConfigService,
) {}
async onModuleInit() {
const kafkaBrokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092');
const topic = this.configService.get<string>('CDC_TOPIC_CONTRIBUTION_OUTBOX', 'cdc.contribution.outbox');
const kafka = new Kafka({
clientId: 'mining-service',
brokers: kafkaBrokers.split(','),
});
this.consumer = kafka.consumer({ groupId: 'mining-service-contribution-sync' });
@EventPattern('contribution.ContributionCalculated')
async handleContributionCalculated(@Payload() message: any): Promise<void> {
try { try {
const { payload } = message.value || message; await this.consumer.connect();
this.logger.debug(`Received ContributionCalculated event for ${payload.accountSequence}`); await this.consumer.subscribe({ topic, fromBeginning: false });
await this.syncService.handleContributionCalculated({ await this.consumer.run({
accountSequence: payload.accountSequence, eachMessage: async (payload: EachMessagePayload) => {
personalContribution: payload.personalContribution, await this.handleMessage(payload);
calculatedAt: payload.calculatedAt, },
}); });
this.logger.log(`Subscribed to ${topic} for contribution sync`);
} catch (error) { } catch (error) {
this.logger.error('Failed to handle ContributionCalculated event', error); this.logger.error('Failed to connect to Kafka for contribution sync', error);
} }
} }
@EventPattern('contribution.DailySnapshotCreated') private async handleMessage(payload: EachMessagePayload): Promise<void> {
async handleDailySnapshotCreated(@Payload() message: any): Promise<void> {
try { try {
const { payload } = message.value || message; const { message } = payload;
this.logger.log(`Received DailySnapshotCreated event for ${payload.snapshotDate}`); if (!message.value) return;
await this.syncService.handleDailySnapshotCreated({ const event = JSON.parse(message.value.toString());
snapshotId: payload.snapshotId,
snapshotDate: payload.snapshotDate, // CDC 消息格式:{ after: { event_type, payload, ... } }
totalContribution: payload.totalContribution, const data = event.after || event;
activeAccounts: payload.activeAccounts, const eventType = data.event_type || data.eventType;
}); const eventPayload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload;
if (!eventPayload) return;
if (eventType === 'ContributionAccountUpdated') {
this.logger.debug(`Received ContributionAccountUpdated for ${eventPayload.accountSequence}`);
// 使用 effectiveContribution 作为挖矿算力
await this.syncService.handleContributionCalculated({
accountSequence: eventPayload.accountSequence,
personalContribution: eventPayload.effectiveContribution || eventPayload.totalContribution || '0',
calculatedAt: eventPayload.createdAt || new Date().toISOString(),
});
} else if (eventType === 'DailySnapshotCreated') {
this.logger.log(`Received DailySnapshotCreated for ${eventPayload.snapshotDate}`);
await this.syncService.handleDailySnapshotCreated({
snapshotId: eventPayload.snapshotId,
snapshotDate: eventPayload.snapshotDate,
totalContribution: eventPayload.totalContribution,
activeAccounts: eventPayload.activeAccounts,
});
}
} catch (error) { } catch (error) {
this.logger.error('Failed to handle DailySnapshotCreated event', error); this.logger.error('Failed to handle contribution event', error);
} }
} }
} }