rwadurian/backend/services/planting-service/src/infrastructure/kafka/event-ack.consumer.ts

100 lines
3.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { OutboxRepository } from '../persistence/repositories/outbox.repository';
/**
* 事件确认消息结构
*/
interface EventAckMessage {
/** 原始事件的 aggregateId如 orderNo */
eventId: string;
/** 原始事件类型 */
eventType: string;
/** 消费服务名称 */
consumerService: string;
/** 处理结果 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** 确认时间 */
confirmedAt: string;
}
/**
* 事件确认消费者
*
* B方案核心组件监听消费方的确认事件
* 当消费方reward-service, referral-service, authorization-service
* 成功处理事件后,会发送确认消息到 planting.events.ack topic
*
* 工作流程:
* 1. planting-service 发送事件到 Kafka标记为 SENT
* 2. 消费方处理事件成功后,发送确认到 planting.events.ack
* 3. 本消费者收到确认,将事件标记为 CONFIRMED
* 4. 超时未收到确认的事件会被重发
*/
@Injectable()
export class EventAckConsumer implements OnModuleInit {
private readonly logger = new Logger(EventAckConsumer.name);
constructor(
@Inject('KAFKA_SERVICE')
private readonly kafkaClient: ClientKafka,
private readonly outboxRepository: OutboxRepository,
) {}
async onModuleInit() {
// 订阅确认 topic
this.kafkaClient.subscribeToResponseOf('planting.events.ack');
this.logger.log('[ACK-CONSUMER] Subscribed to planting.events.ack topic');
// 连接后开始消费
try {
await this.kafkaClient.connect();
this.startConsuming();
} catch (error) {
this.logger.error('[ACK-CONSUMER] Failed to connect to Kafka:', error);
}
}
private startConsuming() {
// 使用 ClientKafka 的 emit 来订阅消息
// 注意NestJS 的 ClientKafka 主要用于生产者,消费者通常使用 @MessagePattern
// 这里我们使用轮询方式或者通过 Controller 来处理
this.logger.log('[ACK-CONSUMER] Event acknowledgment consumer started');
}
/**
* 处理确认消息
* 此方法由 Kafka Controller 调用
*/
async handleAckMessage(message: EventAckMessage): Promise<void> {
this.logger.debug(`[ACK-CONSUMER] Received ack: ${JSON.stringify(message)}`);
try {
if (message.success) {
// 标记事件为已确认
const confirmed = await this.outboxRepository.markAsConfirmed(message.eventId);
if (confirmed) {
this.logger.log(
`[ACK-CONSUMER] ✓ Event ${message.eventId} confirmed by ${message.consumerService}`,
);
}
} else {
// 消费方处理失败,记录错误但不改变状态
// 超时后会自动重发
this.logger.warn(
`[ACK-CONSUMER] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`,
);
}
} catch (error) {
this.logger.error(
`[ACK-CONSUMER] Error processing ack for event ${message.eventId}:`,
error,
);
}
}
}