rwadurian/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts

394 lines
10 KiB
TypeScript

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import * as path from 'path';
import { EventEmitter } from 'events';
import { app } from 'electron';
// Proto 文件路径 - 在打包后需要从 app.asar.unpacked 或 resources 目录加载
function getProtoPath(): string {
// 开发环境
if (!app.isPackaged) {
return path.join(__dirname, '../../proto/message_router.proto');
}
// 生产环境 - proto 文件需要解包
return path.join(process.resourcesPath, 'proto/message_router.proto');
}
const PROTO_PATH = getProtoPath();
// 定义 proto 包结构类型
interface ProtoPackage {
mpc?: {
router?: {
v1?: {
MessageRouter?: grpc.ServiceClientConstructor;
};
};
};
}
// 加载 Proto 定义
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
interface SessionInfo {
sessionId: string;
sessionType: string;
thresholdN: number;
thresholdT: number;
messageHash?: Buffer;
keygenSessionId?: string;
}
interface PartyInfo {
partyId: string;
partyIndex: number;
}
interface JoinSessionResponse {
success: boolean;
sessionInfo?: SessionInfo;
partyIndex: number;
otherParties: PartyInfo[];
}
interface MPCMessage {
messageId: string;
sessionId: string;
fromParty: string;
isBroadcast: boolean;
roundNumber: number;
payload: Buffer;
createdAt: string;
}
interface SessionEvent {
eventId: string;
eventType: string;
sessionId: string;
thresholdN: number;
thresholdT: number;
selectedParties: string[];
joinTokens: Record<string, string>;
messageHash?: Buffer;
}
/**
* gRPC 客户端 - 连接到 Message Router
*
* 连接地址格式:
* - 开发环境: localhost:50051 (不加密)
* - 生产环境: mpc-grpc.szaiai.com:443 (TLS 加密)
*/
export class GrpcClient extends EventEmitter {
private client: grpc.Client | null = null;
private connected = false;
private partyId: string | null = null;
private heartbeatInterval: NodeJS.Timeout | null = null;
private messageStream: grpc.ClientReadableStream<MPCMessage> | null = null;
private eventStream: grpc.ClientReadableStream<SessionEvent> | null = null;
constructor() {
super();
}
/**
* 连接到 Message Router
* @param address 完整地址,格式: host:port (例如 mpc-grpc.szaiai.com:443 或 localhost:50051)
* @param useTLS 是否使用 TLS 加密 (默认: 自动检测,端口 443 使用 TLS)
*/
async connect(address: string, useTLS?: boolean): Promise<void> {
return new Promise((resolve, reject) => {
const proto = grpc.loadPackageDefinition(packageDefinition) as ProtoPackage;
const MessageRouter = proto.mpc?.router?.v1?.MessageRouter;
if (!MessageRouter) {
reject(new Error('Failed to load MessageRouter service definition'));
return;
}
// 解析地址,如果没有端口则默认使用 443
let targetAddress = address;
if (!address.includes(':')) {
targetAddress = `${address}:443`;
}
// 自动检测是否使用 TLS: 端口 443 或显式指定
const port = parseInt(targetAddress.split(':')[1], 10);
const shouldUseTLS = useTLS !== undefined ? useTLS : (port === 443);
// 创建凭证
const credentials = shouldUseTLS
? grpc.credentials.createSsl() // TLS 加密 (生产环境)
: grpc.credentials.createInsecure(); // 不加密 (开发环境)
console.log(`Connecting to Message Router: ${targetAddress} (TLS: ${shouldUseTLS})`);
this.client = new MessageRouter(
targetAddress,
credentials
) as grpc.Client;
// 等待连接就绪
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 10);
(this.client as grpc.Client & { waitForReady: (deadline: Date, callback: (err?: Error) => void) => void })
.waitForReady(deadline, (err?: Error) => {
if (err) {
reject(err);
} else {
this.connected = true;
resolve();
}
});
});
}
/**
* 断开连接
*/
disconnect(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.messageStream) {
this.messageStream.cancel();
this.messageStream = null;
}
if (this.eventStream) {
this.eventStream.cancel();
this.eventStream = null;
}
if (this.client) {
(this.client as grpc.Client & { close: () => void }).close();
this.client = null;
}
this.connected = false;
this.partyId = null;
}
/**
* 检查是否已连接
*/
isConnected(): boolean {
return this.connected;
}
/**
* 获取当前 Party ID
*/
getPartyId(): string | null {
return this.partyId;
}
/**
* 注册为参与方
*/
async registerParty(partyId: string, role: string): Promise<void> {
if (!this.client) {
throw new Error('Not connected');
}
return new Promise((resolve, reject) => {
(this.client as grpc.Client & { registerParty: (req: unknown, callback: (err: Error | null, res: { success: boolean }) => void) => void })
.registerParty(
{
party_id: partyId,
party_role: role,
version: '1.0.0',
},
(err: Error | null, response: { success: boolean }) => {
if (err) {
reject(err);
} else if (!response.success) {
reject(new Error('Registration failed'));
} else {
this.partyId = partyId;
this.startHeartbeat();
resolve();
}
}
);
});
}
/**
* 开始心跳
*/
private startHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = setInterval(() => {
if (this.client && this.partyId) {
(this.client as grpc.Client & { heartbeat: (req: unknown, callback: (err: Error | null) => void) => void })
.heartbeat(
{ party_id: this.partyId },
(err: Error | null) => {
if (err) {
console.error('Heartbeat failed:', err.message);
this.emit('connectionError', err);
}
}
);
}
}, 30000); // 每 30 秒一次
}
/**
* 加入会话
*/
async joinSession(sessionId: string, partyId: string, joinToken: string): Promise<JoinSessionResponse> {
if (!this.client) {
throw new Error('Not connected');
}
return new Promise((resolve, reject) => {
(this.client as grpc.Client & { joinSession: (req: unknown, callback: (err: Error | null, res: JoinSessionResponse) => void) => void })
.joinSession(
{
session_id: sessionId,
party_id: partyId,
join_token: joinToken,
},
(err: Error | null, response: JoinSessionResponse) => {
if (err) {
reject(err);
} else {
resolve(response);
}
}
);
});
}
/**
* 订阅会话事件
*/
subscribeSessionEvents(partyId: string): void {
if (!this.client) {
throw new Error('Not connected');
}
this.eventStream = (this.client as grpc.Client & { subscribeSessionEvents: (req: unknown) => grpc.ClientReadableStream<SessionEvent> })
.subscribeSessionEvents({ party_id: partyId });
this.eventStream.on('data', (event: SessionEvent) => {
this.emit('sessionEvent', event);
});
this.eventStream.on('error', (err: Error) => {
console.error('Session event stream error:', err.message);
this.emit('streamError', err);
});
this.eventStream.on('end', () => {
console.log('Session event stream ended');
this.emit('streamEnd');
});
}
/**
* 订阅 MPC 消息
*/
subscribeMessages(sessionId: string, partyId: string): void {
if (!this.client) {
throw new Error('Not connected');
}
this.messageStream = (this.client as grpc.Client & { subscribeMessages: (req: unknown) => grpc.ClientReadableStream<MPCMessage> })
.subscribeMessages({
session_id: sessionId,
party_id: partyId,
});
this.messageStream.on('data', (message: MPCMessage) => {
this.emit('mpcMessage', message);
});
this.messageStream.on('error', (err: Error) => {
console.error('Message stream error:', err.message);
this.emit('messageStreamError', err);
});
this.messageStream.on('end', () => {
console.log('Message stream ended');
this.emit('messageStreamEnd');
});
}
/**
* 发送 MPC 消息
*/
async routeMessage(
sessionId: string,
fromParty: string,
toParties: string[],
roundNumber: number,
payload: Buffer
): Promise<string> {
if (!this.client) {
throw new Error('Not connected');
}
return new Promise((resolve, reject) => {
(this.client as grpc.Client & { routeMessage: (req: unknown, callback: (err: Error | null, res: { message_id: string }) => void) => void })
.routeMessage(
{
session_id: sessionId,
from_party: fromParty,
to_parties: toParties,
round_number: roundNumber,
payload: payload,
},
(err: Error | null, response: { message_id: string }) => {
if (err) {
reject(err);
} else {
resolve(response.message_id);
}
}
);
});
}
/**
* 报告完成
*/
async reportCompletion(sessionId: string, partyId: string, publicKey: Buffer): Promise<boolean> {
if (!this.client) {
throw new Error('Not connected');
}
return new Promise((resolve, reject) => {
(this.client as grpc.Client & { reportCompletion: (req: unknown, callback: (err: Error | null, res: { all_completed: boolean }) => void) => void })
.reportCompletion(
{
session_id: sessionId,
party_id: partyId,
public_key: publicKey,
},
(err: Error | null, response: { all_completed: boolean }) => {
if (err) {
reject(err);
} else {
resolve(response.all_completed);
}
}
);
});
}
}