/** * MPC Signing Client * * 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名 * 用于热钱包的 ERC20 转账签名 * * 事件流: * blockchain-service → Kafka(mpc.SigningRequested) → mpc-service * mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service */ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { randomUUID } from 'crypto'; import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; import { MpcEventConsumerService, SigningCompletedPayload, SessionFailedPayload, } from '@/infrastructure/kafka/mpc-event-consumer.service'; export interface CreateSigningInput { username: string; messageHash: string; } export interface SigningResult { sessionId: string; status: string; signature?: string; } // 签名结果回调 type SigningCallback = (signature: string | null, error?: string) => void; // MPC 签名请求 Topic export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested'; @Injectable() export class MpcSigningClient implements OnModuleInit { private readonly logger = new Logger(MpcSigningClient.name); // C2C Bot 热钱包 private readonly hotWalletUsername: string; private readonly hotWalletAddress: string; // eUSDT (积分股) 做市商钱包 private readonly eusdtMarketMakerUsername: string; private readonly eusdtMarketMakerAddress: string; // fUSDT (积分值) 做市商钱包 private readonly fusdtMarketMakerUsername: string; private readonly fusdtMarketMakerAddress: string; private readonly signingTimeoutMs: number = 300000; // 5 minutes // 待处理的签名请求回调 Map private pendingRequests: Map void; reject: (error: Error) => void; timeout: NodeJS.Timeout; }> = new Map(); constructor( private readonly configService: ConfigService, private readonly eventPublisher: EventPublisherService, private readonly mpcEventConsumer: MpcEventConsumerService, ) { // C2C Bot 热钱包配置 this.hotWalletUsername = this.configService.get('C2C_BOT_WALLET_USERNAME', ''); this.hotWalletAddress = this.configService.get('C2C_BOT_WALLET_ADDRESS', ''); // eUSDT (积分股) 做市商钱包配置 this.eusdtMarketMakerUsername = this.configService.get('EUSDT_MARKET_MAKER_USERNAME', ''); this.eusdtMarketMakerAddress = this.configService.get('EUSDT_MARKET_MAKER_ADDRESS', ''); // fUSDT (积分值) 做市商钱包配置 this.fusdtMarketMakerUsername = this.configService.get('FUSDT_MARKET_MAKER_USERNAME', ''); this.fusdtMarketMakerAddress = this.configService.get('FUSDT_MARKET_MAKER_ADDRESS', ''); if (!this.hotWalletUsername) { this.logger.warn('[INIT] C2C_BOT_WALLET_USERNAME not configured (C2C Bot disabled)'); } if (!this.hotWalletAddress) { this.logger.warn('[INIT] C2C_BOT_WALLET_ADDRESS not configured (C2C Bot disabled)'); } if (!this.eusdtMarketMakerUsername || !this.eusdtMarketMakerAddress) { this.logger.warn('[INIT] eUSDT Market Maker not configured'); } if (!this.fusdtMarketMakerUsername || !this.fusdtMarketMakerAddress) { this.logger.warn('[INIT] fUSDT Market Maker not configured'); } this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`); this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] Using Kafka event-driven signing`); } async onModuleInit() { // 注册签名完成和失败事件处理器 this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this)); this.logger.log('[INIT] MPC signing event handlers registered'); } /** * 检查 C2C Bot 热钱包是否已配置 */ isConfigured(): boolean { return !!this.hotWalletUsername && !!this.hotWalletAddress; } /** * 检查 eUSDT 做市商钱包是否已配置 */ isEusdtMarketMakerConfigured(): boolean { return !!this.eusdtMarketMakerUsername && !!this.eusdtMarketMakerAddress; } /** * 检查 fUSDT 做市商钱包是否已配置 */ isFusdtMarketMakerConfigured(): boolean { return !!this.fusdtMarketMakerUsername && !!this.fusdtMarketMakerAddress; } /** * 获取 C2C Bot 热钱包地址 */ getHotWalletAddress(): string { return this.hotWalletAddress; } /** * 获取 C2C Bot 热钱包用户名 */ getHotWalletUsername(): string { return this.hotWalletUsername; } /** * 获取 eUSDT 做市商钱包地址 */ getEusdtMarketMakerAddress(): string { return this.eusdtMarketMakerAddress; } /** * 获取 eUSDT 做市商 MPC 用户名 */ getEusdtMarketMakerUsername(): string { return this.eusdtMarketMakerUsername; } /** * 获取 fUSDT 做市商钱包地址 */ getFusdtMarketMakerAddress(): string { return this.fusdtMarketMakerAddress; } /** * 获取 fUSDT 做市商 MPC 用户名 */ getFusdtMarketMakerUsername(): string { return this.fusdtMarketMakerUsername; } /** * 签名消息(使用 C2C Bot 热钱包,通过 Kafka 事件驱动) * * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) */ async signMessage(messageHash: string): Promise { if (!this.hotWalletUsername) { throw new Error('Hot wallet username not configured'); } return this.signMessageWithUsername(this.hotWalletUsername, messageHash); } /** * 使用 eUSDT 做市商钱包签名消息 * * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) */ async signMessageAsEusdtMarketMaker(messageHash: string): Promise { if (!this.eusdtMarketMakerUsername) { throw new Error('eUSDT Market Maker MPC username not configured'); } return this.signMessageWithUsername(this.eusdtMarketMakerUsername, messageHash); } /** * 使用 fUSDT 做市商钱包签名消息 * * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) */ async signMessageAsFusdtMarketMaker(messageHash: string): Promise { if (!this.fusdtMarketMakerUsername) { throw new Error('fUSDT Market Maker MPC username not configured'); } return this.signMessageWithUsername(this.fusdtMarketMakerUsername, messageHash); } /** * 使用指定用户名签名消息(通过 Kafka 事件驱动) * * @param username MPC 用户名 * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) */ async signMessageWithUsername(username: string, messageHash: string): Promise { this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}... (username: ${username})`); if (!username) { throw new Error('MPC username not provided'); } const sessionId = randomUUID(); this.logger.log(`[SIGN] Session ID: ${sessionId}`); // 创建 Promise 等待签名结果 const signaturePromise = new Promise((resolve, reject) => { // 设置超时 const timeout = setTimeout(() => { this.pendingRequests.delete(sessionId); reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`)); }, this.signingTimeoutMs); // 保存到待处理队列 this.pendingRequests.set(sessionId, { resolve, reject, timeout }); }); // 发布签名请求事件到 Kafka try { await this.eventPublisher.publish({ eventType: 'mining_blockchain.mpc.signing.requested', toPayload: () => ({ sessionId, userId: 'system', username, messageHash, source: 'mining-blockchain-service', }), eventId: sessionId, occurredAt: new Date(), }); this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`); } catch (error) { // 发布失败,清理待处理队列 const pending = this.pendingRequests.get(sessionId); if (pending) { clearTimeout(pending.timeout); this.pendingRequests.delete(sessionId); } throw error; } // 等待签名结果 const signature = await signaturePromise; this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`); return signature; } /** * 处理签名完成事件 */ private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { const sessionId = payload.sessionId; this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`); const pending = this.pendingRequests.get(sessionId); if (pending) { clearTimeout(pending.timeout); this.pendingRequests.delete(sessionId); if (payload.signature) { pending.resolve(payload.signature); } else { pending.reject(new Error('Signing completed but no signature returned')); } } else { this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`); } } /** * 处理签名失败事件 */ private async handleSigningFailed(payload: SessionFailedPayload): Promise { const sessionId = payload.sessionId; this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`); const pending = this.pendingRequests.get(sessionId); if (pending) { clearTimeout(pending.timeout); this.pendingRequests.delete(sessionId); pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`)); } } }