244 lines
7.3 KiB
TypeScript
244 lines
7.3 KiB
TypeScript
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
|
||
import { Prisma, PrismaClient } from '@prisma/client';
|
||
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
|
||
|
||
/** Prisma 事务客户端类型 */
|
||
type TransactionClient = Omit<
|
||
PrismaClient,
|
||
'$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends'
|
||
>;
|
||
|
||
/**
|
||
* ExtractNewRecordState 转换后的消息格式
|
||
* 字段来自 identity-service 的 wallet_addresses 表 + Debezium 元数据
|
||
*/
|
||
interface UnwrappedCdcWalletAddress {
|
||
// 1.0 identity-service wallet_addresses 表字段
|
||
address_id: number;
|
||
user_id: number;
|
||
chain_type: string;
|
||
address: string;
|
||
public_key: string;
|
||
address_digest: string;
|
||
mpc_signature_r: string;
|
||
mpc_signature_s: string;
|
||
mpc_signature_v: number;
|
||
status: string;
|
||
bound_at: number; // timestamp in milliseconds
|
||
|
||
// Debezium ExtractNewRecordState 添加的元数据字段
|
||
__op: 'c' | 'u' | 'd' | 'r';
|
||
__table: string;
|
||
__source_ts_ms: number;
|
||
__deleted?: string;
|
||
}
|
||
|
||
/**
|
||
* CDC Consumer - 消费 1.0 钱包地址变更事件
|
||
* 监听 Debezium 发送的 CDC 事件,同步到 synced_wallet_addresses 表
|
||
*
|
||
* 实现事务性幂等消费(Transactional Idempotent Consumer)确保:
|
||
* - 每个 CDC 事件只处理一次(exactly-once 语义)
|
||
* - 幂等记录(processed_cdc_events)和业务逻辑在同一事务中执行
|
||
* - 任何失败都会导致整个事务回滚
|
||
*/
|
||
@Injectable()
|
||
export class WalletAddressCdcConsumer implements OnModuleInit, OnModuleDestroy {
|
||
private readonly logger = new Logger(WalletAddressCdcConsumer.name);
|
||
private kafka: Kafka;
|
||
private consumer: Consumer;
|
||
private isConnected = false;
|
||
private topic: string;
|
||
|
||
constructor(
|
||
private readonly configService: ConfigService,
|
||
private readonly prisma: PrismaService,
|
||
) {
|
||
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
|
||
|
||
this.kafka = new Kafka({
|
||
clientId: 'auth-service-cdc-wallet',
|
||
brokers,
|
||
});
|
||
|
||
this.consumer = this.kafka.consumer({
|
||
groupId: this.configService.get<string>('CDC_CONSUMER_GROUP', 'auth-service-cdc-group') + '-wallet',
|
||
});
|
||
|
||
this.topic = this.configService.get<string>(
|
||
'CDC_TOPIC_WALLET_ADDRESSES',
|
||
'cdc.identity.public.wallet_addresses',
|
||
);
|
||
}
|
||
|
||
async onModuleInit() {
|
||
if (this.configService.get('CDC_ENABLED', 'true') !== 'true') {
|
||
this.logger.log('Wallet Address CDC Consumer is disabled');
|
||
return;
|
||
}
|
||
|
||
try {
|
||
await this.consumer.connect();
|
||
this.isConnected = true;
|
||
|
||
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true });
|
||
|
||
await this.consumer.run({
|
||
eachMessage: async (payload) => {
|
||
await this.handleMessage(payload);
|
||
},
|
||
});
|
||
|
||
this.logger.log(
|
||
`Wallet Address CDC Consumer started, listening to topic: ${this.topic}`,
|
||
);
|
||
} catch (error) {
|
||
this.logger.error('Failed to start Wallet Address CDC Consumer', error);
|
||
}
|
||
}
|
||
|
||
async onModuleDestroy() {
|
||
if (this.isConnected) {
|
||
await this.consumer.disconnect();
|
||
this.logger.log('Wallet Address CDC Consumer disconnected');
|
||
}
|
||
}
|
||
|
||
private async handleMessage(payload: EachMessagePayload) {
|
||
const { topic, partition, message } = payload;
|
||
|
||
if (!message.value) return;
|
||
|
||
const offset = BigInt(message.offset);
|
||
const idempotencyKey = `${topic}:${offset}`;
|
||
|
||
try {
|
||
const cdcEvent: UnwrappedCdcWalletAddress = JSON.parse(message.value.toString());
|
||
const op = cdcEvent.__op;
|
||
const tableName = cdcEvent.__table || 'wallet_addresses';
|
||
|
||
this.logger.log(`[CDC] Processing wallet address event: topic=${topic}, offset=${offset}, op=${op}`);
|
||
|
||
await this.processWithIdempotency(topic, offset, tableName, op, cdcEvent);
|
||
|
||
this.logger.log(`[CDC] Successfully processed wallet address event: ${idempotencyKey}`);
|
||
} catch (error: any) {
|
||
if (error.code === 'P2002') {
|
||
this.logger.debug(`[CDC] Skipping duplicate wallet address event: ${idempotencyKey}`);
|
||
return;
|
||
}
|
||
this.logger.error(
|
||
`[CDC] Failed to process wallet address message from ${topic}[${partition}], offset=${offset}`,
|
||
error,
|
||
);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 事务性幂等处理
|
||
*/
|
||
private async processWithIdempotency(
|
||
topic: string,
|
||
offset: bigint,
|
||
tableName: string,
|
||
operation: string,
|
||
event: UnwrappedCdcWalletAddress,
|
||
): Promise<void> {
|
||
await this.prisma.$transaction(async (tx) => {
|
||
// 1. 尝试插入幂等记录
|
||
try {
|
||
await tx.processedCdcEvent.create({
|
||
data: {
|
||
sourceTopic: topic,
|
||
offset: offset,
|
||
tableName: tableName,
|
||
operation: operation,
|
||
},
|
||
});
|
||
} catch (error: any) {
|
||
if (error.code === 'P2002') {
|
||
this.logger.debug(`[CDC] Wallet address event already processed: ${topic}:${offset}`);
|
||
return;
|
||
}
|
||
throw error;
|
||
}
|
||
|
||
// 2. 执行业务逻辑
|
||
await this.processCdcEvent(event, offset, tx);
|
||
}, {
|
||
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
|
||
timeout: 30000,
|
||
});
|
||
}
|
||
|
||
private async processCdcEvent(
|
||
event: UnwrappedCdcWalletAddress,
|
||
sequenceNum: bigint,
|
||
tx: TransactionClient,
|
||
): Promise<void> {
|
||
const op = event.__op;
|
||
const isDeleted = event.__deleted === 'true';
|
||
|
||
if (isDeleted || op === 'd') {
|
||
await this.deleteWalletAddress(event.address_id, tx);
|
||
return;
|
||
}
|
||
|
||
switch (op) {
|
||
case 'c':
|
||
case 'r':
|
||
case 'u':
|
||
await this.upsertWalletAddress(event, sequenceNum, tx);
|
||
break;
|
||
}
|
||
}
|
||
|
||
private async upsertWalletAddress(
|
||
walletAddress: UnwrappedCdcWalletAddress,
|
||
sequenceNum: bigint,
|
||
tx: TransactionClient,
|
||
): Promise<void> {
|
||
await tx.syncedWalletAddress.upsert({
|
||
where: { legacyAddressId: BigInt(walletAddress.address_id) },
|
||
update: {
|
||
legacyUserId: BigInt(walletAddress.user_id),
|
||
chainType: walletAddress.chain_type,
|
||
address: walletAddress.address,
|
||
publicKey: walletAddress.public_key,
|
||
status: walletAddress.status,
|
||
sourceSequenceNum: sequenceNum,
|
||
syncedAt: new Date(),
|
||
},
|
||
create: {
|
||
legacyAddressId: BigInt(walletAddress.address_id),
|
||
legacyUserId: BigInt(walletAddress.user_id),
|
||
chainType: walletAddress.chain_type,
|
||
address: walletAddress.address,
|
||
publicKey: walletAddress.public_key,
|
||
status: walletAddress.status,
|
||
legacyBoundAt: new Date(walletAddress.bound_at),
|
||
sourceSequenceNum: sequenceNum,
|
||
},
|
||
});
|
||
|
||
this.logger.debug(
|
||
`[CDC] Synced wallet address: addressId=${walletAddress.address_id}, chain=${walletAddress.chain_type}`,
|
||
);
|
||
}
|
||
|
||
private async deleteWalletAddress(addressId: number, tx: TransactionClient): Promise<void> {
|
||
try {
|
||
await tx.syncedWalletAddress.update({
|
||
where: { legacyAddressId: BigInt(addressId) },
|
||
data: { status: 'DELETED' },
|
||
});
|
||
|
||
this.logger.debug(`[CDC] Marked wallet address as deleted: ${addressId}`);
|
||
} catch (error) {
|
||
this.logger.error(`[CDC] Failed to mark wallet address as deleted: ${addressId}`, error);
|
||
}
|
||
}
|
||
}
|