refactor(mining-blockchain): replace Kafka MPC signing with HTTP direct calls
Remove Kafka event-driven MPC signing (publish/consume pattern) and replace with HTTP POST + polling to mpc-service, matching identity-service approach. - Rewrite mpc-signing.client.ts: POST /api/v1/mpc/sign + poll status - Delete mpc-event-consumer.service.ts (no longer needed) - Delete mpc-keygen-completed.handler.ts (keygen handled elsewhere) - Add MPC_SERVICE_URL env var to docker-compose.2.0.yml Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
0d47fadf59
commit
1106a40ff1
|
|
@ -418,8 +418,10 @@ services:
|
||||||
REDIS_PORT: ${REDIS_PORT:-6379}
|
REDIS_PORT: ${REDIS_PORT:-6379}
|
||||||
REDIS_PASSWORD: ${REDIS_PASSWORD:-}
|
REDIS_PASSWORD: ${REDIS_PASSWORD:-}
|
||||||
REDIS_DB: 8
|
REDIS_DB: 8
|
||||||
# Kafka - 用于 MPC 签名通信和事件发布
|
# Kafka - 用于事件发布
|
||||||
KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092}
|
KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092}
|
||||||
|
# MPC Service - 直接 HTTP 调用签名
|
||||||
|
MPC_SERVICE_URL: ${MPC_SERVICE_URL:-http://mpc-service:3006}
|
||||||
# JWT 配置
|
# JWT 配置
|
||||||
JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production}
|
JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production}
|
||||||
# 区块链配置
|
# 区块链配置
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1 @@
|
||||||
export * from './mpc-keygen-completed.handler';
|
|
||||||
export * from './withdrawal-requested.handler';
|
export * from './withdrawal-requested.handler';
|
||||||
|
|
|
||||||
|
|
@ -1,74 +0,0 @@
|
||||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
|
||||||
import { AddressDerivationService } from '../services/address-derivation.service';
|
|
||||||
import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MPC 密钥生成完成事件处理器
|
|
||||||
*
|
|
||||||
* 监听 mpc.KeygenCompleted 事件,从公钥派生多链钱包地址,
|
|
||||||
* 并发布 blockchain.WalletAddressCreated 事件通知 identity-service
|
|
||||||
*/
|
|
||||||
@Injectable()
|
|
||||||
export class MpcKeygenCompletedHandler implements OnModuleInit {
|
|
||||||
private readonly logger = new Logger(MpcKeygenCompletedHandler.name);
|
|
||||||
|
|
||||||
constructor(
|
|
||||||
private readonly addressDerivationService: AddressDerivationService,
|
|
||||||
private readonly mpcEventConsumer: MpcEventConsumerService,
|
|
||||||
) {}
|
|
||||||
|
|
||||||
onModuleInit() {
|
|
||||||
// Register handler for keygen completed events
|
|
||||||
this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this));
|
|
||||||
this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理 MPC 密钥生成完成事件
|
|
||||||
* 从 mpc-service 的 KeygenCompleted 事件中提取 publicKey、userId 和 accountSequence
|
|
||||||
*/
|
|
||||||
private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise<void> {
|
|
||||||
this.logger.log(`[HANDLE] Received KeygenCompleted event`);
|
|
||||||
this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`);
|
|
||||||
this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`);
|
|
||||||
this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`);
|
|
||||||
|
|
||||||
// Extract userId and accountSequence from extraPayload
|
|
||||||
const userId = payload.extraPayload?.userId;
|
|
||||||
const accountSequence = payload.extraPayload?.accountSequence;
|
|
||||||
|
|
||||||
if (!userId) {
|
|
||||||
this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!accountSequence) {
|
|
||||||
this.logger.error(`[ERROR] Missing accountSequence in extraPayload, cannot derive addresses`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const publicKey = payload.publicKey;
|
|
||||||
if (!publicKey) {
|
|
||||||
this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}, account: ${accountSequence}`);
|
|
||||||
|
|
||||||
const result = await this.addressDerivationService.deriveAndRegister({
|
|
||||||
userId: BigInt(userId),
|
|
||||||
accountSequence: accountSequence,
|
|
||||||
publicKey,
|
|
||||||
});
|
|
||||||
|
|
||||||
this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for account ${accountSequence}`);
|
|
||||||
result.addresses.forEach((addr) => {
|
|
||||||
this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`);
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`[ERROR] Failed to derive addresses for account ${accountSequence}:`, error);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -2,7 +2,7 @@ import { Global, Module } from '@nestjs/common';
|
||||||
import { HttpModule } from '@nestjs/axios';
|
import { HttpModule } from '@nestjs/axios';
|
||||||
import { PrismaService } from './persistence/prisma/prisma.service';
|
import { PrismaService } from './persistence/prisma/prisma.service';
|
||||||
import { RedisService, AddressCacheService } from './redis';
|
import { RedisService, AddressCacheService } from './redis';
|
||||||
import { EventPublisherService, MpcEventConsumerService, WithdrawalEventConsumerService } from './kafka';
|
import { EventPublisherService, WithdrawalEventConsumerService } from './kafka';
|
||||||
import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain';
|
import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain';
|
||||||
import { MpcSigningClient } from './mpc';
|
import { MpcSigningClient } from './mpc';
|
||||||
import { DomainModule } from '@/domain/domain.module';
|
import { DomainModule } from '@/domain/domain.module';
|
||||||
|
|
@ -33,7 +33,6 @@ import {
|
||||||
PrismaService,
|
PrismaService,
|
||||||
RedisService,
|
RedisService,
|
||||||
EventPublisherService,
|
EventPublisherService,
|
||||||
MpcEventConsumerService,
|
|
||||||
WithdrawalEventConsumerService,
|
WithdrawalEventConsumerService,
|
||||||
MpcSigningClient,
|
MpcSigningClient,
|
||||||
|
|
||||||
|
|
@ -81,7 +80,6 @@ import {
|
||||||
PrismaService,
|
PrismaService,
|
||||||
RedisService,
|
RedisService,
|
||||||
EventPublisherService,
|
EventPublisherService,
|
||||||
MpcEventConsumerService,
|
|
||||||
WithdrawalEventConsumerService,
|
WithdrawalEventConsumerService,
|
||||||
MpcSigningClient,
|
MpcSigningClient,
|
||||||
EvmProviderAdapter,
|
EvmProviderAdapter,
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
export * from './event-publisher.service';
|
export * from './event-publisher.service';
|
||||||
export * from './event-consumer.controller';
|
export * from './event-consumer.controller';
|
||||||
export * from './mpc-event-consumer.service';
|
|
||||||
export * from './withdrawal-event-consumer.service';
|
export * from './withdrawal-event-consumer.service';
|
||||||
export * from './deposit-ack-consumer.service';
|
export * from './deposit-ack-consumer.service';
|
||||||
|
|
|
||||||
|
|
@ -1,247 +0,0 @@
|
||||||
/**
|
|
||||||
* MPC Event Consumer Service for Blockchain Service
|
|
||||||
*
|
|
||||||
* Consumes MPC events from mpc-service via Kafka:
|
|
||||||
* - KeygenCompleted: derives wallet addresses from public keys
|
|
||||||
* - SigningCompleted: returns signature for hot wallet transfers
|
|
||||||
* - SessionFailed: handles keygen/signing failures
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
||||||
import { ConfigService } from '@nestjs/config';
|
|
||||||
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
|
|
||||||
|
|
||||||
// MPC Event Topics (events from mpc-service)
|
|
||||||
export const MPC_TOPICS = {
|
|
||||||
KEYGEN_COMPLETED: 'mining_mpc.KeygenCompleted',
|
|
||||||
SIGNING_COMPLETED: 'mining_mpc.SigningCompleted',
|
|
||||||
SESSION_FAILED: 'mining_mpc.SessionFailed',
|
|
||||||
} as const;
|
|
||||||
|
|
||||||
export interface KeygenCompletedPayload {
|
|
||||||
sessionId: string;
|
|
||||||
partyId: string;
|
|
||||||
publicKey: string;
|
|
||||||
shareId: string;
|
|
||||||
threshold: string;
|
|
||||||
extraPayload?: {
|
|
||||||
userId: string;
|
|
||||||
accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号)
|
|
||||||
username: string;
|
|
||||||
delegateShare?: {
|
|
||||||
partyId: string;
|
|
||||||
partyIndex: number;
|
|
||||||
encryptedShare: string;
|
|
||||||
};
|
|
||||||
serverParties?: string[];
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface SigningCompletedPayload {
|
|
||||||
sessionId: string;
|
|
||||||
partyId: string;
|
|
||||||
messageHash: string;
|
|
||||||
signature: string;
|
|
||||||
publicKey: string;
|
|
||||||
extraPayload?: {
|
|
||||||
userId: string;
|
|
||||||
username: string;
|
|
||||||
mpcSessionId: string;
|
|
||||||
source?: string; // 'blockchain-service' | 'identity-service'
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface SessionFailedPayload {
|
|
||||||
sessionId: string;
|
|
||||||
partyId: string;
|
|
||||||
sessionType: string; // 'keygen' | 'sign'
|
|
||||||
errorMessage: string;
|
|
||||||
errorCode?: string;
|
|
||||||
extraPayload?: {
|
|
||||||
userId: string;
|
|
||||||
username: string;
|
|
||||||
source?: string;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export type MpcEventHandler<T> = (payload: T) => Promise<void>;
|
|
||||||
|
|
||||||
@Injectable()
|
|
||||||
export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
|
|
||||||
private readonly logger = new Logger(MpcEventConsumerService.name);
|
|
||||||
private kafka: Kafka;
|
|
||||||
private consumer: Consumer;
|
|
||||||
private isConnected = false;
|
|
||||||
|
|
||||||
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
|
|
||||||
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
|
|
||||||
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
|
|
||||||
private signingFailedHandler?: MpcEventHandler<SessionFailedPayload>;
|
|
||||||
|
|
||||||
constructor(private readonly configService: ConfigService) {}
|
|
||||||
|
|
||||||
async onModuleInit() {
|
|
||||||
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
|
|
||||||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mining-blockchain-service';
|
|
||||||
const groupId = 'mining-blockchain-service-mpc-events';
|
|
||||||
|
|
||||||
this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`);
|
|
||||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
|
||||||
this.logger.log(`[INIT] GroupId: ${groupId}`);
|
|
||||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
|
||||||
this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`);
|
|
||||||
|
|
||||||
// 企业级重试配置:指数退避,最多重试约 2.5 小时
|
|
||||||
this.kafka = new Kafka({
|
|
||||||
clientId,
|
|
||||||
brokers,
|
|
||||||
logLevel: logLevel.WARN,
|
|
||||||
retry: {
|
|
||||||
initialRetryTime: 1000, // 1 秒
|
|
||||||
maxRetryTime: 300000, // 最大 5 分钟
|
|
||||||
retries: 15, // 最多 15 次
|
|
||||||
multiplier: 2, // 指数退避因子
|
|
||||||
restartOnFailure: async () => true,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
this.consumer = this.kafka.consumer({
|
|
||||||
groupId,
|
|
||||||
sessionTimeout: 30000,
|
|
||||||
heartbeatInterval: 3000,
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.logger.log(`[CONNECT] Connecting MPC Event consumer...`);
|
|
||||||
await this.consumer.connect();
|
|
||||||
this.isConnected = true;
|
|
||||||
this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`);
|
|
||||||
|
|
||||||
// Subscribe to MPC topics
|
|
||||||
await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false });
|
|
||||||
this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
|
|
||||||
|
|
||||||
// Start consuming
|
|
||||||
await this.startConsuming();
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async onModuleDestroy() {
|
|
||||||
if (this.isConnected) {
|
|
||||||
await this.consumer.disconnect();
|
|
||||||
this.logger.log('MPC Event Kafka consumer disconnected');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register handler for keygen completed events
|
|
||||||
*/
|
|
||||||
onKeygenCompleted(handler: MpcEventHandler<KeygenCompletedPayload>): void {
|
|
||||||
this.keygenCompletedHandler = handler;
|
|
||||||
this.logger.log(`[REGISTER] KeygenCompleted handler registered`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register handler for signing completed events
|
|
||||||
*/
|
|
||||||
onSigningCompleted(handler: MpcEventHandler<SigningCompletedPayload>): void {
|
|
||||||
this.signingCompletedHandler = handler;
|
|
||||||
this.logger.log(`[REGISTER] SigningCompleted handler registered`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register handler for session failed events (keygen)
|
|
||||||
*/
|
|
||||||
onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
|
|
||||||
this.sessionFailedHandler = handler;
|
|
||||||
this.logger.log(`[REGISTER] SessionFailed handler registered`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register handler for signing failed events
|
|
||||||
*/
|
|
||||||
onSigningFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
|
|
||||||
this.signingFailedHandler = handler;
|
|
||||||
this.logger.log(`[REGISTER] SigningFailed handler registered`);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async startConsuming(): Promise<void> {
|
|
||||||
await this.consumer.run({
|
|
||||||
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
|
|
||||||
const offset = message.offset;
|
|
||||||
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
|
|
||||||
|
|
||||||
try {
|
|
||||||
const value = message.value?.toString();
|
|
||||||
if (!value) {
|
|
||||||
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`);
|
|
||||||
|
|
||||||
const parsed = JSON.parse(value);
|
|
||||||
const payload = parsed.payload || parsed;
|
|
||||||
|
|
||||||
this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`);
|
|
||||||
this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`);
|
|
||||||
|
|
||||||
switch (topic) {
|
|
||||||
case MPC_TOPICS.KEYGEN_COMPLETED:
|
|
||||||
this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`);
|
|
||||||
this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`);
|
|
||||||
this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`);
|
|
||||||
if (this.keygenCompletedHandler) {
|
|
||||||
await this.keygenCompletedHandler(payload as KeygenCompletedPayload);
|
|
||||||
this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`);
|
|
||||||
} else {
|
|
||||||
this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MPC_TOPICS.SIGNING_COMPLETED:
|
|
||||||
this.logger.log(`[HANDLE] Processing SigningCompleted event`);
|
|
||||||
this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`);
|
|
||||||
this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`);
|
|
||||||
if (this.signingCompletedHandler) {
|
|
||||||
await this.signingCompletedHandler(payload as SigningCompletedPayload);
|
|
||||||
this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`);
|
|
||||||
} else {
|
|
||||||
this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MPC_TOPICS.SESSION_FAILED:
|
|
||||||
this.logger.log(`[HANDLE] Processing SessionFailed event`);
|
|
||||||
this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`);
|
|
||||||
this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`);
|
|
||||||
|
|
||||||
const failedPayload = payload as SessionFailedPayload;
|
|
||||||
// Route to appropriate handler based on session type
|
|
||||||
if (failedPayload.sessionType === 'sign') {
|
|
||||||
if (this.signingFailedHandler) {
|
|
||||||
await this.signingFailedHandler(failedPayload);
|
|
||||||
this.logger.log(`[HANDLE] SigningFailed handler completed`);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (this.sessionFailedHandler) {
|
|
||||||
await this.sessionFailedHandler(failedPayload);
|
|
||||||
this.logger.log(`[HANDLE] SessionFailed handler completed`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
this.logger.log(`[START] Started consuming MPC events for address derivation`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,23 +1,18 @@
|
||||||
/**
|
/**
|
||||||
* MPC Signing Client
|
* MPC Signing Client
|
||||||
*
|
*
|
||||||
* 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名
|
* 通过 HTTP 直接调用 mpc-service 进行 MPC 签名
|
||||||
* 用于热钱包的 ERC20 转账签名
|
* 用于热钱包的 ERC20 转账签名
|
||||||
*
|
*
|
||||||
* 事件流:
|
* 流程:
|
||||||
* blockchain-service → Kafka(mpc.SigningRequested) → mpc-service
|
* blockchain-service → POST /api/v1/mpc/sign → mpc-service
|
||||||
* mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service
|
* blockchain-service ← GET /api/v1/mpc/sign/{sessionId}/status (轮询) ← mpc-service
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
import { ConfigService } from '@nestjs/config';
|
import { ConfigService } from '@nestjs/config';
|
||||||
import { randomUUID } from 'crypto';
|
import { HttpService } from '@nestjs/axios';
|
||||||
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
|
import { firstValueFrom } from 'rxjs';
|
||||||
import {
|
|
||||||
MpcEventConsumerService,
|
|
||||||
SigningCompletedPayload,
|
|
||||||
SessionFailedPayload,
|
|
||||||
} from '@/infrastructure/kafka/mpc-event-consumer.service';
|
|
||||||
|
|
||||||
export interface CreateSigningInput {
|
export interface CreateSigningInput {
|
||||||
username: string;
|
username: string;
|
||||||
|
|
@ -30,15 +25,11 @@ export interface SigningResult {
|
||||||
signature?: string;
|
signature?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 签名结果回调
|
|
||||||
type SigningCallback = (signature: string | null, error?: string) => void;
|
|
||||||
|
|
||||||
// MPC 签名请求 Topic
|
|
||||||
export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MpcSigningClient implements OnModuleInit {
|
export class MpcSigningClient {
|
||||||
private readonly logger = new Logger(MpcSigningClient.name);
|
private readonly logger = new Logger(MpcSigningClient.name);
|
||||||
|
// MPC Service URL
|
||||||
|
private readonly mpcServiceUrl: string;
|
||||||
// C2C Bot 热钱包
|
// C2C Bot 热钱包
|
||||||
private readonly hotWalletUsername: string;
|
private readonly hotWalletUsername: string;
|
||||||
private readonly hotWalletAddress: string;
|
private readonly hotWalletAddress: string;
|
||||||
|
|
@ -48,20 +39,16 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
// fUSDT (积分值) 做市商钱包
|
// fUSDT (积分值) 做市商钱包
|
||||||
private readonly fusdtMarketMakerUsername: string;
|
private readonly fusdtMarketMakerUsername: string;
|
||||||
private readonly fusdtMarketMakerAddress: string;
|
private readonly fusdtMarketMakerAddress: string;
|
||||||
private readonly signingTimeoutMs: number = 300000; // 5 minutes
|
// 轮询配置
|
||||||
|
private readonly pollIntervalMs: number = 2000;
|
||||||
// 待处理的签名请求回调 Map<sessionId, { resolve, reject, timeout }>
|
private readonly maxPollAttempts: number = 150; // 5 minutes
|
||||||
private pendingRequests: Map<string, {
|
|
||||||
resolve: (signature: string) => void;
|
|
||||||
reject: (error: Error) => void;
|
|
||||||
timeout: NodeJS.Timeout;
|
|
||||||
}> = new Map();
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly configService: ConfigService,
|
private readonly configService: ConfigService,
|
||||||
private readonly eventPublisher: EventPublisherService,
|
private readonly httpService: HttpService,
|
||||||
private readonly mpcEventConsumer: MpcEventConsumerService,
|
|
||||||
) {
|
) {
|
||||||
|
// MPC Service URL
|
||||||
|
this.mpcServiceUrl = this.configService.get<string>('MPC_SERVICE_URL', 'http://mpc-service:3006');
|
||||||
// C2C Bot 热钱包配置
|
// C2C Bot 热钱包配置
|
||||||
this.hotWalletUsername = this.configService.get<string>('C2C_BOT_WALLET_USERNAME', '');
|
this.hotWalletUsername = this.configService.get<string>('C2C_BOT_WALLET_USERNAME', '');
|
||||||
this.hotWalletAddress = this.configService.get<string>('C2C_BOT_WALLET_ADDRESS', '');
|
this.hotWalletAddress = this.configService.get<string>('C2C_BOT_WALLET_ADDRESS', '');
|
||||||
|
|
@ -88,14 +75,7 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`);
|
this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`);
|
||||||
this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`);
|
this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`);
|
||||||
this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`);
|
this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`);
|
||||||
this.logger.log(`[INIT] Using Kafka event-driven signing`);
|
this.logger.log(`[INIT] MPC Service URL: ${this.mpcServiceUrl}`);
|
||||||
}
|
|
||||||
|
|
||||||
async onModuleInit() {
|
|
||||||
// 注册签名完成和失败事件处理器
|
|
||||||
this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this));
|
|
||||||
this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this));
|
|
||||||
this.logger.log('[INIT] MPC signing event handlers registered');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -162,10 +142,7 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 签名消息(使用 C2C Bot 热钱包,通过 Kafka 事件驱动)
|
* 签名消息(使用 C2C Bot 热钱包)
|
||||||
*
|
|
||||||
* @param messageHash 要签名的消息哈希 (hex string with 0x prefix)
|
|
||||||
* @returns 签名结果 (hex string)
|
|
||||||
*/
|
*/
|
||||||
async signMessage(messageHash: string): Promise<string> {
|
async signMessage(messageHash: string): Promise<string> {
|
||||||
if (!this.hotWalletUsername) {
|
if (!this.hotWalletUsername) {
|
||||||
|
|
@ -176,9 +153,6 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用 eUSDT 做市商钱包签名消息
|
* 使用 eUSDT 做市商钱包签名消息
|
||||||
*
|
|
||||||
* @param messageHash 要签名的消息哈希 (hex string with 0x prefix)
|
|
||||||
* @returns 签名结果 (hex string)
|
|
||||||
*/
|
*/
|
||||||
async signMessageAsEusdtMarketMaker(messageHash: string): Promise<string> {
|
async signMessageAsEusdtMarketMaker(messageHash: string): Promise<string> {
|
||||||
if (!this.eusdtMarketMakerUsername) {
|
if (!this.eusdtMarketMakerUsername) {
|
||||||
|
|
@ -189,9 +163,6 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用 fUSDT 做市商钱包签名消息
|
* 使用 fUSDT 做市商钱包签名消息
|
||||||
*
|
|
||||||
* @param messageHash 要签名的消息哈希 (hex string with 0x prefix)
|
|
||||||
* @returns 签名结果 (hex string)
|
|
||||||
*/
|
*/
|
||||||
async signMessageAsFusdtMarketMaker(messageHash: string): Promise<string> {
|
async signMessageAsFusdtMarketMaker(messageHash: string): Promise<string> {
|
||||||
if (!this.fusdtMarketMakerUsername) {
|
if (!this.fusdtMarketMakerUsername) {
|
||||||
|
|
@ -201,11 +172,11 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用指定用户名签名消息(通过 Kafka 事件驱动)
|
* 使用指定用户名签名消息(通过 HTTP 调用 mpc-service)
|
||||||
*
|
*
|
||||||
* @param username MPC 用户名
|
* 流程:
|
||||||
* @param messageHash 要签名的消息哈希 (hex string with 0x prefix)
|
* 1. POST /api/v1/mpc/sign → 创建签名会话,获取 sessionId
|
||||||
* @returns 签名结果 (hex string)
|
* 2. GET /api/v1/mpc/sign/{sessionId}/status → 轮询结果 (每 2 秒, 最多 5 分钟)
|
||||||
*/
|
*/
|
||||||
async signMessageWithUsername(username: string, messageHash: string): Promise<string> {
|
async signMessageWithUsername(username: string, messageHash: string): Promise<string> {
|
||||||
this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}... (username: ${username})`);
|
this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}... (username: ${username})`);
|
||||||
|
|
@ -214,87 +185,74 @@ export class MpcSigningClient implements OnModuleInit {
|
||||||
throw new Error('MPC username not provided');
|
throw new Error('MPC username not provided');
|
||||||
}
|
}
|
||||||
|
|
||||||
const sessionId = randomUUID();
|
|
||||||
this.logger.log(`[SIGN] Session ID: ${sessionId}`);
|
|
||||||
|
|
||||||
// 创建 Promise 等待签名结果
|
|
||||||
const signaturePromise = new Promise<string>((resolve, reject) => {
|
|
||||||
// 设置超时
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
this.pendingRequests.delete(sessionId);
|
|
||||||
reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`));
|
|
||||||
}, this.signingTimeoutMs);
|
|
||||||
|
|
||||||
// 保存到待处理队列
|
|
||||||
this.pendingRequests.set(sessionId, { resolve, reject, timeout });
|
|
||||||
});
|
|
||||||
|
|
||||||
// 发布签名请求事件到 Kafka
|
|
||||||
try {
|
try {
|
||||||
await this.eventPublisher.publish({
|
// 1. 创建签名会话
|
||||||
eventType: 'mining_blockchain.mpc.signing.requested',
|
const createResponse = await firstValueFrom(
|
||||||
toPayload: () => ({
|
this.httpService.post<{ sessionId: string; status: string }>(
|
||||||
sessionId,
|
`${this.mpcServiceUrl}/api/v1/mpc/sign`,
|
||||||
userId: 'system',
|
{
|
||||||
username,
|
username,
|
||||||
messageHash,
|
messageHash,
|
||||||
source: 'mining-blockchain-service',
|
},
|
||||||
}),
|
{
|
||||||
eventId: sessionId,
|
headers: { 'Content-Type': 'application/json' },
|
||||||
occurredAt: new Date(),
|
timeout: 30000,
|
||||||
});
|
},
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`);
|
const sessionId = createResponse.data.sessionId;
|
||||||
|
this.logger.log(`[SIGN] Signing session created: ${sessionId}`);
|
||||||
|
|
||||||
|
// 2. 轮询签名状态
|
||||||
|
const result = await this.pollSigningStatus(sessionId);
|
||||||
|
|
||||||
|
if (result.status !== 'completed') {
|
||||||
|
throw new Error(`Signing session failed with status: ${result.status}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.log(`[SIGN] Signature obtained: ${result.signature.slice(0, 20)}...`);
|
||||||
|
return result.signature;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// 发布失败,清理待处理队列
|
this.logger.error(`[SIGN] MPC signing failed: username=${username}`, error);
|
||||||
const pending = this.pendingRequests.get(sessionId);
|
throw new Error(`MPC signing failed: ${error.message}`);
|
||||||
if (pending) {
|
|
||||||
clearTimeout(pending.timeout);
|
|
||||||
this.pendingRequests.delete(sessionId);
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 等待签名结果
|
|
||||||
const signature = await signaturePromise;
|
|
||||||
this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`);
|
|
||||||
return signature;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理签名完成事件
|
|
||||||
*/
|
|
||||||
private async handleSigningCompleted(payload: SigningCompletedPayload): Promise<void> {
|
|
||||||
const sessionId = payload.sessionId;
|
|
||||||
this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`);
|
|
||||||
|
|
||||||
const pending = this.pendingRequests.get(sessionId);
|
|
||||||
if (pending) {
|
|
||||||
clearTimeout(pending.timeout);
|
|
||||||
this.pendingRequests.delete(sessionId);
|
|
||||||
|
|
||||||
if (payload.signature) {
|
|
||||||
pending.resolve(payload.signature);
|
|
||||||
} else {
|
|
||||||
pending.reject(new Error('Signing completed but no signature returned'));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理签名失败事件
|
* 轮询签名会话状态
|
||||||
*/
|
*/
|
||||||
private async handleSigningFailed(payload: SessionFailedPayload): Promise<void> {
|
private async pollSigningStatus(sessionId: string): Promise<{ status: string; signature: string }> {
|
||||||
const sessionId = payload.sessionId;
|
for (let i = 0; i < this.maxPollAttempts; i++) {
|
||||||
this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`);
|
try {
|
||||||
|
const response = await firstValueFrom(
|
||||||
|
this.httpService.get<{ sessionId: string; status: string; signature?: string }>(
|
||||||
|
`${this.mpcServiceUrl}/api/v1/mpc/sign/${sessionId}/status`,
|
||||||
|
{ timeout: 10000 },
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
const pending = this.pendingRequests.get(sessionId);
|
const data = response.data;
|
||||||
if (pending) {
|
|
||||||
clearTimeout(pending.timeout);
|
if (data.status === 'completed') {
|
||||||
this.pendingRequests.delete(sessionId);
|
return { status: 'completed', signature: data.signature || '' };
|
||||||
pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`));
|
}
|
||||||
|
|
||||||
|
if (data.status === 'failed' || data.status === 'expired') {
|
||||||
|
return { status: data.status, signature: '' };
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.sleep(this.pollIntervalMs);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(`[POLL] Error polling signing status (attempt ${i + 1}/${this.maxPollAttempts}): ${error.message}`);
|
||||||
|
await this.sleep(this.pollIntervalMs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throw new Error(`Signing session ${sessionId} timed out after ${this.maxPollAttempts * this.pollIntervalMs / 1000}s`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sleep(ms: number): Promise<void> {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue