diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 7c0dc6d1..1a591863 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -265,12 +265,11 @@ load_env() { # Mode-specific configuration if [ "$DEPLOY_MODE" = "standalone" ]; then - # Standalone: local PostgreSQL/Redis, remote Kafka/MPC + # Standalone: local PostgreSQL/Redis, remote Kafka export POSTGRES_HOST="${POSTGRES_HOST:-postgres-2}" export POSTGRES_PORT="${POSTGRES_PORT:-5432}" export KAFKA_BROKERS="${KAFKA_BROKERS:-192.168.1.111:9093}" export REDIS_HOST="${REDIS_HOST:-redis-2}" - export MPC_SERVICE_URL="${MPC_SERVICE_URL:-http://192.168.1.111:3006}" export RWA_NETWORK_NAME="rwa-2-network" POSTGRES_CONTAINER="${POSTGRES_CONTAINER:-rwa-postgres-2}" @@ -1930,7 +1929,6 @@ show_help() { echo "" echo -e "${BOLD}Environment Variables (standalone mode):${NC}" echo " KAFKA_BROKERS Remote Kafka address (default: 192.168.1.111:9093)" - echo " MPC_SERVICE_URL Remote MPC service (default: http://192.168.1.111:3006)" echo " POSTGRES_HOST PostgreSQL host (default: postgres-2)" echo " REDIS_HOST Redis host (default: redis-2)" echo " DEBEZIUM_CONNECT_URL Debezium REST API (default: http://localhost:8084)" diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index 07bb6c78..92a89873 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -418,10 +418,8 @@ services: REDIS_PORT: ${REDIS_PORT:-6379} REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 8 - # Kafka - 用于事件发布 + # Kafka - 用于 MPC 签名通信和事件发布 KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092} - # MPC Service - 直接 HTTP 调用签名 - MPC_SERVICE_URL: ${MPC_SERVICE_URL:-http://mpc-service:3006} # JWT 配置 JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} # 区块链配置 diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts index 957ef247..7231998d 100644 --- a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts +++ b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts @@ -1 +1,2 @@ +export * from './mpc-keygen-completed.handler'; export * from './withdrawal-requested.handler'; diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts new file mode 100644 index 00000000..201f444c --- /dev/null +++ b/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts @@ -0,0 +1,74 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { AddressDerivationService } from '../services/address-derivation.service'; +import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service'; + +/** + * MPC 密钥生成完成事件处理器 + * + * 监听 mpc.KeygenCompleted 事件,从公钥派生多链钱包地址, + * 并发布 blockchain.WalletAddressCreated 事件通知 identity-service + */ +@Injectable() +export class MpcKeygenCompletedHandler implements OnModuleInit { + private readonly logger = new Logger(MpcKeygenCompletedHandler.name); + + constructor( + private readonly addressDerivationService: AddressDerivationService, + private readonly mpcEventConsumer: MpcEventConsumerService, + ) {} + + onModuleInit() { + // Register handler for keygen completed events + this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); + this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`); + } + + /** + * 处理 MPC 密钥生成完成事件 + * 从 mpc-service 的 KeygenCompleted 事件中提取 publicKey、userId 和 accountSequence + */ + private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { + this.logger.log(`[HANDLE] Received KeygenCompleted event`); + this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`); + this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); + this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`); + + // Extract userId and accountSequence from extraPayload + const userId = payload.extraPayload?.userId; + const accountSequence = payload.extraPayload?.accountSequence; + + if (!userId) { + this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`); + return; + } + + if (!accountSequence) { + this.logger.error(`[ERROR] Missing accountSequence in extraPayload, cannot derive addresses`); + return; + } + + const publicKey = payload.publicKey; + if (!publicKey) { + this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`); + return; + } + + try { + this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}, account: ${accountSequence}`); + + const result = await this.addressDerivationService.deriveAndRegister({ + userId: BigInt(userId), + accountSequence: accountSequence, + publicKey, + }); + + this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for account ${accountSequence}`); + result.addresses.forEach((addr) => { + this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`); + }); + } catch (error) { + this.logger.error(`[ERROR] Failed to derive addresses for account ${accountSequence}:`, error); + throw error; + } + } +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts index aef33fb0..f8d4403d 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts @@ -2,7 +2,7 @@ import { Global, Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService, AddressCacheService } from './redis'; -import { EventPublisherService, WithdrawalEventConsumerService } from './kafka'; +import { EventPublisherService, MpcEventConsumerService, WithdrawalEventConsumerService } from './kafka'; import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain'; import { MpcSigningClient } from './mpc'; import { DomainModule } from '@/domain/domain.module'; @@ -33,6 +33,7 @@ import { PrismaService, RedisService, EventPublisherService, + MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, @@ -80,6 +81,7 @@ import { PrismaService, RedisService, EventPublisherService, + MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, EvmProviderAdapter, diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts index df1e0281..e78978c3 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts @@ -1,4 +1,5 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; +export * from './mpc-event-consumer.service'; export * from './withdrawal-event-consumer.service'; export * from './deposit-ack-consumer.service'; diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts new file mode 100644 index 00000000..c873f4ca --- /dev/null +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -0,0 +1,247 @@ +/** + * MPC Event Consumer Service for Blockchain Service + * + * Consumes MPC events from mpc-service via Kafka: + * - KeygenCompleted: derives wallet addresses from public keys + * - SigningCompleted: returns signature for hot wallet transfers + * - SessionFailed: handles keygen/signing failures + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// MPC Event Topics (events from mpc-service) +export const MPC_TOPICS = { + KEYGEN_COMPLETED: 'mining_mpc.KeygenCompleted', + SIGNING_COMPLETED: 'mining_mpc.SigningCompleted', + SESSION_FAILED: 'mining_mpc.SessionFailed', +} as const; + +export interface KeygenCompletedPayload { + sessionId: string; + partyId: string; + publicKey: string; + shareId: string; + threshold: string; + extraPayload?: { + userId: string; + accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号) + username: string; + delegateShare?: { + partyId: string; + partyIndex: number; + encryptedShare: string; + }; + serverParties?: string[]; + }; +} + +export interface SigningCompletedPayload { + sessionId: string; + partyId: string; + messageHash: string; + signature: string; + publicKey: string; + extraPayload?: { + userId: string; + username: string; + mpcSessionId: string; + source?: string; // 'blockchain-service' | 'identity-service' + }; +} + +export interface SessionFailedPayload { + sessionId: string; + partyId: string; + sessionType: string; // 'keygen' | 'sign' + errorMessage: string; + errorCode?: string; + extraPayload?: { + userId: string; + username: string; + source?: string; + }; +} + +export type MpcEventHandler = (payload: T) => Promise; + +@Injectable() +export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(MpcEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private keygenCompletedHandler?: MpcEventHandler; + private signingCompletedHandler?: MpcEventHandler; + private sessionFailedHandler?: MpcEventHandler; + private signingFailedHandler?: MpcEventHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; + const groupId = 'mining-blockchain-service-mpc-events'; + + this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`); + + // 企业级重试配置:指数退避,最多重试约 2.5 小时 + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 1000, // 1 秒 + maxRetryTime: 300000, // 最大 5 分钟 + retries: 15, // 最多 15 次 + multiplier: 2, // 指数退避因子 + restartOnFailure: async () => true, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + + // Subscribe to MPC topics + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('MPC Event Kafka consumer disconnected'); + } + } + + /** + * Register handler for keygen completed events + */ + onKeygenCompleted(handler: MpcEventHandler): void { + this.keygenCompletedHandler = handler; + this.logger.log(`[REGISTER] KeygenCompleted handler registered`); + } + + /** + * Register handler for signing completed events + */ + onSigningCompleted(handler: MpcEventHandler): void { + this.signingCompletedHandler = handler; + this.logger.log(`[REGISTER] SigningCompleted handler registered`); + } + + /** + * Register handler for session failed events (keygen) + */ + onSessionFailed(handler: MpcEventHandler): void { + this.sessionFailedHandler = handler; + this.logger.log(`[REGISTER] SessionFailed handler registered`); + } + + /** + * Register handler for signing failed events + */ + onSigningFailed(handler: MpcEventHandler): void { + this.signingFailedHandler = handler; + this.logger.log(`[REGISTER] SigningFailed handler registered`); + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); + return; + } + + this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); + + const parsed = JSON.parse(value); + const payload = parsed.payload || parsed; + + this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); + this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); + + switch (topic) { + case MPC_TOPICS.KEYGEN_COMPLETED: + this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`); + this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`); + this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`); + if (this.keygenCompletedHandler) { + await this.keygenCompletedHandler(payload as KeygenCompletedPayload); + this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`); + } else { + this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`); + } + break; + + case MPC_TOPICS.SIGNING_COMPLETED: + this.logger.log(`[HANDLE] Processing SigningCompleted event`); + this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`); + this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`); + if (this.signingCompletedHandler) { + await this.signingCompletedHandler(payload as SigningCompletedPayload); + this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`); + } else { + this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`); + } + break; + + case MPC_TOPICS.SESSION_FAILED: + this.logger.log(`[HANDLE] Processing SessionFailed event`); + this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); + this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); + + const failedPayload = payload as SessionFailedPayload; + // Route to appropriate handler based on session type + if (failedPayload.sessionType === 'sign') { + if (this.signingFailedHandler) { + await this.signingFailedHandler(failedPayload); + this.logger.log(`[HANDLE] SigningFailed handler completed`); + } + } else { + if (this.sessionFailedHandler) { + await this.sessionFailedHandler(failedPayload); + this.logger.log(`[HANDLE] SessionFailed handler completed`); + } + } + break; + + default: + this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming MPC events for address derivation`); + } +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts index bd522eaa..796f6907 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts @@ -1,18 +1,23 @@ /** * MPC Signing Client * - * 通过 HTTP 直接调用 mpc-service 进行 MPC 签名 + * 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名 * 用于热钱包的 ERC20 转账签名 * - * 流程: - * blockchain-service → POST /api/v1/mpc/sign → mpc-service - * blockchain-service ← GET /api/v1/mpc/sign/{sessionId}/status (轮询) ← mpc-service + * 事件流: + * blockchain-service → Kafka(mpc.SigningRequested) → mpc-service + * mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service */ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { HttpService } from '@nestjs/axios'; -import { firstValueFrom } from 'rxjs'; +import { randomUUID } from 'crypto'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { + MpcEventConsumerService, + SigningCompletedPayload, + SessionFailedPayload, +} from '@/infrastructure/kafka/mpc-event-consumer.service'; export interface CreateSigningInput { username: string; @@ -25,11 +30,15 @@ export interface SigningResult { signature?: string; } +// 签名结果回调 +type SigningCallback = (signature: string | null, error?: string) => void; + +// MPC 签名请求 Topic +export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested'; + @Injectable() -export class MpcSigningClient { +export class MpcSigningClient implements OnModuleInit { private readonly logger = new Logger(MpcSigningClient.name); - // MPC Service URL - private readonly mpcServiceUrl: string; // C2C Bot 热钱包 private readonly hotWalletUsername: string; private readonly hotWalletAddress: string; @@ -39,16 +48,20 @@ export class MpcSigningClient { // fUSDT (积分值) 做市商钱包 private readonly fusdtMarketMakerUsername: string; private readonly fusdtMarketMakerAddress: string; - // 轮询配置 - private readonly pollIntervalMs: number = 2000; - private readonly maxPollAttempts: number = 150; // 5 minutes + private readonly signingTimeoutMs: number = 300000; // 5 minutes + + // 待处理的签名请求回调 Map + private pendingRequests: Map void; + reject: (error: Error) => void; + timeout: NodeJS.Timeout; + }> = new Map(); constructor( private readonly configService: ConfigService, - private readonly httpService: HttpService, + private readonly eventPublisher: EventPublisherService, + private readonly mpcEventConsumer: MpcEventConsumerService, ) { - // MPC Service URL - this.mpcServiceUrl = this.configService.get('MPC_SERVICE_URL', 'http://mpc-service:3006'); // C2C Bot 热钱包配置 this.hotWalletUsername = this.configService.get('C2C_BOT_WALLET_USERNAME', ''); this.hotWalletAddress = this.configService.get('C2C_BOT_WALLET_ADDRESS', ''); @@ -75,7 +88,14 @@ export class MpcSigningClient { this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`); this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`); - this.logger.log(`[INIT] MPC Service URL: ${this.mpcServiceUrl}`); + this.logger.log(`[INIT] Using Kafka event-driven signing`); + } + + async onModuleInit() { + // 注册签名完成和失败事件处理器 + this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); + this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this)); + this.logger.log('[INIT] MPC signing event handlers registered'); } /** @@ -142,7 +162,10 @@ export class MpcSigningClient { } /** - * 签名消息(使用 C2C Bot 热钱包) + * 签名消息(使用 C2C Bot 热钱包,通过 Kafka 事件驱动) + * + * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) + * @returns 签名结果 (hex string) */ async signMessage(messageHash: string): Promise { if (!this.hotWalletUsername) { @@ -153,6 +176,9 @@ export class MpcSigningClient { /** * 使用 eUSDT 做市商钱包签名消息 + * + * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) + * @returns 签名结果 (hex string) */ async signMessageAsEusdtMarketMaker(messageHash: string): Promise { if (!this.eusdtMarketMakerUsername) { @@ -163,6 +189,9 @@ export class MpcSigningClient { /** * 使用 fUSDT 做市商钱包签名消息 + * + * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) + * @returns 签名结果 (hex string) */ async signMessageAsFusdtMarketMaker(messageHash: string): Promise { if (!this.fusdtMarketMakerUsername) { @@ -172,11 +201,11 @@ export class MpcSigningClient { } /** - * 使用指定用户名签名消息(通过 HTTP 调用 mpc-service) + * 使用指定用户名签名消息(通过 Kafka 事件驱动) * - * 流程: - * 1. POST /api/v1/mpc/sign → 创建签名会话,获取 sessionId - * 2. GET /api/v1/mpc/sign/{sessionId}/status → 轮询结果 (每 2 秒, 最多 5 分钟) + * @param username MPC 用户名 + * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) + * @returns 签名结果 (hex string) */ async signMessageWithUsername(username: string, messageHash: string): Promise { this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}... (username: ${username})`); @@ -185,74 +214,87 @@ export class MpcSigningClient { throw new Error('MPC username not provided'); } + const sessionId = randomUUID(); + this.logger.log(`[SIGN] Session ID: ${sessionId}`); + + // 创建 Promise 等待签名结果 + const signaturePromise = new Promise((resolve, reject) => { + // 设置超时 + const timeout = setTimeout(() => { + this.pendingRequests.delete(sessionId); + reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`)); + }, this.signingTimeoutMs); + + // 保存到待处理队列 + this.pendingRequests.set(sessionId, { resolve, reject, timeout }); + }); + + // 发布签名请求事件到 Kafka try { - // 1. 创建签名会话 - const createResponse = await firstValueFrom( - this.httpService.post<{ sessionId: string; status: string }>( - `${this.mpcServiceUrl}/api/v1/mpc/sign`, - { - username, - messageHash, - }, - { - headers: { 'Content-Type': 'application/json' }, - timeout: 30000, - }, - ), - ); + await this.eventPublisher.publish({ + eventType: 'mining_blockchain.mpc.signing.requested', + toPayload: () => ({ + sessionId, + userId: 'system', + username, + messageHash, + source: 'mining-blockchain-service', + }), + eventId: sessionId, + occurredAt: new Date(), + }); - const sessionId = createResponse.data.sessionId; - this.logger.log(`[SIGN] Signing session created: ${sessionId}`); - - // 2. 轮询签名状态 - const result = await this.pollSigningStatus(sessionId); - - if (result.status !== 'completed') { - throw new Error(`Signing session failed with status: ${result.status}`); - } - - this.logger.log(`[SIGN] Signature obtained: ${result.signature.slice(0, 20)}...`); - return result.signature; + this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`); } catch (error) { - this.logger.error(`[SIGN] MPC signing failed: username=${username}`, error); - throw new Error(`MPC signing failed: ${error.message}`); + // 发布失败,清理待处理队列 + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); + } + throw error; + } + + // 等待签名结果 + const signature = await signaturePromise; + this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`); + return signature; + } + + /** + * 处理签名完成事件 + */ + private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { + const sessionId = payload.sessionId; + this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`); + + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); + + if (payload.signature) { + pending.resolve(payload.signature); + } else { + pending.reject(new Error('Signing completed but no signature returned')); + } + } else { + this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`); } } /** - * 轮询签名会话状态 + * 处理签名失败事件 */ - private async pollSigningStatus(sessionId: string): Promise<{ status: string; signature: string }> { - for (let i = 0; i < this.maxPollAttempts; i++) { - try { - const response = await firstValueFrom( - this.httpService.get<{ sessionId: string; status: string; signature?: string }>( - `${this.mpcServiceUrl}/api/v1/mpc/sign/${sessionId}/status`, - { timeout: 10000 }, - ), - ); + private async handleSigningFailed(payload: SessionFailedPayload): Promise { + const sessionId = payload.sessionId; + this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`); - const data = response.data; - - if (data.status === 'completed') { - return { status: 'completed', signature: data.signature || '' }; - } - - if (data.status === 'failed' || data.status === 'expired') { - return { status: data.status, signature: '' }; - } - - await this.sleep(this.pollIntervalMs); - } catch (error) { - this.logger.warn(`[POLL] Error polling signing status (attempt ${i + 1}/${this.maxPollAttempts}): ${error.message}`); - await this.sleep(this.pollIntervalMs); - } + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); + pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`)); } - - throw new Error(`Signing session ${sessionId} timed out after ${this.maxPollAttempts * this.pollIntervalMs / 1000}s`); - } - - private sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); } }