/** * 转让所有权事件处理器(纯新增) * 消费 planting.ownership.removed + planting.ownership.added 事件 * 调用 TransferAdjustmentService 执行 88% 算力调整 * * 消费模式:kafkajs 原生消费者(与 CDCConsumerService 同架构) * 幂等性:ProcessedEvent 表(sourceService + eventId) * * 回滚方式:删除此文件并从 application.module.ts 中移除引用 */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { TransferAdjustmentService, OwnershipRemovedEvent, OwnershipAddedEvent, } from '../services/transfer-adjustment.service'; @Injectable() export class TransferOwnershipHandler implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(TransferOwnershipHandler.name); private kafka: Kafka; private removedConsumer: Consumer; private addedConsumer: Consumer; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, private readonly transferAdjustmentService: TransferAdjustmentService, ) { const brokers = this.configService .get('KAFKA_BROKERS', 'localhost:9092') .split(','); this.kafka = new Kafka({ clientId: 'contribution-service-transfer', brokers, }); this.removedConsumer = this.kafka.consumer({ groupId: 'contribution-service-transfer-removed', }); this.addedConsumer = this.kafka.consumer({ groupId: 'contribution-service-transfer-added', }); } async onModuleInit(): Promise { try { // 订阅卖方减持事件 await this.removedConsumer.connect(); await this.removedConsumer.subscribe({ topic: 'planting.ownership.removed', fromBeginning: false, }); await this.removedConsumer.run({ eachMessage: async (payload) => { await this.handleOwnershipRemoved(payload); }, }); // 订阅买方增持事件 await this.addedConsumer.connect(); await this.addedConsumer.subscribe({ topic: 'planting.ownership.added', fromBeginning: false, }); await this.addedConsumer.run({ eachMessage: async (payload) => { await this.handleOwnershipAdded(payload); }, }); this.logger.log( 'Subscribed to planting.ownership.removed + planting.ownership.added', ); } catch (error) { this.logger.error('Failed to initialize transfer ownership consumers', error); } } async onModuleDestroy(): Promise { try { await this.removedConsumer.disconnect(); await this.addedConsumer.disconnect(); } catch (error) { this.logger.error('Failed to disconnect transfer consumers', error); } } /** * 处理卖方减持事件 */ private async handleOwnershipRemoved(payload: EachMessagePayload): Promise { const { message } = payload; if (!message.value) return; let event: OwnershipRemovedEvent; try { const raw = JSON.parse(message.value.toString()); // 支持两种消息格式:直接 payload 或嵌套在 value 字段中 event = raw.value ? raw.value : raw; } catch (error) { this.logger.error('[TRANSFER-REMOVED] Failed to parse message', error); return; } const processedEventId = `removed:${event.transferOrderNo}`; this.logger.log( `[TRANSFER-REMOVED] Processing: ${event.transferOrderNo}, ` + `seller=${event.sellerAccountSequence}, trees=${event.treeCount}`, ); try { // 幂等性检查 const existing = await this.prisma.processedEvent.findFirst({ where: { sourceService: 'planting-service', eventId: processedEventId, }, }); if (existing) { this.logger.log(`[TRANSFER-REMOVED] Already processed: ${processedEventId}`); return; } // 执行卖方算力扣减 await this.transferAdjustmentService.processOwnershipRemoved(event); // 记录已处理 await this.prisma.processedEvent.create({ data: { sourceService: 'planting-service', eventId: processedEventId, eventType: 'PlantingOwnershipRemoved', }, }); this.logger.log( `[TRANSFER-REMOVED] ✓ Processed: ${event.transferOrderNo}, seller=${event.sellerAccountSequence}`, ); } catch (error) { this.logger.error( `[TRANSFER-REMOVED] ✗ Failed for ${event.transferOrderNo}:`, error, ); throw error; // 让 kafkajs 重试 } } /** * 处理买方增持事件 */ private async handleOwnershipAdded(payload: EachMessagePayload): Promise { const { message } = payload; if (!message.value) return; let event: OwnershipAddedEvent; try { const raw = JSON.parse(message.value.toString()); event = raw.value ? raw.value : raw; } catch (error) { this.logger.error('[TRANSFER-ADDED] Failed to parse message', error); return; } const processedEventId = `added:${event.transferOrderNo}`; this.logger.log( `[TRANSFER-ADDED] Processing: ${event.transferOrderNo}, ` + `buyer=${event.buyerAccountSequence}, trees=${event.treeCount}`, ); try { // 幂等性检查 const existing = await this.prisma.processedEvent.findFirst({ where: { sourceService: 'planting-service', eventId: processedEventId, }, }); if (existing) { this.logger.log(`[TRANSFER-ADDED] Already processed: ${processedEventId}`); return; } // 执行买方算力新增 await this.transferAdjustmentService.processOwnershipAdded(event); // 记录已处理 await this.prisma.processedEvent.create({ data: { sourceService: 'planting-service', eventId: processedEventId, eventType: 'PlantingOwnershipAdded', }, }); this.logger.log( `[TRANSFER-ADDED] ✓ Processed: ${event.transferOrderNo}, buyer=${event.buyerAccountSequence}`, ); } catch (error) { this.logger.error( `[TRANSFER-ADDED] ✗ Failed for ${event.transferOrderNo}:`, error, ); throw error; // 让 kafkajs 重试 } } }