/** * Adoption fUSDT Injection Handler * * When an adoption reaches MINING_ENABLED status, this handler: * 1. Receives the AdoptionFusdtInjectionRequested event via Kafka * 2. Creates a PENDING ledger record in adoption_injection_records * 3. Transfers fUSDT from injection wallet to fUSDT market maker wallet * 4. Updates ledger record with tx result (CONFIRMED or FAILED) * 5. Publishes confirmation/failure Kafka event * * Amount = treeCount x 5760 * Ledger memo includes: user ID, adoption quantity, adoption time, transfer amount */ import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common'; import { AdoptionInjectionConsumerService, AdoptionInjectionPayload, } from '@/infrastructure/kafka/adoption-injection-consumer.service'; import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; import { Erc20TransferService } from '@/domain/services/erc20-transfer.service'; import { MpcSigningClient } from '@/infrastructure/mpc/mpc-signing.client'; import { ChainTypeEnum } from '@/domain/enums'; import { ADOPTION_INJECTION_RECORD_REPOSITORY, IAdoptionInjectionRecordRepository, } from '@/domain/repositories'; @Injectable() export class AdoptionInjectionHandler implements OnModuleInit { private readonly logger = new Logger(AdoptionInjectionHandler.name); constructor( private readonly injectionConsumer: AdoptionInjectionConsumerService, private readonly eventPublisher: EventPublisherService, private readonly transferService: Erc20TransferService, private readonly mpcSigningClient: MpcSigningClient, @Inject(ADOPTION_INJECTION_RECORD_REPOSITORY) private readonly injectionRepo: IAdoptionInjectionRecordRepository, ) {} onModuleInit() { if (!this.injectionConsumer.isEnabled()) { this.logger.warn(`[INIT] Adoption Injection Consumer 未启用,Handler 不注册`); return; } this.injectionConsumer.onAdoptionInjectionRequested( this.handleInjection.bind(this), ); this.logger.log(`[INIT] AdoptionInjectionHandler registered`); } private async handleInjection(payload: AdoptionInjectionPayload): Promise { this.logger.log(`[HANDLE] ========== Adoption fUSDT Injection ==========`); this.logger.log(`[HANDLE] adoptionId: ${payload.originalAdoptionId}`); this.logger.log(`[HANDLE] accountSequence: ${payload.accountSequence}`); this.logger.log(`[HANDLE] treeCount: ${payload.treeCount}`); this.logger.log(`[HANDLE] adoptionDate: ${payload.adoptionDate}`); this.logger.log(`[HANDLE] amount: ${payload.amount} fUSDT`); const chainType = ChainTypeEnum.KAVA; // Step 1: 幂等性检查 - 是否已处理过此认种 const existing = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId); if (existing && existing.status === 'CONFIRMED') { this.logger.warn(`[HANDLE] Adoption ${payload.originalAdoptionId} already injected (txHash=${existing.txHash}), skipping`); return; } try { // Step 2: Check injection wallet is configured if (!this.transferService.isFusdtInjectionConfigured(chainType)) { throw new Error('fUSDT Injection wallet not configured'); } // Step 3: Get addresses const injectionAddress = this.transferService.getFusdtInjectionAddress(); const marketMakerAddress = this.mpcSigningClient.getFusdtMarketMakerAddress(); if (!injectionAddress || !marketMakerAddress) { throw new Error('fUSDT Injection or Market Maker address not configured'); } // Step 4: Build ledger memo const memo = [ `认种注入(Adoption Injection)`, `用户: ${payload.accountSequence}`, `认种ID: ${payload.originalAdoptionId}`, `认种数量: ${payload.treeCount}`, `认种时间: ${payload.adoptionDate}`, `转账金额: ${payload.amount} fUSDT`, `单价: ${payload.fusdtPerTree} fUSDT/棵`, ].join(' | '); this.logger.log(`[HANDLE] Memo: ${memo}`); // Step 5: 写入分类账 - PENDING 状态(或更新已有的 FAILED 记录) let ledgerRecord = existing; if (!ledgerRecord) { ledgerRecord = await this.injectionRepo.save({ adoptionId: payload.originalAdoptionId, accountSequence: payload.accountSequence, treeCount: payload.treeCount, adoptionDate: new Date(payload.adoptionDate), fromAddress: injectionAddress, toAddress: marketMakerAddress, amount: payload.amount, chainType: 'KAVA', memo, status: 'PROCESSING', }); this.logger.log(`[LEDGER] Created injection record id=${ledgerRecord.id}, status=PROCESSING`); } else { // 重试之前失败的记录 ledgerRecord = await this.injectionRepo.save({ ...ledgerRecord, status: 'PROCESSING', errorMessage: null, }); this.logger.log(`[LEDGER] Retrying failed injection record id=${ledgerRecord.id}`); } // Step 6: Execute fUSDT transfer this.logger.log(`[PROCESS] Transferring ${payload.amount} fUSDT to market maker ${marketMakerAddress}`); const result = await this.transferService.transferFusdtAsInjectionWallet( chainType, marketMakerAddress, payload.amount, memo, ); if (result.success && result.txHash) { this.logger.log(`[SUCCESS] Adoption injection transfer confirmed!`); this.logger.log(`[SUCCESS] TxHash: ${result.txHash}`); this.logger.log(`[SUCCESS] Block: ${result.blockNumber}`); // Step 7a: 更新分类账 - CONFIRMED await this.injectionRepo.markConfirmed( ledgerRecord.id!, result.txHash, BigInt(result.blockNumber || 0), result.gasUsed || '0', ); this.logger.log(`[LEDGER] Injection record id=${ledgerRecord.id} marked CONFIRMED`); // Publish success event await this.eventPublisher.publish({ eventType: 'mining_blockchain.adoption_injection.confirmed', toPayload: () => ({ originalAdoptionId: payload.originalAdoptionId, accountSequence: payload.accountSequence, treeCount: payload.treeCount, adoptionDate: payload.adoptionDate, amount: payload.amount, txHash: result.txHash, blockNumber: result.blockNumber, memo, }), eventId: `adoption-injection-confirmed-${payload.originalAdoptionId}-${Date.now()}`, occurredAt: new Date(), }); } else { throw new Error(result.error || 'Injection transfer failed'); } } catch (error) { this.logger.error( `[ERROR] Adoption injection transfer failed for adoptionId=${payload.originalAdoptionId}`, error, ); // Step 7b: 更新分类账 - FAILED const errorMsg = error instanceof Error ? error.message : 'Unknown error'; if (existing?.id || (await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId))?.id) { const record = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId); if (record?.id) { await this.injectionRepo.markFailed(record.id, errorMsg); this.logger.log(`[LEDGER] Injection record id=${record.id} marked FAILED: ${errorMsg}`); } } // Publish failure event await this.eventPublisher.publish({ eventType: 'mining_blockchain.adoption_injection.failed', toPayload: () => ({ originalAdoptionId: payload.originalAdoptionId, accountSequence: payload.accountSequence, treeCount: payload.treeCount, amount: payload.amount, error: errorMsg, }), eventId: `adoption-injection-failed-${payload.originalAdoptionId}-${Date.now()}`, occurredAt: new Date(), }); throw error; } } }