feat(mpc): 将 blockchain-service MPC 签名从 HTTP 改为 Kafka 事件驱动

重构 blockchain-service 和 mpc-service 之间的 MPC 签名通信方式:
- blockchain-service: MpcSigningClient 改用 Kafka 发布签名请求事件
- blockchain-service: MpcEventConsumerService 新增 SigningCompleted 事件监听
- mpc-service: SigningRequestedHandler 支持识别请求来源 (source 字段)

事件流:
blockchain-service → Kafka(mpc.SigningRequested) → mpc-service
mpc-service → HTTP → mpc-system
mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-15 09:45:39 -08:00
parent 0682f6aac3
commit 54ac2ee225
5 changed files with 194 additions and 118 deletions

View File

@ -103,6 +103,8 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
'blockchain.deposit.confirmed': 'blockchain.deposits', 'blockchain.deposit.confirmed': 'blockchain.deposits',
'blockchain.wallet.address.created': 'blockchain.wallets', 'blockchain.wallet.address.created': 'blockchain.wallets',
'blockchain.transaction.broadcasted': 'blockchain.transactions', 'blockchain.transaction.broadcasted': 'blockchain.transactions',
// MPC 签名请求 - 发送到 mpc-service 消费的 topic
'blockchain.mpc.signing.requested': 'mpc.SigningRequested',
}; };
return topicMap[eventType] || 'blockchain.events'; return topicMap[eventType] || 'blockchain.events';
} }

View File

@ -1,8 +1,10 @@
/** /**
* MPC Event Consumer Service for Blockchain Service * MPC Event Consumer Service for Blockchain Service
* *
* Consumes MPC keygen completion events from mpc-service via Kafka. * Consumes MPC events from mpc-service via Kafka:
* Derives wallet addresses from public keys and publishes WalletAddressCreated events. * - KeygenCompleted: derives wallet addresses from public keys
* - SigningCompleted: returns signature for hot wallet transfers
* - SessionFailed: handles keygen/signing failures
*/ */
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
@ -12,6 +14,7 @@ import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// MPC Event Topics (events from mpc-service) // MPC Event Topics (events from mpc-service)
export const MPC_TOPICS = { export const MPC_TOPICS = {
KEYGEN_COMPLETED: 'mpc.KeygenCompleted', KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
SIGNING_COMPLETED: 'mpc.SigningCompleted',
SESSION_FAILED: 'mpc.SessionFailed', SESSION_FAILED: 'mpc.SessionFailed',
} as const; } as const;
@ -34,15 +37,30 @@ export interface KeygenCompletedPayload {
}; };
} }
export interface SigningCompletedPayload {
sessionId: string;
partyId: string;
messageHash: string;
signature: string;
publicKey: string;
extraPayload?: {
userId: string;
username: string;
mpcSessionId: string;
source?: string; // 'blockchain-service' | 'identity-service'
};
}
export interface SessionFailedPayload { export interface SessionFailedPayload {
sessionId: string; sessionId: string;
partyId: string; partyId: string;
sessionType: string; sessionType: string; // 'keygen' | 'sign'
errorMessage: string; errorMessage: string;
errorCode?: string; errorCode?: string;
extraPayload?: { extraPayload?: {
userId: string; userId: string;
username: string; username: string;
source?: string;
}; };
} }
@ -56,7 +74,9 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
private isConnected = false; private isConnected = false;
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>; private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>; private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
private signingFailedHandler?: MpcEventHandler<SessionFailedPayload>;
constructor(private readonly configService: ConfigService) {} constructor(private readonly configService: ConfigService) {}
@ -120,13 +140,29 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
} }
/** /**
* Register handler for session failed events * Register handler for signing completed events
*/
onSigningCompleted(handler: MpcEventHandler<SigningCompletedPayload>): void {
this.signingCompletedHandler = handler;
this.logger.log(`[REGISTER] SigningCompleted handler registered`);
}
/**
* Register handler for session failed events (keygen)
*/ */
onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void { onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.sessionFailedHandler = handler; this.sessionFailedHandler = handler;
this.logger.log(`[REGISTER] SessionFailed handler registered`); this.logger.log(`[REGISTER] SessionFailed handler registered`);
} }
/**
* Register handler for signing failed events
*/
onSigningFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.signingFailedHandler = handler;
this.logger.log(`[REGISTER] SigningFailed handler registered`);
}
private async startConsuming(): Promise<void> { private async startConsuming(): Promise<void> {
await this.consumer.run({ await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
@ -161,15 +197,35 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
} }
break; break;
case MPC_TOPICS.SIGNING_COMPLETED:
this.logger.log(`[HANDLE] Processing SigningCompleted event`);
this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`);
this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`);
if (this.signingCompletedHandler) {
await this.signingCompletedHandler(payload as SigningCompletedPayload);
this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`);
}
break;
case MPC_TOPICS.SESSION_FAILED: case MPC_TOPICS.SESSION_FAILED:
this.logger.log(`[HANDLE] Processing SessionFailed event`); this.logger.log(`[HANDLE] Processing SessionFailed event`);
this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`);
this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`);
if (this.sessionFailedHandler) {
await this.sessionFailedHandler(payload as SessionFailedPayload); const failedPayload = payload as SessionFailedPayload;
this.logger.log(`[HANDLE] SessionFailed handler completed`); // Route to appropriate handler based on session type
if (failedPayload.sessionType === 'sign') {
if (this.signingFailedHandler) {
await this.signingFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SigningFailed handler completed`);
}
} else { } else {
this.logger.warn(`[HANDLE] No handler registered for SessionFailed`); if (this.sessionFailedHandler) {
await this.sessionFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SessionFailed handler completed`);
}
} }
break; break;

View File

@ -1,14 +1,23 @@
/** /**
* MPC Signing Client * MPC Signing Client
* *
* mpc-service MPC * Kafka mpc-service MPC
* ERC20 * ERC20
*
* :
* blockchain-service Kafka(mpc.SigningRequested) mpc-service
* mpc-service Kafka(mpc.SigningCompleted) blockchain-service
*/ */
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { HttpService } from '@nestjs/axios'; import { randomUUID } from 'crypto';
import { firstValueFrom } from 'rxjs'; import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import {
MpcEventConsumerService,
SigningCompletedPayload,
SessionFailedPayload,
} from '@/infrastructure/kafka/mpc-event-consumer.service';
export interface CreateSigningInput { export interface CreateSigningInput {
username: string; username: string;
@ -21,20 +30,31 @@ export interface SigningResult {
signature?: string; signature?: string;
} }
// 签名结果回调
type SigningCallback = (signature: string | null, error?: string) => void;
// MPC 签名请求 Topic
export const MPC_SIGNING_TOPIC = 'mpc.SigningRequested';
@Injectable() @Injectable()
export class MpcSigningClient { export class MpcSigningClient implements OnModuleInit {
private readonly logger = new Logger(MpcSigningClient.name); private readonly logger = new Logger(MpcSigningClient.name);
private readonly mpcServiceUrl: string;
private readonly hotWalletUsername: string; private readonly hotWalletUsername: string;
private readonly hotWalletAddress: string; private readonly hotWalletAddress: string;
private readonly pollingIntervalMs: number = 2000; private readonly signingTimeoutMs: number = 300000; // 5 minutes
private readonly maxPollingAttempts: number = 150; // 5 minutes max
// 待处理的签名请求回调 Map<sessionId, { resolve, reject, timeout }>
private pendingRequests: Map<string, {
resolve: (signature: string) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
}> = new Map();
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
private readonly httpService: HttpService, private readonly eventPublisher: EventPublisherService,
private readonly mpcEventConsumer: MpcEventConsumerService,
) { ) {
this.mpcServiceUrl = this.configService.get<string>('MPC_SERVICE_URL', 'http://localhost:3013');
this.hotWalletUsername = this.configService.get<string>('HOT_WALLET_USERNAME', ''); this.hotWalletUsername = this.configService.get<string>('HOT_WALLET_USERNAME', '');
this.hotWalletAddress = this.configService.get<string>('HOT_WALLET_ADDRESS', ''); this.hotWalletAddress = this.configService.get<string>('HOT_WALLET_ADDRESS', '');
@ -45,9 +65,16 @@ export class MpcSigningClient {
this.logger.warn('[INIT] HOT_WALLET_ADDRESS not configured'); this.logger.warn('[INIT] HOT_WALLET_ADDRESS not configured');
} }
this.logger.log(`[INIT] MPC Service URL: ${this.mpcServiceUrl}`);
this.logger.log(`[INIT] Hot Wallet Username: ${this.hotWalletUsername || '(not configured)'}`); this.logger.log(`[INIT] Hot Wallet Username: ${this.hotWalletUsername || '(not configured)'}`);
this.logger.log(`[INIT] Hot Wallet Address: ${this.hotWalletAddress || '(not configured)'}`); this.logger.log(`[INIT] Hot Wallet Address: ${this.hotWalletAddress || '(not configured)'}`);
this.logger.log(`[INIT] Using Kafka event-driven signing`);
}
async onModuleInit() {
// 注册签名完成和失败事件处理器
this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this));
this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this));
this.logger.log('[INIT] MPC signing event handlers registered');
} }
/** /**
@ -72,66 +99,7 @@ export class MpcSigningClient {
} }
/** /**
* MPC * Kafka
*/
async createSigningSession(messageHash: string): Promise<{ sessionId: string; status: string }> {
this.logger.log(`[SIGN] Creating signing session for messageHash: ${messageHash.slice(0, 16)}...`);
if (!this.hotWalletUsername) {
throw new Error('Hot wallet username not configured');
}
const response = await firstValueFrom(
this.httpService.post<{
sessionId: string;
status: string;
}>(
`${this.mpcServiceUrl}/api/v1/mpc/sign`,
{
username: this.hotWalletUsername,
messageHash,
},
{
headers: { 'Content-Type': 'application/json' },
timeout: 30000,
},
),
);
this.logger.log(`[SIGN] Session created: ${response.data.sessionId}`);
return {
sessionId: response.data.sessionId,
status: response.data.status,
};
}
/**
*
*/
async getSigningStatus(sessionId: string): Promise<SigningResult> {
const response = await firstValueFrom(
this.httpService.get<{
sessionId: string;
status: string;
signature?: string;
}>(
`${this.mpcServiceUrl}/api/v1/mpc/sign/${sessionId}/status`,
{
headers: { 'Content-Type': 'application/json' },
timeout: 10000,
},
),
);
return {
sessionId: response.data.sessionId,
status: response.data.status,
signature: response.data.signature,
};
}
/**
*
* *
* @param messageHash (hex string with 0x prefix) * @param messageHash (hex string with 0x prefix)
* @returns (hex string) * @returns (hex string)
@ -139,49 +107,91 @@ export class MpcSigningClient {
async signMessage(messageHash: string): Promise<string> { async signMessage(messageHash: string): Promise<string> {
this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}...`); this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}...`);
// Step 1: 创建签名会话 if (!this.hotWalletUsername) {
const session = await this.createSigningSession(messageHash); throw new Error('Hot wallet username not configured');
this.logger.log(`[SIGN] Session ID: ${session.sessionId}`);
// Step 2: 轮询等待签名完成
const result = await this.pollForCompletion(session.sessionId);
if (result.status === 'completed' && result.signature) {
this.logger.log(`[SIGN] Signature obtained: ${result.signature.slice(0, 20)}...`);
return result.signature;
} }
throw new Error(`MPC signing failed with status: ${result.status}`); const sessionId = randomUUID();
this.logger.log(`[SIGN] Session ID: ${sessionId}`);
// 创建 Promise 等待签名结果
const signaturePromise = new Promise<string>((resolve, reject) => {
// 设置超时
const timeout = setTimeout(() => {
this.pendingRequests.delete(sessionId);
reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`));
}, this.signingTimeoutMs);
// 保存到待处理队列
this.pendingRequests.set(sessionId, { resolve, reject, timeout });
});
// 发布签名请求事件到 Kafka
try {
await this.eventPublisher.publish({
eventType: 'blockchain.mpc.signing.requested',
toPayload: () => ({
sessionId,
userId: 'system', // 系统热钱包
username: this.hotWalletUsername,
messageHash,
source: 'blockchain-service',
}),
eventId: sessionId,
occurredAt: new Date(),
});
this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}`);
} catch (error) {
// 发布失败,清理待处理队列
const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
}
throw error;
}
// 等待签名结果
const signature = await signaturePromise;
this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`);
return signature;
} }
/** /**
* *
*/ */
private async pollForCompletion(sessionId: string): Promise<SigningResult> { private async handleSigningCompleted(payload: SigningCompletedPayload): Promise<void> {
for (let attempt = 0; attempt < this.maxPollingAttempts; attempt++) { const sessionId = payload.sessionId;
const result = await this.getSigningStatus(sessionId); this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`);
this.logger.debug(`[POLL] Session ${sessionId}: status=${result.status}, attempt=${attempt + 1}`); const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
if (result.status === 'completed') { if (payload.signature) {
return result; pending.resolve(payload.signature);
} else {
pending.reject(new Error('Signing completed but no signature returned'));
} }
} else {
if (result.status === 'failed' || result.status === 'expired') { this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`);
return result;
}
// 等待下一次轮询
await this.sleep(this.pollingIntervalMs);
} }
return {
sessionId,
status: 'timeout',
};
} }
private sleep(ms: number): Promise<void> { /**
return new Promise(resolve => setTimeout(resolve, ms)); *
*/
private async handleSigningFailed(payload: SessionFailedPayload): Promise<void> {
const sessionId = payload.sessionId;
this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`);
const pending = this.pendingRequests.get(sessionId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(sessionId);
pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`));
}
} }
} }

View File

@ -1,8 +1,12 @@
/** /**
* SigningRequested Event Handler * SigningRequested Event Handler
* *
* Handles signing requests from identity-service via Kafka. * Handles signing requests from identity-service and blockchain-service via Kafka.
* Processes the signing and publishes completion/failure events. * Processes the signing and publishes completion/failure events.
*
* :
* - identity-service mpc.SigningRequested mpc-service mpc.SigningCompleted identity-service
* - blockchain-service mpc.SigningRequested mpc-service mpc.SigningCompleted blockchain-service
*/ */
import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
@ -38,8 +42,10 @@ export class SigningRequestedHandler implements OnModuleInit {
private async handleMessage(topic: string, payload: Record<string, unknown>): Promise<void> { private async handleMessage(topic: string, payload: Record<string, unknown>): Promise<void> {
const data = payload as unknown as SigningRequestedPayload; const data = payload as unknown as SigningRequestedPayload;
const { sessionId, userId, username, messageHash, userShare } = data; const { sessionId, userId, username, messageHash, userShare } = data;
// source 标识请求来源: 'blockchain-service' | 'identity-service'
const source = (data as any).source || 'identity-service';
this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}`); this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}, source=${source}`);
try { try {
// Step 1: Create signing session via mpc-system // Step 1: Create signing session via mpc-system
@ -65,11 +71,12 @@ export class SigningRequestedHandler implements OnModuleInit {
'', // publicKey - not needed for signing result '', // publicKey - not needed for signing result
); );
// Add extra payload for identity-service // Add extra payload for the requesting service (identity-service or blockchain-service)
(completedEvent as any).extraPayload = { (completedEvent as any).extraPayload = {
userId, userId,
username, username,
mpcSessionId, mpcSessionId,
source,
}; };
await this.eventPublisher.publishWithRetry(completedEvent); await this.eventPublisher.publishWithRetry(completedEvent);
@ -82,10 +89,10 @@ export class SigningRequestedHandler implements OnModuleInit {
SessionType.SIGN, SessionType.SIGN,
`Signing failed with status: ${result.status}`, `Signing failed with status: ${result.status}`,
); );
(failedEvent as any).extraPayload = { userId, username }; (failedEvent as any).extraPayload = { userId, username, source };
await this.eventPublisher.publishWithRetry(failedEvent); await this.eventPublisher.publishWithRetry(failedEvent);
this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}`); this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}, source=${source}`);
} }
} catch (error) { } catch (error) {
this.logger.error(`Signing processing error: userId=${userId}`, error); this.logger.error(`Signing processing error: userId=${userId}`, error);
@ -97,7 +104,7 @@ export class SigningRequestedHandler implements OnModuleInit {
SessionType.SIGN, SessionType.SIGN,
error instanceof Error ? error.message : 'Unknown error', error instanceof Error ? error.message : 'Unknown error',
); );
(failedEvent as any).extraPayload = { userId, username }; (failedEvent as any).extraPayload = { userId, username, source };
try { try {
await this.eventPublisher.publishWithRetry(failedEvent); await this.eventPublisher.publishWithRetry(failedEvent);

View File

@ -2,7 +2,7 @@
* Event Consumer Service * Event Consumer Service
* *
* Consumes domain events from Kafka for async processing. * Consumes domain events from Kafka for async processing.
* Handles keygen and signing requests from identity-service. * Handles keygen and signing requests from identity-service and blockchain-service.
*/ */
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
@ -31,6 +31,7 @@ export interface SigningRequestedPayload {
username: string; username: string;
messageHash: string; messageHash: string;
userShare?: string; userShare?: string;
source?: string; // 'identity-service' | 'blockchain-service'
} }
export type MessageHandler = (topic: string, payload: Record<string, unknown>) => Promise<void>; export type MessageHandler = (topic: string, payload: Record<string, unknown>) => Promise<void>;