feat(2.0-services): 为auth/mining/trading服务添加Outbox事件发布机制

- auth-service:
  - 添加Kafka生产者模块和服务
  - 添加Redis服务用于分布式锁
  - 添加OutboxScheduler定时发布Outbox事件到Kafka
  - 更新InfrastructureModule为全局模块

- mining-service:
  - 添加Kafka生产者服务
  - 添加OutboxRepository用于管理Outbox事件
  - 添加OutboxScheduler定时发布事件

- trading-service:
  - 添加Kafka生产者服务
  - 添加OutboxRepository用于管理Outbox事件
  - 添加OutboxScheduler定时发布事件

所有服务的Outbox调度器:
- 每30秒发布待处理的事件到Kafka
- 每天凌晨3点清理7天前已处理的事件
- 使用Redis分布式锁确保多实例部署时只有一个实例处理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-10 20:54:37 -08:00
parent 15a5fb6c14
commit 28ad8c2e2f
22 changed files with 870 additions and 9 deletions

View File

@ -1,6 +1,7 @@
import { Module } from '@nestjs/common';
import { JwtModule } from '@nestjs/jwt';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import {
AuthService,
PasswordService,
@ -9,11 +10,13 @@ import {
UserService,
OutboxService,
} from './services';
import { OutboxScheduler } from './schedulers';
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
@Module({
imports: [
InfrastructureModule,
ScheduleModule.forRoot(),
JwtModule.registerAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
@ -32,6 +35,7 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
KycService,
UserService,
OutboxService,
OutboxScheduler,
],
exports: [
AuthService,

View File

@ -0,0 +1 @@
export * from './outbox.scheduler';

View File

@ -0,0 +1,176 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { KafkaProducerService } from '@/infrastructure/kafka/kafka-producer.service';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { OutboxStatus } from '@prisma/client';
/**
* Outbox
* Outbox Kafka
*/
@Injectable()
export class OutboxScheduler implements OnModuleInit {
private readonly logger = new Logger(OutboxScheduler.name);
private readonly LOCK_KEY = 'auth:outbox:scheduler:lock';
constructor(
private readonly prisma: PrismaService,
private readonly kafkaProducer: KafkaProducerService,
private readonly redis: RedisService,
) {}
onModuleInit() {
this.logger.log('Outbox scheduler initialized');
}
/**
* 30 Outbox
*/
@Cron('*/30 * * * * *')
async publishOutboxEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:publish`, 25);
if (!lockValue) {
return; // 其他实例正在处理
}
try {
// 获取待处理的事件
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.PENDING,
OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }],
},
orderBy: { createdAt: 'asc' },
take: 100,
});
if (events.length === 0) {
return;
}
let successCount = 0;
let failCount = 0;
for (const event of events) {
try {
await this.kafkaProducer.emit(event.topic, {
key: event.key,
value: {
eventId: event.id.toString(),
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
eventType: event.eventType,
payload: event.payload,
createdAt: event.createdAt.toISOString(),
},
});
// 标记为已发布
await this.prisma.outboxEvent.update({
where: { id: event.id },
data: {
status: OutboxStatus.PUBLISHED,
publishedAt: new Date(),
},
});
successCount++;
} catch (error) {
failCount++;
const errorMessage =
error instanceof Error ? error.message : 'Unknown error';
this.logger.error(`Failed to publish event ${event.id}`, error);
// 更新重试信息
const newRetryCount = event.retryCount + 1;
const shouldFail = newRetryCount >= event.maxRetries;
await this.prisma.outboxEvent.update({
where: { id: event.id },
data: {
retryCount: newRetryCount,
lastError: errorMessage,
status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING,
// 指数退避: 30s, 60s, 120s
nextRetryAt: shouldFail
? null
: new Date(Date.now() + 30000 * Math.pow(2, newRetryCount - 1)),
},
});
}
}
if (successCount > 0 || failCount > 0) {
this.logger.log(
`Published ${successCount} outbox events, ${failCount} failed`,
);
}
} catch (error) {
this.logger.error('Failed to process outbox events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue);
}
}
/**
* 37
*/
@Cron('0 3 * * *')
async cleanupPublishedEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:cleanup`, 300);
if (!lockValue) {
return;
}
try {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const result = await this.prisma.outboxEvent.deleteMany({
where: {
status: OutboxStatus.PUBLISHED,
publishedAt: { lt: sevenDaysAgo },
},
});
if (result.count > 0) {
this.logger.log(`Cleaned up ${result.count} published outbox events`);
}
} catch (error) {
this.logger.error('Failed to cleanup published events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
}
}
/**
*
*/
@Cron('0 * * * *')
async resetStaleFailedEvents(): Promise<void> {
try {
const oneDayAgo = new Date();
oneDayAgo.setDate(oneDayAgo.getDate() - 1);
// 将超过1天的失败事件重置为待处理给予重试机会
const result = await this.prisma.outboxEvent.updateMany({
where: {
status: OutboxStatus.FAILED,
createdAt: { gt: oneDayAgo },
},
data: {
status: OutboxStatus.PENDING,
retryCount: 0,
nextRetryAt: new Date(),
},
});
if (result.count > 0) {
this.logger.log(`Reset ${result.count} stale failed events for retry`);
}
} catch (error) {
this.logger.error('Failed to reset stale failed events', error);
}
}
}

View File

@ -1,5 +1,5 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { Module, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { PrismaModule } from './persistence/prisma/prisma.module';
import { PrismaService } from './persistence/prisma/prisma.service';
import {
@ -9,6 +9,8 @@ import {
PrismaSmsVerificationRepository,
} from './persistence/repositories';
import { LegacyUserCdcConsumer } from './messaging/cdc';
import { KafkaModule, KafkaProducerService } from './kafka';
import { RedisService } from './redis';
import {
USER_REPOSITORY,
SYNCED_LEGACY_USER_REPOSITORY,
@ -16,12 +18,29 @@ import {
SMS_VERIFICATION_REPOSITORY,
} from '@/domain';
@Global()
@Module({
imports: [ConfigModule, PrismaModule],
imports: [ConfigModule, PrismaModule, KafkaModule],
providers: [
// CDC
LegacyUserCdcConsumer,
// Kafka Producer
KafkaProducerService,
// Redis
{
provide: 'REDIS_OPTIONS',
useFactory: (configService: ConfigService) => ({
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 14),
}),
inject: [ConfigService],
},
RedisService,
// Repositories
{
provide: USER_REPOSITORY,
@ -42,6 +61,8 @@ import {
],
exports: [
PrismaService,
KafkaProducerService,
RedisService,
USER_REPOSITORY,
SYNCED_LEGACY_USER_REPOSITORY,
REFRESH_TOKEN_REPOSITORY,

View File

@ -0,0 +1,2 @@
export * from './kafka.module';
export * from './kafka-producer.service';

View File

@ -0,0 +1,53 @@
import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
export interface KafkaMessage {
key?: string;
value: any;
headers?: Record<string, string>;
}
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private readonly logger = new Logger(KafkaProducerService.name);
constructor(
@Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka,
) {}
async onModuleInit() {
await this.kafkaClient.connect();
this.logger.log('Kafka producer connected');
}
async emit(topic: string, message: KafkaMessage): Promise<void> {
try {
await lastValueFrom(
this.kafkaClient.emit(topic, {
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers,
}),
);
this.logger.debug(`Message emitted to topic ${topic}`);
} catch (error) {
this.logger.error(`Failed to emit message to topic ${topic}`, error);
throw error;
}
}
async emitBatch(topic: string, messages: KafkaMessage[]): Promise<void> {
try {
for (const message of messages) {
await this.emit(topic, message);
}
} catch (error) {
this.logger.error(
`Failed to emit batch messages to topic ${topic}`,
error,
);
throw error;
}
}
}

View File

@ -0,0 +1,31 @@
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'KAFKA_CLIENT',
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'auth-service',
brokers: configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(','),
},
producer: {
allowAutoTopicCreation: true,
},
},
}),
inject: [ConfigService],
},
]),
],
exports: [ClientsModule],
})
export class KafkaModule {}

View File

@ -0,0 +1 @@
export * from './redis.service';

View File

@ -0,0 +1,107 @@
import {
Injectable,
Inject,
OnModuleInit,
OnModuleDestroy,
Logger,
} from '@nestjs/common';
import Redis from 'ioredis';
interface RedisOptions {
host: string;
port: number;
password?: string;
db?: number;
}
@Injectable()
export class RedisService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(RedisService.name);
private client: Redis;
constructor(
@Inject('REDIS_OPTIONS') private readonly options: RedisOptions,
) {}
async onModuleInit() {
this.client = new Redis({
host: this.options.host,
port: this.options.port,
password: this.options.password,
db: this.options.db ?? 14, // auth-service uses DB 14
retryStrategy: (times) => Math.min(times * 50, 2000),
});
this.client.on('error', (err) => this.logger.error('Redis error', err));
this.client.on('connect', () => this.logger.log('Connected to Redis'));
}
async onModuleDestroy() {
await this.client.quit();
}
getClient(): Redis {
return this.client;
}
async get(key: string): Promise<string | null> {
return this.client.get(key);
}
async set(key: string, value: string, ttlSeconds?: number): Promise<void> {
if (ttlSeconds) {
await this.client.setex(key, ttlSeconds, value);
} else {
await this.client.set(key, value);
}
}
async getJson<T>(key: string): Promise<T | null> {
const value = await this.get(key);
if (!value) return null;
try {
return JSON.parse(value) as T;
} catch {
return null;
}
}
async setJson<T>(key: string, value: T, ttlSeconds?: number): Promise<void> {
await this.set(key, JSON.stringify(value), ttlSeconds);
}
async acquireLock(
lockKey: string,
ttlSeconds: number = 30,
): Promise<string | null> {
const lockValue = `${Date.now()}-${Math.random().toString(36).substring(7)}`;
const result = await this.client.set(
lockKey,
lockValue,
'EX',
ttlSeconds,
'NX',
);
return result === 'OK' ? lockValue : null;
}
async releaseLock(lockKey: string, lockValue: string): Promise<boolean> {
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
const result = await this.client.eval(script, 1, lockKey, lockValue);
return result === 1;
}
async incr(key: string): Promise<number> {
return this.client.incr(key);
}
async del(key: string): Promise<number> {
return this.client.del(key);
}
}

View File

@ -16,6 +16,7 @@ import { ContributionEventHandler } from './event-handlers/contribution-event.ha
// Schedulers
import { MiningScheduler } from './schedulers/mining.scheduler';
import { OutboxScheduler } from './schedulers/outbox.scheduler';
@Module({
imports: [ScheduleModule.forRoot(), InfrastructureModule],
@ -34,6 +35,7 @@ import { MiningScheduler } from './schedulers/mining.scheduler';
// Schedulers
MiningScheduler,
OutboxScheduler,
],
exports: [
MiningDistributionService,

View File

@ -0,0 +1,106 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
/**
* Outbox
* Outbox Kafka
*/
@Injectable()
export class OutboxScheduler implements OnModuleInit {
private readonly logger = new Logger(OutboxScheduler.name);
private readonly LOCK_KEY = 'mining:outbox:scheduler:lock';
constructor(
private readonly outboxRepository: OutboxRepository,
private readonly kafkaProducer: KafkaProducerService,
private readonly redis: RedisService,
) {}
onModuleInit() {
this.logger.log('Outbox scheduler initialized');
}
/**
* 30 Outbox
*/
@Cron('*/30 * * * * *')
async publishOutboxEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(
`${this.LOCK_KEY}:publish`,
25,
);
if (!lockValue) {
return; // 其他实例正在处理
}
try {
const events = await this.outboxRepository.findUnprocessed(100);
if (events.length === 0) {
return;
}
const processedIds: string[] = [];
for (const event of events) {
try {
await this.kafkaProducer.emit(`mining.${event.eventType}`, {
key: event.aggregateId,
value: {
eventId: event.id,
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
eventType: event.eventType,
payload: event.payload,
createdAt: event.createdAt.toISOString(),
},
});
processedIds.push(event.id);
} catch (error) {
this.logger.error(`Failed to publish event ${event.id}`, error);
// 继续处理下一个事件
}
}
if (processedIds.length > 0) {
await this.outboxRepository.markAsProcessed(processedIds);
this.logger.debug(`Published ${processedIds.length} outbox events`);
}
} catch (error) {
this.logger.error('Failed to process outbox events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue);
}
}
/**
* 37
*/
@Cron('0 3 * * *')
async cleanupProcessedEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(
`${this.LOCK_KEY}:cleanup`,
300,
);
if (!lockValue) {
return;
}
try {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo);
if (deleted > 0) {
this.logger.log(`Cleaned up ${deleted} processed outbox events`);
}
} catch (error) {
this.logger.error('Failed to cleanup processed events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
}
}
}

View File

@ -6,7 +6,9 @@ import { MiningAccountRepository } from './persistence/repositories/mining-accou
import { MiningConfigRepository } from './persistence/repositories/mining-config.repository';
import { BlackHoleRepository } from './persistence/repositories/black-hole.repository';
import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
@Global()
@Module({
@ -21,7 +23,9 @@ import { RedisService } from './redis/redis.service';
options: {
client: {
clientId: 'mining-service',
brokers: configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(','),
brokers: configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(','),
},
producer: {
allowAutoTopicCreation: true,
@ -37,13 +41,15 @@ import { RedisService } from './redis/redis.service';
MiningConfigRepository,
BlackHoleRepository,
PriceSnapshotRepository,
OutboxRepository,
KafkaProducerService,
{
provide: 'REDIS_OPTIONS',
useFactory: (configService: ConfigService) => ({
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 1),
db: configService.get<number>('REDIS_DB', 11),
}),
inject: [ConfigService],
},
@ -54,6 +60,8 @@ import { RedisService } from './redis/redis.service';
MiningConfigRepository,
BlackHoleRepository,
PriceSnapshotRepository,
OutboxRepository,
KafkaProducerService,
RedisService,
ClientsModule,
],

View File

@ -0,0 +1 @@
export * from './kafka-producer.service';

View File

@ -0,0 +1,53 @@
import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
export interface KafkaMessage {
key?: string;
value: any;
headers?: Record<string, string>;
}
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private readonly logger = new Logger(KafkaProducerService.name);
constructor(
@Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka,
) {}
async onModuleInit() {
await this.kafkaClient.connect();
this.logger.log('Kafka producer connected');
}
async emit(topic: string, message: KafkaMessage): Promise<void> {
try {
await lastValueFrom(
this.kafkaClient.emit(topic, {
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers,
}),
);
this.logger.debug(`Message emitted to topic ${topic}`);
} catch (error) {
this.logger.error(`Failed to emit message to topic ${topic}`, error);
throw error;
}
}
async emitBatch(topic: string, messages: KafkaMessage[]): Promise<void> {
try {
for (const message of messages) {
await this.emit(topic, message);
}
} catch (error) {
this.logger.error(
`Failed to emit batch messages to topic ${topic}`,
error,
);
throw error;
}
}
}

View File

@ -0,0 +1,60 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { OutboxEvent } from '@prisma/client';
@Injectable()
export class OutboxRepository {
constructor(private readonly prisma: PrismaService) {}
/**
* Outbox
*/
async create(data: {
aggregateType: string;
aggregateId: string;
eventType: string;
payload: any;
}): Promise<OutboxEvent> {
return this.prisma.outboxEvent.create({
data: {
aggregateType: data.aggregateType,
aggregateId: data.aggregateId,
eventType: data.eventType,
payload: data.payload,
},
});
}
/**
*
*/
async findUnprocessed(limit: number = 100): Promise<OutboxEvent[]> {
return this.prisma.outboxEvent.findMany({
where: { processedAt: null },
orderBy: { createdAt: 'asc' },
take: limit,
});
}
/**
*
*/
async markAsProcessed(ids: string[]): Promise<void> {
await this.prisma.outboxEvent.updateMany({
where: { id: { in: ids } },
data: { processedAt: new Date() },
});
}
/**
*
*/
async deleteProcessed(before: Date): Promise<number> {
const result = await this.prisma.outboxEvent.deleteMany({
where: {
processedAt: { not: null, lt: before },
},
});
return result.count;
}
}

View File

@ -3,10 +3,11 @@ import { ScheduleModule } from '@nestjs/schedule';
import { InfrastructureModule } from '../infrastructure/infrastructure.module';
import { OrderService } from './services/order.service';
import { TransferService } from './services/transfer.service';
import { OutboxScheduler } from './schedulers/outbox.scheduler';
@Module({
imports: [ScheduleModule.forRoot(), InfrastructureModule],
providers: [OrderService, TransferService],
providers: [OrderService, TransferService, OutboxScheduler],
exports: [OrderService, TransferService],
})
export class ApplicationModule {}

View File

@ -0,0 +1 @@
export * from './outbox.scheduler';

View File

@ -0,0 +1,106 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { KafkaProducerService } from '../../infrastructure/kafka/kafka-producer.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
/**
* Outbox
* Outbox Kafka
*/
@Injectable()
export class OutboxScheduler implements OnModuleInit {
private readonly logger = new Logger(OutboxScheduler.name);
private readonly LOCK_KEY = 'trading:outbox:scheduler:lock';
constructor(
private readonly outboxRepository: OutboxRepository,
private readonly kafkaProducer: KafkaProducerService,
private readonly redis: RedisService,
) {}
onModuleInit() {
this.logger.log('Outbox scheduler initialized');
}
/**
* 30 Outbox
*/
@Cron('*/30 * * * * *')
async publishOutboxEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(
`${this.LOCK_KEY}:publish`,
25,
);
if (!lockValue) {
return; // 其他实例正在处理
}
try {
const events = await this.outboxRepository.findUnprocessed(100);
if (events.length === 0) {
return;
}
const processedIds: string[] = [];
for (const event of events) {
try {
await this.kafkaProducer.emit(`trading.${event.eventType}`, {
key: event.aggregateId,
value: {
eventId: event.id,
aggregateType: event.aggregateType,
aggregateId: event.aggregateId,
eventType: event.eventType,
payload: event.payload,
createdAt: event.createdAt.toISOString(),
},
});
processedIds.push(event.id);
} catch (error) {
this.logger.error(`Failed to publish event ${event.id}`, error);
// 继续处理下一个事件
}
}
if (processedIds.length > 0) {
await this.outboxRepository.markAsProcessed(processedIds);
this.logger.debug(`Published ${processedIds.length} outbox events`);
}
} catch (error) {
this.logger.error('Failed to process outbox events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:publish`, lockValue);
}
}
/**
* 37
*/
@Cron('0 3 * * *')
async cleanupProcessedEvents(): Promise<void> {
const lockValue = await this.redis.acquireLock(
`${this.LOCK_KEY}:cleanup`,
300,
);
if (!lockValue) {
return;
}
try {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo);
if (deleted > 0) {
this.logger.log(`Cleaned up ${deleted} processed outbox events`);
}
} catch (error) {
this.logger.error('Failed to cleanup processed events', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
}
}
}

View File

@ -4,7 +4,9 @@ import { ClientsModule, Transport } from '@nestjs/microservices';
import { PrismaModule } from './persistence/prisma/prisma.module';
import { TradingAccountRepository } from './persistence/repositories/trading-account.repository';
import { OrderRepository } from './persistence/repositories/order.repository';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
@Global()
@Module({
@ -19,7 +21,9 @@ import { RedisService } from './redis/redis.service';
options: {
client: {
clientId: 'trading-service',
brokers: configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(','),
brokers: configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(','),
},
producer: { allowAutoTopicCreation: true },
},
@ -31,18 +35,27 @@ import { RedisService } from './redis/redis.service';
providers: [
TradingAccountRepository,
OrderRepository,
OutboxRepository,
KafkaProducerService,
{
provide: 'REDIS_OPTIONS',
useFactory: (configService: ConfigService) => ({
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 2),
db: configService.get<number>('REDIS_DB', 12),
}),
inject: [ConfigService],
},
RedisService,
],
exports: [TradingAccountRepository, OrderRepository, RedisService, ClientsModule],
exports: [
TradingAccountRepository,
OrderRepository,
OutboxRepository,
KafkaProducerService,
RedisService,
ClientsModule,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1 @@
export * from './kafka-producer.service';

View File

@ -0,0 +1,53 @@
import { Injectable, Inject, OnModuleInit, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
export interface KafkaMessage {
key?: string;
value: any;
headers?: Record<string, string>;
}
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private readonly logger = new Logger(KafkaProducerService.name);
constructor(
@Inject('KAFKA_CLIENT') private readonly kafkaClient: ClientKafka,
) {}
async onModuleInit() {
await this.kafkaClient.connect();
this.logger.log('Kafka producer connected');
}
async emit(topic: string, message: KafkaMessage): Promise<void> {
try {
await lastValueFrom(
this.kafkaClient.emit(topic, {
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers,
}),
);
this.logger.debug(`Message emitted to topic ${topic}`);
} catch (error) {
this.logger.error(`Failed to emit message to topic ${topic}`, error);
throw error;
}
}
async emitBatch(topic: string, messages: KafkaMessage[]): Promise<void> {
try {
for (const message of messages) {
await this.emit(topic, message);
}
} catch (error) {
this.logger.error(
`Failed to emit batch messages to topic ${topic}`,
error,
);
throw error;
}
}
}

View File

@ -0,0 +1,60 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { OutboxEvent } from '@prisma/client';
@Injectable()
export class OutboxRepository {
constructor(private readonly prisma: PrismaService) {}
/**
* Outbox
*/
async create(data: {
aggregateType: string;
aggregateId: string;
eventType: string;
payload: any;
}): Promise<OutboxEvent> {
return this.prisma.outboxEvent.create({
data: {
aggregateType: data.aggregateType,
aggregateId: data.aggregateId,
eventType: data.eventType,
payload: data.payload,
},
});
}
/**
*
*/
async findUnprocessed(limit: number = 100): Promise<OutboxEvent[]> {
return this.prisma.outboxEvent.findMany({
where: { processedAt: null },
orderBy: { createdAt: 'asc' },
take: limit,
});
}
/**
*
*/
async markAsProcessed(ids: string[]): Promise<void> {
await this.prisma.outboxEvent.updateMany({
where: { id: { in: ids } },
data: { processedAt: new Date() },
});
}
/**
*
*/
async deleteProcessed(before: Date): Promise<number> {
const result = await this.prisma.outboxEvent.deleteMany({
where: {
processedAt: { not: null, lt: before },
},
});
return result.count;
}
}