refactor(mining-blockchain): 移除 mpc-service 依赖,改为 HTTP 直调 mpc-system

将 mining-blockchain-service 的 MPC 签名通信从 Kafka 事件驱动(经由 mpc-service 中转)
改为 HTTP 直接调用 mpc-system 的 account-service (port 4000)。

## 核心变更

### mpc-signing.client.ts (重写)
- 移除 EventPublisherService、MpcEventConsumerService 依赖和 pendingRequests Map
- 移除 OnModuleInit 中的 Kafka 事件注册
- 新增 HttpService (@nestjs/axios) + JwtService (@nestjs/jwt) 依赖注入
- 签名流程改为:
  1. POST /api/v1/mpc/sign → 创建签名会话 (snake_case: username, message_hash)
  2. GET /api/v1/mpc/sessions/{session_id} → 轮询结果 (每 2s, 最多 5 分钟)
- JWT 认证: 使用 MPC_JWT_SECRET (HS256) 生成 Bearer token,匹配 mpc-system 格式
- 所有公共接口不变 (signMessage, signMessageAsXxxMarketMaker, isConfigured, getXxxAddress 等)

### 删除的文件
- mpc-event-consumer.service.ts: Kafka MPC 事件消费者 (SigningCompleted/SessionFailed/KeygenCompleted)
- mpc-keygen-completed.handler.ts: Keygen 地址派生处理器 (不再由此服务处理)

### 模块更新
- infrastructure.module.ts: 移除 MpcEventConsumerService,新增 JwtModule.register({})
- kafka/index.ts: 移除 mpc-event-consumer.service 导出
- event-handlers/index.ts: 移除 mpc-keygen-completed.handler 导出

### 部署配置
- docker-compose.2.0.yml: 新增 MPC_ACCOUNT_SERVICE_URL 和 MPC_JWT_SECRET 环境变量
- deploy-mining.sh: standalone 模式新增 export MPC_ACCOUNT_SERVICE_URL (默认 http://192.168.1.111:4000)

## 不受影响的部分
- Erc20TransferService / MpcTransferInitializerService 调用方式不变
- EventPublisherService (用于其他事件) 不变
- WithdrawalEventConsumerService 不变
- mpc-system 本身零修改

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-01 20:53:35 -08:00
parent c657bf6e2b
commit ec73541fe1
8 changed files with 131 additions and 429 deletions

View File

@ -271,6 +271,8 @@ load_env() {
export KAFKA_BROKERS="${KAFKA_BROKERS:-192.168.1.111:9093}"
export REDIS_HOST="${REDIS_HOST:-redis-2}"
export RWA_NETWORK_NAME="rwa-2-network"
# MPC system 在 1.0 服务器上
export MPC_ACCOUNT_SERVICE_URL="${MPC_ACCOUNT_SERVICE_URL:-http://192.168.1.111:4000}"
POSTGRES_CONTAINER="${POSTGRES_CONTAINER:-rwa-postgres-2}"
KAFKA_CONTAINER="" # Kafka is remote, no local container

View File

@ -418,10 +418,13 @@ services:
REDIS_PORT: ${REDIS_PORT:-6379}
REDIS_PASSWORD: ${REDIS_PASSWORD:-}
REDIS_DB: 8
# Kafka - 用于 MPC 签名通信和事件发布
# Kafka - 用于事件发布
KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092}
# JWT 配置
JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production}
# MPC system 直调配置 (account-service port 4000)
MPC_ACCOUNT_SERVICE_URL: ${MPC_ACCOUNT_SERVICE_URL:-http://localhost:4000}
MPC_JWT_SECRET: ${MPC_JWT_SECRET:-}
# 区块链配置
NETWORK_MODE: ${NETWORK_MODE:-mainnet}
# KAVA 配置

View File

@ -1,2 +1 @@
export * from './mpc-keygen-completed.handler';
export * from './withdrawal-requested.handler';

View File

@ -1,74 +0,0 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { AddressDerivationService } from '../services/address-derivation.service';
import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service';
/**
* MPC
*
* mpc.KeygenCompleted
* blockchain.WalletAddressCreated identity-service
*/
@Injectable()
export class MpcKeygenCompletedHandler implements OnModuleInit {
private readonly logger = new Logger(MpcKeygenCompletedHandler.name);
constructor(
private readonly addressDerivationService: AddressDerivationService,
private readonly mpcEventConsumer: MpcEventConsumerService,
) {}
onModuleInit() {
// Register handler for keygen completed events
this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this));
this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`);
}
/**
* MPC
* mpc-service KeygenCompleted publicKeyuserId accountSequence
*/
private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise<void> {
this.logger.log(`[HANDLE] Received KeygenCompleted event`);
this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`);
this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`);
this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`);
// Extract userId and accountSequence from extraPayload
const userId = payload.extraPayload?.userId;
const accountSequence = payload.extraPayload?.accountSequence;
if (!userId) {
this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`);
return;
}
if (!accountSequence) {
this.logger.error(`[ERROR] Missing accountSequence in extraPayload, cannot derive addresses`);
return;
}
const publicKey = payload.publicKey;
if (!publicKey) {
this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`);
return;
}
try {
this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}, account: ${accountSequence}`);
const result = await this.addressDerivationService.deriveAndRegister({
userId: BigInt(userId),
accountSequence: accountSequence,
publicKey,
});
this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for account ${accountSequence}`);
result.addresses.forEach((addr) => {
this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`);
});
} catch (error) {
this.logger.error(`[ERROR] Failed to derive addresses for account ${accountSequence}:`, error);
throw error;
}
}
}

View File

@ -1,8 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { HttpModule } from '@nestjs/axios';
import { JwtModule } from '@nestjs/jwt';
import { PrismaService } from './persistence/prisma/prisma.service';
import { RedisService, AddressCacheService } from './redis';
import { EventPublisherService, MpcEventConsumerService, WithdrawalEventConsumerService } from './kafka';
import { EventPublisherService, WithdrawalEventConsumerService } from './kafka';
import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain';
import { MpcSigningClient } from './mpc';
import { DomainModule } from '@/domain/domain.module';
@ -27,13 +28,12 @@ import {
@Global()
@Module({
imports: [DomainModule, HttpModule],
imports: [DomainModule, HttpModule, JwtModule.register({})],
providers: [
// 核心服务
PrismaService,
RedisService,
EventPublisherService,
MpcEventConsumerService,
WithdrawalEventConsumerService,
MpcSigningClient,
@ -81,7 +81,6 @@ import {
PrismaService,
RedisService,
EventPublisherService,
MpcEventConsumerService,
WithdrawalEventConsumerService,
MpcSigningClient,
EvmProviderAdapter,

View File

@ -1,5 +1,4 @@
export * from './event-publisher.service';
export * from './event-consumer.controller';
export * from './mpc-event-consumer.service';
export * from './withdrawal-event-consumer.service';
export * from './deposit-ack-consumer.service';

View File

@ -1,247 +0,0 @@
/**
* MPC Event Consumer Service for Blockchain Service
*
* Consumes MPC events from mpc-service via Kafka:
* - KeygenCompleted: derives wallet addresses from public keys
* - SigningCompleted: returns signature for hot wallet transfers
* - SessionFailed: handles keygen/signing failures
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// MPC Event Topics (events from mpc-service)
export const MPC_TOPICS = {
KEYGEN_COMPLETED: 'mining_mpc.KeygenCompleted',
SIGNING_COMPLETED: 'mining_mpc.SigningCompleted',
SESSION_FAILED: 'mining_mpc.SessionFailed',
} as const;
export interface KeygenCompletedPayload {
sessionId: string;
partyId: string;
publicKey: string;
shareId: string;
threshold: string;
extraPayload?: {
userId: string;
accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号)
username: string;
delegateShare?: {
partyId: string;
partyIndex: number;
encryptedShare: string;
};
serverParties?: string[];
};
}
export interface SigningCompletedPayload {
sessionId: string;
partyId: string;
messageHash: string;
signature: string;
publicKey: string;
extraPayload?: {
userId: string;
username: string;
mpcSessionId: string;
source?: string; // 'blockchain-service' | 'identity-service'
};
}
export interface SessionFailedPayload {
sessionId: string;
partyId: string;
sessionType: string; // 'keygen' | 'sign'
errorMessage: string;
errorCode?: string;
extraPayload?: {
userId: string;
username: string;
source?: string;
};
}
export type MpcEventHandler<T> = (payload: T) => Promise<void>;
@Injectable()
export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(MpcEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
private signingFailedHandler?: MpcEventHandler<SessionFailedPayload>;
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mining-blockchain-service';
const groupId = 'mining-blockchain-service-mpc-events';
this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`);
this.logger.log(`[INIT] GroupId: ${groupId}`);
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`);
// 企业级重试配置:指数退避,最多重试约 2.5 小时
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 1000, // 1 秒
maxRetryTime: 300000, // 最大 5 分钟
retries: 15, // 最多 15 次
multiplier: 2, // 指数退避因子
restartOnFailure: async () => true,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
this.logger.log(`[CONNECT] Connecting MPC Event consumer...`);
await this.consumer.connect();
this.isConnected = true;
this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`);
// Subscribe to MPC topics
await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false });
this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
// Start consuming
await this.startConsuming();
} catch (error) {
this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('MPC Event Kafka consumer disconnected');
}
}
/**
* Register handler for keygen completed events
*/
onKeygenCompleted(handler: MpcEventHandler<KeygenCompletedPayload>): void {
this.keygenCompletedHandler = handler;
this.logger.log(`[REGISTER] KeygenCompleted handler registered`);
}
/**
* Register handler for signing completed events
*/
onSigningCompleted(handler: MpcEventHandler<SigningCompletedPayload>): void {
this.signingCompletedHandler = handler;
this.logger.log(`[REGISTER] SigningCompleted handler registered`);
}
/**
* Register handler for session failed events (keygen)
*/
onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.sessionFailedHandler = handler;
this.logger.log(`[REGISTER] SessionFailed handler registered`);
}
/**
* Register handler for signing failed events
*/
onSigningFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.signingFailedHandler = handler;
this.logger.log(`[REGISTER] SigningFailed handler registered`);
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const offset = message.offset;
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
return;
}
this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`);
const parsed = JSON.parse(value);
const payload = parsed.payload || parsed;
this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`);
this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`);
switch (topic) {
case MPC_TOPICS.KEYGEN_COMPLETED:
this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`);
this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`);
this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`);
if (this.keygenCompletedHandler) {
await this.keygenCompletedHandler(payload as KeygenCompletedPayload);
this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`);
} else {
this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`);
}
break;
case MPC_TOPICS.SIGNING_COMPLETED:
this.logger.log(`[HANDLE] Processing SigningCompleted event`);
this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`);
this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`);
if (this.signingCompletedHandler) {
await this.signingCompletedHandler(payload as SigningCompletedPayload);
this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`);
}
break;
case MPC_TOPICS.SESSION_FAILED:
this.logger.log(`[HANDLE] Processing SessionFailed event`);
this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`);
this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`);
const failedPayload = payload as SessionFailedPayload;
// Route to appropriate handler based on session type
if (failedPayload.sessionType === 'sign') {
if (this.signingFailedHandler) {
await this.signingFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SigningFailed handler completed`);
}
} else {
if (this.sessionFailedHandler) {
await this.sessionFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SessionFailed handler completed`);
}
}
break;
default:
this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`);
}
} catch (error) {
this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error);
}
},
});
this.logger.log(`[START] Started consuming MPC events for address derivation`);
}
}

View File

@ -1,23 +1,20 @@
/**
* MPC Signing Client
*
* Kafka mpc-service MPC
* ERC20
* mpc-system account-service (port 4000) MPC
* ERC20
*
* :
* blockchain-service Kafka(mpc.SigningRequested) mpc-service
* mpc-service Kafka(mpc.SigningCompleted) blockchain-service
* :
* 1. POST /api/v1/mpc/sign
* 2. GET /api/v1/mpc/sessions/{session_id}
*/
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { HttpService } from '@nestjs/axios';
import { JwtService } from '@nestjs/jwt';
import { randomUUID } from 'crypto';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import {
MpcEventConsumerService,
SigningCompletedPayload,
SessionFailedPayload,
} from '@/infrastructure/kafka/mpc-event-consumer.service';
import { firstValueFrom } from 'rxjs';
export interface CreateSigningInput {
username: string;
@ -30,14 +27,11 @@ export interface SigningResult {
signature?: string;
}
// 签名结果回调
type SigningCallback = (signature: string | null, error?: string) => void;
// MPC 签名请求 Topic
// MPC 签名请求 Topic (保留导出以避免外部引用报错)
export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested';
@Injectable()
export class MpcSigningClient implements OnModuleInit {
export class MpcSigningClient {
private readonly logger = new Logger(MpcSigningClient.name);
// C2C Bot 热钱包
private readonly hotWalletUsername: string;
@ -48,19 +42,16 @@ export class MpcSigningClient implements OnModuleInit {
// fUSDT (积分值) 做市商钱包
private readonly fusdtMarketMakerUsername: string;
private readonly fusdtMarketMakerAddress: string;
// MPC system 配置
private readonly mpcAccountServiceUrl: string;
private readonly mpcJwtSecret: string;
private readonly signingTimeoutMs: number = 300000; // 5 minutes
// 待处理的签名请求回调 Map<sessionId, { resolve, reject, timeout }>
private pendingRequests: Map<string, {
resolve: (signature: string) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
}> = new Map();
private readonly pollingIntervalMs: number = 2000; // 2 seconds
constructor(
private readonly configService: ConfigService,
private readonly eventPublisher: EventPublisherService,
private readonly mpcEventConsumer: MpcEventConsumerService,
private readonly httpService: HttpService,
private readonly jwtService: JwtService,
) {
// C2C Bot 热钱包配置
this.hotWalletUsername = this.configService.get<string>('C2C_BOT_WALLET_USERNAME', '');
@ -71,6 +62,9 @@ export class MpcSigningClient implements OnModuleInit {
// fUSDT (积分值) 做市商钱包配置
this.fusdtMarketMakerUsername = this.configService.get<string>('FUSDT_MARKET_MAKER_USERNAME', '');
this.fusdtMarketMakerAddress = this.configService.get<string>('FUSDT_MARKET_MAKER_ADDRESS', '');
// MPC system 配置
this.mpcAccountServiceUrl = this.configService.get<string>('MPC_ACCOUNT_SERVICE_URL', 'http://localhost:4000');
this.mpcJwtSecret = this.configService.get<string>('MPC_JWT_SECRET', '');
if (!this.hotWalletUsername) {
this.logger.warn('[INIT] C2C_BOT_WALLET_USERNAME not configured (C2C Bot disabled)');
@ -84,18 +78,15 @@ export class MpcSigningClient implements OnModuleInit {
if (!this.fusdtMarketMakerUsername || !this.fusdtMarketMakerAddress) {
this.logger.warn('[INIT] fUSDT Market Maker not configured');
}
if (!this.mpcJwtSecret) {
this.logger.warn('[INIT] MPC_JWT_SECRET not configured - signing will fail');
}
this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`);
this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`);
this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`);
this.logger.log(`[INIT] Using Kafka event-driven signing`);
}
async onModuleInit() {
// 注册签名完成和失败事件处理器
this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this));
this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this));
this.logger.log('[INIT] MPC signing event handlers registered');
this.logger.log(`[INIT] MPC Account Service: ${this.mpcAccountServiceUrl}`);
this.logger.log(`[INIT] Using HTTP direct call to mpc-system`);
}
/**
@ -162,7 +153,7 @@ export class MpcSigningClient implements OnModuleInit {
}
/**
* 使 C2C Bot Kafka
* 使 C2C Bot
*
* @param messageHash (hex string with 0x prefix)
* @returns (hex string)
@ -201,7 +192,7 @@ export class MpcSigningClient implements OnModuleInit {
}
/**
* 使 Kafka
* 使HTTP mpc-system
*
* @param username MPC
* @param messageHash (hex string with 0x prefix)
@ -214,87 +205,117 @@ export class MpcSigningClient implements OnModuleInit {
throw new Error('MPC username not provided');
}
const sessionId = randomUUID();
this.logger.log(`[SIGN] Session ID: ${sessionId}`);
// 创建 Promise 等待签名结果
const signaturePromise = new Promise<string>((resolve, reject) => {
// 设置超时
const timeout = setTimeout(() => {
this.pendingRequests.delete(sessionId);
reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`));
}, this.signingTimeoutMs);
// 保存到待处理队列
this.pendingRequests.set(sessionId, { resolve, reject, timeout });
});
// 发布签名请求事件到 Kafka
try {
await this.eventPublisher.publish({
eventType: 'mining_blockchain.mpc.signing.requested',
toPayload: () => ({
sessionId,
userId: 'system',
username,
messageHash,
source: 'mining-blockchain-service',
}),
eventId: sessionId,
occurredAt: new Date(),
});
this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`);
} catch (error) {
// 发布失败,清理待处理队列
const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
}
throw error;
if (!this.mpcJwtSecret) {
throw new Error('MPC_JWT_SECRET not configured');
}
// 等待签名结果
const signature = await signaturePromise;
// Step 1: 创建签名会话
const createUrl = `${this.mpcAccountServiceUrl}/api/v1/mpc/sign`;
const headers = this.getMpcAuthHeaders();
this.logger.log(`[SIGN] POST ${createUrl}`);
const createResponse = await firstValueFrom(
this.httpService.post<{
session_id: string;
status: string;
session_type?: string;
username?: string;
message_hash?: string;
}>(
createUrl,
{ username, message_hash: messageHash },
{ headers, timeout: 30000 },
),
);
const sessionId = createResponse.data.session_id;
this.logger.log(`[SIGN] Session created: ${sessionId}, status: ${createResponse.data.status}`);
// Step 2: 轮询签名结果
const signature = await this.pollSigningStatus(sessionId);
this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`);
return signature;
}
/**
*
*
*/
private async handleSigningCompleted(payload: SigningCompletedPayload): Promise<void> {
const sessionId = payload.sessionId;
this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`);
private async pollSigningStatus(sessionId: string): Promise<string> {
const statusUrl = `${this.mpcAccountServiceUrl}/api/v1/mpc/sessions/${sessionId}`;
const maxAttempts = Math.ceil(this.signingTimeoutMs / this.pollingIntervalMs);
const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
await this.delay(this.pollingIntervalMs);
if (payload.signature) {
pending.resolve(payload.signature);
} else {
pending.reject(new Error('Signing completed but no signature returned'));
const response = await firstValueFrom(
this.httpService.get<{
session_id: string;
status: string;
session_type?: string;
completed_parties?: number;
total_parties?: number;
signature?: string;
}>(statusUrl, {
headers: this.getMpcAuthHeaders(),
timeout: 10000,
}),
);
const { status, signature } = response.data;
if (attempt % 5 === 0 || status !== 'pending') {
this.logger.log(`[POLL] Attempt ${attempt}/${maxAttempts}: status=${status}`);
}
if (status === 'completed') {
if (!signature) {
throw new Error('Signing completed but no signature returned');
}
return signature;
}
if (status === 'failed' || status === 'expired') {
throw new Error(`MPC signing ${status}: sessionId=${sessionId}`);
}
} else {
this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`);
}
throw new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`);
}
/**
*
* mpc-system JWT token
*/
private async handleSigningFailed(payload: SessionFailedPayload): Promise<void> {
const sessionId = payload.sessionId;
this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`);
private generateMpcAccessToken(): string {
const now = Math.floor(Date.now() / 1000);
const payload = {
jti: randomUUID(),
iss: 'mining-blockchain-service',
sub: 'system',
username: 'mining-blockchain-service',
token_type: 'access',
iat: now,
nbf: now,
exp: now + 24 * 60 * 60,
};
return this.jwtService.sign(payload, {
secret: this.mpcJwtSecret,
algorithm: 'HS256' as const,
});
}
const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`));
}
/**
* mpc-system
*/
private getMpcAuthHeaders(): Record<string, string> {
const token = this.generateMpcAccessToken();
return {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
};
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}