From ec73541fe163dbcf70c6203eccfb1d482f3aae31 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 1 Feb 2026 20:53:35 -0800 Subject: [PATCH] =?UTF-8?q?refactor(mining-blockchain):=20=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=20mpc-service=20=E4=BE=9D=E8=B5=96=EF=BC=8C=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=20HTTP=20=E7=9B=B4=E8=B0=83=20mpc-system?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将 mining-blockchain-service 的 MPC 签名通信从 Kafka 事件驱动(经由 mpc-service 中转) 改为 HTTP 直接调用 mpc-system 的 account-service (port 4000)。 ## 核心变更 ### mpc-signing.client.ts (重写) - 移除 EventPublisherService、MpcEventConsumerService 依赖和 pendingRequests Map - 移除 OnModuleInit 中的 Kafka 事件注册 - 新增 HttpService (@nestjs/axios) + JwtService (@nestjs/jwt) 依赖注入 - 签名流程改为: 1. POST /api/v1/mpc/sign → 创建签名会话 (snake_case: username, message_hash) 2. GET /api/v1/mpc/sessions/{session_id} → 轮询结果 (每 2s, 最多 5 分钟) - JWT 认证: 使用 MPC_JWT_SECRET (HS256) 生成 Bearer token,匹配 mpc-system 格式 - 所有公共接口不变 (signMessage, signMessageAsXxxMarketMaker, isConfigured, getXxxAddress 等) ### 删除的文件 - mpc-event-consumer.service.ts: Kafka MPC 事件消费者 (SigningCompleted/SessionFailed/KeygenCompleted) - mpc-keygen-completed.handler.ts: Keygen 地址派生处理器 (不再由此服务处理) ### 模块更新 - infrastructure.module.ts: 移除 MpcEventConsumerService,新增 JwtModule.register({}) - kafka/index.ts: 移除 mpc-event-consumer.service 导出 - event-handlers/index.ts: 移除 mpc-keygen-completed.handler 导出 ### 部署配置 - docker-compose.2.0.yml: 新增 MPC_ACCOUNT_SERVICE_URL 和 MPC_JWT_SECRET 环境变量 - deploy-mining.sh: standalone 模式新增 export MPC_ACCOUNT_SERVICE_URL (默认 http://192.168.1.111:4000) ## 不受影响的部分 - Erc20TransferService / MpcTransferInitializerService 调用方式不变 - EventPublisherService (用于其他事件) 不变 - WithdrawalEventConsumerService 不变 - mpc-system 本身零修改 Co-Authored-By: Claude Opus 4.5 --- backend/services/deploy-mining.sh | 2 + backend/services/docker-compose.2.0.yml | 5 +- .../src/application/event-handlers/index.ts | 1 - .../mpc-keygen-completed.handler.ts | 74 ------ .../infrastructure/infrastructure.module.ts | 7 +- .../src/infrastructure/kafka/index.ts | 1 - .../kafka/mpc-event-consumer.service.ts | 247 ------------------ .../infrastructure/mpc/mpc-signing.client.ts | 223 +++++++++------- 8 files changed, 131 insertions(+), 429 deletions(-) delete mode 100644 backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts delete mode 100644 backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 1a591863..15103a3d 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -271,6 +271,8 @@ load_env() { export KAFKA_BROKERS="${KAFKA_BROKERS:-192.168.1.111:9093}" export REDIS_HOST="${REDIS_HOST:-redis-2}" export RWA_NETWORK_NAME="rwa-2-network" + # MPC system 在 1.0 服务器上 + export MPC_ACCOUNT_SERVICE_URL="${MPC_ACCOUNT_SERVICE_URL:-http://192.168.1.111:4000}" POSTGRES_CONTAINER="${POSTGRES_CONTAINER:-rwa-postgres-2}" KAFKA_CONTAINER="" # Kafka is remote, no local container diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index 92a89873..68492b7a 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -418,10 +418,13 @@ services: REDIS_PORT: ${REDIS_PORT:-6379} REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 8 - # Kafka - 用于 MPC 签名通信和事件发布 + # Kafka - 用于事件发布 KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092} # JWT 配置 JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} + # MPC system 直调配置 (account-service port 4000) + MPC_ACCOUNT_SERVICE_URL: ${MPC_ACCOUNT_SERVICE_URL:-http://localhost:4000} + MPC_JWT_SECRET: ${MPC_JWT_SECRET:-} # 区块链配置 NETWORK_MODE: ${NETWORK_MODE:-mainnet} # KAVA 配置 diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts index 7231998d..957ef247 100644 --- a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts +++ b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts @@ -1,2 +1 @@ -export * from './mpc-keygen-completed.handler'; export * from './withdrawal-requested.handler'; diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts deleted file mode 100644 index 201f444c..00000000 --- a/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { AddressDerivationService } from '../services/address-derivation.service'; -import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service'; - -/** - * MPC 密钥生成完成事件处理器 - * - * 监听 mpc.KeygenCompleted 事件,从公钥派生多链钱包地址, - * 并发布 blockchain.WalletAddressCreated 事件通知 identity-service - */ -@Injectable() -export class MpcKeygenCompletedHandler implements OnModuleInit { - private readonly logger = new Logger(MpcKeygenCompletedHandler.name); - - constructor( - private readonly addressDerivationService: AddressDerivationService, - private readonly mpcEventConsumer: MpcEventConsumerService, - ) {} - - onModuleInit() { - // Register handler for keygen completed events - this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); - this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`); - } - - /** - * 处理 MPC 密钥生成完成事件 - * 从 mpc-service 的 KeygenCompleted 事件中提取 publicKey、userId 和 accountSequence - */ - private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { - this.logger.log(`[HANDLE] Received KeygenCompleted event`); - this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`); - this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); - this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`); - - // Extract userId and accountSequence from extraPayload - const userId = payload.extraPayload?.userId; - const accountSequence = payload.extraPayload?.accountSequence; - - if (!userId) { - this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`); - return; - } - - if (!accountSequence) { - this.logger.error(`[ERROR] Missing accountSequence in extraPayload, cannot derive addresses`); - return; - } - - const publicKey = payload.publicKey; - if (!publicKey) { - this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`); - return; - } - - try { - this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}, account: ${accountSequence}`); - - const result = await this.addressDerivationService.deriveAndRegister({ - userId: BigInt(userId), - accountSequence: accountSequence, - publicKey, - }); - - this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for account ${accountSequence}`); - result.addresses.forEach((addr) => { - this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`); - }); - } catch (error) { - this.logger.error(`[ERROR] Failed to derive addresses for account ${accountSequence}:`, error); - throw error; - } - } -} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts index f8d4403d..6112e503 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts @@ -1,8 +1,9 @@ import { Global, Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; +import { JwtModule } from '@nestjs/jwt'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService, AddressCacheService } from './redis'; -import { EventPublisherService, MpcEventConsumerService, WithdrawalEventConsumerService } from './kafka'; +import { EventPublisherService, WithdrawalEventConsumerService } from './kafka'; import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain'; import { MpcSigningClient } from './mpc'; import { DomainModule } from '@/domain/domain.module'; @@ -27,13 +28,12 @@ import { @Global() @Module({ - imports: [DomainModule, HttpModule], + imports: [DomainModule, HttpModule, JwtModule.register({})], providers: [ // 核心服务 PrismaService, RedisService, EventPublisherService, - MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, @@ -81,7 +81,6 @@ import { PrismaService, RedisService, EventPublisherService, - MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, EvmProviderAdapter, diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts index e78978c3..df1e0281 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts @@ -1,5 +1,4 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; -export * from './mpc-event-consumer.service'; export * from './withdrawal-event-consumer.service'; export * from './deposit-ack-consumer.service'; diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts deleted file mode 100644 index c873f4ca..00000000 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ /dev/null @@ -1,247 +0,0 @@ -/** - * MPC Event Consumer Service for Blockchain Service - * - * Consumes MPC events from mpc-service via Kafka: - * - KeygenCompleted: derives wallet addresses from public keys - * - SigningCompleted: returns signature for hot wallet transfers - * - SessionFailed: handles keygen/signing failures - */ - -import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; -import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; - -// MPC Event Topics (events from mpc-service) -export const MPC_TOPICS = { - KEYGEN_COMPLETED: 'mining_mpc.KeygenCompleted', - SIGNING_COMPLETED: 'mining_mpc.SigningCompleted', - SESSION_FAILED: 'mining_mpc.SessionFailed', -} as const; - -export interface KeygenCompletedPayload { - sessionId: string; - partyId: string; - publicKey: string; - shareId: string; - threshold: string; - extraPayload?: { - userId: string; - accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号) - username: string; - delegateShare?: { - partyId: string; - partyIndex: number; - encryptedShare: string; - }; - serverParties?: string[]; - }; -} - -export interface SigningCompletedPayload { - sessionId: string; - partyId: string; - messageHash: string; - signature: string; - publicKey: string; - extraPayload?: { - userId: string; - username: string; - mpcSessionId: string; - source?: string; // 'blockchain-service' | 'identity-service' - }; -} - -export interface SessionFailedPayload { - sessionId: string; - partyId: string; - sessionType: string; // 'keygen' | 'sign' - errorMessage: string; - errorCode?: string; - extraPayload?: { - userId: string; - username: string; - source?: string; - }; -} - -export type MpcEventHandler = (payload: T) => Promise; - -@Injectable() -export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { - private readonly logger = new Logger(MpcEventConsumerService.name); - private kafka: Kafka; - private consumer: Consumer; - private isConnected = false; - - private keygenCompletedHandler?: MpcEventHandler; - private signingCompletedHandler?: MpcEventHandler; - private sessionFailedHandler?: MpcEventHandler; - private signingFailedHandler?: MpcEventHandler; - - constructor(private readonly configService: ConfigService) {} - - async onModuleInit() { - const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; - const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; - const groupId = 'mining-blockchain-service-mpc-events'; - - this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); - this.logger.log(`[INIT] ClientId: ${clientId}`); - this.logger.log(`[INIT] GroupId: ${groupId}`); - this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); - this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`); - - // 企业级重试配置:指数退避,最多重试约 2.5 小时 - this.kafka = new Kafka({ - clientId, - brokers, - logLevel: logLevel.WARN, - retry: { - initialRetryTime: 1000, // 1 秒 - maxRetryTime: 300000, // 最大 5 分钟 - retries: 15, // 最多 15 次 - multiplier: 2, // 指数退避因子 - restartOnFailure: async () => true, - }, - }); - - this.consumer = this.kafka.consumer({ - groupId, - sessionTimeout: 30000, - heartbeatInterval: 3000, - }); - - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); - - // Subscribe to MPC topics - await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); - this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); - - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); - } - } - - async onModuleDestroy() { - if (this.isConnected) { - await this.consumer.disconnect(); - this.logger.log('MPC Event Kafka consumer disconnected'); - } - } - - /** - * Register handler for keygen completed events - */ - onKeygenCompleted(handler: MpcEventHandler): void { - this.keygenCompletedHandler = handler; - this.logger.log(`[REGISTER] KeygenCompleted handler registered`); - } - - /** - * Register handler for signing completed events - */ - onSigningCompleted(handler: MpcEventHandler): void { - this.signingCompletedHandler = handler; - this.logger.log(`[REGISTER] SigningCompleted handler registered`); - } - - /** - * Register handler for session failed events (keygen) - */ - onSessionFailed(handler: MpcEventHandler): void { - this.sessionFailedHandler = handler; - this.logger.log(`[REGISTER] SessionFailed handler registered`); - } - - /** - * Register handler for signing failed events - */ - onSigningFailed(handler: MpcEventHandler): void { - this.signingFailedHandler = handler; - this.logger.log(`[REGISTER] SigningFailed handler registered`); - } - - private async startConsuming(): Promise { - await this.consumer.run({ - eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { - const offset = message.offset; - this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); - - try { - const value = message.value?.toString(); - if (!value) { - this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); - return; - } - - this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); - - const parsed = JSON.parse(value); - const payload = parsed.payload || parsed; - - this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); - this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); - - switch (topic) { - case MPC_TOPICS.KEYGEN_COMPLETED: - this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`); - this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`); - this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`); - if (this.keygenCompletedHandler) { - await this.keygenCompletedHandler(payload as KeygenCompletedPayload); - this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`); - } else { - this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`); - } - break; - - case MPC_TOPICS.SIGNING_COMPLETED: - this.logger.log(`[HANDLE] Processing SigningCompleted event`); - this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`); - this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`); - if (this.signingCompletedHandler) { - await this.signingCompletedHandler(payload as SigningCompletedPayload); - this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`); - } else { - this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`); - } - break; - - case MPC_TOPICS.SESSION_FAILED: - this.logger.log(`[HANDLE] Processing SessionFailed event`); - this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); - this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); - - const failedPayload = payload as SessionFailedPayload; - // Route to appropriate handler based on session type - if (failedPayload.sessionType === 'sign') { - if (this.signingFailedHandler) { - await this.signingFailedHandler(failedPayload); - this.logger.log(`[HANDLE] SigningFailed handler completed`); - } - } else { - if (this.sessionFailedHandler) { - await this.sessionFailedHandler(failedPayload); - this.logger.log(`[HANDLE] SessionFailed handler completed`); - } - } - break; - - default: - this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`); - } - } catch (error) { - this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); - } - }, - }); - - this.logger.log(`[START] Started consuming MPC events for address derivation`); - } -} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts index 796f6907..9022a4d1 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts @@ -1,23 +1,20 @@ /** * MPC Signing Client * - * 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名 - * 用于热钱包的 ERC20 转账签名 + * 直接调用 mpc-system 的 account-service (port 4000) 进行 MPC 签名 + * 用于热钱包和做市商钱包的 ERC20 转账签名 * - * 事件流: - * blockchain-service → Kafka(mpc.SigningRequested) → mpc-service - * mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service + * 签名流程: + * 1. POST /api/v1/mpc/sign → 创建签名会话 + * 2. GET /api/v1/mpc/sessions/{session_id} → 轮询签名结果 */ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import { HttpService } from '@nestjs/axios'; +import { JwtService } from '@nestjs/jwt'; import { randomUUID } from 'crypto'; -import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; -import { - MpcEventConsumerService, - SigningCompletedPayload, - SessionFailedPayload, -} from '@/infrastructure/kafka/mpc-event-consumer.service'; +import { firstValueFrom } from 'rxjs'; export interface CreateSigningInput { username: string; @@ -30,14 +27,11 @@ export interface SigningResult { signature?: string; } -// 签名结果回调 -type SigningCallback = (signature: string | null, error?: string) => void; - -// MPC 签名请求 Topic +// MPC 签名请求 Topic (保留导出以避免外部引用报错) export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested'; @Injectable() -export class MpcSigningClient implements OnModuleInit { +export class MpcSigningClient { private readonly logger = new Logger(MpcSigningClient.name); // C2C Bot 热钱包 private readonly hotWalletUsername: string; @@ -48,19 +42,16 @@ export class MpcSigningClient implements OnModuleInit { // fUSDT (积分值) 做市商钱包 private readonly fusdtMarketMakerUsername: string; private readonly fusdtMarketMakerAddress: string; + // MPC system 配置 + private readonly mpcAccountServiceUrl: string; + private readonly mpcJwtSecret: string; private readonly signingTimeoutMs: number = 300000; // 5 minutes - - // 待处理的签名请求回调 Map - private pendingRequests: Map void; - reject: (error: Error) => void; - timeout: NodeJS.Timeout; - }> = new Map(); + private readonly pollingIntervalMs: number = 2000; // 2 seconds constructor( private readonly configService: ConfigService, - private readonly eventPublisher: EventPublisherService, - private readonly mpcEventConsumer: MpcEventConsumerService, + private readonly httpService: HttpService, + private readonly jwtService: JwtService, ) { // C2C Bot 热钱包配置 this.hotWalletUsername = this.configService.get('C2C_BOT_WALLET_USERNAME', ''); @@ -71,6 +62,9 @@ export class MpcSigningClient implements OnModuleInit { // fUSDT (积分值) 做市商钱包配置 this.fusdtMarketMakerUsername = this.configService.get('FUSDT_MARKET_MAKER_USERNAME', ''); this.fusdtMarketMakerAddress = this.configService.get('FUSDT_MARKET_MAKER_ADDRESS', ''); + // MPC system 配置 + this.mpcAccountServiceUrl = this.configService.get('MPC_ACCOUNT_SERVICE_URL', 'http://localhost:4000'); + this.mpcJwtSecret = this.configService.get('MPC_JWT_SECRET', ''); if (!this.hotWalletUsername) { this.logger.warn('[INIT] C2C_BOT_WALLET_USERNAME not configured (C2C Bot disabled)'); @@ -84,18 +78,15 @@ export class MpcSigningClient implements OnModuleInit { if (!this.fusdtMarketMakerUsername || !this.fusdtMarketMakerAddress) { this.logger.warn('[INIT] fUSDT Market Maker not configured'); } + if (!this.mpcJwtSecret) { + this.logger.warn('[INIT] MPC_JWT_SECRET not configured - signing will fail'); + } this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`); this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`); - this.logger.log(`[INIT] Using Kafka event-driven signing`); - } - - async onModuleInit() { - // 注册签名完成和失败事件处理器 - this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); - this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this)); - this.logger.log('[INIT] MPC signing event handlers registered'); + this.logger.log(`[INIT] MPC Account Service: ${this.mpcAccountServiceUrl}`); + this.logger.log(`[INIT] Using HTTP direct call to mpc-system`); } /** @@ -162,7 +153,7 @@ export class MpcSigningClient implements OnModuleInit { } /** - * 签名消息(使用 C2C Bot 热钱包,通过 Kafka 事件驱动) + * 签名消息(使用 C2C Bot 热钱包) * * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) @@ -201,7 +192,7 @@ export class MpcSigningClient implements OnModuleInit { } /** - * 使用指定用户名签名消息(通过 Kafka 事件驱动) + * 使用指定用户名签名消息(HTTP 直调 mpc-system) * * @param username MPC 用户名 * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) @@ -214,87 +205,117 @@ export class MpcSigningClient implements OnModuleInit { throw new Error('MPC username not provided'); } - const sessionId = randomUUID(); - this.logger.log(`[SIGN] Session ID: ${sessionId}`); - - // 创建 Promise 等待签名结果 - const signaturePromise = new Promise((resolve, reject) => { - // 设置超时 - const timeout = setTimeout(() => { - this.pendingRequests.delete(sessionId); - reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`)); - }, this.signingTimeoutMs); - - // 保存到待处理队列 - this.pendingRequests.set(sessionId, { resolve, reject, timeout }); - }); - - // 发布签名请求事件到 Kafka - try { - await this.eventPublisher.publish({ - eventType: 'mining_blockchain.mpc.signing.requested', - toPayload: () => ({ - sessionId, - userId: 'system', - username, - messageHash, - source: 'mining-blockchain-service', - }), - eventId: sessionId, - occurredAt: new Date(), - }); - - this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`); - } catch (error) { - // 发布失败,清理待处理队列 - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); - } - throw error; + if (!this.mpcJwtSecret) { + throw new Error('MPC_JWT_SECRET not configured'); } - // 等待签名结果 - const signature = await signaturePromise; + // Step 1: 创建签名会话 + const createUrl = `${this.mpcAccountServiceUrl}/api/v1/mpc/sign`; + const headers = this.getMpcAuthHeaders(); + + this.logger.log(`[SIGN] POST ${createUrl}`); + + const createResponse = await firstValueFrom( + this.httpService.post<{ + session_id: string; + status: string; + session_type?: string; + username?: string; + message_hash?: string; + }>( + createUrl, + { username, message_hash: messageHash }, + { headers, timeout: 30000 }, + ), + ); + + const sessionId = createResponse.data.session_id; + this.logger.log(`[SIGN] Session created: ${sessionId}, status: ${createResponse.data.status}`); + + // Step 2: 轮询签名结果 + const signature = await this.pollSigningStatus(sessionId); this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`); return signature; } /** - * 处理签名完成事件 + * 轮询签名会话状态直到完成或超时 */ - private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { - const sessionId = payload.sessionId; - this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`); + private async pollSigningStatus(sessionId: string): Promise { + const statusUrl = `${this.mpcAccountServiceUrl}/api/v1/mpc/sessions/${sessionId}`; + const maxAttempts = Math.ceil(this.signingTimeoutMs / this.pollingIntervalMs); - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + await this.delay(this.pollingIntervalMs); - if (payload.signature) { - pending.resolve(payload.signature); - } else { - pending.reject(new Error('Signing completed but no signature returned')); + const response = await firstValueFrom( + this.httpService.get<{ + session_id: string; + status: string; + session_type?: string; + completed_parties?: number; + total_parties?: number; + signature?: string; + }>(statusUrl, { + headers: this.getMpcAuthHeaders(), + timeout: 10000, + }), + ); + + const { status, signature } = response.data; + + if (attempt % 5 === 0 || status !== 'pending') { + this.logger.log(`[POLL] Attempt ${attempt}/${maxAttempts}: status=${status}`); + } + + if (status === 'completed') { + if (!signature) { + throw new Error('Signing completed but no signature returned'); + } + return signature; + } + + if (status === 'failed' || status === 'expired') { + throw new Error(`MPC signing ${status}: sessionId=${sessionId}`); } - } else { - this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`); } + + throw new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`); } /** - * 处理签名失败事件 + * 生成 mpc-system 认证 JWT token */ - private async handleSigningFailed(payload: SessionFailedPayload): Promise { - const sessionId = payload.sessionId; - this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`); + private generateMpcAccessToken(): string { + const now = Math.floor(Date.now() / 1000); + const payload = { + jti: randomUUID(), + iss: 'mining-blockchain-service', + sub: 'system', + username: 'mining-blockchain-service', + token_type: 'access', + iat: now, + nbf: now, + exp: now + 24 * 60 * 60, + }; + return this.jwtService.sign(payload, { + secret: this.mpcJwtSecret, + algorithm: 'HS256' as const, + }); + } - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); - pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`)); - } + /** + * 获取 mpc-system 认证请求头 + */ + private getMpcAuthHeaders(): Record { + const token = this.generateMpcAccessToken(); + return { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}`, + }; + } + + private delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); } }