rwadurian/backend/services/referral-service/src/infrastructure/messaging/kafka.service.ts

112 lines
3.4 KiB
TypeScript

import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';
export interface KafkaMessage {
topic: string;
key?: string;
value: Record<string, unknown>;
headers?: Record<string, string>;
}
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(KafkaService.name);
private kafka: Kafka;
private producer: Producer;
private consumers: Map<string, Consumer> = new Map();
constructor(private readonly configService: ConfigService) {
this.kafka = new Kafka({
clientId: 'referral-service',
brokers: this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(','),
logLevel: logLevel.WARN,
});
this.producer = this.kafka.producer();
}
async onModuleInit() {
this.logger.log('Connecting to Kafka...');
await this.producer.connect();
this.logger.log('Kafka producer connected');
}
async onModuleDestroy() {
this.logger.log('Disconnecting from Kafka...');
await this.producer.disconnect();
for (const [groupId, consumer] of this.consumers) {
await consumer.disconnect();
this.logger.log(`Consumer ${groupId} disconnected`);
}
this.logger.log('Kafka disconnected');
}
async publish(message: KafkaMessage): Promise<void> {
await this.producer.send({
topic: message.topic,
messages: [
{
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers,
},
],
});
this.logger.debug(`Published message to ${message.topic}`);
}
async publishBatch(messages: KafkaMessage[]): Promise<void> {
const topicMessages = new Map<string, Array<{ key?: string; value: string; headers?: Record<string, string> }>>();
for (const msg of messages) {
if (!topicMessages.has(msg.topic)) {
topicMessages.set(msg.topic, []);
}
topicMessages.get(msg.topic)!.push({
key: msg.key,
value: JSON.stringify(msg.value),
headers: msg.headers,
});
}
await this.producer.sendBatch({
topicMessages: Array.from(topicMessages.entries()).map(([topic, messages]) => ({
topic,
messages,
})),
});
this.logger.debug(`Published ${messages.length} messages in batch`);
}
async subscribe(
groupId: string,
topics: string[],
handler: (topic: string, message: Record<string, unknown>) => Promise<void>,
): Promise<void> {
const consumer = this.kafka.consumer({ groupId });
await consumer.connect();
for (const topic of topics) {
await consumer.subscribe({ topic, fromBeginning: false });
}
await consumer.run({
eachMessage: async ({ topic, message }) => {
try {
const value = message.value ? JSON.parse(message.value.toString()) : null;
if (value) {
await handler(topic, value);
}
} catch (error) {
this.logger.error(`Error processing message from ${topic}:`, error);
throw error; // 让 KafkaJS 知道处理失败,触发重试
}
},
});
this.consumers.set(groupId, consumer);
this.logger.log(`Consumer ${groupId} subscribed to topics: ${topics.join(', ')}`);
}
}