/** * Withdrawal Event Consumer Service for Blockchain Service * * Consumes withdrawal request events from wallet-service via Kafka. * Creates transaction requests for MPC signing and blockchain broadcasting. */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; export const WITHDRAWAL_TOPICS = { WITHDRAWAL_REQUESTED: 'wallet.withdrawals', SYSTEM_WITHDRAWAL_REQUESTED: 'wallet.system-withdrawals', } as const; export interface WithdrawalRequestedPayload { orderNo: string; accountSequence: string; userId: string; walletId: string; amount: string; fee: string; netAmount: string; assetType: string; chainType: string; toAddress: string; } export interface SystemWithdrawalRequestedPayload { orderNo: string; fromAccountSequence: string; fromAccountName: string; toAccountSequence: string; toAddress: string; amount: string; chainType: string; } export type WithdrawalEventHandler = (payload: WithdrawalRequestedPayload) => Promise; export type SystemWithdrawalEventHandler = (payload: SystemWithdrawalRequestedPayload) => Promise; @Injectable() export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(WithdrawalEventConsumerService.name); private kafka: Kafka; private consumer: Consumer; private isConnected = false; private withdrawalRequestedHandler?: WithdrawalEventHandler; private systemWithdrawalRequestedHandler?: SystemWithdrawalEventHandler; constructor(private readonly configService: ConfigService) {} async onModuleInit() { const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'blockchain-service'; const groupId = 'blockchain-service-withdrawal-events'; this.logger.log(`[INIT] Withdrawal Event Consumer initializing...`); this.logger.log(`[INIT] ClientId: ${clientId}`); this.logger.log(`[INIT] GroupId: ${groupId}`); this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); this.logger.log(`[INIT] Topics: ${Object.values(WITHDRAWAL_TOPICS).join(', ')}`); // 企业级重试配置:指数退避,最多重试约 2.5 小时 this.kafka = new Kafka({ clientId, brokers, logLevel: logLevel.WARN, retry: { initialRetryTime: 1000, // 1 秒 maxRetryTime: 300000, // 最大 5 分钟 retries: 15, // 最多 15 次 multiplier: 2, // 指数退避因子 restartOnFailure: async () => true, }, }); this.consumer = this.kafka.consumer({ groupId, sessionTimeout: 30000, heartbeatInterval: 3000, }); try { this.logger.log(`[CONNECT] Connecting Withdrawal Event consumer...`); await this.consumer.connect(); this.isConnected = true; this.logger.log(`[CONNECT] Withdrawal Event consumer connected successfully`); await this.consumer.subscribe({ topics: Object.values(WITHDRAWAL_TOPICS), fromBeginning: false, }); this.logger.log(`[SUBSCRIBE] Subscribed to withdrawal topics`); await this.startConsuming(); } catch (error) { this.logger.error(`[ERROR] Failed to connect Withdrawal Event consumer`, error); } } async onModuleDestroy() { if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('Withdrawal Event consumer disconnected'); } } /** * Register handler for withdrawal requested events */ onWithdrawalRequested(handler: WithdrawalEventHandler): void { this.withdrawalRequestedHandler = handler; this.logger.log(`[REGISTER] WithdrawalRequested handler registered`); } /** * Register handler for system withdrawal requested events */ onSystemWithdrawalRequested(handler: SystemWithdrawalEventHandler): void { this.systemWithdrawalRequestedHandler = handler; this.logger.log(`[REGISTER] SystemWithdrawalRequested handler registered`); } private async startConsuming(): Promise { await this.consumer.run({ eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { const offset = message.offset; this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); try { const value = message.value?.toString(); if (!value) { this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); return; } this.logger.log(`[RECEIVE] Raw message: ${value.substring(0, 500)}...`); const parsed = JSON.parse(value); const eventType = parsed.eventType; const payload = parsed.payload || parsed; this.logger.log(`[RECEIVE] Event type: ${eventType}`); if (eventType === 'wallet.withdrawal.requested') { this.logger.log(`[HANDLE] Processing WithdrawalRequested event`); this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`); this.logger.log(`[HANDLE] chainType: ${payload.chainType}`); this.logger.log(`[HANDLE] toAddress: ${payload.toAddress}`); this.logger.log(`[HANDLE] amount: ${payload.amount}`); if (this.withdrawalRequestedHandler) { await this.withdrawalRequestedHandler(payload as WithdrawalRequestedPayload); this.logger.log(`[HANDLE] WithdrawalRequested handler completed`); } else { this.logger.warn(`[HANDLE] No handler registered for WithdrawalRequested`); } } else if (eventType === 'wallet.system-withdrawal.requested') { this.logger.log(`[HANDLE] Processing SystemWithdrawalRequested event`); this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`); this.logger.log(`[HANDLE] fromAccountSequence: ${payload.fromAccountSequence}`); this.logger.log(`[HANDLE] toAccountSequence: ${payload.toAccountSequence}`); this.logger.log(`[HANDLE] toAddress: ${payload.toAddress}`); this.logger.log(`[HANDLE] amount: ${payload.amount}`); if (this.systemWithdrawalRequestedHandler) { await this.systemWithdrawalRequestedHandler(payload as SystemWithdrawalRequestedPayload); this.logger.log(`[HANDLE] SystemWithdrawalRequested handler completed`); } else { this.logger.warn(`[HANDLE] No handler registered for SystemWithdrawalRequested`); } } else { this.logger.warn(`[RECEIVE] Unknown event type: ${eventType}`); } } catch (error) { this.logger.error(`[ERROR] Error processing withdrawal event from ${topic}`, error); } }, }); this.logger.log(`[START] Started consuming withdrawal events`); } }