import { Injectable, Logger, Inject } from '@nestjs/common'; import { ClientKafka } from '@nestjs/microservices'; /** * 事件确认消息结构 */ interface EventAckMessage { /** 原始事件的 aggregateId */ eventId: string; /** 原始事件类型 */ eventType: string; /** 消费服务名称 */ consumerService: string; /** 处理结果 */ success: boolean; /** 错误信息(如果失败) */ error?: string; /** 确认时间 */ confirmedAt: string; } /** * 事件确认发布器 * * B方案核心组件:消费方处理事件后发送确认 * 发送确认消息到 reward.events.ack topic */ @Injectable() export class EventAckPublisher { private readonly logger = new Logger(EventAckPublisher.name); private readonly serviceName = 'wallet-service'; constructor( @Inject('KAFKA_SERVICE') private readonly kafkaClient: ClientKafka, ) {} /** * 发送处理成功确认 */ async sendSuccess(eventId: string, eventType: string): Promise { const ackMessage: EventAckMessage = { eventId, eventType, consumerService: this.serviceName, success: true, confirmedAt: new Date().toISOString(), }; try { this.kafkaClient.emit('reward.events.ack', { key: eventId, value: JSON.stringify(ackMessage), }); this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); } catch (error) { this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); } } /** * 发送处理失败确认 */ async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { const ackMessage: EventAckMessage = { eventId, eventType, consumerService: this.serviceName, success: false, error: errorMessage, confirmedAt: new Date().toISOString(), }; try { this.kafkaClient.emit('reward.events.ack', { key: eventId, value: JSON.stringify(ackMessage), }); this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); } catch (error) { this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); } } }