import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Producer, Consumer, logLevel } from 'kafkajs'; import { DomainEvent } from '@/domain/events'; // 定义 Kafka 消息接口 export interface DomainEventMessage { eventId: string; eventType: string; occurredAt: string; aggregateId: string; aggregateType: string; payload: any; } // 定义主题常量 export const IDENTITY_TOPICS = { USER_ACCOUNT_CREATED: 'identity.UserAccountCreated', USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated', DEVICE_ADDED: 'identity.DeviceAdded', DEVICE_REMOVED: 'identity.DeviceRemoved', PHONE_BOUND: 'identity.PhoneBound', WALLET_BOUND: 'identity.WalletBound', MULTIPLE_WALLETS_BOUND: 'identity.MultipleWalletsBound', KYC_SUBMITTED: 'identity.KYCSubmitted', KYC_VERIFIED: 'identity.KYCVerified', KYC_REJECTED: 'identity.KYCRejected', KYC_APPROVED: 'identity.KYCApproved', USER_LOCATION_UPDATED: 'identity.UserLocationUpdated', USER_ACCOUNT_FROZEN: 'identity.UserAccountFrozen', ACCOUNT_FROZEN: 'identity.AccountFrozen', USER_ACCOUNT_DEACTIVATED: 'identity.UserAccountDeactivated', } as const; @Injectable() export class EventPublisherService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private producer: Producer; constructor(private readonly configService: ConfigService) { this.kafka = new Kafka({ clientId: this.configService.get('KAFKA_CLIENT_ID', 'identity-service'), brokers: (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','), logLevel: logLevel.WARN, }); this.producer = this.kafka.producer(); } async onModuleInit() { await this.producer.connect(); } async onModuleDestroy() { await this.producer.disconnect(); } async publish(event: DomainEvent): Promise; async publish(topic: string, message: DomainEventMessage): Promise; async publish(eventOrTopic: DomainEvent | string, message?: DomainEventMessage): Promise { if (typeof eventOrTopic === 'string') { // 直接发布到指定 topic (用于重试场景) const topic = eventOrTopic; const msg = message!; await this.producer.send({ topic, messages: [ { key: msg.eventId, value: JSON.stringify(msg), }, ], }); } else { // 从领域事件发布 const event = eventOrTopic; await this.producer.send({ topic: `identity.${event.eventType}`, messages: [ { key: event.eventId, value: JSON.stringify({ eventId: event.eventId, eventType: event.eventType, occurredAt: event.occurredAt.toISOString(), aggregateId: (event as any).aggregateId || '', aggregateType: (event as any).aggregateType || 'UserAccount', payload: (event as any).payload, }), }, ], }); } } async publishAll(events: DomainEvent[]): Promise { for (const event of events) { await this.publish(event); } } }