rwadurian/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts

372 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { TradingAccountRepository } from '../persistence/repositories/trading-account.repository';
import { OutboxRepository } from '../persistence/repositories/outbox.repository';
import { ProcessedEventRepository } from '../persistence/repositories/processed-event.repository';
import { RedisService } from '../redis/redis.service';
import { TradingAccountAggregate } from '../../domain/aggregates/trading-account.aggregate';
import {
TradingEventTypes,
TradingTopics,
TradingAccountCreatedPayload,
} from '../../domain/events/trading.events';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
/**
* CDC Consumer Service
* 监听 mining-wallet-service 的 outbox 事件,在用户钱包创建时同步创建交易账户
*/
@Injectable()
export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isRunning = false;
constructor(
private readonly configService: ConfigService,
private readonly tradingAccountRepository: TradingAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly processedEventRepository: ProcessedEventRepository,
private readonly redis: RedisService,
) {
const brokers = this.configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(',');
this.kafka = new Kafka({
clientId: 'trading-service-cdc',
brokers,
});
this.consumer = this.kafka.consumer({
groupId: this.configService.get<string>(
'CDC_CONSUMER_GROUP',
'trading-service-cdc-group',
),
});
}
async onModuleInit() {
await this.start();
}
async onModuleDestroy() {
await this.stop();
}
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('CDC consumer is already running');
return;
}
const walletTopic = this.configService.get<string>(
'CDC_TOPIC_WALLET_OUTBOX',
'cdc.mining-wallet.outbox',
);
try {
await this.consumer.connect();
this.logger.log('CDC consumer connected');
await this.consumer.subscribe({
topics: [walletTopic],
fromBeginning: true, // full-reset 时从头开始同步
});
this.logger.log(`Subscribed to topic: ${walletTopic}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('CDC consumer started - listening for UserWalletCreated and WalletSystemAccountCreated events');
} catch (error) {
this.logger.error('Failed to start CDC consumer', error);
}
}
async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('CDC consumer stopped');
} catch (error) {
this.logger.error('Failed to stop CDC consumer', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, message } = payload;
try {
if (!message.value) {
return;
}
const eventData = JSON.parse(message.value.toString());
// 忽略心跳消息
if (this.isHeartbeatMessage(eventData)) {
return;
}
// 处理 Debezium outbox 事件
if (this.isDebeziumOutboxEvent(eventData)) {
const op = eventData.payload?.op;
// 只处理 create 操作
if (op !== 'c' && op !== 'r') {
return;
}
const after = eventData.payload.after;
await this.handleOutboxEvent(after);
}
} catch (error) {
this.logger.error(`Error processing message from topic ${topic}`, error);
}
}
private isHeartbeatMessage(data: any): boolean {
const keys = Object.keys(data);
return keys.length === 1 && keys[0] === 'ts_ms';
}
private isDebeziumOutboxEvent(data: any): boolean {
const after = data.payload?.after;
if (!after) return false;
return after.event_type && after.aggregate_type;
}
private async handleOutboxEvent(data: any): Promise<void> {
const eventType = data.event_type;
const eventId = data.id || data.outbox_id;
// 处理不同类型的事件
if (eventType === 'UserWalletCreated') {
await this.handleUserWalletCreated(eventId, data);
} else if (eventType === 'WalletSystemAccountCreated') {
await this.handleSystemAccountCreated(eventId, data);
}
}
/**
* 处理用户钱包创建事件 - 创建用户交易账户
*/
private async handleUserWalletCreated(eventId: string, data: any): Promise<void> {
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
const accountSequence = payload?.accountSequence;
if (!accountSequence) {
this.logger.warn(`UserWalletCreated event ${eventId} missing accountSequence`);
return;
}
// 只处理 CONTRIBUTION 类型的钱包
if (payload?.walletType !== 'CONTRIBUTION') {
return;
}
this.logger.debug(
`Processing UserWalletCreated event: ${eventId}, accountSequence: ${accountSequence}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 检查交易账户是否已存在
const existingAccount =
await this.tradingAccountRepository.findByAccountSequence(accountSequence);
if (existingAccount) {
this.logger.debug(`Trading account ${accountSequence} already exists`);
await this.markEventProcessed(eventId);
return;
}
// 创建交易账户
const account = TradingAccountAggregate.create(accountSequence);
const accountId = await this.tradingAccountRepository.save(account);
// 发布交易账户创建事件
await this.publishAccountCreatedEvent(accountId, accountSequence);
// 标记为已处理
await this.markEventProcessed(eventId);
this.logger.log(
`Trading account created for user ${accountSequence} (triggered by UserWalletCreated)`,
);
} catch (error) {
if (error instanceof Error && error.message.includes('Unique constraint')) {
this.logger.debug(
`Trading account already exists for ${accountSequence}, marking as processed`,
);
await this.markEventProcessed(eventId, 'UserWalletCreated');
return;
}
this.logger.error(
`Failed to create trading account for ${accountSequence}`,
error instanceof Error ? error.stack : error,
);
throw error;
}
}
/**
* 处理系统账户创建事件 - 为省/市系统账户创建交易账户
* 系统账户的 accountSequence 格式为 code如 PROV-440000, CITY-440100
*/
private async handleSystemAccountCreated(eventId: string, data: any): Promise<void> {
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
// 系统账户使用 code 作为 accountSequence
const accountCode = payload?.code;
if (!accountCode) {
this.logger.warn(`WalletSystemAccountCreated event ${eventId} missing code`);
return;
}
// 只处理省/市级系统账户
const accountType = payload?.accountType;
if (accountType !== 'PROVINCE' && accountType !== 'CITY') {
this.logger.debug(
`Skipping non-regional system account: ${accountCode}, type: ${accountType}`,
);
return;
}
this.logger.debug(
`Processing WalletSystemAccountCreated event: ${eventId}, code: ${accountCode}, type: ${accountType}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 检查交易账户是否已存在
const existingAccount =
await this.tradingAccountRepository.findByAccountSequence(accountCode);
if (existingAccount) {
this.logger.debug(`Trading account ${accountCode} already exists`);
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
return;
}
// 创建交易账户
const account = TradingAccountAggregate.create(accountCode);
const accountId = await this.tradingAccountRepository.save(account);
// 发布交易账户创建事件
await this.publishAccountCreatedEvent(accountId, accountCode);
// 标记为已处理
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
this.logger.log(
`Trading account created for system ${accountType}: ${accountCode} (triggered by WalletSystemAccountCreated)`,
);
} catch (error) {
if (error instanceof Error && error.message.includes('Unique constraint')) {
this.logger.debug(
`Trading account already exists for ${accountCode}, marking as processed`,
);
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
return;
}
this.logger.error(
`Failed to create trading account for system ${accountCode}`,
error instanceof Error ? error.stack : error,
);
throw error;
}
}
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `trading:processed-event:wallet:${eventId}`;
const cached = await this.redis.get(redisKey);
if (cached) return true;
const dbRecord = await this.processedEventRepository.findByEventId(String(eventId));
if (dbRecord) {
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
private async markEventProcessed(
eventId: string,
eventType: string = 'UserWalletCreated',
): Promise<void> {
const redisKey = `trading:processed-event:wallet:${eventId}`;
try {
await this.processedEventRepository.create({
eventId: String(eventId),
eventType,
sourceService: 'mining-wallet-service',
});
} catch (error) {
if (!(error instanceof Error && error.message.includes('Unique constraint'))) {
throw error;
}
}
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
private async publishAccountCreatedEvent(
accountId: string,
accountSequence: string,
): Promise<void> {
try {
const payload: TradingAccountCreatedPayload = {
accountId,
accountSequence,
createdAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'TradingAccount',
aggregateId: accountId,
eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED,
payload,
topic: TradingTopics.ACCOUNTS,
key: accountSequence,
});
this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`);
} catch (error) {
this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`);
}
}
}