rwadurian/backend/services/auth-service/src/infrastructure/kafka/kafka-producer.service.ts

54 lines
1.4 KiB
TypeScript

import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
export interface KafkaMessage {
key?: string;
value: any;
headers?: Record<string, string>;
}
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private readonly logger = new Logger(KafkaProducerService.name);
constructor(
@Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka,
) {}
async onModuleInit() {
await this.kafkaClient.connect();
this.logger.log('Kafka producer connected');
}
async emit(topic: string, message: KafkaMessage): Promise<void> {
try {
await lastValueFrom(
this.kafkaClient.emit(topic, {
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers,
}),
);
this.logger.debug(`Message emitted to topic ${topic}`);
} catch (error) {
this.logger.error(`Failed to emit message to topic ${topic}`, error);
throw error;
}
}
async emitBatch(topic: string, messages: KafkaMessage[]): Promise<void> {
try {
for (const message of messages) {
await this.emit(topic, message);
}
} catch (error) {
this.logger.error(
`Failed to emit batch messages to topic ${topic}`,
error,
);
throw error;
}
}
}