diff --git a/backend/services/auth-service/src/application/application.module.ts b/backend/services/auth-service/src/application/application.module.ts index 348328a3..9a7a1c48 100644 --- a/backend/services/auth-service/src/application/application.module.ts +++ b/backend/services/auth-service/src/application/application.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { JwtModule } from '@nestjs/jwt'; import { ConfigModule, ConfigService } from '@nestjs/config'; +import { ScheduleModule } from '@nestjs/schedule'; import { AuthService, PasswordService, @@ -9,11 +10,13 @@ import { UserService, OutboxService, } from './services'; +import { OutboxScheduler } from './schedulers'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; @Module({ imports: [ InfrastructureModule, + ScheduleModule.forRoot(), JwtModule.registerAsync({ imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ @@ -32,6 +35,7 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; KycService, UserService, OutboxService, + OutboxScheduler, ], exports: [ AuthService, diff --git a/backend/services/auth-service/src/application/schedulers/index.ts b/backend/services/auth-service/src/application/schedulers/index.ts new file mode 100644 index 00000000..140724aa --- /dev/null +++ b/backend/services/auth-service/src/application/schedulers/index.ts @@ -0,0 +1 @@ +export * from './outbox.scheduler'; diff --git a/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts new file mode 100644 index 00000000..88d84702 --- /dev/null +++ b/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts @@ -0,0 +1,176 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { KafkaProducerService } from '@/infrastructure/kafka/kafka-producer.service'; +import { RedisService } from '@/infrastructure/redis/redis.service'; +import { OutboxStatus } from '@prisma/client'; + +/** + * Outbox 事件发布调度器 + * 定期将 Outbox 中的事件发布到 Kafka + */ +@Injectable() +export class OutboxScheduler implements OnModuleInit { + private readonly logger = new Logger(OutboxScheduler.name); + private readonly LOCK_KEY = 'auth:outbox:scheduler:lock'; + + constructor( + private readonly prisma: PrismaService, + private readonly kafkaProducer: KafkaProducerService, + private readonly redis: RedisService, + ) {} + + onModuleInit() { + this.logger.log('Outbox scheduler initialized'); + } + + /** + * 每30秒发布 Outbox 中的待处理事件 + */ + @Cron('*/30 * * * * *') + async publishOutboxEvents(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:publish`, 25); + if (!lockValue) { + return; // 其他实例正在处理 + } + + try { + // 获取待处理的事件 + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }], + }, + orderBy: { createdAt: 'asc' }, + take: 100, + }); + + if (events.length === 0) { + return; + } + + let successCount = 0; + let failCount = 0; + + for (const event of events) { + try { + await this.kafkaProducer.emit(event.topic, { + key: event.key, + value: { + eventId: event.id.toString(), + aggregateType: event.aggregateType, + aggregateId: event.aggregateId, + eventType: event.eventType, + payload: event.payload, + createdAt: event.createdAt.toISOString(), + }, + }); + + // 标记为已发布 + await this.prisma.outboxEvent.update({ + where: { id: event.id }, + data: { + status: OutboxStatus.PUBLISHED, + publishedAt: new Date(), + }, + }); + + successCount++; + } catch (error) { + failCount++; + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error(`Failed to publish event ${event.id}`, error); + + // 更新重试信息 + const newRetryCount = event.retryCount + 1; + const shouldFail = newRetryCount >= event.maxRetries; + + await this.prisma.outboxEvent.update({ + where: { id: event.id }, + data: { + retryCount: newRetryCount, + lastError: errorMessage, + status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING, + // 指数退避: 30s, 60s, 120s + nextRetryAt: shouldFail + ? null + : new Date(Date.now() + 30000 * Math.pow(2, newRetryCount - 1)), + }, + }); + } + } + + if (successCount > 0 || failCount > 0) { + this.logger.log( + `Published ${successCount} outbox events, ${failCount} failed`, + ); + } + } catch (error) { + this.logger.error('Failed to process outbox events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue); + } + } + + /** + * 每天凌晨3点清理已发布的事件(保留7天) + */ + @Cron('0 3 * * *') + async cleanupPublishedEvents(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:cleanup`, 300); + if (!lockValue) { + return; + } + + try { + const sevenDaysAgo = new Date(); + sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.PUBLISHED, + publishedAt: { lt: sevenDaysAgo }, + }, + }); + + if (result.count > 0) { + this.logger.log(`Cleaned up ${result.count} published outbox events`); + } + } catch (error) { + this.logger.error('Failed to cleanup published events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); + } + } + + /** + * 每小时检查并重置长时间失败的事件 + */ + @Cron('0 * * * *') + async resetStaleFailedEvents(): Promise { + try { + const oneDayAgo = new Date(); + oneDayAgo.setDate(oneDayAgo.getDate() - 1); + + // 将超过1天的失败事件重置为待处理,给予重试机会 + const result = await this.prisma.outboxEvent.updateMany({ + where: { + status: OutboxStatus.FAILED, + createdAt: { gt: oneDayAgo }, + }, + data: { + status: OutboxStatus.PENDING, + retryCount: 0, + nextRetryAt: new Date(), + }, + }); + + if (result.count > 0) { + this.logger.log(`Reset ${result.count} stale failed events for retry`); + } + } catch (error) { + this.logger.error('Failed to reset stale failed events', error); + } + } +} diff --git a/backend/services/auth-service/src/infrastructure/infrastructure.module.ts b/backend/services/auth-service/src/infrastructure/infrastructure.module.ts index 82743d6e..5749985c 100644 --- a/backend/services/auth-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/auth-service/src/infrastructure/infrastructure.module.ts @@ -1,5 +1,5 @@ -import { Module } from '@nestjs/common'; -import { ConfigModule } from '@nestjs/config'; +import { Module, Global } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; import { PrismaModule } from './persistence/prisma/prisma.module'; import { PrismaService } from './persistence/prisma/prisma.service'; import { @@ -9,6 +9,8 @@ import { PrismaSmsVerificationRepository, } from './persistence/repositories'; import { LegacyUserCdcConsumer } from './messaging/cdc'; +import { KafkaModule, KafkaProducerService } from './kafka'; +import { RedisService } from './redis'; import { USER_REPOSITORY, SYNCED_LEGACY_USER_REPOSITORY, @@ -16,12 +18,29 @@ import { SMS_VERIFICATION_REPOSITORY, } from '@/domain'; +@Global() @Module({ - imports: [ConfigModule, PrismaModule], + imports: [ConfigModule, PrismaModule, KafkaModule], providers: [ // CDC LegacyUserCdcConsumer, + // Kafka Producer + KafkaProducerService, + + // Redis + { + provide: 'REDIS_OPTIONS', + useFactory: (configService: ConfigService) => ({ + host: configService.get('REDIS_HOST', 'localhost'), + port: configService.get('REDIS_PORT', 6379), + password: configService.get('REDIS_PASSWORD'), + db: configService.get('REDIS_DB', 14), + }), + inject: [ConfigService], + }, + RedisService, + // Repositories { provide: USER_REPOSITORY, @@ -42,6 +61,8 @@ import { ], exports: [ PrismaService, + KafkaProducerService, + RedisService, USER_REPOSITORY, SYNCED_LEGACY_USER_REPOSITORY, REFRESH_TOKEN_REPOSITORY, diff --git a/backend/services/auth-service/src/infrastructure/kafka/index.ts b/backend/services/auth-service/src/infrastructure/kafka/index.ts new file mode 100644 index 00000000..38722f03 --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/kafka/index.ts @@ -0,0 +1,2 @@ +export * from './kafka.module'; +export * from './kafka-producer.service'; diff --git a/backend/services/auth-service/src/infrastructure/kafka/kafka-producer.service.ts b/backend/services/auth-service/src/infrastructure/kafka/kafka-producer.service.ts new file mode 100644 index 00000000..e64a8421 --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/kafka/kafka-producer.service.ts @@ -0,0 +1,53 @@ +import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +export interface KafkaMessage { + key?: string; + value: any; + headers?: Record; +} + +@Injectable() +export class KafkaProducerService implements OnModuleInit { + private readonly logger = new Logger(KafkaProducerService.name); + + constructor( + @Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka, + ) {} + + async onModuleInit() { + await this.kafkaClient.connect(); + this.logger.log('Kafka producer connected'); + } + + async emit(topic: string, message: KafkaMessage): Promise { + try { + await lastValueFrom( + this.kafkaClient.emit(topic, { + key: message.key, + value: JSON.stringify(message.value), + headers: message.headers, + }), + ); + this.logger.debug(`Message emitted to topic ${topic}`); + } catch (error) { + this.logger.error(`Failed to emit message to topic ${topic}`, error); + throw error; + } + } + + async emitBatch(topic: string, messages: KafkaMessage[]): Promise { + try { + for (const message of messages) { + await this.emit(topic, message); + } + } catch (error) { + this.logger.error( + `Failed to emit batch messages to topic ${topic}`, + error, + ); + throw error; + } + } +} diff --git a/backend/services/auth-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/auth-service/src/infrastructure/kafka/kafka.module.ts new file mode 100644 index 00000000..00abc00e --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/kafka/kafka.module.ts @@ -0,0 +1,31 @@ +import { Module } from '@nestjs/common'; +import { ClientsModule, Transport } from '@nestjs/microservices'; +import { ConfigModule, ConfigService } from '@nestjs/config'; + +@Module({ + imports: [ + ClientsModule.registerAsync([ + { + name: 'KAFKA_CLIENT', + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'auth-service', + brokers: configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','), + }, + producer: { + allowAutoTopicCreation: true, + }, + }, + }), + inject: [ConfigService], + }, + ]), + ], + exports: [ClientsModule], +}) +export class KafkaModule {} diff --git a/backend/services/auth-service/src/infrastructure/redis/index.ts b/backend/services/auth-service/src/infrastructure/redis/index.ts new file mode 100644 index 00000000..1ee25d38 --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/redis/index.ts @@ -0,0 +1 @@ +export * from './redis.service'; diff --git a/backend/services/auth-service/src/infrastructure/redis/redis.service.ts b/backend/services/auth-service/src/infrastructure/redis/redis.service.ts new file mode 100644 index 00000000..cfd54c54 --- /dev/null +++ b/backend/services/auth-service/src/infrastructure/redis/redis.service.ts @@ -0,0 +1,107 @@ +import { + Injectable, + Inject, + OnModuleInit, + OnModuleDestroy, + Logger, +} from '@nestjs/common'; +import Redis from 'ioredis'; + +interface RedisOptions { + host: string; + port: number; + password?: string; + db?: number; +} + +@Injectable() +export class RedisService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(RedisService.name); + private client: Redis; + + constructor( + @Inject('REDIS_OPTIONS') private readonly options: RedisOptions, + ) {} + + async onModuleInit() { + this.client = new Redis({ + host: this.options.host, + port: this.options.port, + password: this.options.password, + db: this.options.db ?? 14, // auth-service uses DB 14 + retryStrategy: (times) => Math.min(times * 50, 2000), + }); + + this.client.on('error', (err) => this.logger.error('Redis error', err)); + this.client.on('connect', () => this.logger.log('Connected to Redis')); + } + + async onModuleDestroy() { + await this.client.quit(); + } + + getClient(): Redis { + return this.client; + } + + async get(key: string): Promise { + return this.client.get(key); + } + + async set(key: string, value: string, ttlSeconds?: number): Promise { + if (ttlSeconds) { + await this.client.setex(key, ttlSeconds, value); + } else { + await this.client.set(key, value); + } + } + + async getJson(key: string): Promise { + const value = await this.get(key); + if (!value) return null; + try { + return JSON.parse(value) as T; + } catch { + return null; + } + } + + async setJson(key: string, value: T, ttlSeconds?: number): Promise { + await this.set(key, JSON.stringify(value), ttlSeconds); + } + + async acquireLock( + lockKey: string, + ttlSeconds: number = 30, + ): Promise { + const lockValue = `${Date.now()}-${Math.random().toString(36).substring(7)}`; + const result = await this.client.set( + lockKey, + lockValue, + 'EX', + ttlSeconds, + 'NX', + ); + return result === 'OK' ? lockValue : null; + } + + async releaseLock(lockKey: string, lockValue: string): Promise { + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + `; + const result = await this.client.eval(script, 1, lockKey, lockValue); + return result === 1; + } + + async incr(key: string): Promise { + return this.client.incr(key); + } + + async del(key: string): Promise { + return this.client.del(key); + } +} diff --git a/backend/services/mining-service/src/application/application.module.ts b/backend/services/mining-service/src/application/application.module.ts index 7a231852..f185fb78 100644 --- a/backend/services/mining-service/src/application/application.module.ts +++ b/backend/services/mining-service/src/application/application.module.ts @@ -16,6 +16,7 @@ import { ContributionEventHandler } from './event-handlers/contribution-event.ha // Schedulers import { MiningScheduler } from './schedulers/mining.scheduler'; +import { OutboxScheduler } from './schedulers/outbox.scheduler'; @Module({ imports: [ScheduleModule.forRoot(), InfrastructureModule], @@ -34,6 +35,7 @@ import { MiningScheduler } from './schedulers/mining.scheduler'; // Schedulers MiningScheduler, + OutboxScheduler, ], exports: [ MiningDistributionService, diff --git a/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts new file mode 100644 index 00000000..68e67501 --- /dev/null +++ b/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts @@ -0,0 +1,106 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service'; +import { RedisService } from '../../infrastructure/redis/redis.service'; + +/** + * Outbox 事件发布调度器 + * 定期将 Outbox 中的事件发布到 Kafka + */ +@Injectable() +export class OutboxScheduler implements OnModuleInit { + private readonly logger = new Logger(OutboxScheduler.name); + private readonly LOCK_KEY = 'mining:outbox:scheduler:lock'; + + constructor( + private readonly outboxRepository: OutboxRepository, + private readonly kafkaProducer: KafkaProducerService, + private readonly redis: RedisService, + ) {} + + onModuleInit() { + this.logger.log('Outbox scheduler initialized'); + } + + /** + * 每30秒发布 Outbox 中的待处理事件 + */ + @Cron('*/30 * * * * *') + async publishOutboxEvents(): Promise { + const lockValue = await this.redis.acquireLock( + `${this.LOCK_KEY}:publish`, + 25, + ); + if (!lockValue) { + return; // 其他实例正在处理 + } + + try { + const events = await this.outboxRepository.findUnprocessed(100); + + if (events.length === 0) { + return; + } + + const processedIds: string[] = []; + + for (const event of events) { + try { + await this.kafkaProducer.emit(`mining.${event.eventType}`, { + key: event.aggregateId, + value: { + eventId: event.id, + aggregateType: event.aggregateType, + aggregateId: event.aggregateId, + eventType: event.eventType, + payload: event.payload, + createdAt: event.createdAt.toISOString(), + }, + }); + processedIds.push(event.id); + } catch (error) { + this.logger.error(`Failed to publish event ${event.id}`, error); + // 继续处理下一个事件 + } + } + + if (processedIds.length > 0) { + await this.outboxRepository.markAsProcessed(processedIds); + this.logger.debug(`Published ${processedIds.length} outbox events`); + } + } catch (error) { + this.logger.error('Failed to process outbox events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue); + } + } + + /** + * 每天凌晨3点清理已处理的事件(保留7天) + */ + @Cron('0 3 * * *') + async cleanupProcessedEvents(): Promise { + const lockValue = await this.redis.acquireLock( + `${this.LOCK_KEY}:cleanup`, + 300, + ); + if (!lockValue) { + return; + } + + try { + const sevenDaysAgo = new Date(); + sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); + + const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo); + if (deleted > 0) { + this.logger.log(`Cleaned up ${deleted} processed outbox events`); + } + } catch (error) { + this.logger.error('Failed to cleanup processed events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); + } + } +} diff --git a/backend/services/mining-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-service/src/infrastructure/infrastructure.module.ts index 7ca682a3..27ea546e 100644 --- a/backend/services/mining-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-service/src/infrastructure/infrastructure.module.ts @@ -6,7 +6,9 @@ import { MiningAccountRepository } from './persistence/repositories/mining-accou import { MiningConfigRepository } from './persistence/repositories/mining-config.repository'; import { BlackHoleRepository } from './persistence/repositories/black-hole.repository'; import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository'; +import { OutboxRepository } from './persistence/repositories/outbox.repository'; import { RedisService } from './redis/redis.service'; +import { KafkaProducerService } from './kafka/kafka-producer.service'; @Global() @Module({ @@ -21,7 +23,9 @@ import { RedisService } from './redis/redis.service'; options: { client: { clientId: 'mining-service', - brokers: configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + brokers: configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','), }, producer: { allowAutoTopicCreation: true, @@ -37,13 +41,15 @@ import { RedisService } from './redis/redis.service'; MiningConfigRepository, BlackHoleRepository, PriceSnapshotRepository, + OutboxRepository, + KafkaProducerService, { provide: 'REDIS_OPTIONS', useFactory: (configService: ConfigService) => ({ host: configService.get('REDIS_HOST', 'localhost'), port: configService.get('REDIS_PORT', 6379), password: configService.get('REDIS_PASSWORD'), - db: configService.get('REDIS_DB', 1), + db: configService.get('REDIS_DB', 11), }), inject: [ConfigService], }, @@ -54,6 +60,8 @@ import { RedisService } from './redis/redis.service'; MiningConfigRepository, BlackHoleRepository, PriceSnapshotRepository, + OutboxRepository, + KafkaProducerService, RedisService, ClientsModule, ], diff --git a/backend/services/mining-service/src/infrastructure/kafka/index.ts b/backend/services/mining-service/src/infrastructure/kafka/index.ts new file mode 100644 index 00000000..13e788e8 --- /dev/null +++ b/backend/services/mining-service/src/infrastructure/kafka/index.ts @@ -0,0 +1 @@ +export * from './kafka-producer.service'; diff --git a/backend/services/mining-service/src/infrastructure/kafka/kafka-producer.service.ts b/backend/services/mining-service/src/infrastructure/kafka/kafka-producer.service.ts new file mode 100644 index 00000000..e64a8421 --- /dev/null +++ b/backend/services/mining-service/src/infrastructure/kafka/kafka-producer.service.ts @@ -0,0 +1,53 @@ +import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +export interface KafkaMessage { + key?: string; + value: any; + headers?: Record; +} + +@Injectable() +export class KafkaProducerService implements OnModuleInit { + private readonly logger = new Logger(KafkaProducerService.name); + + constructor( + @Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka, + ) {} + + async onModuleInit() { + await this.kafkaClient.connect(); + this.logger.log('Kafka producer connected'); + } + + async emit(topic: string, message: KafkaMessage): Promise { + try { + await lastValueFrom( + this.kafkaClient.emit(topic, { + key: message.key, + value: JSON.stringify(message.value), + headers: message.headers, + }), + ); + this.logger.debug(`Message emitted to topic ${topic}`); + } catch (error) { + this.logger.error(`Failed to emit message to topic ${topic}`, error); + throw error; + } + } + + async emitBatch(topic: string, messages: KafkaMessage[]): Promise { + try { + for (const message of messages) { + await this.emit(topic, message); + } + } catch (error) { + this.logger.error( + `Failed to emit batch messages to topic ${topic}`, + error, + ); + throw error; + } + } +} diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..e5cd88c3 --- /dev/null +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,60 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { OutboxEvent } from '@prisma/client'; + +@Injectable() +export class OutboxRepository { + constructor(private readonly prisma: PrismaService) {} + + /** + * 创建 Outbox 事件 + */ + async create(data: { + aggregateType: string; + aggregateId: string; + eventType: string; + payload: any; + }): Promise { + return this.prisma.outboxEvent.create({ + data: { + aggregateType: data.aggregateType, + aggregateId: data.aggregateId, + eventType: data.eventType, + payload: data.payload, + }, + }); + } + + /** + * 获取未处理的事件 + */ + async findUnprocessed(limit: number = 100): Promise { + return this.prisma.outboxEvent.findMany({ + where: { processedAt: null }, + orderBy: { createdAt: 'asc' }, + take: limit, + }); + } + + /** + * 标记事件为已处理 + */ + async markAsProcessed(ids: string[]): Promise { + await this.prisma.outboxEvent.updateMany({ + where: { id: { in: ids } }, + data: { processedAt: new Date() }, + }); + } + + /** + * 删除已处理的旧事件 + */ + async deleteProcessed(before: Date): Promise { + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + processedAt: { not: null, lt: before }, + }, + }); + return result.count; + } +} diff --git a/backend/services/trading-service/src/application/application.module.ts b/backend/services/trading-service/src/application/application.module.ts index 0042c3db..8ab82d93 100644 --- a/backend/services/trading-service/src/application/application.module.ts +++ b/backend/services/trading-service/src/application/application.module.ts @@ -3,10 +3,11 @@ import { ScheduleModule } from '@nestjs/schedule'; import { InfrastructureModule } from '../infrastructure/infrastructure.module'; import { OrderService } from './services/order.service'; import { TransferService } from './services/transfer.service'; +import { OutboxScheduler } from './schedulers/outbox.scheduler'; @Module({ imports: [ScheduleModule.forRoot(), InfrastructureModule], - providers: [OrderService, TransferService], + providers: [OrderService, TransferService, OutboxScheduler], exports: [OrderService, TransferService], }) export class ApplicationModule {} diff --git a/backend/services/trading-service/src/application/schedulers/index.ts b/backend/services/trading-service/src/application/schedulers/index.ts new file mode 100644 index 00000000..140724aa --- /dev/null +++ b/backend/services/trading-service/src/application/schedulers/index.ts @@ -0,0 +1 @@ +export * from './outbox.scheduler'; diff --git a/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts new file mode 100644 index 00000000..7cb0448f --- /dev/null +++ b/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts @@ -0,0 +1,106 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service'; +import { RedisService } from '../../infrastructure/redis/redis.service'; + +/** + * Outbox 事件发布调度器 + * 定期将 Outbox 中的事件发布到 Kafka + */ +@Injectable() +export class OutboxScheduler implements OnModuleInit { + private readonly logger = new Logger(OutboxScheduler.name); + private readonly LOCK_KEY = 'trading:outbox:scheduler:lock'; + + constructor( + private readonly outboxRepository: OutboxRepository, + private readonly kafkaProducer: KafkaProducerService, + private readonly redis: RedisService, + ) {} + + onModuleInit() { + this.logger.log('Outbox scheduler initialized'); + } + + /** + * 每30秒发布 Outbox 中的待处理事件 + */ + @Cron('*/30 * * * * *') + async publishOutboxEvents(): Promise { + const lockValue = await this.redis.acquireLock( + `${this.LOCK_KEY}:publish`, + 25, + ); + if (!lockValue) { + return; // 其他实例正在处理 + } + + try { + const events = await this.outboxRepository.findUnprocessed(100); + + if (events.length === 0) { + return; + } + + const processedIds: string[] = []; + + for (const event of events) { + try { + await this.kafkaProducer.emit(`trading.${event.eventType}`, { + key: event.aggregateId, + value: { + eventId: event.id, + aggregateType: event.aggregateType, + aggregateId: event.aggregateId, + eventType: event.eventType, + payload: event.payload, + createdAt: event.createdAt.toISOString(), + }, + }); + processedIds.push(event.id); + } catch (error) { + this.logger.error(`Failed to publish event ${event.id}`, error); + // 继续处理下一个事件 + } + } + + if (processedIds.length > 0) { + await this.outboxRepository.markAsProcessed(processedIds); + this.logger.debug(`Published ${processedIds.length} outbox events`); + } + } catch (error) { + this.logger.error('Failed to process outbox events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue); + } + } + + /** + * 每天凌晨3点清理已处理的事件(保留7天) + */ + @Cron('0 3 * * *') + async cleanupProcessedEvents(): Promise { + const lockValue = await this.redis.acquireLock( + `${this.LOCK_KEY}:cleanup`, + 300, + ); + if (!lockValue) { + return; + } + + try { + const sevenDaysAgo = new Date(); + sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); + + const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo); + if (deleted > 0) { + this.logger.log(`Cleaned up ${deleted} processed outbox events`); + } + } catch (error) { + this.logger.error('Failed to cleanup processed events', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); + } + } +} diff --git a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts index e5226596..74f4fbe8 100644 --- a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts @@ -4,7 +4,9 @@ import { ClientsModule, Transport } from '@nestjs/microservices'; import { PrismaModule } from './persistence/prisma/prisma.module'; import { TradingAccountRepository } from './persistence/repositories/trading-account.repository'; import { OrderRepository } from './persistence/repositories/order.repository'; +import { OutboxRepository } from './persistence/repositories/outbox.repository'; import { RedisService } from './redis/redis.service'; +import { KafkaProducerService } from './kafka/kafka-producer.service'; @Global() @Module({ @@ -19,7 +21,9 @@ import { RedisService } from './redis/redis.service'; options: { client: { clientId: 'trading-service', - brokers: configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + brokers: configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','), }, producer: { allowAutoTopicCreation: true }, }, @@ -31,18 +35,27 @@ import { RedisService } from './redis/redis.service'; providers: [ TradingAccountRepository, OrderRepository, + OutboxRepository, + KafkaProducerService, { provide: 'REDIS_OPTIONS', useFactory: (configService: ConfigService) => ({ host: configService.get('REDIS_HOST', 'localhost'), port: configService.get('REDIS_PORT', 6379), password: configService.get('REDIS_PASSWORD'), - db: configService.get('REDIS_DB', 2), + db: configService.get('REDIS_DB', 12), }), inject: [ConfigService], }, RedisService, ], - exports: [TradingAccountRepository, OrderRepository, RedisService, ClientsModule], + exports: [ + TradingAccountRepository, + OrderRepository, + OutboxRepository, + KafkaProducerService, + RedisService, + ClientsModule, + ], }) export class InfrastructureModule {} diff --git a/backend/services/trading-service/src/infrastructure/kafka/index.ts b/backend/services/trading-service/src/infrastructure/kafka/index.ts new file mode 100644 index 00000000..13e788e8 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/index.ts @@ -0,0 +1 @@ +export * from './kafka-producer.service'; diff --git a/backend/services/trading-service/src/infrastructure/kafka/kafka-producer.service.ts b/backend/services/trading-service/src/infrastructure/kafka/kafka-producer.service.ts new file mode 100644 index 00000000..e64a8421 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/kafka-producer.service.ts @@ -0,0 +1,53 @@ +import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +export interface KafkaMessage { + key?: string; + value: any; + headers?: Record; +} + +@Injectable() +export class KafkaProducerService implements OnModuleInit { + private readonly logger = new Logger(KafkaProducerService.name); + + constructor( + @Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka, + ) {} + + async onModuleInit() { + await this.kafkaClient.connect(); + this.logger.log('Kafka producer connected'); + } + + async emit(topic: string, message: KafkaMessage): Promise { + try { + await lastValueFrom( + this.kafkaClient.emit(topic, { + key: message.key, + value: JSON.stringify(message.value), + headers: message.headers, + }), + ); + this.logger.debug(`Message emitted to topic ${topic}`); + } catch (error) { + this.logger.error(`Failed to emit message to topic ${topic}`, error); + throw error; + } + } + + async emitBatch(topic: string, messages: KafkaMessage[]): Promise { + try { + for (const message of messages) { + await this.emit(topic, message); + } + } catch (error) { + this.logger.error( + `Failed to emit batch messages to topic ${topic}`, + error, + ); + throw error; + } + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..e5cd88c3 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,60 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { OutboxEvent } from '@prisma/client'; + +@Injectable() +export class OutboxRepository { + constructor(private readonly prisma: PrismaService) {} + + /** + * 创建 Outbox 事件 + */ + async create(data: { + aggregateType: string; + aggregateId: string; + eventType: string; + payload: any; + }): Promise { + return this.prisma.outboxEvent.create({ + data: { + aggregateType: data.aggregateType, + aggregateId: data.aggregateId, + eventType: data.eventType, + payload: data.payload, + }, + }); + } + + /** + * 获取未处理的事件 + */ + async findUnprocessed(limit: number = 100): Promise { + return this.prisma.outboxEvent.findMany({ + where: { processedAt: null }, + orderBy: { createdAt: 'asc' }, + take: limit, + }); + } + + /** + * 标记事件为已处理 + */ + async markAsProcessed(ids: string[]): Promise { + await this.prisma.outboxEvent.updateMany({ + where: { id: { in: ids } }, + data: { processedAt: new Date() }, + }); + } + + /** + * 删除已处理的旧事件 + */ + async deleteProcessed(before: Date): Promise { + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + processedAt: { not: null, lt: before }, + }, + }); + return result.count; + } +}