rwadurian/backend/services/identity-service/src/infrastructure/kafka/event-publisher.service.ts

101 lines
3.1 KiB
TypeScript

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';
import { DomainEvent } from '@/domain/events';
// 定义 Kafka 消息接口
export interface DomainEventMessage {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
aggregateType: string;
payload: any;
}
// 定义主题常量
export const IDENTITY_TOPICS = {
USER_ACCOUNT_CREATED: 'identity.UserAccountCreated',
USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated',
DEVICE_ADDED: 'identity.DeviceAdded',
DEVICE_REMOVED: 'identity.DeviceRemoved',
PHONE_BOUND: 'identity.PhoneBound',
WALLET_BOUND: 'identity.WalletBound',
MULTIPLE_WALLETS_BOUND: 'identity.MultipleWalletsBound',
KYC_SUBMITTED: 'identity.KYCSubmitted',
KYC_VERIFIED: 'identity.KYCVerified',
KYC_REJECTED: 'identity.KYCRejected',
KYC_APPROVED: 'identity.KYCApproved',
USER_LOCATION_UPDATED: 'identity.UserLocationUpdated',
USER_ACCOUNT_FROZEN: 'identity.UserAccountFrozen',
ACCOUNT_FROZEN: 'identity.AccountFrozen',
USER_ACCOUNT_DEACTIVATED: 'identity.UserAccountDeactivated',
} as const;
@Injectable()
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
private kafka: Kafka;
private producer: Producer;
constructor(private readonly configService: ConfigService) {
this.kafka = new Kafka({
clientId: this.configService.get<string>('KAFKA_CLIENT_ID', 'identity-service'),
brokers: (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(','),
logLevel: logLevel.WARN,
});
this.producer = this.kafka.producer();
}
async onModuleInit() {
await this.producer.connect();
}
async onModuleDestroy() {
await this.producer.disconnect();
}
async publish(event: DomainEvent): Promise<void>;
async publish(topic: string, message: DomainEventMessage): Promise<void>;
async publish(eventOrTopic: DomainEvent | string, message?: DomainEventMessage): Promise<void> {
if (typeof eventOrTopic === 'string') {
// 直接发布到指定 topic (用于重试场景)
const topic = eventOrTopic;
const msg = message!;
await this.producer.send({
topic,
messages: [
{
key: msg.eventId,
value: JSON.stringify(msg),
},
],
});
} else {
// 从领域事件发布
const event = eventOrTopic;
await this.producer.send({
topic: `identity.${event.eventType}`,
messages: [
{
key: event.eventId,
value: JSON.stringify({
eventId: event.eventId,
eventType: event.eventType,
occurredAt: event.occurredAt.toISOString(),
aggregateId: (event as any).aggregateId || '',
aggregateType: (event as any).aggregateType || 'UserAccount',
payload: (event as any).payload,
}),
},
],
});
}
}
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
}