rwadurian/backend/services/admin-service/src/infrastructure/kafka/user-event-consumer.service.ts

417 lines
12 KiB
TypeScript

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '../persistence/prisma/prisma.service';
import { IUserQueryRepository, USER_QUERY_REPOSITORY } from '../../domain/repositories/user-query.repository';
import { Inject } from '@nestjs/common';
/**
* 用户事件 Payload 类型定义
*/
interface UserAccountCreatedPayload {
userId: string;
accountSequence: string;
referralCode: string;
phoneNumber?: string;
initialDeviceId: string;
inviterSequence: string | null;
registeredAt: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface UserAccountAutoCreatedPayload {
userId: string;
accountSequence: string;
referralCode: string;
initialDeviceId: string;
inviterSequence: string | null;
registeredAt: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface UserProfileUpdatedPayload {
userId: string;
accountSequence: string;
nickname: string | null;
avatarUrl: string | null;
updatedAt: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface KYCVerifiedPayload {
userId: string;
verifiedAt: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface KYCRejectedPayload {
userId: string;
reason: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface UserAccountFrozenPayload {
userId: string;
reason: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
interface UserAccountDeactivatedPayload {
userId: string;
deactivatedAt: string;
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
/**
* 用户事件消费者服务
*
* 消费 identity-service 发布的用户相关事件,同步更新本地 UserQueryView
*/
@Injectable()
export class UserEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(UserEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isRunning = false;
// 配置
private readonly topics: string[];
private readonly consumerGroup: string;
private readonly ackTopic: string;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
@Inject(USER_QUERY_REPOSITORY)
private readonly userQueryRepository: IUserQueryRepository,
) {
const brokers = (this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092')).split(',');
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'admin-service');
this.consumerGroup = this.configService.get<string>('KAFKA_CONSUMER_GROUP', 'admin-service-user-sync');
this.ackTopic = 'identity.events.ack';
// 订阅的主题 (与 identity-service 的 IDENTITY_TOPICS 保持一致)
this.topics = [
'identity.UserAccountCreated',
'identity.UserAccountAutoCreated',
'identity.PhoneBound',
'identity.KYCSubmitted',
'identity.KYCVerified',
'identity.KYCRejected',
'identity.UserAccountFrozen',
'identity.UserAccountDeactivated',
];
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
});
this.consumer = this.kafka.consumer({ groupId: this.consumerGroup });
this.logger.log(`[UserEventConsumer] Configured with topics: ${this.topics.join(', ')}`);
}
async onModuleInit() {
await this.start();
}
async onModuleDestroy() {
await this.stop();
}
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('[UserEventConsumer] Already running');
return;
}
try {
this.logger.log('[UserEventConsumer] Connecting to Kafka...');
await this.consumer.connect();
for (const topic of this.topics) {
await this.consumer.subscribe({ topic, fromBeginning: false });
this.logger.log(`[UserEventConsumer] Subscribed to topic: ${topic}`);
}
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('[UserEventConsumer] Started successfully');
} catch (error) {
this.logger.error('[UserEventConsumer] Failed to start:', error);
}
}
async stop(): Promise<void> {
if (!this.isRunning) return;
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('[UserEventConsumer] Stopped');
} catch (error) {
this.logger.error('[UserEventConsumer] Failed to stop:', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
if (!message.value) {
this.logger.warn(`[UserEventConsumer] Empty message from ${topic}:${partition}`);
return;
}
try {
const eventData = JSON.parse(message.value.toString());
const eventType = eventData._outbox?.eventType || eventData.eventType;
const eventId = eventData._outbox?.id || message.key?.toString();
this.logger.debug(`[UserEventConsumer] Received event: ${eventType} (${eventId})`);
// 幂等性检查
if (eventId && await this.isEventProcessed(eventId)) {
this.logger.debug(`[UserEventConsumer] Event ${eventId} already processed, skipping`);
return;
}
// 处理事件 (eventData.payload 包含实际的业务数据)
await this.processEvent(eventType, eventData.payload);
// 记录已处理事件
if (eventId) {
await this.markEventProcessed(eventId, eventType);
}
// 发送确认消息 (B方案)
if (eventData._outbox?.id) {
await this.sendAck(eventData._outbox.id, eventType);
}
this.logger.log(`[UserEventConsumer] ✓ Processed event: ${eventType}`);
} catch (error) {
this.logger.error(`[UserEventConsumer] Failed to process message:`, error);
// 不抛出错误,避免阻塞消费
}
}
private async processEvent(eventType: string, payload: unknown): Promise<void> {
switch (eventType) {
case 'UserAccountCreated':
await this.handleUserAccountCreated(payload as UserAccountCreatedPayload);
break;
case 'UserAccountAutoCreated':
await this.handleUserAccountAutoCreated(payload as UserAccountAutoCreatedPayload);
break;
case 'UserProfileUpdated':
await this.handleUserProfileUpdated(payload as UserProfileUpdatedPayload);
break;
case 'KYCVerified':
await this.handleKYCVerified(payload as KYCVerifiedPayload);
break;
case 'KYCRejected':
await this.handleKYCRejected(payload as KYCRejectedPayload);
break;
case 'UserAccountFrozen':
await this.handleUserAccountFrozen(payload as UserAccountFrozenPayload);
break;
case 'UserAccountDeactivated':
await this.handleUserAccountDeactivated(payload as UserAccountDeactivatedPayload);
break;
default:
this.logger.debug(`[UserEventConsumer] Unknown event type: ${eventType}, skipping`);
}
}
// ==================== Event Handlers ====================
private async handleUserAccountCreated(payload: UserAccountCreatedPayload): Promise<void> {
const phoneNumberMasked = payload.phoneNumber
? this.maskPhoneNumber(payload.phoneNumber)
: null;
await this.userQueryRepository.upsert({
userId: BigInt(payload.userId),
accountSequence: payload.accountSequence,
phoneNumberMasked,
inviterSequence: payload.inviterSequence,
registeredAt: new Date(payload.registeredAt),
});
this.logger.log(`[UserEventConsumer] Created user: ${payload.accountSequence}`);
}
private async handleUserAccountAutoCreated(payload: UserAccountAutoCreatedPayload): Promise<void> {
await this.userQueryRepository.upsert({
userId: BigInt(payload.userId),
accountSequence: payload.accountSequence,
inviterSequence: payload.inviterSequence,
registeredAt: new Date(payload.registeredAt),
});
this.logger.log(`[UserEventConsumer] Auto-created user: ${payload.accountSequence}`);
}
private async handleUserProfileUpdated(payload: UserProfileUpdatedPayload): Promise<void> {
const userId = BigInt(payload.userId);
// 检查用户是否存在
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
this.logger.warn(`[UserEventConsumer] User ${userId} not found, skipping profile update`);
return;
}
await this.userQueryRepository.updateProfile(userId, {
nickname: payload.nickname,
avatarUrl: payload.avatarUrl,
});
this.logger.log(`[UserEventConsumer] Updated profile for user: ${payload.accountSequence}`);
}
private async handleKYCVerified(payload: KYCVerifiedPayload): Promise<void> {
const userId = BigInt(payload.userId);
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
this.logger.warn(`[UserEventConsumer] User ${userId} not found, skipping KYC update`);
return;
}
await this.userQueryRepository.updateKycStatus(userId, 'VERIFIED');
this.logger.log(`[UserEventConsumer] KYC verified for user: ${userId}`);
}
private async handleKYCRejected(payload: KYCRejectedPayload): Promise<void> {
const userId = BigInt(payload.userId);
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
this.logger.warn(`[UserEventConsumer] User ${userId} not found, skipping KYC update`);
return;
}
await this.userQueryRepository.updateKycStatus(userId, 'REJECTED');
this.logger.log(`[UserEventConsumer] KYC rejected for user: ${userId}`);
}
private async handleUserAccountFrozen(payload: UserAccountFrozenPayload): Promise<void> {
const userId = BigInt(payload.userId);
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
this.logger.warn(`[UserEventConsumer] User ${userId} not found, skipping status update`);
return;
}
await this.userQueryRepository.updateStatus(userId, 'FROZEN');
this.logger.log(`[UserEventConsumer] User frozen: ${userId}`);
}
private async handleUserAccountDeactivated(payload: UserAccountDeactivatedPayload): Promise<void> {
const userId = BigInt(payload.userId);
const exists = await this.userQueryRepository.exists(userId);
if (!exists) {
this.logger.warn(`[UserEventConsumer] User ${userId} not found, skipping status update`);
return;
}
await this.userQueryRepository.updateStatus(userId, 'DEACTIVATED');
this.logger.log(`[UserEventConsumer] User deactivated: ${userId}`);
}
// ==================== Helper Methods ====================
private maskPhoneNumber(phone: string): string {
if (phone.length < 7) return phone;
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
private async isEventProcessed(eventId: string): Promise<boolean> {
const count = await this.prisma.processedEvent.count({
where: { eventId },
});
return count > 0;
}
private async markEventProcessed(eventId: string, eventType: string): Promise<void> {
await this.prisma.processedEvent.create({
data: {
eventId,
eventType,
processedAt: new Date(),
},
});
}
private async sendAck(outboxId: string, eventType: string): Promise<void> {
try {
const producer = this.kafka.producer();
await producer.connect();
await producer.send({
topic: this.ackTopic,
messages: [
{
key: outboxId,
value: JSON.stringify({
outboxId,
eventType,
consumerId: this.consumerGroup,
confirmedAt: new Date().toISOString(),
}),
},
],
});
await producer.disconnect();
this.logger.debug(`[UserEventConsumer] Sent ACK for outbox event ${outboxId}`);
} catch (error) {
this.logger.error(`[UserEventConsumer] Failed to send ACK:`, error);
}
}
}