feat: add event-driven communication between identity-service and mpc-service

Replace synchronous HTTP polling with Kafka event-driven model for MPC operations:

- Add MPC event consumer service in mpc-service for keygen/signing requests
- Add keygen-requested and signing-requested event handlers
- Add MPC event consumer in identity-service for completion events
- Extend mpc-client.service with async event-driven methods
- Support backward compatibility via MPC_USE_EVENT_DRIVEN env var

Topics: mpc.KeygenRequested, mpc.SigningRequested, mpc.KeygenCompleted,
        mpc.SigningCompleted, mpc.SessionFailed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-06 18:17:44 -08:00
parent 17fd663fe3
commit c459387c42
14 changed files with 985 additions and 10 deletions

View File

@ -3,20 +3,38 @@
*
* mpc-service (NestJS)
*
* :
* 1. (legacy): HTTP +
* 2. (): Kafka
*
* (DDD ):
* identity-service () mpc-service (MPC域/NestJS) mpc-system (Go/TSS实现)
*
*
* 1. identity-service mpc-service keygen API
* 2. mpc-service mpc-system TSS keygen
* 3. delegate share () identity-service
*
* 1. identity-service mpc.KeygenRequested
* 2. mpc-service mpc-system TSS keygen
* 3. mpc-service mpc.KeygenCompleted
* 4. identity-service
*/
import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { ConfigService } from '@nestjs/config';
import { firstValueFrom } from 'rxjs';
import { createHash, randomUUID } from 'crypto';
import { EventPublisherService, IDENTITY_TOPICS } from '../../kafka/event-publisher.service';
import {
MpcEventConsumerService,
KeygenCompletedPayload,
SigningCompletedPayload,
SessionFailedPayload,
} from '../../kafka/mpc-event-consumer.service';
// MPC Request Topics (发布到 mpc-service)
export const MPC_REQUEST_TOPICS = {
KEYGEN_REQUESTED: 'mpc.KeygenRequested',
SIGNING_REQUESTED: 'mpc.SigningRequested',
} as const;
export interface KeygenRequest {
sessionId: string;
@ -51,23 +69,285 @@ export interface SigningResult {
messageHash: string;
}
// 异步请求接口 (事件驱动模式)
export interface AsyncKeygenRequest {
userId: string;
username: string;
threshold: number;
totalParties: number;
requireDelegate: boolean;
}
export interface AsyncKeygenResponse {
sessionId: string;
status: 'pending' | 'processing';
}
export interface AsyncSigningRequest {
userId: string;
username: string;
messageHash: string;
userShare?: string;
}
export interface AsyncSigningResponse {
sessionId: string;
status: 'pending' | 'processing';
}
// 结果回调类型
export type KeygenResultCallback = (result: KeygenResult | null, error?: string) => Promise<void>;
export type SigningResultCallback = (result: SigningResult | null, error?: string) => Promise<void>;
@Injectable()
export class MpcClientService {
export class MpcClientService implements OnModuleInit {
private readonly logger = new Logger(MpcClientService.name);
private readonly mpcServiceUrl: string; // mpc-service (NestJS) URL
private readonly mpcMode: string;
private readonly useEventDriven: boolean;
private readonly pollIntervalMs = 2000;
private readonly maxPollAttempts = 150; // 5 minutes max
// 待处理的 keygen/signing 请求回调
private pendingKeygenCallbacks: Map<string, KeygenResultCallback> = new Map();
private pendingSigningCallbacks: Map<string, SigningResultCallback> = new Map();
constructor(
private readonly httpService: HttpService,
private readonly configService: ConfigService,
private readonly eventPublisher: EventPublisherService,
private readonly mpcEventConsumer: MpcEventConsumerService,
) {
// 连接 mpc-service (NestJS)
this.mpcServiceUrl = this.configService.get<string>('MPC_SERVICE_URL', 'http://localhost:3001');
this.mpcMode = this.configService.get<string>('MPC_MODE', 'local');
this.useEventDriven = this.configService.get<string>('MPC_USE_EVENT_DRIVEN', 'true') === 'true';
}
async onModuleInit() {
// 注册 MPC 事件处理器
this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this));
this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this));
this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this));
this.logger.log('MPC event handlers registered');
}
// ==========================================================================
// 事件驱动模式 - 异步 API
// ==========================================================================
/**
* keygen ()
* sessionId
*/
async requestKeygenAsync(
request: AsyncKeygenRequest,
callback?: KeygenResultCallback,
): Promise<AsyncKeygenResponse> {
const sessionId = this.generateSessionId();
this.logger.log(`Requesting async keygen: userId=${request.userId}, sessionId=${sessionId}`);
// 如果是本地模式,直接执行并回调
if (this.mpcMode === 'local') {
this.executeLocalKeygenWithCallback(sessionId, request, callback);
return { sessionId, status: 'processing' };
}
// 注册回调
if (callback) {
this.pendingKeygenCallbacks.set(sessionId, callback);
}
// 发布 keygen 请求事件
await this.eventPublisher.publish(MPC_REQUEST_TOPICS.KEYGEN_REQUESTED, {
eventId: sessionId,
eventType: 'KeygenRequested',
occurredAt: new Date().toISOString(),
aggregateId: request.userId,
aggregateType: 'UserAccount',
payload: {
sessionId,
userId: request.userId,
username: request.username,
threshold: request.threshold,
totalParties: request.totalParties,
requireDelegate: request.requireDelegate,
},
});
this.logger.log(`Keygen request published: sessionId=${sessionId}`);
return { sessionId, status: 'pending' };
}
/**
* signing ()
*/
async requestSigningAsync(
request: AsyncSigningRequest,
callback?: SigningResultCallback,
): Promise<AsyncSigningResponse> {
const sessionId = this.generateSessionId();
this.logger.log(`Requesting async signing: userId=${request.userId}, sessionId=${sessionId}`);
// 如果是本地模式,直接执行并回调
if (this.mpcMode === 'local') {
this.executeLocalSigningWithCallback(sessionId, request, callback);
return { sessionId, status: 'processing' };
}
// 注册回调
if (callback) {
this.pendingSigningCallbacks.set(sessionId, callback);
}
// 发布 signing 请求事件
await this.eventPublisher.publish(MPC_REQUEST_TOPICS.SIGNING_REQUESTED, {
eventId: sessionId,
eventType: 'SigningRequested',
occurredAt: new Date().toISOString(),
aggregateId: request.userId,
aggregateType: 'UserAccount',
payload: {
sessionId,
userId: request.userId,
username: request.username,
messageHash: request.messageHash,
userShare: request.userShare,
},
});
this.logger.log(`Signing request published: sessionId=${sessionId}`);
return { sessionId, status: 'pending' };
}
// ==========================================================================
// 事件处理器 - 处理 MPC 完成事件
// ==========================================================================
private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise<void> {
const sessionId = payload.sessionId;
const callback = this.pendingKeygenCallbacks.get(sessionId);
this.logger.log(`Keygen completed event received: sessionId=${sessionId}`);
if (callback) {
try {
const result: KeygenResult = {
sessionId,
publicKey: payload.publicKey,
delegateShare: payload.extraPayload?.delegateShare || {
partyId: payload.partyId,
partyIndex: 0,
encryptedShare: '',
},
serverParties: payload.extraPayload?.serverParties || [],
};
await callback(result);
} catch (error) {
this.logger.error(`Keygen callback error: sessionId=${sessionId}`, error);
} finally {
this.pendingKeygenCallbacks.delete(sessionId);
}
}
}
private async handleSigningCompleted(payload: SigningCompletedPayload): Promise<void> {
const sessionId = payload.sessionId;
const callback = this.pendingSigningCallbacks.get(sessionId);
this.logger.log(`Signing completed event received: sessionId=${sessionId}`);
if (callback) {
try {
const result: SigningResult = {
sessionId,
signature: payload.signature,
messageHash: payload.messageHash,
};
await callback(result);
} catch (error) {
this.logger.error(`Signing callback error: sessionId=${sessionId}`, error);
} finally {
this.pendingSigningCallbacks.delete(sessionId);
}
}
}
private async handleSessionFailed(payload: SessionFailedPayload): Promise<void> {
const sessionId = payload.sessionId;
const sessionType = payload.sessionType;
this.logger.warn(`Session failed event received: sessionId=${sessionId}, type=${sessionType}`);
if (sessionType === 'keygen') {
const callback = this.pendingKeygenCallbacks.get(sessionId);
if (callback) {
await callback(null, payload.errorMessage);
this.pendingKeygenCallbacks.delete(sessionId);
}
} else if (sessionType === 'sign') {
const callback = this.pendingSigningCallbacks.get(sessionId);
if (callback) {
await callback(null, payload.errorMessage);
this.pendingSigningCallbacks.delete(sessionId);
}
}
}
// ==========================================================================
// 本地模式辅助方法
// ==========================================================================
private async executeLocalKeygenWithCallback(
sessionId: string,
request: AsyncKeygenRequest,
callback?: KeygenResultCallback,
): Promise<void> {
try {
const result = await this.executeLocalKeygen({
sessionId,
username: request.username,
threshold: request.threshold,
totalParties: request.totalParties,
requireDelegate: request.requireDelegate,
});
if (callback) {
await callback(result);
}
} catch (error) {
if (callback) {
await callback(null, error instanceof Error ? error.message : 'Unknown error');
}
}
}
private async executeLocalSigningWithCallback(
sessionId: string,
request: AsyncSigningRequest,
callback?: SigningResultCallback,
): Promise<void> {
try {
const result = await this.executeLocalSigning({
username: request.username,
messageHash: request.messageHash,
userShare: request.userShare,
});
if (callback) {
await callback(result);
}
} catch (error) {
if (callback) {
await callback(null, error instanceof Error ? error.message : 'Unknown error');
}
}
}
// ==========================================================================
// 同步模式 (Legacy) - 保留兼容性
// ==========================================================================
/**
* ID ( UUID )
*/

View File

@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { HttpModule } from '@nestjs/axios';
import { MpcWalletService } from './mpc-wallet.service';
import { MpcClientService } from './mpc-client.service';
import { KafkaModule } from '../../kafka/kafka.module';
@Module({
imports: [
@ -9,6 +10,7 @@ import { MpcClientService } from './mpc-client.service';
timeout: 300000, // MPC 操作可能需要较长时间
maxRedirects: 5,
}),
KafkaModule, // 用于事件驱动模式
],
providers: [MpcWalletService, MpcClientService],
exports: [MpcWalletService, MpcClientService],

View File

@ -3,3 +3,4 @@ export * from './event-publisher.service';
export * from './event-consumer.controller';
export * from './dead-letter.service';
export * from './event-retry.service';
export * from './mpc-event-consumer.service';

View File

@ -1,8 +1,15 @@
import { Module } from '@nestjs/common';
import { EventPublisherService } from './event-publisher.service';
import { MpcEventConsumerService } from './mpc-event-consumer.service';
@Module({
providers: [EventPublisherService],
exports: [EventPublisherService],
providers: [
EventPublisherService,
MpcEventConsumerService,
],
exports: [
EventPublisherService,
MpcEventConsumerService,
],
})
export class KafkaModule {}

View File

@ -0,0 +1,187 @@
/**
* MPC Event Consumer Service
*
* Consumes MPC keygen/signing completion events from mpc-service via Kafka.
* Updates user wallet addresses when keygen completes.
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// MPC Event Topics
export const MPC_TOPICS = {
KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
SIGNING_COMPLETED: 'mpc.SigningCompleted',
SESSION_FAILED: 'mpc.SessionFailed',
} as const;
export interface KeygenCompletedPayload {
sessionId: string;
partyId: string;
publicKey: string;
shareId: string;
threshold: string;
extraPayload?: {
userId: string;
username: string;
delegateShare?: {
partyId: string;
partyIndex: number;
encryptedShare: string;
};
serverParties?: string[];
};
}
export interface SigningCompletedPayload {
sessionId: string;
partyId: string;
messageHash: string;
signature: string;
publicKey: string;
extraPayload?: {
userId: string;
username: string;
mpcSessionId: string;
};
}
export interface SessionFailedPayload {
sessionId: string;
partyId: string;
sessionType: string;
errorMessage: string;
errorCode?: string;
extraPayload?: {
userId: string;
username: string;
};
}
export type MpcEventHandler<T> = (payload: T) => Promise<void>;
@Injectable()
export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(MpcEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'identity-service';
const groupId = 'identity-service-mpc-events';
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
await this.consumer.connect();
this.isConnected = true;
this.logger.log('MPC Event Kafka consumer connected');
// Subscribe to MPC topics
await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false });
this.logger.log(`Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
// Start consuming
await this.startConsuming();
} catch (error) {
this.logger.error('Failed to connect MPC Event Kafka consumer', error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('MPC Event Kafka consumer disconnected');
}
}
/**
* Register handler for keygen completed events
*/
onKeygenCompleted(handler: MpcEventHandler<KeygenCompletedPayload>): void {
this.keygenCompletedHandler = handler;
}
/**
* Register handler for signing completed events
*/
onSigningCompleted(handler: MpcEventHandler<SigningCompletedPayload>): void {
this.signingCompletedHandler = handler;
}
/**
* Register handler for session failed events
*/
onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.sessionFailedHandler = handler;
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn('Empty message received');
return;
}
const parsed = JSON.parse(value);
const payload = parsed.payload || parsed;
this.logger.debug(`Received MPC event from ${topic}: ${JSON.stringify(payload)}`);
switch (topic) {
case MPC_TOPICS.KEYGEN_COMPLETED:
if (this.keygenCompletedHandler) {
await this.keygenCompletedHandler(payload as KeygenCompletedPayload);
}
break;
case MPC_TOPICS.SIGNING_COMPLETED:
if (this.signingCompletedHandler) {
await this.signingCompletedHandler(payload as SigningCompletedPayload);
}
break;
case MPC_TOPICS.SESSION_FAILED:
if (this.sessionFailedHandler) {
await this.sessionFailedHandler(payload as SessionFailedPayload);
}
break;
default:
this.logger.warn(`Unknown MPC topic: ${topic}`);
}
} catch (error) {
this.logger.error(`Error processing MPC event from ${topic}`, error);
}
},
});
this.logger.log('Started consuming MPC events');
}
}

View File

@ -1,7 +1,7 @@
/**
* Application Module
*
* mpc-service MPCCoordinatorService mpc-system
* mpc-service
*/
import { Module } from '@nestjs/common';
@ -10,6 +10,11 @@ import { InfrastructureModule } from '../infrastructure/infrastructure.module';
// Services
import { MPCCoordinatorService } from './services/mpc-coordinator.service';
import { EventConsumerStarterService } from './services/event-consumer-starter.service';
// Event Handlers
import { KeygenRequestedHandler } from './event-handlers/keygen-requested.handler';
import { SigningRequestedHandler } from './event-handlers/signing-requested.handler';
@Module({
imports: [
@ -19,6 +24,11 @@ import { MPCCoordinatorService } from './services/mpc-coordinator.service';
providers: [
// Application Services
MPCCoordinatorService,
EventConsumerStarterService, // 启动 Kafka 消费者
// Event Handlers (Kafka consumers)
KeygenRequestedHandler,
SigningRequestedHandler,
],
exports: [
MPCCoordinatorService,

View File

@ -0,0 +1,6 @@
/**
* Event Handlers Module Exports
*/
export * from './keygen-requested.handler';
export * from './signing-requested.handler';

View File

@ -0,0 +1,162 @@
/**
* KeygenRequested Event Handler
*
* Handles keygen requests from identity-service via Kafka.
* Processes the keygen and publishes completion/failure events.
*/
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { MPCCoordinatorService } from '../services/mpc-coordinator.service';
import { EventPublisherService } from '../../infrastructure/messaging/kafka/event-publisher.service';
import {
EventConsumerService,
MPC_CONSUME_TOPICS,
KeygenRequestedPayload,
} from '../../infrastructure/messaging/kafka/event-consumer.service';
import { KeygenCompletedEvent } from '../../domain/events/keygen-completed.event';
import { SessionFailedEvent } from '../../domain/events/session-failed.event';
import { SessionType } from '../../domain/enums';
@Injectable()
export class KeygenRequestedHandler implements OnModuleInit {
private readonly logger = new Logger(KeygenRequestedHandler.name);
constructor(
private readonly eventConsumer: EventConsumerService,
private readonly eventPublisher: EventPublisherService,
private readonly mpcCoordinator: MPCCoordinatorService,
) {}
async onModuleInit() {
await this.eventConsumer.subscribe(
MPC_CONSUME_TOPICS.KEYGEN_REQUESTED,
this.handleMessage.bind(this),
);
this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`);
}
private async handleMessage(topic: string, payload: Record<string, unknown>): Promise<void> {
const data = payload as unknown as KeygenRequestedPayload;
const { sessionId, userId, username, threshold, totalParties, requireDelegate } = data;
this.logger.log(`Processing keygen request: userId=${userId}, username=${username}, sessionId=${sessionId}`);
try {
// Step 1: Create keygen session via mpc-system
const createResult = await this.mpcCoordinator.createKeygenSession({
username,
thresholdN: totalParties,
thresholdT: threshold,
requireDelegate,
});
const mpcSessionId = createResult.sessionId;
this.logger.log(`Keygen session created in mpc-system: ${mpcSessionId}`);
// Step 2: Poll for completion (with max retries)
const result = await this.pollKeygenCompletion(mpcSessionId, 150, 2000);
if (result.status === 'completed' && result.publicKey) {
// Cache public key
await this.mpcCoordinator.savePublicKeyCache(username, result.publicKey);
// Save delegate share if exists
if (result.delegateShare) {
await this.mpcCoordinator.saveDelegateShare({
username,
partyId: result.delegateShare.partyId,
partyIndex: result.delegateShare.partyIndex,
encryptedShare: result.delegateShare.encryptedShare,
});
}
// Publish success event
const completedEvent = new KeygenCompletedEvent(
sessionId, // Original session ID from identity-service
result.delegateShare?.partyId || '',
result.publicKey,
mpcSessionId,
`${threshold}-of-${totalParties}`,
);
// Add extra payload for identity-service
(completedEvent as any).extraPayload = {
userId,
username,
delegateShare: result.delegateShare,
serverParties: [], // mpc-system manages this
};
await this.eventPublisher.publishWithRetry(completedEvent);
this.logger.log(`Keygen completed: userId=${userId}, publicKey=${result.publicKey}`);
} else {
// Publish failure event
const failedEvent = new SessionFailedEvent(
sessionId,
'', // partyId
SessionType.KEYGEN,
`Keygen failed with status: ${result.status}`,
);
(failedEvent as any).extraPayload = { userId, username };
await this.eventPublisher.publishWithRetry(failedEvent);
this.logger.warn(`Keygen failed: userId=${userId}, status=${result.status}`);
}
} catch (error) {
this.logger.error(`Keygen processing error: userId=${userId}`, error);
// Publish failure event
const failedEvent = new SessionFailedEvent(
sessionId,
'', // partyId
SessionType.KEYGEN,
error instanceof Error ? error.message : 'Unknown error',
);
(failedEvent as any).extraPayload = { userId, username };
try {
await this.eventPublisher.publishWithRetry(failedEvent);
} catch (publishError) {
this.logger.error('Failed to publish failure event', publishError);
}
}
}
private async pollKeygenCompletion(
sessionId: string,
maxAttempts: number,
intervalMs: number,
): Promise<{
status: string;
publicKey?: string;
delegateShare?: {
partyId: string;
partyIndex: number;
encryptedShare: string;
};
}> {
for (let i = 0; i < maxAttempts; i++) {
const status = await this.mpcCoordinator.getKeygenStatus(sessionId);
if (status.status === 'completed') {
return {
status: 'completed',
publicKey: status.publicKey,
delegateShare: status.delegateShare,
};
}
if (status.status === 'failed' || status.status === 'expired') {
return { status: status.status };
}
await this.sleep(intervalMs);
}
return { status: 'timeout' };
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@ -0,0 +1,141 @@
/**
* SigningRequested Event Handler
*
* Handles signing requests from identity-service via Kafka.
* Processes the signing and publishes completion/failure events.
*/
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { MPCCoordinatorService } from '../services/mpc-coordinator.service';
import { EventPublisherService } from '../../infrastructure/messaging/kafka/event-publisher.service';
import {
EventConsumerService,
MPC_CONSUME_TOPICS,
SigningRequestedPayload,
} from '../../infrastructure/messaging/kafka/event-consumer.service';
import { SigningCompletedEvent } from '../../domain/events/signing-completed.event';
import { SessionFailedEvent } from '../../domain/events/session-failed.event';
import { SessionType } from '../../domain/enums';
@Injectable()
export class SigningRequestedHandler implements OnModuleInit {
private readonly logger = new Logger(SigningRequestedHandler.name);
constructor(
private readonly eventConsumer: EventConsumerService,
private readonly eventPublisher: EventPublisherService,
private readonly mpcCoordinator: MPCCoordinatorService,
) {}
async onModuleInit() {
await this.eventConsumer.subscribe(
MPC_CONSUME_TOPICS.SIGNING_REQUESTED,
this.handleMessage.bind(this),
);
this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.SIGNING_REQUESTED}`);
}
private async handleMessage(topic: string, payload: Record<string, unknown>): Promise<void> {
const data = payload as unknown as SigningRequestedPayload;
const { sessionId, userId, username, messageHash, userShare } = data;
this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}`);
try {
// Step 1: Create signing session via mpc-system
const createResult = await this.mpcCoordinator.createSigningSession({
username,
messageHash,
userShare,
});
const mpcSessionId = createResult.sessionId;
this.logger.log(`Signing session created in mpc-system: ${mpcSessionId}`);
// Step 2: Poll for completion (with max retries)
const result = await this.pollSigningCompletion(mpcSessionId, 150, 2000);
if (result.status === 'completed' && result.signature) {
// Publish success event
const completedEvent = new SigningCompletedEvent(
sessionId, // Original session ID from identity-service
'', // partyId
messageHash,
result.signature,
'', // publicKey - not needed for signing result
);
// Add extra payload for identity-service
(completedEvent as any).extraPayload = {
userId,
username,
mpcSessionId,
};
await this.eventPublisher.publishWithRetry(completedEvent);
this.logger.log(`Signing completed: userId=${userId}, signature=${result.signature.substring(0, 16)}...`);
} else {
// Publish failure event
const failedEvent = new SessionFailedEvent(
sessionId,
'', // partyId
SessionType.SIGN,
`Signing failed with status: ${result.status}`,
);
(failedEvent as any).extraPayload = { userId, username };
await this.eventPublisher.publishWithRetry(failedEvent);
this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}`);
}
} catch (error) {
this.logger.error(`Signing processing error: userId=${userId}`, error);
// Publish failure event
const failedEvent = new SessionFailedEvent(
sessionId,
'', // partyId
SessionType.SIGN,
error instanceof Error ? error.message : 'Unknown error',
);
(failedEvent as any).extraPayload = { userId, username };
try {
await this.eventPublisher.publishWithRetry(failedEvent);
} catch (publishError) {
this.logger.error('Failed to publish failure event', publishError);
}
}
}
private async pollSigningCompletion(
sessionId: string,
maxAttempts: number,
intervalMs: number,
): Promise<{
status: string;
signature?: string;
}> {
for (let i = 0; i < maxAttempts; i++) {
const status = await this.mpcCoordinator.getSigningStatus(sessionId);
if (status.status === 'completed') {
return {
status: 'completed',
signature: status.signature,
};
}
if (status.status === 'failed' || status.status === 'expired') {
return { status: status.status };
}
await this.sleep(intervalMs);
}
return { status: 'timeout' };
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@ -0,0 +1,25 @@
/**
* Event Consumer Starter Service
*
* Starts Kafka consumers after all handlers are registered.
* This ensures handlers are registered before consuming starts.
*/
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { EventConsumerService } from '../../infrastructure/messaging/kafka/event-consumer.service';
@Injectable()
export class EventConsumerStarterService implements OnApplicationBootstrap {
private readonly logger = new Logger(EventConsumerStarterService.name);
constructor(private readonly eventConsumer: EventConsumerService) {}
async onApplicationBootstrap() {
try {
await this.eventConsumer.startConsuming();
this.logger.log('MPC event consumers started successfully');
} catch (error) {
this.logger.error('Failed to start MPC event consumers', error);
}
}
}

View File

@ -1 +1,2 @@
export * from './mpc-coordinator.service';
export * from './event-consumer-starter.service';

View File

@ -1,7 +1,9 @@
/**
* Infrastructure Module
*
* mpc-service PrismaService delegate share
* mpc-service :
* - PrismaService delegate share
* - Kafka
*/
import { Global, Module } from '@nestjs/common';
@ -10,15 +12,25 @@ import { ConfigModule } from '@nestjs/config';
// Persistence
import { PrismaService } from './persistence/prisma/prisma.service';
// Kafka Messaging
import { EventPublisherService } from './messaging/kafka/event-publisher.service';
import { EventConsumerService } from './messaging/kafka/event-consumer.service';
@Global()
@Module({
imports: [ConfigModule],
providers: [
// Prisma (用于缓存公钥和 delegate share)
PrismaService,
// Kafka (事件发布和消费)
EventPublisherService,
EventConsumerService,
],
exports: [
PrismaService,
EventPublisherService,
EventConsumerService,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,140 @@
/**
* Event Consumer Service
*
* Consumes domain events from Kafka for async processing.
* Handles keygen and signing requests from identity-service.
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// Kafka Topics for consuming
export const MPC_CONSUME_TOPICS = {
KEYGEN_REQUESTED: 'mpc.KeygenRequested',
SIGNING_REQUESTED: 'mpc.SigningRequested',
} as const;
export interface KeygenRequestedPayload {
sessionId: string;
userId: string;
username: string;
threshold: number;
totalParties: number;
requireDelegate: boolean;
}
export interface SigningRequestedPayload {
sessionId: string;
userId: string;
username: string;
messageHash: string;
userShare?: string;
}
export type MessageHandler = (topic: string, payload: Record<string, unknown>) => Promise<void>;
@Injectable()
export class EventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private handlers: Map<string, MessageHandler> = new Map();
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mpc-service';
const groupId = this.configService.get<string>('KAFKA_GROUP_ID') || 'mpc-service-group';
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
await this.consumer.connect();
this.isConnected = true;
this.logger.log('Kafka consumer connected');
} catch (error) {
this.logger.error('Failed to connect Kafka consumer', error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('Kafka consumer disconnected');
}
}
/**
* Subscribe to a topic with a handler
*/
async subscribe(topic: string, handler: MessageHandler): Promise<void> {
if (!this.isConnected) {
this.logger.warn('Kafka not connected, cannot subscribe');
return;
}
this.handlers.set(topic, handler);
try {
await this.consumer.subscribe({ topic, fromBeginning: false });
this.logger.log(`Subscribed to topic: ${topic}`);
} catch (error) {
this.logger.error(`Failed to subscribe to topic: ${topic}`, error);
throw error;
}
}
/**
* Start consuming messages
*/
async startConsuming(): Promise<void> {
if (!this.isConnected) {
this.logger.warn('Kafka not connected, cannot start consuming');
return;
}
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const handler = this.handlers.get(topic);
if (!handler) {
this.logger.warn(`No handler for topic: ${topic}`);
return;
}
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn('Empty message received');
return;
}
const parsed = JSON.parse(value);
this.logger.debug(`Received message from ${topic}: ${JSON.stringify(parsed)}`);
await handler(topic, parsed.payload || parsed);
} catch (error) {
this.logger.error(`Error processing message from ${topic}`, error);
}
},
});
this.logger.log('Started consuming messages');
}
}

View File

@ -1 +1,2 @@
export * from './event-publisher.service';
export * from './event-consumer.service';