rwadurian/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts

296 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* MPC Event Consumer Service for Blockchain Service
*
* Consumes MPC events from mpc-service via Kafka:
* - KeygenCompleted: derives wallet addresses from public keys
* - SigningCompleted: returns signature for hot wallet transfers
* - SessionFailed: handles keygen/signing failures
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// MPC Event Topics (events from mpc-service)
export const MPC_TOPICS = {
KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
SIGNING_COMPLETED: 'mpc.SigningCompleted',
SESSION_FAILED: 'mpc.SessionFailed',
} as const;
export interface KeygenCompletedPayload {
sessionId: string;
partyId: string;
publicKey: string;
shareId: string;
threshold: string;
extraPayload?: {
userId: string;
accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号)
username: string;
delegateShare?: {
partyId: string;
partyIndex: number;
encryptedShare: string;
};
serverParties?: string[];
};
}
export interface SigningCompletedPayload {
sessionId: string;
partyId: string;
messageHash: string;
signature: string;
publicKey: string;
extraPayload?: {
userId: string;
username: string;
mpcSessionId: string;
source?: string; // 'blockchain-service' | 'identity-service'
};
}
export interface SessionFailedPayload {
sessionId: string;
partyId: string;
sessionType: string; // 'keygen' | 'sign'
errorMessage: string;
errorCode?: string;
extraPayload?: {
userId: string;
username: string;
source?: string;
};
}
export type MpcEventHandler<T> = (payload: T) => Promise<void>;
@Injectable()
export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(MpcEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private isShuttingDown = false;
private keygenCompletedHandler?: MpcEventHandler<KeygenCompletedPayload>;
private signingCompletedHandler?: MpcEventHandler<SigningCompletedPayload>;
private sessionFailedHandler?: MpcEventHandler<SessionFailedPayload>;
private signingFailedHandler?: MpcEventHandler<SessionFailedPayload>;
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'blockchain-service';
const groupId = 'blockchain-service-mpc-events';
this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`);
this.logger.log(`[INIT] GroupId: ${groupId}`);
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`);
// 企业级重试配置:指数退避,最多重试约 2.5 小时
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 1000, // 1 秒
maxRetryTime: 300000, // 最大 5 分钟
retries: 15, // 最多 15 次
multiplier: 2, // 指数退避因子
restartOnFailure: async () => true,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
// 监听 consumer crash 事件,自动重连
// 当 Kafka topic-partition 不可用或其他运行时错误导致 consumer 崩溃时触发
this.consumer.on(this.consumer.events.CRASH, async (event) => {
if (this.isShuttingDown) return;
this.logger.error(`[CRASH] Kafka consumer crashed: ${event.payload.error?.message || 'unknown'}, restart: ${event.payload.restart}`);
// 如果 KafkaJS 内部不自动重启restart=false手动触发重连
if (!event.payload.restart) {
this.logger.warn(`[CRASH] KafkaJS will not auto-restart, triggering manual reconnect...`);
this.isConnected = false;
await this.connectWithRetry();
}
});
await this.connectWithRetry();
}
/**
* 带指数退避的连接重试逻辑
*
* 解决问题:服务启动时 Kafka topic-partition 未就绪,导致 subscribe() 抛出
* "This server does not host this topic-partition" 错误。原实现只 catch 一次就放弃,
* consumer 永久失效,后续所有 MPC 签名结果都收不到(表现为 signing timeout 300s
*
* 策略指数退避重试2s→4s→8s→...→60s上限最多 10 次,总等待约 5 分钟。
*/
private async connectWithRetry(maxRetries = 10): Promise<void> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
if (this.isShuttingDown) return;
try {
if (!this.isConnected) {
this.logger.log(`[CONNECT] Connecting MPC Event consumer (attempt ${attempt}/${maxRetries})...`);
await this.consumer.connect();
this.isConnected = true;
this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`);
}
// Subscribe to MPC topics
await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false });
this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`);
// Start consuming
await this.startConsuming();
return; // 成功,退出重试循环
} catch (error: any) {
this.logger.error(`[ERROR] Failed to connect/subscribe Kafka consumer (attempt ${attempt}/${maxRetries}): ${error.message}`);
if (attempt < maxRetries) {
// 指数退避2s, 4s, 8s, 16s, 32s, 60s, 60s, ...
const delay = Math.min(2000 * Math.pow(2, attempt - 1), 60000);
this.logger.log(`[RETRY] Will retry in ${delay / 1000}s...`);
await new Promise(resolve => setTimeout(resolve, delay));
// 断开连接以清理状态,下次循环重新建立
try { await this.consumer.disconnect(); } catch (_) {}
this.isConnected = false;
}
}
}
this.logger.error(`[FATAL] Failed to connect Kafka consumer after ${maxRetries} attempts. MPC events will NOT be received!`);
}
async onModuleDestroy() {
this.isShuttingDown = true;
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('MPC Event Kafka consumer disconnected');
}
}
/**
* Register handler for keygen completed events
*/
onKeygenCompleted(handler: MpcEventHandler<KeygenCompletedPayload>): void {
this.keygenCompletedHandler = handler;
this.logger.log(`[REGISTER] KeygenCompleted handler registered`);
}
/**
* Register handler for signing completed events
*/
onSigningCompleted(handler: MpcEventHandler<SigningCompletedPayload>): void {
this.signingCompletedHandler = handler;
this.logger.log(`[REGISTER] SigningCompleted handler registered`);
}
/**
* Register handler for session failed events (keygen)
*/
onSessionFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.sessionFailedHandler = handler;
this.logger.log(`[REGISTER] SessionFailed handler registered`);
}
/**
* Register handler for signing failed events
*/
onSigningFailed(handler: MpcEventHandler<SessionFailedPayload>): void {
this.signingFailedHandler = handler;
this.logger.log(`[REGISTER] SigningFailed handler registered`);
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const offset = message.offset;
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
return;
}
this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`);
const parsed = JSON.parse(value);
const payload = parsed.payload || parsed;
this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`);
this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`);
switch (topic) {
case MPC_TOPICS.KEYGEN_COMPLETED:
this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`);
this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`);
this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`);
if (this.keygenCompletedHandler) {
await this.keygenCompletedHandler(payload as KeygenCompletedPayload);
this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`);
} else {
this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`);
}
break;
case MPC_TOPICS.SIGNING_COMPLETED:
this.logger.log(`[HANDLE] Processing SigningCompleted event`);
this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`);
this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`);
if (this.signingCompletedHandler) {
await this.signingCompletedHandler(payload as SigningCompletedPayload);
this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`);
}
break;
case MPC_TOPICS.SESSION_FAILED:
this.logger.log(`[HANDLE] Processing SessionFailed event`);
this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`);
this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`);
const failedPayload = payload as SessionFailedPayload;
// Route to appropriate handler based on session type
if (failedPayload.sessionType === 'sign') {
if (this.signingFailedHandler) {
await this.signingFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SigningFailed handler completed`);
}
} else {
if (this.sessionFailedHandler) {
await this.sessionFailedHandler(failedPayload);
this.logger.log(`[HANDLE] SessionFailed handler completed`);
}
}
break;
default:
this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`);
}
} catch (error) {
this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error);
}
},
});
this.logger.log(`[START] Started consuming MPC events for address derivation`);
}
}