import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { ProcessedEventRepository } from '../persistence/repositories/processed-event.repository'; import { RedisService } from '../redis/redis.service'; import { Prisma, MarketMakerAssetType, MarketMakerDepositStatus } from '@prisma/client'; // 4小时 TTL(秒) const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; interface MarketMakerDepositConfirmedPayload { depositId: string; chainType: string; txHash: string; fromAddress: string; toAddress: string; assetType: 'EUSDT' | 'FUSDT'; tokenContract: string; amount: string; amountFormatted: string; confirmations: number; blockNumber: string; blockTimestamp: string; } /** * Market Maker Deposit Consumer Service * 监听 mining-blockchain-service 的做市商充值确认事件 * 当充值确认后,为做市商账户入账 */ @Injectable() export class MarketMakerDepositConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(MarketMakerDepositConsumerService.name); private kafka: Kafka; private consumer: Consumer; private isRunning = false; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, private readonly processedEventRepository: ProcessedEventRepository, private readonly redis: RedisService, ) { const brokers = this.configService .get('KAFKA_BROKERS', 'localhost:9092') .split(','); this.kafka = new Kafka({ clientId: 'trading-service-mm-deposit', brokers, }); this.consumer = this.kafka.consumer({ groupId: this.configService.get( 'MM_DEPOSIT_CONSUMER_GROUP', 'trading-service-mm-deposit-group', ), }); } async onModuleInit() { await this.start(); } async onModuleDestroy() { await this.stop(); } async start(): Promise { if (this.isRunning) { this.logger.warn('Market maker deposit consumer is already running'); return; } const topic = 'blockchain.market_maker.deposits'; try { await this.consumer.connect(); this.logger.log('Market maker deposit consumer connected'); await this.consumer.subscribe({ topics: [topic], fromBeginning: false, }); this.logger.log(`Subscribed to topic: ${topic}`); await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { await this.handleMessage(payload); }, }); this.isRunning = true; this.logger.log('Market maker deposit consumer started - listening for deposit confirmed events'); } catch (error) { this.logger.error('Failed to start market maker deposit consumer', error); } } async stop(): Promise { if (!this.isRunning) { return; } try { await this.consumer.disconnect(); this.isRunning = false; this.logger.log('Market maker deposit consumer stopped'); } catch (error) { this.logger.error('Failed to stop market maker deposit consumer', error); } } private async handleMessage(payload: EachMessagePayload): Promise { const { topic, message } = payload; try { if (!message.value) { return; } const eventData = JSON.parse(message.value.toString()); // 检查事件类型 if (eventData.eventType !== 'blockchain.market_maker.deposit.confirmed') { return; } const depositPayload = eventData.payload as MarketMakerDepositConfirmedPayload; await this.handleDepositConfirmed(eventData.eventId, depositPayload); } catch (error) { this.logger.error(`Error processing message from topic ${topic}`, error); } } /** * 处理做市商充值确认事件 */ private async handleDepositConfirmed( eventId: string, payload: MarketMakerDepositConfirmedPayload, ): Promise { this.logger.log( `Processing MarketMakerDepositConfirmed: ${payload.txHash}, amount: ${payload.amountFormatted} ${payload.assetType}`, ); // 幂等性检查 if (await this.isEventProcessed(eventId)) { this.logger.debug(`Event ${eventId} already processed, skipping`); return; } try { // 查找做市商配置(通过钱包地址) const marketMaker = await this.prisma.marketMakerConfig.findFirst({ where: { kavaWalletAddress: { equals: payload.toAddress, mode: 'insensitive', }, }, }); if (!marketMaker) { this.logger.error( `Market maker not found for wallet address: ${payload.toAddress}`, ); await this.markEventProcessed(eventId); return; } // 使用事务处理入账 await this.prisma.$transaction(async (tx) => { // 1. 创建或更新充值记录 const depositRecord = await tx.marketMakerDeposit.upsert({ where: { txHash: payload.txHash }, create: { marketMakerId: marketMaker.id, chainType: payload.chainType, txHash: payload.txHash, blockNumber: BigInt(payload.blockNumber), blockTimestamp: new Date(payload.blockTimestamp), assetType: payload.assetType as MarketMakerAssetType, tokenContract: payload.tokenContract, fromAddress: payload.fromAddress, toAddress: payload.toAddress, amount: new Prisma.Decimal(payload.amountFormatted), amountRaw: new Prisma.Decimal(payload.amount), confirmations: payload.confirmations, requiredConfirms: 12, status: MarketMakerDepositStatus.CONFIRMED, }, update: { confirmations: payload.confirmations, status: MarketMakerDepositStatus.CONFIRMED, }, }); // 检查是否已入账 if (depositRecord.status === MarketMakerDepositStatus.CREDITED) { this.logger.debug(`Deposit ${payload.txHash} already credited, skipping`); return; } // 2. 根据资产类型更新余额 const assetField = payload.assetType === 'EUSDT' ? 'shareBalance' : 'cashBalance'; const currentBalance = payload.assetType === 'EUSDT' ? marketMaker.shareBalance : marketMaker.cashBalance; const newBalance = currentBalance.add(new Prisma.Decimal(payload.amountFormatted)); // 更新做市商余额 await tx.marketMakerConfig.update({ where: { id: marketMaker.id }, data: { [assetField]: newBalance, }, }); // 3. 创建流水记录 const ledger = await tx.marketMakerLedger.create({ data: { marketMakerId: marketMaker.id, type: 'DEPOSIT', assetType: payload.assetType === 'EUSDT' ? 'SHARE' : 'CASH', amount: new Prisma.Decimal(payload.amountFormatted), balanceBefore: currentBalance, balanceAfter: newBalance, memo: `区块链充值: ${payload.txHash.slice(0, 10)}... (${payload.fromAddress.slice(0, 10)}...)`, }, }); // 4. 更新充值记录状态为已入账 await tx.marketMakerDeposit.update({ where: { id: depositRecord.id }, data: { status: MarketMakerDepositStatus.CREDITED, creditedAt: new Date(), creditedAmount: new Prisma.Decimal(payload.amountFormatted), ledgerId: ledger.id, }, }); this.logger.log( `[CREDITED] Market maker deposit: ${payload.txHash.slice(0, 10)}... | ` + `${payload.assetType}: ${payload.amountFormatted} | ` + `Balance: ${currentBalance} -> ${newBalance}`, ); }); // 标记为已处理 await this.markEventProcessed(eventId); } catch (error) { this.logger.error( `Failed to process deposit for ${payload.txHash}`, error instanceof Error ? error.stack : error, ); throw error; } } private async isEventProcessed(eventId: string): Promise { const redisKey = `trading:processed-event:mm-deposit:${eventId}`; const cached = await this.redis.get(redisKey); if (cached) return true; const dbRecord = await this.processedEventRepository.findByEventId(eventId); if (dbRecord) { await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); return true; } return false; } private async markEventProcessed(eventId: string): Promise { const redisKey = `trading:processed-event:mm-deposit:${eventId}`; try { await this.processedEventRepository.create({ eventId, eventType: 'MarketMakerDepositConfirmed', sourceService: 'mining-blockchain-service', }); } catch (error) { if (!(error instanceof Error && error.message.includes('Unique constraint'))) { throw error; } } await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); } }