import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { Decimal } from '@prisma/client/runtime/library'; /** * Debezium CDC 事件结构 (经过 ExtractNewRecordState 转换后) */ // wallet_accounts 表 interface CdcWalletAccountPayload { wallet_id: string; account_sequence: string; user_id: string; usdt_available: string; usdt_frozen: string; dst_available: string; dst_frozen: string; bnb_available: string; bnb_frozen: string; og_available: string; og_frozen: string; rwad_available: string; rwad_frozen: string; hashpower: string; pending_usdt: string; pending_hashpower: string; settleable_usdt: string; settleable_hashpower: string; settled_total_usdt: string; settled_total_hashpower: string; expired_total_usdt: string; expired_total_hashpower: string; status: string; has_planted: boolean; created_at: string; __op: 'c' | 'u' | 'd' | 'r'; __table: string; __source_ts_ms: number; __deleted?: string; } // withdrawal_orders 表 interface CdcWithdrawalOrderPayload { order_id: string; order_no: string; account_sequence: string; user_id: string; amount: string; fee: string; chain_type: string; to_address: string; tx_hash?: string | null; is_internal_transfer: boolean; to_account_sequence?: string | null; to_user_id?: string | null; status: string; error_message?: string | null; frozen_at?: string | null; broadcasted_at?: string | null; confirmed_at?: string | null; created_at: string; __op: 'c' | 'u' | 'd' | 'r'; __table: string; __source_ts_ms: number; __deleted?: string; } // fiat_withdrawal_orders 表 interface CdcFiatWithdrawalOrderPayload { order_id: string; order_no: string; account_sequence: string; user_id: string; amount: string; fee: string; payment_method: string; bank_name?: string | null; bank_card_no?: string | null; card_holder_name?: string | null; alipay_account?: string | null; alipay_real_name?: string | null; wechat_account?: string | null; wechat_real_name?: string | null; status: string; error_message?: string | null; reviewed_by?: string | null; reviewed_at?: string | null; review_remark?: string | null; paid_by?: string | null; paid_at?: string | null; frozen_at?: string | null; completed_at?: string | null; created_at: string; __op: 'c' | 'u' | 'd' | 'r'; __table: string; __source_ts_ms: number; __deleted?: string; } // wallet_ledger_entries 表 (分类账流水) interface CdcWalletLedgerEntryPayload { entry_id: string; account_sequence: string; user_id: string; entry_type: string; amount: string; asset_type: string; balance_after?: string | null; ref_order_id?: string | null; ref_tx_hash?: string | null; memo?: string | null; created_at: string; __op: 'c' | 'u' | 'd' | 'r'; __table: string; __source_ts_ms: number; __deleted?: string; } type CdcWalletPayload = CdcWalletAccountPayload | CdcWithdrawalOrderPayload | CdcFiatWithdrawalOrderPayload | CdcWalletLedgerEntryPayload; /** * Wallet CDC 消费者服务 * * 消费 Debezium 从 wallet-service PostgreSQL 捕获的数据变更 * * Topics: * - cdc.wallet.public.wallet_accounts * - cdc.wallet.public.withdrawal_orders * - cdc.wallet.public.fiat_withdrawal_orders */ @Injectable() export class WalletCdcConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(WalletCdcConsumerService.name); private kafka: Kafka; private consumer: Consumer; private isRunning = false; // CDC Topics private readonly cdcTopics = [ 'cdc.wallet.public.wallet_accounts', 'cdc.wallet.public.withdrawal_orders', 'cdc.wallet.public.fiat_withdrawal_orders', 'cdc.wallet.public.wallet_ledger_entries', ]; private readonly consumerGroup: string; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, ) { const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); const clientId = this.configService.get('KAFKA_CLIENT_ID', 'admin-service'); this.consumerGroup = this.configService.get('KAFKA_WALLET_CDC_GROUP', 'admin-service-wallet-cdc'); this.kafka = new Kafka({ clientId: `${clientId}-wallet-cdc`, brokers, logLevel: logLevel.WARN, }); this.consumer = this.kafka.consumer({ groupId: this.consumerGroup }); this.logger.log(`[Wallet-CDC] Configured to consume topics: ${this.cdcTopics.join(', ')}`); } async onModuleInit() { await this.start(); } async onModuleDestroy() { await this.stop(); } async start(): Promise { if (this.isRunning) { this.logger.warn('[Wallet-CDC] Consumer already running'); return; } try { this.logger.log('[Wallet-CDC] Connecting to Kafka...'); await this.consumer.connect(); for (const topic of this.cdcTopics) { await this.consumer.subscribe({ topic, fromBeginning: false, }); } this.logger.log(`[Wallet-CDC] Subscribed to topics: ${this.cdcTopics.join(', ')}`); await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { await this.handleMessage(payload); }, }); this.isRunning = true; this.logger.log('[Wallet-CDC] Consumer started successfully'); } catch (error) { this.logger.error('[Wallet-CDC] Failed to start consumer:', error); } } async stop(): Promise { if (!this.isRunning) return; try { await this.consumer.disconnect(); this.isRunning = false; this.logger.log('[Wallet-CDC] Consumer stopped'); } catch (error) { this.logger.error('[Wallet-CDC] Failed to stop consumer:', error); } } private async handleMessage(payload: EachMessagePayload): Promise { const { topic, partition, message } = payload; if (!message.value) { this.logger.warn(`[Wallet-CDC] Empty message from ${topic}:${partition}`); return; } try { const data = JSON.parse(message.value.toString()) as CdcWalletPayload; const operation = data.__op; const table = data.__table; this.logger.debug( `[Wallet-CDC] Received ${operation} event for table ${table}` ); // 幂等性检查 const eventId = `wallet-cdc:${topic}:${partition}:${message.offset}`; if (await this.isEventProcessed(eventId)) { this.logger.debug(`[Wallet-CDC] Event ${eventId} already processed, skipping`); return; } // 根据表名处理不同的事件 switch (table) { case 'wallet_accounts': await this.processWalletAccountEvent(data as CdcWalletAccountPayload); break; case 'withdrawal_orders': await this.processWithdrawalOrderEvent(data as CdcWithdrawalOrderPayload); break; case 'fiat_withdrawal_orders': await this.processFiatWithdrawalOrderEvent(data as CdcFiatWithdrawalOrderPayload); break; case 'wallet_ledger_entries': await this.processWalletLedgerEntryEvent(data as CdcWalletLedgerEntryPayload); break; default: this.logger.warn(`[Wallet-CDC] Unknown table: ${table}`); } // 记录已处理 await this.markEventProcessed(eventId, `wallet-cdc:${table}:${operation}`); this.logger.log( `[Wallet-CDC] ✓ Processed ${operation} for table: ${table}` ); } catch (error) { this.logger.error(`[Wallet-CDC] Failed to process message:`, error); throw error; // 让 KafkaJS 重试 } } // ==================== wallet_accounts 处理 ==================== private async processWalletAccountEvent(data: CdcWalletAccountPayload): Promise { const operation = data.__op; const isDeleted = data.__deleted === 'true'; if (operation === 'd' || isDeleted) { await this.handleWalletAccountDelete(data); } else if (operation === 'c' || operation === 'r') { await this.handleWalletAccountCreateOrSnapshot(data); } else if (operation === 'u') { await this.handleWalletAccountUpdate(data); } } private async handleWalletAccountCreateOrSnapshot(data: CdcWalletAccountPayload): Promise { await this.prisma.walletAccountQueryView.upsert({ where: { id: BigInt(data.wallet_id) }, create: { id: BigInt(data.wallet_id), accountSequence: data.account_sequence, userId: BigInt(data.user_id), usdtAvailable: new Decimal(data.usdt_available || '0'), usdtFrozen: new Decimal(data.usdt_frozen || '0'), dstAvailable: new Decimal(data.dst_available || '0'), dstFrozen: new Decimal(data.dst_frozen || '0'), bnbAvailable: new Decimal(data.bnb_available || '0'), bnbFrozen: new Decimal(data.bnb_frozen || '0'), ogAvailable: new Decimal(data.og_available || '0'), ogFrozen: new Decimal(data.og_frozen || '0'), rwadAvailable: new Decimal(data.rwad_available || '0'), rwadFrozen: new Decimal(data.rwad_frozen || '0'), hashpower: new Decimal(data.hashpower || '0'), pendingUsdt: new Decimal(data.pending_usdt || '0'), pendingHashpower: new Decimal(data.pending_hashpower || '0'), settleableUsdt: new Decimal(data.settleable_usdt || '0'), settleableHashpower: new Decimal(data.settleable_hashpower || '0'), settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), status: data.status, hasPlanted: data.has_planted, createdAt: new Date(data.created_at), syncedAt: new Date(), }, update: { accountSequence: data.account_sequence, userId: BigInt(data.user_id), usdtAvailable: new Decimal(data.usdt_available || '0'), usdtFrozen: new Decimal(data.usdt_frozen || '0'), dstAvailable: new Decimal(data.dst_available || '0'), dstFrozen: new Decimal(data.dst_frozen || '0'), bnbAvailable: new Decimal(data.bnb_available || '0'), bnbFrozen: new Decimal(data.bnb_frozen || '0'), ogAvailable: new Decimal(data.og_available || '0'), ogFrozen: new Decimal(data.og_frozen || '0'), rwadAvailable: new Decimal(data.rwad_available || '0'), rwadFrozen: new Decimal(data.rwad_frozen || '0'), hashpower: new Decimal(data.hashpower || '0'), pendingUsdt: new Decimal(data.pending_usdt || '0'), pendingHashpower: new Decimal(data.pending_hashpower || '0'), settleableUsdt: new Decimal(data.settleable_usdt || '0'), settleableHashpower: new Decimal(data.settleable_hashpower || '0'), settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), status: data.status, hasPlanted: data.has_planted, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Created/Snapshot wallet account: ${data.account_sequence}`); } private async handleWalletAccountUpdate(data: CdcWalletAccountPayload): Promise { const id = BigInt(data.wallet_id); const exists = await this.prisma.walletAccountQueryView.findUnique({ where: { id }, select: { id: true }, }); if (!exists) { await this.handleWalletAccountCreateOrSnapshot(data); return; } await this.prisma.walletAccountQueryView.update({ where: { id }, data: { usdtAvailable: new Decimal(data.usdt_available || '0'), usdtFrozen: new Decimal(data.usdt_frozen || '0'), dstAvailable: new Decimal(data.dst_available || '0'), dstFrozen: new Decimal(data.dst_frozen || '0'), bnbAvailable: new Decimal(data.bnb_available || '0'), bnbFrozen: new Decimal(data.bnb_frozen || '0'), ogAvailable: new Decimal(data.og_available || '0'), ogFrozen: new Decimal(data.og_frozen || '0'), rwadAvailable: new Decimal(data.rwad_available || '0'), rwadFrozen: new Decimal(data.rwad_frozen || '0'), hashpower: new Decimal(data.hashpower || '0'), pendingUsdt: new Decimal(data.pending_usdt || '0'), pendingHashpower: new Decimal(data.pending_hashpower || '0'), settleableUsdt: new Decimal(data.settleable_usdt || '0'), settleableHashpower: new Decimal(data.settleable_hashpower || '0'), settledTotalUsdt: new Decimal(data.settled_total_usdt || '0'), settledTotalHashpower: new Decimal(data.settled_total_hashpower || '0'), expiredTotalUsdt: new Decimal(data.expired_total_usdt || '0'), expiredTotalHashpower: new Decimal(data.expired_total_hashpower || '0'), status: data.status, hasPlanted: data.has_planted, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Updated wallet account: ${data.account_sequence}`); } private async handleWalletAccountDelete(data: CdcWalletAccountPayload): Promise { const id = BigInt(data.wallet_id); try { await this.prisma.walletAccountQueryView.delete({ where: { id }, }); this.logger.log(`[Wallet-CDC] Deleted wallet account: ${data.account_sequence}`); } catch { this.logger.warn(`[Wallet-CDC] Wallet account not found for delete: ${data.account_sequence}`); } } // ==================== withdrawal_orders 处理 ==================== private async processWithdrawalOrderEvent(data: CdcWithdrawalOrderPayload): Promise { const operation = data.__op; const isDeleted = data.__deleted === 'true'; if (operation === 'd' || isDeleted) { await this.handleWithdrawalOrderDelete(data); } else if (operation === 'c' || operation === 'r') { await this.handleWithdrawalOrderCreateOrSnapshot(data); } else if (operation === 'u') { await this.handleWithdrawalOrderUpdate(data); } } private async handleWithdrawalOrderCreateOrSnapshot(data: CdcWithdrawalOrderPayload): Promise { await this.prisma.withdrawalOrderQueryView.upsert({ where: { id: BigInt(data.order_id) }, create: { id: BigInt(data.order_id), orderNo: data.order_no, accountSequence: data.account_sequence, userId: BigInt(data.user_id), amount: new Decimal(data.amount), fee: new Decimal(data.fee), chainType: data.chain_type, toAddress: data.to_address, txHash: data.tx_hash || null, isInternalTransfer: data.is_internal_transfer, toAccountSequence: data.to_account_sequence || null, toUserId: data.to_user_id ? BigInt(data.to_user_id) : null, status: data.status, errorMessage: data.error_message || null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, createdAt: new Date(data.created_at), syncedAt: new Date(), }, update: { orderNo: data.order_no, accountSequence: data.account_sequence, userId: BigInt(data.user_id), amount: new Decimal(data.amount), fee: new Decimal(data.fee), chainType: data.chain_type, toAddress: data.to_address, txHash: data.tx_hash || null, isInternalTransfer: data.is_internal_transfer, toAccountSequence: data.to_account_sequence || null, toUserId: data.to_user_id ? BigInt(data.to_user_id) : null, status: data.status, errorMessage: data.error_message || null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Created/Snapshot withdrawal order: ${data.order_no}`); } private async handleWithdrawalOrderUpdate(data: CdcWithdrawalOrderPayload): Promise { const id = BigInt(data.order_id); const exists = await this.prisma.withdrawalOrderQueryView.findUnique({ where: { id }, select: { id: true }, }); if (!exists) { await this.handleWithdrawalOrderCreateOrSnapshot(data); return; } await this.prisma.withdrawalOrderQueryView.update({ where: { id }, data: { txHash: data.tx_hash || null, status: data.status, errorMessage: data.error_message || null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, broadcastedAt: data.broadcasted_at ? new Date(data.broadcasted_at) : null, confirmedAt: data.confirmed_at ? new Date(data.confirmed_at) : null, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Updated withdrawal order: ${data.order_no}`); } private async handleWithdrawalOrderDelete(data: CdcWithdrawalOrderPayload): Promise { const id = BigInt(data.order_id); try { await this.prisma.withdrawalOrderQueryView.delete({ where: { id }, }); this.logger.log(`[Wallet-CDC] Deleted withdrawal order: ${data.order_no}`); } catch { this.logger.warn(`[Wallet-CDC] Withdrawal order not found for delete: ${data.order_no}`); } } // ==================== fiat_withdrawal_orders 处理 ==================== private async processFiatWithdrawalOrderEvent(data: CdcFiatWithdrawalOrderPayload): Promise { const operation = data.__op; const isDeleted = data.__deleted === 'true'; if (operation === 'd' || isDeleted) { await this.handleFiatWithdrawalOrderDelete(data); } else if (operation === 'c' || operation === 'r') { await this.handleFiatWithdrawalOrderCreateOrSnapshot(data); } else if (operation === 'u') { await this.handleFiatWithdrawalOrderUpdate(data); } } private maskBankCardNo(cardNo: string | null | undefined): string | null { if (!cardNo) return null; if (cardNo.length <= 8) return cardNo; return cardNo.slice(0, 4) + '****' + cardNo.slice(-4); } private maskAccount(account: string | null | undefined): string | null { if (!account) return null; if (account.length <= 4) return account; const visible = Math.min(3, Math.floor(account.length / 3)); return account.slice(0, visible) + '****' + account.slice(-visible); } private async handleFiatWithdrawalOrderCreateOrSnapshot(data: CdcFiatWithdrawalOrderPayload): Promise { await this.prisma.fiatWithdrawalOrderQueryView.upsert({ where: { id: BigInt(data.order_id) }, create: { id: BigInt(data.order_id), orderNo: data.order_no, accountSequence: data.account_sequence, userId: BigInt(data.user_id), amount: new Decimal(data.amount), fee: new Decimal(data.fee), paymentMethod: data.payment_method, bankName: data.bank_name || null, bankCardNoMasked: this.maskBankCardNo(data.bank_card_no), cardHolderName: data.card_holder_name || null, alipayAccountMasked: this.maskAccount(data.alipay_account), wechatAccountMasked: this.maskAccount(data.wechat_account), status: data.status, errorMessage: data.error_message || null, reviewedBy: data.reviewed_by || null, reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, reviewRemark: data.review_remark || null, paidBy: data.paid_by || null, paidAt: data.paid_at ? new Date(data.paid_at) : null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, completedAt: data.completed_at ? new Date(data.completed_at) : null, createdAt: new Date(data.created_at), syncedAt: new Date(), }, update: { orderNo: data.order_no, accountSequence: data.account_sequence, userId: BigInt(data.user_id), amount: new Decimal(data.amount), fee: new Decimal(data.fee), paymentMethod: data.payment_method, bankName: data.bank_name || null, bankCardNoMasked: this.maskBankCardNo(data.bank_card_no), cardHolderName: data.card_holder_name || null, alipayAccountMasked: this.maskAccount(data.alipay_account), wechatAccountMasked: this.maskAccount(data.wechat_account), status: data.status, errorMessage: data.error_message || null, reviewedBy: data.reviewed_by || null, reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, reviewRemark: data.review_remark || null, paidBy: data.paid_by || null, paidAt: data.paid_at ? new Date(data.paid_at) : null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, completedAt: data.completed_at ? new Date(data.completed_at) : null, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Created/Snapshot fiat withdrawal order: ${data.order_no}`); } private async handleFiatWithdrawalOrderUpdate(data: CdcFiatWithdrawalOrderPayload): Promise { const id = BigInt(data.order_id); const exists = await this.prisma.fiatWithdrawalOrderQueryView.findUnique({ where: { id }, select: { id: true }, }); if (!exists) { await this.handleFiatWithdrawalOrderCreateOrSnapshot(data); return; } await this.prisma.fiatWithdrawalOrderQueryView.update({ where: { id }, data: { status: data.status, errorMessage: data.error_message || null, reviewedBy: data.reviewed_by || null, reviewedAt: data.reviewed_at ? new Date(data.reviewed_at) : null, reviewRemark: data.review_remark || null, paidBy: data.paid_by || null, paidAt: data.paid_at ? new Date(data.paid_at) : null, frozenAt: data.frozen_at ? new Date(data.frozen_at) : null, completedAt: data.completed_at ? new Date(data.completed_at) : null, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Updated fiat withdrawal order: ${data.order_no}`); } private async handleFiatWithdrawalOrderDelete(data: CdcFiatWithdrawalOrderPayload): Promise { const id = BigInt(data.order_id); try { await this.prisma.fiatWithdrawalOrderQueryView.delete({ where: { id }, }); this.logger.log(`[Wallet-CDC] Deleted fiat withdrawal order: ${data.order_no}`); } catch { this.logger.warn(`[Wallet-CDC] Fiat withdrawal order not found for delete: ${data.order_no}`); } } // ==================== wallet_ledger_entries 处理 ==================== private async processWalletLedgerEntryEvent(data: CdcWalletLedgerEntryPayload): Promise { const operation = data.__op; const isDeleted = data.__deleted === 'true'; // 分类账流水是 append-only,通常只有 create 和 snapshot // 但也处理更新和删除情况以保持完整性 if (operation === 'd' || isDeleted) { await this.handleWalletLedgerEntryDelete(data); } else if (operation === 'c' || operation === 'r') { await this.handleWalletLedgerEntryCreate(data); } else if (operation === 'u') { await this.handleWalletLedgerEntryUpdate(data); } } private async handleWalletLedgerEntryCreate(data: CdcWalletLedgerEntryPayload): Promise { await this.prisma.walletLedgerEntryView.upsert({ where: { id: BigInt(data.entry_id) }, create: { id: BigInt(data.entry_id), accountSequence: data.account_sequence, userId: BigInt(data.user_id), entryType: data.entry_type, amount: new Decimal(data.amount), assetType: data.asset_type, balanceAfter: data.balance_after ? new Decimal(data.balance_after) : null, refOrderId: data.ref_order_id || null, refTxHash: data.ref_tx_hash || null, memo: data.memo || null, createdAt: new Date(data.created_at), syncedAt: new Date(), }, update: { accountSequence: data.account_sequence, userId: BigInt(data.user_id), entryType: data.entry_type, amount: new Decimal(data.amount), assetType: data.asset_type, balanceAfter: data.balance_after ? new Decimal(data.balance_after) : null, refOrderId: data.ref_order_id || null, refTxHash: data.ref_tx_hash || null, memo: data.memo || null, syncedAt: new Date(), }, }); this.logger.log(`[Wallet-CDC] Created ledger entry: ${data.entry_id} (${data.entry_type})`); } private async handleWalletLedgerEntryUpdate(data: CdcWalletLedgerEntryPayload): Promise { // 理论上分类账不应该被更新,但为了完整性还是处理 await this.handleWalletLedgerEntryCreate(data); this.logger.log(`[Wallet-CDC] Updated ledger entry: ${data.entry_id}`); } private async handleWalletLedgerEntryDelete(data: CdcWalletLedgerEntryPayload): Promise { const id = BigInt(data.entry_id); try { await this.prisma.walletLedgerEntryView.delete({ where: { id }, }); this.logger.log(`[Wallet-CDC] Deleted ledger entry: ${data.entry_id}`); } catch { this.logger.warn(`[Wallet-CDC] Ledger entry not found for delete: ${data.entry_id}`); } } // ==================== Helper Methods ==================== private async isEventProcessed(eventId: string): Promise { const count = await this.prisma.processedEvent.count({ where: { eventId }, }); return count > 0; } private async markEventProcessed(eventId: string, eventType: string): Promise { await this.prisma.processedEvent.create({ data: { eventId, eventType, processedAt: new Date(), }, }); } /** * 获取消费者状态 */ getStatus(): { isRunning: boolean; topics: string[]; consumerGroup: string } { return { isRunning: this.isRunning, topics: this.cdcTopics, consumerGroup: this.consumerGroup, }; } }