import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { KafkaService } from '../../infrastructure'; import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher'; import { TeamStatisticsService } from '../services'; import { UpdateTeamStatisticsCommand } from '../commands'; interface PlantingCreatedEvent { eventName: string; data: { userId: string; treeCount: number; provinceCode: string; cityCode: string; }; _outbox?: { id: string; aggregateId: string; eventType: string; }; } /** * 认种创建事件处理器 * 监听 planting-service 发出的认种事件 */ @Injectable() export class PlantingCreatedHandler implements OnModuleInit { private readonly logger = new Logger(PlantingCreatedHandler.name); constructor( private readonly kafkaService: KafkaService, private readonly teamStatisticsService: TeamStatisticsService, private readonly eventAckPublisher: EventAckPublisher, ) {} async onModuleInit() { await this.kafkaService.subscribe( 'referral-service-planting-created', ['planting.planting.created'], this.handleMessage.bind(this), ); this.logger.log('Subscribed to planting.created events'); } private async handleMessage(topic: string, message: Record): Promise { const event = message as unknown as PlantingCreatedEvent; if (event.eventName !== 'planting.created') { return; } // B方案:提取 outbox 信息用于发送确认 const outboxInfo = event._outbox; const eventId = outboxInfo?.aggregateId || 'unknown'; try { const command = new UpdateTeamStatisticsCommand( BigInt(event.data.userId), event.data.treeCount, event.data.provinceCode, event.data.cityCode, ); await this.teamStatisticsService.handlePlantingEvent(command); this.logger.log( `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, ); // B方案:发送处理成功确认 if (outboxInfo) { await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); } } catch (error) { this.logger.error( `Failed to update team statistics for user ${event.data.userId}:`, error, ); // B方案:发送处理失败确认 if (outboxInfo) { const errorMessage = error instanceof Error ? error.message : String(error); await this.eventAckPublisher.sendFailure(eventId, outboxInfo.eventType, errorMessage); } } } }