import { Controller, Logger, OnModuleInit } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; import { RedisService } from '../../redis/redis.service'; import { TradingAccountRepository } from '../../persistence/repositories/trading-account.repository'; import { OutboxRepository } from '../../persistence/repositories/outbox.repository'; import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; import { TradingAccountAggregate } from '../../../domain/aggregates/trading-account.aggregate'; import { TradingEventTypes, TradingTopics, TradingAccountCreatedPayload, } from '../../../domain/events/trading.events'; // 用户注册事件结构(来自 auth-service) interface UserRegisteredEvent { eventId: string; eventType: string; payload: { accountSequence: string; phone: string; source: 'V1' | 'V2'; registeredAt: string; }; } // 4小时 TTL(秒) const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; @Controller() export class UserRegisteredConsumer implements OnModuleInit { private readonly logger = new Logger(UserRegisteredConsumer.name); constructor( private readonly redis: RedisService, private readonly tradingAccountRepository: TradingAccountRepository, private readonly outboxRepository: OutboxRepository, private readonly processedEventRepository: ProcessedEventRepository, ) {} async onModuleInit() { this.logger.log('UserRegisteredConsumer initialized - listening for user.registered events'); } @EventPattern('auth.user.registered') async handleUserRegistered(@Payload() message: any): Promise { // 解析消息格式 const event: UserRegisteredEvent = message.value || message; const eventId = event.eventId || message.eventId; if (!eventId) { this.logger.warn('Received event without eventId, skipping'); return; } const accountSequence = event.payload?.accountSequence; if (!accountSequence) { this.logger.warn(`Event ${eventId} missing accountSequence, skipping`); return; } this.logger.debug( `Processing user registered event: ${eventId}, accountSequence: ${accountSequence}`, ); // 幂等性检查 if (await this.isEventProcessed(eventId)) { this.logger.debug(`Event ${eventId} already processed, skipping`); return; } try { // 检查账户是否已存在 const existingAccount = await this.tradingAccountRepository.findByAccountSequence( accountSequence, ); if (existingAccount) { this.logger.debug(`Trading account ${accountSequence} already exists`); await this.markEventProcessed(eventId); return; } // 创建交易账户 const account = TradingAccountAggregate.create(accountSequence); const accountId = await this.tradingAccountRepository.save(account); // 发布交易账户创建事件 await this.publishAccountCreatedEvent(accountId, accountSequence); // 标记为已处理 await this.markEventProcessed(eventId); this.logger.log( `Trading account created for user ${accountSequence}, source: ${event.payload.source}`, ); } catch (error) { // 如果是重复创建的唯一约束错误,忽略 if (error instanceof Error && error.message.includes('Unique constraint')) { this.logger.debug( `Trading account already exists for ${accountSequence}, marking as processed`, ); await this.markEventProcessed(eventId); return; } this.logger.error( `Failed to create trading account for ${accountSequence}`, error instanceof Error ? error.stack : error, ); throw error; // 让 Kafka 重试 } } /** * 幂等性检查 - Redis + DB 双重检查 * 1. 先检查 Redis 缓存(快速路径) * 2. Redis 未命中则检查数据库(持久化保障) */ private async isEventProcessed(eventId: string): Promise { const redisKey = `trading:processed-event:${eventId}`; // 1. 先检查 Redis 缓存(快速路径) const cached = await this.redis.get(redisKey); if (cached) return true; // 2. 检查数据库(Redis 可能过期或重启后丢失) const dbRecord = await this.processedEventRepository.findByEventId(eventId); if (dbRecord) { // 回填 Redis 缓存 await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); return true; } return false; } /** * 标记事件为已处理 - Redis + DB 双写 */ private async markEventProcessed(eventId: string, eventType: string = 'user.registered'): Promise { const redisKey = `trading:processed-event:${eventId}`; // 1. 写入数据库(持久化) try { await this.processedEventRepository.create({ eventId, eventType, sourceService: 'auth-service', }); } catch (error) { // 可能已存在(并发情况),忽略唯一约束错误 if (!(error instanceof Error && error.message.includes('Unique constraint'))) { throw error; } } // 2. 写入 Redis 缓存(4小时 TTL) await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); } /** * 发布交易账户创建事件 */ private async publishAccountCreatedEvent( accountId: string, accountSequence: string, ): Promise { try { const payload: TradingAccountCreatedPayload = { accountId, accountSequence, createdAt: new Date().toISOString(), }; await this.outboxRepository.create({ aggregateType: 'TradingAccount', aggregateId: accountId, eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED, payload, topic: TradingTopics.ACCOUNTS, key: accountSequence, }); this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`); } catch (error) { this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`); } } }