45 lines
1.3 KiB
TypeScript
45 lines
1.3 KiB
TypeScript
import { Injectable, Logger } from '@nestjs/common';
|
|
import { KafkaService } from './kafka.service';
|
|
import { DomainEvent } from '../../domain';
|
|
|
|
export const KAFKA_TOPICS = {
|
|
REFERRAL_EVENTS: 'referral.events',
|
|
TEAM_STATISTICS_EVENTS: 'referral.team-statistics.events',
|
|
} as const;
|
|
|
|
@Injectable()
|
|
export class EventPublisherService {
|
|
private readonly logger = new Logger(EventPublisherService.name);
|
|
|
|
constructor(private readonly kafkaService: KafkaService) {}
|
|
|
|
async publishDomainEvents(events: DomainEvent[]): Promise<void> {
|
|
if (events.length === 0) return;
|
|
|
|
const messages = events.map((event) => ({
|
|
topic: this.getTopicForEvent(event),
|
|
key: event.eventId,
|
|
value: event.toPayload(),
|
|
}));
|
|
|
|
await this.kafkaService.publishBatch(messages);
|
|
this.logger.log(`Published ${events.length} domain events`);
|
|
}
|
|
|
|
async publishEvent(event: DomainEvent): Promise<void> {
|
|
await this.kafkaService.publish({
|
|
topic: this.getTopicForEvent(event),
|
|
key: event.eventId,
|
|
value: event.toPayload(),
|
|
});
|
|
this.logger.debug(`Published event: ${event.eventName}`);
|
|
}
|
|
|
|
private getTopicForEvent(event: DomainEvent): string {
|
|
if (event.eventName.includes('team_statistics')) {
|
|
return KAFKA_TOPICS.TEAM_STATISTICS_EVENTS;
|
|
}
|
|
return KAFKA_TOPICS.REFERRAL_EVENTS;
|
|
}
|
|
}
|