feat(mpc-system): 增强连接可靠性和消息去重机制

后端改进:
- SessionEventBroadcaster: 重连时自动关闭旧 channel 防止内存泄漏
- MessageBroker: 重连时关闭旧的 party/session channel
- SubscribeMessages: 订阅时自动发送数据库中的 pending 消息

客户端改进:
- GrpcClient: 添加自动重连机制(指数退避,最多10次)
- GrpcClient: 断开/重连/失败事件通知前端
- TSSHandler: 消息缓冲机制,进程启动前缓存收到的消息
- TSSHandler: 客户端本地消息去重,防止重连后重复处理
- Database: 添加 processed_messages 表和相关操作方法
- Main: Keygen 幂等性保护,防止重复触发
- Main: 会话事件缓存,解决前端订阅时序问题

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-29 07:37:03 -08:00
parent df8a14211e
commit 0ca37ee76a
7 changed files with 549 additions and 32 deletions

View File

@ -87,12 +87,49 @@ func (s *MessageRouterServer) RouteMessage(
}
// SubscribeMessages subscribes to messages for a party (streaming)
// On subscription, it first sends any pending messages from the database
// to ensure no messages are lost during reconnection
func (s *MessageRouterServer) SubscribeMessages(
req *pb.SubscribeMessagesRequest,
stream pb.MessageRouter_SubscribeMessagesServer,
) error {
ctx := stream.Context()
logger.Info("Party subscribing to messages",
zap.String("session_id", req.SessionId),
zap.String("party_id", req.PartyId))
// First, send any pending messages from the database (message recovery on reconnect)
if s.getPendingMessagesUC != nil && req.SessionId != "" {
input := use_cases.GetPendingMessagesInput{
SessionID: req.SessionId,
PartyID: req.PartyId,
AfterTimestamp: 0, // Get all pending messages
}
pendingMessages, err := s.getPendingMessagesUC.Execute(ctx, input)
if err != nil {
logger.Warn("Failed to get pending messages on subscribe",
zap.String("session_id", req.SessionId),
zap.String("party_id", req.PartyId),
zap.Error(err))
} else if len(pendingMessages) > 0 {
logger.Info("Sending pending messages on subscribe",
zap.String("session_id", req.SessionId),
zap.String("party_id", req.PartyId),
zap.Int("count", len(pendingMessages)))
for _, msg := range pendingMessages {
if err := sendMessage(stream, msg); err != nil {
logger.Error("Failed to send pending message",
zap.String("message_id", msg.ID),
zap.Error(err))
return err
}
}
}
}
// Subscribe to party messages
partyCh, err := s.messageBroker.SubscribeToPartyMessages(ctx, req.PartyId)
if err != nil {
@ -109,6 +146,9 @@ func (s *MessageRouterServer) SubscribeMessages(
for {
select {
case <-ctx.Done():
logger.Info("Party unsubscribed from messages",
zap.String("session_id", req.SessionId),
zap.String("party_id", req.PartyId))
return nil
case msg, ok := <-partyCh:
if !ok {

View File

@ -116,6 +116,7 @@ func (a *MessageBrokerAdapter) PublishToSession(
}
// SubscribeToPartyMessages subscribes to messages for a specific party
// If the party already has an active subscription, the old channel is closed first
func (a *MessageBrokerAdapter) SubscribeToPartyMessages(
ctx context.Context,
partyID string,
@ -123,11 +124,15 @@ func (a *MessageBrokerAdapter) SubscribeToPartyMessages(
a.mu.Lock()
defer a.mu.Unlock()
// Create channel if not exists
if _, exists := a.partyChannels[partyID]; !exists {
a.partyChannels[partyID] = make(chan *entities.MessageDTO, 100)
// Close existing channel if party is re-subscribing (e.g., after reconnect)
if oldCh, exists := a.partyChannels[partyID]; exists {
close(oldCh)
logger.Info("closed existing party channel for re-subscription",
zap.String("party_id", partyID))
}
// Create new channel
a.partyChannels[partyID] = make(chan *entities.MessageDTO, 100)
ch := a.partyChannels[partyID]
// Return a read-only channel
@ -155,6 +160,7 @@ func (a *MessageBrokerAdapter) SubscribeToPartyMessages(
}
// SubscribeToSessionMessages subscribes to all messages in a session
// If the party already has an active subscription for this session, the old channel is closed first
func (a *MessageBrokerAdapter) SubscribeToSessionMessages(
ctx context.Context,
sessionID string,
@ -171,14 +177,18 @@ func (a *MessageBrokerAdapter) SubscribeToSessionMessages(
zap.String("key", key),
zap.Int("current_channel_count", len(a.sessionChannels)))
// Create channel if not exists
if _, exists := a.sessionChannels[key]; !exists {
a.sessionChannels[key] = make(chan *entities.MessageDTO, 100)
logger.Info("Created new session channel",
// Close existing channel if party is re-subscribing (e.g., after reconnect)
if oldCh, exists := a.sessionChannels[key]; exists {
close(oldCh)
logger.Info("closed existing session channel for re-subscription",
zap.String("key", key))
}
// Create new channel
a.sessionChannels[key] = make(chan *entities.MessageDTO, 100)
ch := a.sessionChannels[key]
logger.Info("Created new session channel",
zap.String("key", key))
// Return a read-only channel
out := make(chan *entities.MessageDTO, 100)

View File

@ -20,10 +20,17 @@ func NewSessionEventBroadcaster() *SessionEventBroadcaster {
}
// Subscribe subscribes a party to session events
// If the party already has an active subscription, the old channel is closed first
// to prevent memory leaks and ensure clean reconnection
func (b *SessionEventBroadcaster) Subscribe(partyID string) <-chan *pb.SessionEvent {
b.mu.Lock()
defer b.mu.Unlock()
// Close existing channel if party is re-subscribing (e.g., after reconnect)
if oldCh, exists := b.subscribers[partyID]; exists {
close(oldCh)
}
// Create buffered channel for this subscriber
ch := make(chan *pb.SessionEvent, 100)
b.subscribers[partyID] = ch

View File

@ -72,6 +72,9 @@ interface ActiveKeygenSession {
}
let activeKeygenSession: ActiveKeygenSession | null = null;
// Keygen 幂等性保护:追踪正在进行的 keygen 会话 ID
let keygenInProgressSessionId: string | null = null;
// 会话事件缓存 - 解决前端订阅时可能错过事件的时序问题
// 当事件到达时,前端可能还在页面导航中,尚未订阅
interface SessionEventData {
@ -242,7 +245,9 @@ function getOrCreatePartyId(db: DatabaseManager): string {
// 生成一个新的 UUID 作为 partyId
partyId = crypto.randomUUID();
db.setSetting('party_id', partyId);
console.log('Generated new partyId:', partyId);
debugLog.info('main', `Generated new partyId: ${partyId}`);
} else {
debugLog.info('main', `Loaded existing partyId: ${partyId}`);
}
return partyId;
}
@ -251,17 +256,24 @@ function getOrCreatePartyId(db: DatabaseManager): string {
async function initServices() {
// 初始化数据库 (必须首先初始化)
database = new DatabaseManager();
// 等待数据库初始化完成(加载 WASM 和创建表)
await database.waitForReady();
debugLog.info('main', 'Database initialized');
// 初始化 gRPC 客户端
grpcClient = new GrpcClient();
// 清理过期的已处理消息记录(防止数据库膨胀)
database.cleanupOldProcessedMessages();
debugLog.debug('main', 'Cleaned up old processed messages');
// 初始化 TSS Handler
if (USE_MOCK_TSS) {
debugLog.info('tss', 'Using Mock TSS Handler (development mode)');
tssHandler = new MockTSSHandler(grpcClient);
} else {
debugLog.info('tss', 'Using real TSS Handler');
tssHandler = new TSSHandler(grpcClient);
tssHandler = new TSSHandler(grpcClient, database);
}
// 设置 TSS 进度事件监听
@ -324,6 +336,18 @@ async function handleSessionStart(event: {
return;
}
// 幂等性保护:检查是否已经在执行 keygen
if (keygenInProgressSessionId === event.sessionId) {
debugLog.debug('main', `Keygen already in progress for session ${event.sessionId}, skipping duplicate trigger`);
return;
}
// 再次检查 TSS 是否在运行(双重保护)
if (tssHandler?.getIsRunning()) {
debugLog.debug('main', 'TSS already running, skipping');
return;
}
if (!tssHandler) {
debugLog.error('tss', 'TSS handler not initialized');
mainWindow?.webContents.send(`session:events:${event.sessionId}`, {
@ -333,6 +357,9 @@ async function handleSessionStart(event: {
return;
}
// 标记 keygen 开始
keygenInProgressSessionId = event.sessionId;
// 从事件中更新参与者列表(如果事件包含完整列表)
if (event.selectedParties && event.selectedParties.length > 0) {
const myPartyId = grpcClient?.getPartyId();
@ -384,6 +411,8 @@ async function handleSessionStart(event: {
type: 'failed',
error: result.error || 'Keygen failed',
});
// 清除幂等性标志
keygenInProgressSessionId = null;
}
} catch (error) {
debugLog.error('tss', `Keygen error: ${(error as Error).message}`);
@ -391,6 +420,8 @@ async function handleSessionStart(event: {
type: 'failed',
error: (error as Error).message,
});
// 清除幂等性标志
keygenInProgressSessionId = null;
}
}
@ -443,8 +474,9 @@ async function handleKeygenComplete(result: KeygenResult) {
allCompleted: allCompleted,
});
// 4. 清理活跃会话
// 4. 清理活跃会话和幂等性标志
activeKeygenSession = null;
keygenInProgressSessionId = null;
debugLog.info('main', 'Keygen session completed and cleaned up');
} catch (error) {
@ -453,6 +485,8 @@ async function handleKeygenComplete(result: KeygenResult) {
type: 'failed',
error: (error as Error).message,
});
// 清除幂等性标志
keygenInProgressSessionId = null;
}
}
@ -481,6 +515,22 @@ async function connectAndRegisterToMessageRouter() {
grpcClient.subscribeSessionEvents(partyId);
debugLog.info('grpc', 'Subscribed to session events');
// 监听连接状态变化
grpcClient.on('disconnected', (reason: string) => {
debugLog.warn('grpc', `Disconnected from Message Router: ${reason}`);
mainWindow?.webContents.send('grpc:connectionStatus', { connected: false, reason });
});
grpcClient.on('reconnected', () => {
debugLog.info('grpc', 'Reconnected to Message Router');
mainWindow?.webContents.send('grpc:connectionStatus', { connected: true });
});
grpcClient.on('reconnectFailed', (reason: string) => {
debugLog.error('grpc', `Failed to reconnect: ${reason}`);
mainWindow?.webContents.send('grpc:connectionStatus', { connected: false, error: reason });
});
// 监听会话事件并处理
grpcClient.on('sessionEvent', async (event: {
eventId: string;
@ -1345,6 +1395,29 @@ function setupIpcHandlers() {
ipcMain.on('debug:log', (_event, { level, source, message }) => {
sendDebugLog(level as LogLevel, source as LogSource, message);
});
// ===========================================================================
// 会话事件订阅(带缓存事件发送)
// ===========================================================================
// 前端订阅会话事件时,立即发送缓存的事件
ipcMain.on('grpc:subscribeSessionEvents', (_event, { sessionId }) => {
debugLog.debug('main', `Frontend subscribing to session events: ${sessionId}`);
// 获取并发送缓存的事件
const cachedEvents = getAndClearCachedEvents(sessionId);
if (cachedEvents.length > 0) {
debugLog.info('main', `Sending ${cachedEvents.length} cached events to frontend for session ${sessionId}`);
for (const event of cachedEvents) {
mainWindow?.webContents.send(`session:events:${sessionId}`, event);
}
}
});
// 前端取消订阅
ipcMain.on('grpc:unsubscribeSessionEvents', (_event, { sessionId }) => {
debugLog.debug('main', `Frontend unsubscribing from session events: ${sessionId}`);
});
}
// 应用生命周期

View File

@ -156,6 +156,13 @@ export class DatabaseManager {
await this.initPromise;
}
/**
*
*/
async waitForReady(): Promise<void> {
await this.initPromise;
}
/**
*
*/
@ -213,12 +220,22 @@ export class DatabaseManager {
)
`);
// 已处理消息表 - 用于消息去重,防止重连后重复处理消息
this.db.run(`
CREATE TABLE IF NOT EXISTS processed_messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
processed_at TEXT NOT NULL
)
`);
// 创建索引
this.db.run(`CREATE INDEX IF NOT EXISTS idx_shares_session ON shares(session_id)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_addresses_share ON derived_addresses(share_id)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_addresses_chain ON derived_addresses(chain)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_history_share ON signing_history(share_id)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_history_status ON signing_history(status)`);
this.db.run(`CREATE INDEX IF NOT EXISTS idx_processed_messages_session ON processed_messages(session_id)`);
// 插入默认设置
this.db.run(`INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)`, ['message_router_url', 'mpc-grpc.szaiai.com:443']);
@ -610,6 +627,55 @@ export class DatabaseManager {
return settings;
}
// ===========================================================================
// 消息去重操作
// ===========================================================================
/**
*
*/
isMessageProcessed(messageId: string): boolean {
const row = this.queryOne<{ message_id: string }>(
`SELECT message_id FROM processed_messages WHERE message_id = ?`,
[messageId]
);
return !!row;
}
/**
*
*/
markMessageProcessed(messageId: string, sessionId: string): void {
if (!this.db) return;
const now = new Date().toISOString();
this.db.run(
`INSERT OR IGNORE INTO processed_messages (message_id, session_id, processed_at) VALUES (?, ?, ?)`,
[messageId, sessionId, now]
);
this.saveToFile();
}
/**
*
*
*/
clearProcessedMessages(sessionId: string): void {
if (!this.db) return;
this.db.run(`DELETE FROM processed_messages WHERE session_id = ?`, [sessionId]);
this.saveToFile();
}
/**
* 24
*
*/
cleanupOldProcessedMessages(): void {
if (!this.db) return;
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
this.db.run(`DELETE FROM processed_messages WHERE processed_at < ?`, [cutoff]);
this.saveToFile();
}
// ===========================================================================
// 导入导出
// ===========================================================================

View File

@ -98,23 +98,62 @@ interface GetRegisteredPartiesResponse {
parties: RegisteredParty[];
}
// 重连配置
interface ReconnectConfig {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
}
const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = {
maxRetries: 10,
initialDelayMs: 1000,
maxDelayMs: 30000,
backoffMultiplier: 2,
};
/**
* gRPC - Message Router
*
* :
* - 开发环境: localhost:50051 ()
* - 生产环境: mpc-grpc.szaiai.com:443 (TLS )
*
* :
* - 退
* -
* -
*/
export class GrpcClient extends EventEmitter {
private client: grpc.Client | null = null;
private connected = false;
private partyId: string | null = null;
private partyRole: string | null = null;
private heartbeatInterval: NodeJS.Timeout | null = null;
private messageStream: grpc.ClientReadableStream<MPCMessage> | null = null;
private eventStream: grpc.ClientReadableStream<SessionEvent> | null = null;
constructor() {
// 重连相关
private reconnectConfig: ReconnectConfig;
private currentAddress: string | null = null;
private currentUseTLS: boolean | undefined;
private isReconnecting = false;
private reconnectAttempts = 0;
private reconnectTimeout: NodeJS.Timeout | null = null;
private shouldReconnect = true;
// 消息流状态(用于重连后恢复)
private activeMessageSubscription: { sessionId: string; partyId: string } | null = null;
private eventStreamSubscribed = false;
// 心跳失败计数
private heartbeatFailCount = 0;
private readonly MAX_HEARTBEAT_FAILS = 3;
constructor(reconnectConfig?: Partial<ReconnectConfig>) {
super();
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...reconnectConfig };
}
/**
@ -123,6 +162,15 @@ export class GrpcClient extends EventEmitter {
* @param useTLS 使 TLS (默认: 自动检测 443 使 TLS)
*/
async connect(address: string, useTLS?: boolean): Promise<void> {
// 保存连接参数用于重连
this.currentAddress = address;
this.currentUseTLS = useTLS;
this.shouldReconnect = true;
return this.doConnect(address, useTLS);
}
private async doConnect(address: string, useTLS?: boolean): Promise<void> {
return new Promise((resolve, reject) => {
const definition = loadProtoDefinition();
const proto = grpc.loadPackageDefinition(definition) as ProtoPackage;
@ -148,7 +196,7 @@ export class GrpcClient extends EventEmitter {
? grpc.credentials.createSsl() // TLS 加密 (生产环境)
: grpc.credentials.createInsecure(); // 不加密 (开发环境)
console.log(`Connecting to Message Router: ${targetAddress} (TLS: ${shouldUseTLS})`);
console.log(`[gRPC] Connecting to Message Router: ${targetAddress} (TLS: ${shouldUseTLS})`);
this.client = new MessageRouter(
targetAddress,
@ -165,6 +213,10 @@ export class GrpcClient extends EventEmitter {
reject(err);
} else {
this.connected = true;
this.reconnectAttempts = 0; // 重置重连计数
this.heartbeatFailCount = 0;
console.log('[gRPC] Connected successfully');
this.emit('connected');
resolve();
}
});
@ -172,31 +224,121 @@ export class GrpcClient extends EventEmitter {
}
/**
*
*
*/
disconnect(): void {
this.shouldReconnect = false;
this.cleanupConnection();
}
/**
*
*/
private cleanupConnection(): void {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.messageStream) {
this.messageStream.cancel();
try {
this.messageStream.cancel();
} catch (e) {
// 忽略取消错误
}
this.messageStream = null;
}
if (this.eventStream) {
this.eventStream.cancel();
try {
this.eventStream.cancel();
} catch (e) {
// 忽略取消错误
}
this.eventStream = null;
}
if (this.client) {
(this.client as grpc.Client & { close: () => void }).close();
try {
(this.client as grpc.Client & { close: () => void }).close();
} catch (e) {
// 忽略关闭错误
}
this.client = null;
}
this.connected = false;
this.partyId = null;
}
/**
*
*/
private async triggerReconnect(reason: string): Promise<void> {
if (!this.shouldReconnect || this.isReconnecting || !this.currentAddress) {
return;
}
console.log(`[gRPC] Triggering reconnect: ${reason}`);
this.isReconnecting = true;
this.connected = false;
this.emit('disconnected', reason);
// 清理现有连接
this.cleanupConnection();
// 计算延迟时间(指数退避)
const delay = Math.min(
this.reconnectConfig.initialDelayMs * Math.pow(this.reconnectConfig.backoffMultiplier, this.reconnectAttempts),
this.reconnectConfig.maxDelayMs
);
console.log(`[gRPC] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1}/${this.reconnectConfig.maxRetries})`);
this.reconnectTimeout = setTimeout(async () => {
this.reconnectAttempts++;
if (this.reconnectAttempts > this.reconnectConfig.maxRetries) {
console.error('[gRPC] Max reconnect attempts reached');
this.isReconnecting = false;
this.emit('reconnectFailed', 'Max retries exceeded');
return;
}
try {
await this.doConnect(this.currentAddress!, this.currentUseTLS);
// 重新注册
if (this.partyId && this.partyRole) {
console.log(`[gRPC] Re-registering as party: ${this.partyId}`);
await this.registerParty(this.partyId, this.partyRole);
}
// 重新订阅事件流
if (this.eventStreamSubscribed && this.partyId) {
console.log('[gRPC] Re-subscribing to session events');
this.subscribeSessionEvents(this.partyId);
}
// 重新订阅消息流
if (this.activeMessageSubscription) {
console.log(`[gRPC] Re-subscribing to messages for session: ${this.activeMessageSubscription.sessionId}`);
this.subscribeMessages(this.activeMessageSubscription.sessionId, this.activeMessageSubscription.partyId);
}
this.isReconnecting = false;
this.emit('reconnected');
} catch (err) {
console.error(`[gRPC] Reconnect attempt ${this.reconnectAttempts} failed:`, (err as Error).message);
this.isReconnecting = false;
// 继续尝试重连
this.triggerReconnect('Previous reconnect attempt failed');
}
}, delay);
}
/**
@ -236,6 +378,7 @@ export class GrpcClient extends EventEmitter {
reject(new Error('Registration failed'));
} else {
this.partyId = partyId;
this.partyRole = role;
this.startHeartbeat();
resolve();
}
@ -245,13 +388,15 @@ export class GrpcClient extends EventEmitter {
}
/**
*
*
*/
private startHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.heartbeatFailCount = 0;
this.heartbeatInterval = setInterval(() => {
if (this.client && this.partyId) {
(this.client as grpc.Client & { heartbeat: (req: unknown, callback: (err: Error | null) => void) => void })
@ -259,8 +404,17 @@ export class GrpcClient extends EventEmitter {
{ party_id: this.partyId },
(err: Error | null) => {
if (err) {
console.error('Heartbeat failed:', err.message);
this.heartbeatFailCount++;
console.error(`[gRPC] Heartbeat failed (${this.heartbeatFailCount}/${this.MAX_HEARTBEAT_FAILS}):`, err.message);
this.emit('connectionError', err);
// 连续失败多次后触发重连
if (this.heartbeatFailCount >= this.MAX_HEARTBEAT_FAILS) {
this.triggerReconnect('Heartbeat failed');
}
} else {
// 心跳成功,重置失败计数
this.heartbeatFailCount = 0;
}
}
);
@ -296,13 +450,25 @@ export class GrpcClient extends EventEmitter {
}
/**
*
*
*/
subscribeSessionEvents(partyId: string): void {
if (!this.client) {
throw new Error('Not connected');
}
// 标记已订阅(用于重连后恢复)
this.eventStreamSubscribed = true;
// 取消现有流
if (this.eventStream) {
try {
this.eventStream.cancel();
} catch (e) {
// 忽略
}
}
this.eventStream = (this.client as grpc.Client & { subscribeSessionEvents: (req: unknown) => grpc.ClientReadableStream<SessionEvent> })
.subscribeSessionEvents({ party_id: partyId });
@ -311,24 +477,61 @@ export class GrpcClient extends EventEmitter {
});
this.eventStream.on('error', (err: Error) => {
console.error('Session event stream error:', err.message);
console.error('[gRPC] Session event stream error:', err.message);
this.emit('streamError', err);
// 非主动取消的错误触发重连
if (!err.message.includes('CANCELLED') && this.shouldReconnect) {
this.triggerReconnect('Event stream error');
}
});
this.eventStream.on('end', () => {
console.log('Session event stream ended');
console.log('[gRPC] Session event stream ended');
this.emit('streamEnd');
// 流结束也触发重连
if (this.shouldReconnect && this.eventStreamSubscribed) {
this.triggerReconnect('Event stream ended');
}
});
}
/**
* MPC
*
*/
unsubscribeSessionEvents(): void {
this.eventStreamSubscribed = false;
if (this.eventStream) {
try {
this.eventStream.cancel();
} catch (e) {
// 忽略
}
this.eventStream = null;
}
}
/**
* MPC
*/
subscribeMessages(sessionId: string, partyId: string): void {
if (!this.client) {
throw new Error('Not connected');
}
// 保存订阅状态(用于重连后恢复)
this.activeMessageSubscription = { sessionId, partyId };
// 取消现有流
if (this.messageStream) {
try {
this.messageStream.cancel();
} catch (e) {
// 忽略
}
}
this.messageStream = (this.client as grpc.Client & { subscribeMessages: (req: unknown) => grpc.ClientReadableStream<MPCMessage> })
.subscribeMessages({
session_id: sessionId,
@ -340,16 +543,41 @@ export class GrpcClient extends EventEmitter {
});
this.messageStream.on('error', (err: Error) => {
console.error('Message stream error:', err.message);
console.error('[gRPC] Message stream error:', err.message);
this.emit('messageStreamError', err);
// 非主动取消的错误触发重连
if (!err.message.includes('CANCELLED') && this.shouldReconnect && this.activeMessageSubscription) {
this.triggerReconnect('Message stream error');
}
});
this.messageStream.on('end', () => {
console.log('Message stream ended');
console.log('[gRPC] Message stream ended');
this.emit('messageStreamEnd');
// 流结束也触发重连
if (this.shouldReconnect && this.activeMessageSubscription) {
this.triggerReconnect('Message stream ended');
}
});
}
/**
* MPC
*/
unsubscribeMessages(): void {
this.activeMessageSubscription = null;
if (this.messageStream) {
try {
this.messageStream.cancel();
} catch (e) {
// 忽略
}
this.messageStream = null;
}
}
/**
* MPC
*/

View File

@ -3,6 +3,7 @@ import * as path from 'path';
import * as fs from 'fs';
import { EventEmitter } from 'events';
import { GrpcClient } from './grpc-client';
import { DatabaseManager } from './database';
/**
* TSS
@ -54,6 +55,7 @@ interface ParticipantInfo {
export class TSSHandler extends EventEmitter {
private tssProcess: ChildProcess | null = null;
private grpcClient: GrpcClient;
private database: DatabaseManager | null = null;
private sessionId: string | null = null;
private partyId: string | null = null;
private partyIndex: number = -1;
@ -61,9 +63,26 @@ export class TSSHandler extends EventEmitter {
private partyIndexMap: Map<string, number> = new Map();
private isRunning = false;
constructor(grpcClient: GrpcClient) {
// 消息缓冲:在 TSS 进程启动前缓冲收到的消息
private messageBuffer: Array<{
messageId: string;
fromParty: string;
isBroadcast: boolean;
payload: Buffer;
}> = [];
private isProcessReady = false;
constructor(grpcClient: GrpcClient, database?: DatabaseManager) {
super();
this.grpcClient = grpcClient;
this.database = database || null;
}
/**
*
*/
setDatabase(database: DatabaseManager): void {
this.database = database;
}
/**
@ -114,6 +133,8 @@ export class TSSHandler extends EventEmitter {
this.partyIndex = partyIndex;
this.participants = participants;
this.isRunning = true;
this.isProcessReady = false;
this.messageBuffer = []; // 清空消息缓冲
// 构建 party index map
this.partyIndexMap.clear();
@ -142,10 +163,21 @@ export class TSSHandler extends EventEmitter {
let resultData = '';
// 先订阅消息(可能在进程就绪前就收到消息,会被缓冲)
this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this));
this.grpcClient.subscribeMessages(sessionId, partyId);
// 处理标准输出 (JSON 消息)
this.tssProcess.stdout?.on('data', (data: Buffer) => {
const lines = data.toString().split('\n').filter(line => line.trim());
// 收到第一条输出时,标记进程就绪并发送缓冲的消息
if (!this.isProcessReady && this.tssProcess?.stdin) {
this.isProcessReady = true;
console.log(`[TSS] Process ready, flushing ${this.messageBuffer.length} buffered messages`);
this.flushMessageBuffer();
}
for (const line of lines) {
try {
const message: TSSMessage = JSON.parse(line);
@ -168,13 +200,22 @@ export class TSSHandler extends EventEmitter {
// 处理进程退出
this.tssProcess.on('close', (code) => {
const completedSessionId = this.sessionId;
this.isRunning = false;
this.isProcessReady = false;
this.messageBuffer = [];
this.tssProcess = null;
// 清理消息监听器,防止下次 keygen 时重复注册
this.grpcClient.removeAllListeners('mpcMessage');
if (code === 0 && resultData) {
try {
const result: TSSMessage = JSON.parse(resultData);
if (result.publicKey && result.encryptedShare) {
// 成功完成后清理该会话的已处理消息记录
if (this.database && completedSessionId) {
this.database.clearProcessedMessages(completedSessionId);
}
resolve({
success: true,
publicKey: Buffer.from(result.publicKey, 'base64'),
@ -195,14 +236,14 @@ export class TSSHandler extends EventEmitter {
// 处理进程错误
this.tssProcess.on('error', (err) => {
this.isRunning = false;
this.isProcessReady = false;
this.messageBuffer = [];
this.tssProcess = null;
// 清理消息监听器
this.grpcClient.removeAllListeners('mpcMessage');
reject(err);
});
// 订阅 MPC 消息并转发给 TSS 进程
this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this));
this.grpcClient.subscribeMessages(sessionId, partyId);
} catch (err) {
this.isRunning = false;
reject(err);
@ -254,14 +295,44 @@ export class TSSHandler extends EventEmitter {
}
/**
* gRPC MPC
* gRPC MPC
*/
private handleIncomingMessage(message: {
messageId: string;
fromParty: string;
isBroadcast: boolean;
payload: Buffer;
}): void {
if (!this.tssProcess || !this.tssProcess.stdin) {
// 消息去重检查
if (this.database && message.messageId) {
if (this.database.isMessageProcessed(message.messageId)) {
console.log(`[TSS] Skipping duplicate message: ${message.messageId.substring(0, 8)}...`);
return;
}
}
// 如果进程未就绪,缓冲消息
if (!this.isProcessReady || !this.tssProcess || !this.tssProcess.stdin) {
if (this.isRunning) {
console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (process not ready)`);
this.messageBuffer.push(message);
}
return;
}
this.sendMessageToProcess(message);
}
/**
* TSS
*/
private sendMessageToProcess(message: {
messageId: string;
fromParty: string;
isBroadcast: boolean;
payload: Buffer;
}): void {
if (!this.tssProcess?.stdin) {
return;
}
@ -280,6 +351,26 @@ export class TSSHandler extends EventEmitter {
});
this.tssProcess.stdin.write(inputMessage + '\n');
// 标记消息为已处理(防止重连后重复处理)
if (this.database && message.messageId && this.sessionId) {
this.database.markMessageProcessed(message.messageId, this.sessionId);
}
}
/**
*
*/
private flushMessageBuffer(): void {
if (this.messageBuffer.length === 0) {
return;
}
console.log(`[TSS] Flushing ${this.messageBuffer.length} buffered messages`);
for (const msg of this.messageBuffer) {
this.sendMessageToProcess(msg);
}
this.messageBuffer = [];
}
/**
@ -291,6 +382,8 @@ export class TSSHandler extends EventEmitter {
this.tssProcess = null;
}
this.isRunning = false;
this.isProcessReady = false;
this.messageBuffer = [];
this.grpcClient.removeAllListeners('mpcMessage');
}