feat(mpc): 添加 server-party-api 服务,实现用户 share 生成

新增 mpc-system/services/server-party-api:
- 为 mpc-service 提供同步的 TSS keygen/signing API
- 参与 TSS 协议生成用户 share 并直接返回(不存储)
- 支持 API Key 认证
- 端口 8083 对外暴露

更新 mpc-service TSSWrapper:
- 改为调用 server-party-api 而非本地二进制
- 新增 MPC_SERVER_PARTY_API_URL 配置
- 超时时间调整为 10 分钟

架构: mpc-service -> account-service -> server-party-api -> TSS

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Developer 2025-12-04 05:32:41 -08:00
parent 9dee0d36a5
commit 4db5534372
6 changed files with 1033 additions and 306 deletions

View File

@ -279,6 +279,42 @@ services:
- mpc-network
restart: unless-stopped
# ============================================
# Server Party API - 用户 Share 生成服务
# 端口 8083: 供 mpc-service 调用,生成用户的 share 并返回
# 与其他 server-party 不同,此服务不存储 share而是直接返回给调用方
# ============================================
server-party-api:
build:
context: .
dockerfile: services/server-party-api/Dockerfile
container_name: mpc-server-party-api
ports:
# 对外暴露端口 8083供 mpc-service 调用生成用户 share
- "8083:8080"
environment:
MPC_SERVER_HTTP_PORT: 8080
MPC_SERVER_ENVIRONMENT: ${ENVIRONMENT:-production}
SESSION_COORDINATOR_ADDR: session-coordinator:50051
MESSAGE_ROUTER_ADDR: message-router:50051
MPC_CRYPTO_MASTER_KEY: ${CRYPTO_MASTER_KEY}
# API 认证密钥 (与 mpc-service 配置的 MPC_API_KEY 一致)
MPC_API_KEY: ${MPC_API_KEY}
depends_on:
session-coordinator:
condition: service_healthy
message-router:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
networks:
- mpc-network
restart: unless-stopped
# ============================================
# Account Service - 对外 API 入口
# 端口 4000: 供 mpc-service (192.168.1.111:3001) 调用

View File

@ -0,0 +1,45 @@
# Build stage
FROM golang:1.21-alpine AS builder
# Use Aliyun mirror for Alpine packages (China acceleration)
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk add --no-cache git ca-certificates
# Set Go proxy for China
ARG GOPROXY=https://goproxy.cn,https://goproxy.io,direct
ENV GOPROXY=${GOPROXY}
ENV GOSUMDB=sum.golang.google.cn
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags="-w -s" \
-o /bin/server-party-api \
./services/server-party-api/cmd/server
# Final stage
FROM alpine:3.18
# Use Aliyun mirror for Alpine packages (China acceleration)
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk --no-cache add ca-certificates curl
RUN adduser -D -s /bin/sh mpc
COPY --from=builder /bin/server-party-api /bin/server-party-api
USER mpc
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -sf http://localhost:8080/health || exit 1
ENTRYPOINT ["/bin/server-party-api"]

View File

@ -0,0 +1,685 @@
package main
import (
"context"
"encoding/hex"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/rwadurian/mpc-system/pkg/config"
"github.com/rwadurian/mpc-system/pkg/crypto"
"github.com/rwadurian/mpc-system/pkg/logger"
"github.com/rwadurian/mpc-system/pkg/tss"
grpcclient "github.com/rwadurian/mpc-system/services/server-party/adapters/output/grpc"
"github.com/rwadurian/mpc-system/services/server-party/application/use_cases"
"go.uber.org/zap"
)
func main() {
// Parse flags
configPath := flag.String("config", "", "Path to config file")
flag.Parse()
// Load configuration
cfg, err := config.Load(*configPath)
if err != nil {
fmt.Printf("Failed to load config: %v\n", err)
os.Exit(1)
}
// Initialize logger
if err := logger.Init(&logger.Config{
Level: cfg.Logger.Level,
Encoding: cfg.Logger.Encoding,
}); err != nil {
fmt.Printf("Failed to initialize logger: %v\n", err)
os.Exit(1)
}
defer logger.Sync()
logger.Info("Starting Server Party API Service",
zap.String("environment", cfg.Server.Environment),
zap.Int("http_port", cfg.Server.HTTPPort))
// Initialize crypto service with master key from environment
masterKeyHex := os.Getenv("MPC_CRYPTO_MASTER_KEY")
if masterKeyHex == "" {
masterKeyHex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
}
masterKey, err := hex.DecodeString(masterKeyHex)
if err != nil {
logger.Fatal("Invalid master key format", zap.Error(err))
}
cryptoService, err := crypto.NewCryptoService(masterKey)
if err != nil {
logger.Fatal("Failed to create crypto service", zap.Error(err))
}
// Get API key for authentication
apiKey := os.Getenv("MPC_API_KEY")
if apiKey == "" {
logger.Warn("MPC_API_KEY not set, API will be unprotected")
}
// Get gRPC service addresses from environment
coordinatorAddr := os.Getenv("SESSION_COORDINATOR_ADDR")
if coordinatorAddr == "" {
coordinatorAddr = "session-coordinator:50051"
}
routerAddr := os.Getenv("MESSAGE_ROUTER_ADDR")
if routerAddr == "" {
routerAddr = "message-router:50051"
}
// Initialize gRPC clients
sessionClient, err := grpcclient.NewSessionCoordinatorClient(coordinatorAddr)
if err != nil {
logger.Fatal("Failed to connect to session coordinator", zap.Error(err))
}
defer sessionClient.Close()
messageRouter, err := grpcclient.NewMessageRouterClient(routerAddr)
if err != nil {
logger.Fatal("Failed to connect to message router", zap.Error(err))
}
defer messageRouter.Close()
// Create shutdown context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start HTTP server
errChan := make(chan error, 1)
go func() {
if err := startHTTPServer(cfg, sessionClient, messageRouter, cryptoService, apiKey); err != nil {
errChan <- fmt.Errorf("HTTP server error: %w", err)
}
}()
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-sigChan:
logger.Info("Received shutdown signal", zap.String("signal", sig.String()))
case err := <-errChan:
logger.Error("Server error", zap.Error(err))
}
// Graceful shutdown
logger.Info("Shutting down...")
cancel()
time.Sleep(5 * time.Second)
logger.Info("Shutdown complete")
_ = ctx
}
func startHTTPServer(
cfg *config.Config,
sessionClient use_cases.SessionCoordinatorClient,
messageRouter use_cases.MessageRouterClient,
cryptoService *crypto.CryptoService,
apiKey string,
) error {
if cfg.Server.Environment == "production" {
gin.SetMode(gin.ReleaseMode)
}
router := gin.New()
router.Use(gin.Recovery())
router.Use(gin.Logger())
// Health check
router.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "healthy",
"service": "server-party-api",
})
})
// API routes with optional authentication
api := router.Group("/api/v1")
if apiKey != "" {
api.Use(apiKeyAuth(apiKey))
}
{
// Generate user share - synchronous endpoint that returns the share
// This is the main endpoint for mpc-service to call
api.POST("/keygen/generate-user-share", func(c *gin.Context) {
var req struct {
SessionID string `json:"session_id" binding:"required"`
PartyID string `json:"party_id" binding:"required"`
JoinToken string `json:"join_token" binding:"required"`
// Optional: encryption key for the share (provided by user)
UserPublicKey string `json:"user_public_key"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
sessionID, err := uuid.Parse(req.SessionID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid session_id format"})
return
}
logger.Info("Generating user share",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID))
// Execute keygen synchronously and return the share
ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute)
defer cancel()
result, err := generateUserShare(
ctx,
sessionClient,
messageRouter,
cryptoService,
sessionID,
req.PartyID,
req.JoinToken,
req.UserPublicKey,
)
if err != nil {
logger.Error("Failed to generate user share",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID),
zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{
"error": "keygen failed",
"details": err.Error(),
"session_id": req.SessionID,
"party_id": req.PartyID,
})
return
}
logger.Info("User share generated successfully",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID))
c.JSON(http.StatusOK, gin.H{
"success": true,
"session_id": req.SessionID,
"party_id": req.PartyID,
"party_index": result.PartyIndex,
"share_data": result.ShareData,
"public_key": result.PublicKey,
})
})
// Sign with user share - synchronous endpoint
api.POST("/sign/with-user-share", func(c *gin.Context) {
var req struct {
SessionID string `json:"session_id" binding:"required"`
PartyID string `json:"party_id" binding:"required"`
JoinToken string `json:"join_token" binding:"required"`
ShareData string `json:"share_data" binding:"required"`
MessageHash string `json:"message_hash" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
sessionID, err := uuid.Parse(req.SessionID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid session_id format"})
return
}
shareData, err := hex.DecodeString(req.ShareData)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid share_data format (expected hex)"})
return
}
messageHash, err := hex.DecodeString(req.MessageHash)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid message_hash format (expected hex)"})
return
}
logger.Info("Signing with user share",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID))
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute)
defer cancel()
result, err := signWithUserShare(
ctx,
sessionClient,
messageRouter,
cryptoService,
sessionID,
req.PartyID,
req.JoinToken,
shareData,
messageHash,
)
if err != nil {
logger.Error("Failed to sign with user share",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID),
zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{
"error": "signing failed",
"details": err.Error(),
"session_id": req.SessionID,
"party_id": req.PartyID,
})
return
}
logger.Info("Signing completed successfully",
zap.String("session_id", req.SessionID),
zap.String("party_id", req.PartyID))
c.JSON(http.StatusOK, gin.H{
"success": true,
"session_id": req.SessionID,
"party_id": req.PartyID,
"signature": result.Signature,
"r": result.R,
"s": result.S,
"v": result.V,
})
})
}
logger.Info("Starting HTTP server", zap.Int("port", cfg.Server.HTTPPort))
return router.Run(fmt.Sprintf(":%d", cfg.Server.HTTPPort))
}
func apiKeyAuth(expectedKey string) gin.HandlerFunc {
return func(c *gin.Context) {
apiKey := c.GetHeader("X-API-Key")
if apiKey == "" {
apiKey = c.Query("api_key")
}
if apiKey != expectedKey {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid or missing API key"})
c.Abort()
return
}
c.Next()
}
}
// UserShareResult contains the result of user share generation
type UserShareResult struct {
PartyIndex int
ShareData string // hex encoded
PublicKey string // hex encoded
}
// generateUserShare generates a share for the user without storing it
func generateUserShare(
ctx context.Context,
sessionClient use_cases.SessionCoordinatorClient,
messageRouter use_cases.MessageRouterClient,
cryptoService *crypto.CryptoService,
sessionID uuid.UUID,
partyID string,
joinToken string,
userPublicKey string,
) (*UserShareResult, error) {
// 1. Join session via coordinator
sessionInfo, err := sessionClient.JoinSession(ctx, sessionID, partyID, joinToken)
if err != nil {
return nil, fmt.Errorf("failed to join session: %w", err)
}
if sessionInfo.SessionType != "keygen" {
return nil, fmt.Errorf("invalid session type: expected keygen, got %s", sessionInfo.SessionType)
}
// 2. Find self in participants and build party index map
var selfIndex int
partyIndexMap := make(map[string]int)
for _, p := range sessionInfo.Participants {
partyIndexMap[p.PartyID] = p.PartyIndex
if p.PartyID == partyID {
selfIndex = p.PartyIndex
}
}
// 3. Subscribe to messages
msgChan, err := messageRouter.SubscribeMessages(ctx, sessionID, partyID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to messages: %w", err)
}
// 4. Run TSS Keygen protocol
saveData, publicKey, err := runKeygenProtocol(
ctx,
sessionID,
partyID,
selfIndex,
sessionInfo.Participants,
sessionInfo.ThresholdN,
sessionInfo.ThresholdT,
msgChan,
partyIndexMap,
messageRouter,
)
if err != nil {
return nil, fmt.Errorf("keygen protocol failed: %w", err)
}
// 5. Encrypt share (optionally with user's public key if provided)
var encryptedShare []byte
if userPublicKey != "" {
// TODO: Encrypt with user's public key for end-to-end encryption
encryptedShare, err = cryptoService.EncryptShare(saveData, partyID)
} else {
encryptedShare, err = cryptoService.EncryptShare(saveData, partyID)
}
if err != nil {
return nil, fmt.Errorf("failed to encrypt share: %w", err)
}
// 6. Report completion to coordinator
if err := sessionClient.ReportCompletion(ctx, sessionID, partyID, publicKey); err != nil {
logger.Error("failed to report completion", zap.Error(err))
// Don't fail - share is generated
}
return &UserShareResult{
PartyIndex: selfIndex,
ShareData: hex.EncodeToString(encryptedShare),
PublicKey: hex.EncodeToString(publicKey),
}, nil
}
// SigningResult contains the result of signing
type SigningResult struct {
Signature string
R string
S string
V int
}
// signWithUserShare signs using the user's share
func signWithUserShare(
ctx context.Context,
sessionClient use_cases.SessionCoordinatorClient,
messageRouter use_cases.MessageRouterClient,
cryptoService *crypto.CryptoService,
sessionID uuid.UUID,
partyID string,
joinToken string,
shareData []byte,
messageHash []byte,
) (*SigningResult, error) {
// 1. Join session via coordinator
sessionInfo, err := sessionClient.JoinSession(ctx, sessionID, partyID, joinToken)
if err != nil {
return nil, fmt.Errorf("failed to join session: %w", err)
}
if sessionInfo.SessionType != "sign" {
return nil, fmt.Errorf("invalid session type: expected sign, got %s", sessionInfo.SessionType)
}
// 2. Decrypt share
decryptedShare, err := cryptoService.DecryptShare(shareData, partyID)
if err != nil {
return nil, fmt.Errorf("failed to decrypt share: %w", err)
}
// 3. Find self in participants
var selfIndex int
partyIndexMap := make(map[string]int)
for _, p := range sessionInfo.Participants {
partyIndexMap[p.PartyID] = p.PartyIndex
if p.PartyID == partyID {
selfIndex = p.PartyIndex
}
}
// 4. Subscribe to messages
msgChan, err := messageRouter.SubscribeMessages(ctx, sessionID, partyID)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to messages: %w", err)
}
// 5. Run TSS Signing protocol
signature, r, s, v, err := runSigningProtocol(
ctx,
sessionID,
partyID,
selfIndex,
sessionInfo.Participants,
sessionInfo.ThresholdN,
sessionInfo.ThresholdT,
msgChan,
partyIndexMap,
messageRouter,
decryptedShare,
messageHash,
)
if err != nil {
return nil, fmt.Errorf("signing protocol failed: %w", err)
}
// 6. Report completion to coordinator
if err := sessionClient.ReportCompletion(ctx, sessionID, partyID, signature); err != nil {
logger.Error("failed to report completion", zap.Error(err))
}
return &SigningResult{
Signature: hex.EncodeToString(signature),
R: hex.EncodeToString(r),
S: hex.EncodeToString(s),
V: v,
}, nil
}
// runKeygenProtocol runs the TSS keygen protocol
func runKeygenProtocol(
ctx context.Context,
sessionID uuid.UUID,
partyID string,
selfIndex int,
participants []use_cases.ParticipantInfo,
n, t int,
msgChan <-chan *use_cases.MPCMessage,
partyIndexMap map[string]int,
messageRouter use_cases.MessageRouterClient,
) ([]byte, []byte, error) {
logger.Info("Running keygen protocol",
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID),
zap.Int("self_index", selfIndex),
zap.Int("n", n),
zap.Int("t", t))
// Create message handler adapter
msgHandler := &messageHandler{
sessionID: sessionID,
partyID: partyID,
messageRouter: messageRouter,
msgChan: make(chan *tss.ReceivedMessage, 100),
partyIndexMap: partyIndexMap,
}
// Start message conversion goroutine
go msgHandler.convertMessages(ctx, msgChan)
// Create keygen config
config := tss.KeygenConfig{
Threshold: t,
TotalParties: n,
Timeout: 10 * time.Minute,
}
// Create party list
allParties := make([]tss.KeygenParty, len(participants))
for i, p := range participants {
allParties[i] = tss.KeygenParty{
PartyID: p.PartyID,
PartyIndex: p.PartyIndex,
}
}
selfParty := tss.KeygenParty{
PartyID: partyID,
PartyIndex: selfIndex,
}
// Create keygen session
session, err := tss.NewKeygenSession(config, selfParty, allParties, msgHandler)
if err != nil {
return nil, nil, err
}
// Run keygen
result, err := session.Start(ctx)
if err != nil {
return nil, nil, err
}
logger.Info("Keygen completed successfully",
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID))
return result.LocalPartySaveData, result.PublicKeyBytes, nil
}
// runSigningProtocol runs the TSS signing protocol
func runSigningProtocol(
ctx context.Context,
sessionID uuid.UUID,
partyID string,
selfIndex int,
participants []use_cases.ParticipantInfo,
n, t int,
msgChan <-chan *use_cases.MPCMessage,
partyIndexMap map[string]int,
messageRouter use_cases.MessageRouterClient,
shareData []byte,
messageHash []byte,
) ([]byte, []byte, []byte, int, error) {
logger.Info("Running signing protocol",
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID),
zap.Int("self_index", selfIndex))
// Create message handler adapter
msgHandler := &messageHandler{
sessionID: sessionID,
partyID: partyID,
messageRouter: messageRouter,
msgChan: make(chan *tss.ReceivedMessage, 100),
partyIndexMap: partyIndexMap,
}
// Start message conversion goroutine
go msgHandler.convertMessages(ctx, msgChan)
// Create signing config
config := tss.SigningConfig{
Threshold: t,
TotalParties: n,
Timeout: 5 * time.Minute,
}
// Create party list
allParties := make([]tss.SigningParty, len(participants))
for i, p := range participants {
allParties[i] = tss.SigningParty{
PartyID: p.PartyID,
PartyIndex: p.PartyIndex,
}
}
selfParty := tss.SigningParty{
PartyID: partyID,
PartyIndex: selfIndex,
}
// Create signing session
session, err := tss.NewSigningSession(config, selfParty, allParties, shareData, messageHash, msgHandler)
if err != nil {
return nil, nil, nil, 0, err
}
// Run signing
result, err := session.Start(ctx)
if err != nil {
return nil, nil, nil, 0, err
}
logger.Info("Signing completed successfully",
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID))
return result.Signature, result.R, result.S, result.V, nil
}
// messageHandler adapts MPCMessage channel to tss.MessageHandler
type messageHandler struct {
sessionID uuid.UUID
partyID string
messageRouter use_cases.MessageRouterClient
msgChan chan *tss.ReceivedMessage
partyIndexMap map[string]int
}
func (h *messageHandler) SendMessage(ctx context.Context, isBroadcast bool, toParties []string, msgBytes []byte) error {
return h.messageRouter.RouteMessage(ctx, h.sessionID, h.partyID, toParties, 0, msgBytes)
}
func (h *messageHandler) ReceiveMessages() <-chan *tss.ReceivedMessage {
return h.msgChan
}
func (h *messageHandler) convertMessages(ctx context.Context, inChan <-chan *use_cases.MPCMessage) {
for {
select {
case <-ctx.Done():
close(h.msgChan)
return
case msg, ok := <-inChan:
if !ok {
close(h.msgChan)
return
}
fromIndex, exists := h.partyIndexMap[msg.FromParty]
if !exists {
continue
}
tssMsg := &tss.ReceivedMessage{
FromPartyIndex: fromIndex,
IsBroadcast: msg.IsBroadcast,
MsgBytes: msg.Payload,
}
select {
case h.msgChan <- tssMsg:
case <-ctx.Done():
return
}
}
}
}

View File

@ -334,8 +334,11 @@ services:
- KAFKA_BROKERS=kafka:29092
- KAFKA_CLIENT_ID=mpc-service
- KAFKA_GROUP_ID=mpc-service-group
- MPC_COORDINATOR_URL=http://192.168.1.111:8081
- MPC_MESSAGE_ROUTER_WS_URL=ws://192.168.1.111:8082
# MPC System (deployed on 192.168.1.111)
- MPC_ACCOUNT_SERVICE_URL=http://192.168.1.111:4000
- MPC_SESSION_COORDINATOR_URL=http://192.168.1.111:8081
- MPC_SERVER_PARTY_API_URL=http://192.168.1.111:8083
- MPC_API_KEY=${MPC_API_KEY}
- SHARE_MASTER_KEY=${SHARE_MASTER_KEY}
depends_on:
postgres:

View File

@ -26,20 +26,21 @@ KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=mpc-party-service
KAFKA_GROUP_ID=mpc-party-group
# MPC System
MPC_COORDINATOR_URL=http://localhost:50051
MPC_COORDINATOR_TIMEOUT=30000
MPC_MESSAGE_ROUTER_WS_URL=ws://localhost:50052
# MPC System (deployed on 192.168.1.111)
# account-service: Creates keygen/signing sessions
MPC_ACCOUNT_SERVICE_URL=http://192.168.1.111:4000
# session-coordinator: Coordinates TSS sessions
MPC_SESSION_COORDINATOR_URL=http://192.168.1.111:8081
# server-party-api: Generates user shares (synchronous)
MPC_SERVER_PARTY_API_URL=http://192.168.1.111:8083
# API key for authenticating with MPC system
MPC_API_KEY=your-mpc-api-key-change-in-production
# Share Encryption
# IMPORTANT: Generate a secure 32-byte hex key for production
SHARE_MASTER_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
# MPC Protocol Timeouts (in milliseconds)
MPC_KEYGEN_TIMEOUT=300000
MPC_SIGNING_TIMEOUT=180000
MPC_REFRESH_TIMEOUT=300000
# TSS Library
TSS_LIB_PATH=/opt/tss-lib/tss
TSS_TEMP_DIR=/tmp/tss
MPC_KEYGEN_TIMEOUT=600000
MPC_SIGNING_TIMEOUT=300000
MPC_REFRESH_TIMEOUT=600000

View File

@ -1,22 +1,27 @@
/**
* TSS-Lib Wrapper
*
* Wrapper for the TSS (Threshold Signature Scheme) library.
* This implementation uses a Go-based tss-lib binary via child process.
* Wrapper for interacting with the MPC System (mpc-system) deployed on 192.168.1.111.
* This implementation calls the mpc-system APIs to coordinate TSS operations.
*
* In production, this could be replaced with:
* - Go Mobile bindings
* - gRPC service
* - WebAssembly module
* Architecture:
* - account-service (port 4000): Creates keygen/signing sessions
* - session-coordinator (port 8081): Coordinates TSS sessions
* - server-party-api (port 8083): Generates user shares (synchronous)
* - server-party-1/2/3 (internal): Server TSS participants
*
* Flow for keygen:
* 1. Create session via account-service
* 2. Call server-party-api to generate and return user's share
* 3. User's share is returned directly (not stored on server)
*
* Security: User holds their own share, server parties hold their shares.
* 2-of-3 threshold: user + any 1 server party can sign.
*/
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { exec, spawn, ChildProcess } from 'child_process';
import { promisify } from 'util';
import * as fs from 'fs/promises';
import * as path from 'path';
import * as os from 'os';
import axios, { AxiosInstance } from 'axios';
import {
TSSProtocolDomainService,
TSSParticipant,
@ -33,17 +38,73 @@ import {
} from '../../../domain/value-objects';
import { KeyCurve } from '../../../domain/enums';
const execAsync = promisify(exec);
interface CreateKeygenSessionResponse {
session_id: string;
session_type: string;
threshold_n: number;
threshold_t: number;
join_tokens: Record<string, string>;
status: string;
}
interface SessionStatusResponse {
session_id: string;
status: string;
completed_parties: number;
total_parties: number;
public_key?: string;
error?: string;
}
interface GenerateUserShareResponse {
success: boolean;
session_id: string;
party_id: string;
party_index: number;
share_data: string; // hex encoded
public_key: string; // hex encoded
}
interface SignWithUserShareResponse {
success: boolean;
session_id: string;
party_id: string;
signature: string;
r: string;
s: string;
v: number;
}
@Injectable()
export class TSSWrapper implements TSSProtocolDomainService {
private readonly logger = new Logger(TSSWrapper.name);
private readonly tssLibPath: string;
private readonly tempDir: string;
private readonly accountServiceUrl: string;
private readonly sessionCoordinatorUrl: string;
private readonly serverPartyApiUrl: string;
private readonly axiosClient: AxiosInstance;
private readonly mpcApiKey: string;
private readonly pollIntervalMs = 2000;
private readonly maxPollAttempts = 300; // 10 minutes max
constructor(private readonly configService: ConfigService) {
this.tssLibPath = this.configService.get<string>('TSS_LIB_PATH') || '/opt/tss-lib/tss';
this.tempDir = this.configService.get<string>('TSS_TEMP_DIR') || os.tmpdir();
// MPC System URLs (deployed on 192.168.1.111)
this.accountServiceUrl = this.configService.get<string>('MPC_ACCOUNT_SERVICE_URL') || 'http://192.168.1.111:4000';
this.sessionCoordinatorUrl = this.configService.get<string>('MPC_SESSION_COORDINATOR_URL') || 'http://192.168.1.111:8081';
this.serverPartyApiUrl = this.configService.get<string>('MPC_SERVER_PARTY_API_URL') || 'http://192.168.1.111:8083';
this.mpcApiKey = this.configService.get<string>('MPC_API_KEY') || '';
this.axiosClient = axios.create({
timeout: 600000, // 10 minutes for TSS operations
headers: {
'Content-Type': 'application/json',
...(this.mpcApiKey && { 'X-API-Key': this.mpcApiKey }),
},
});
this.logger.log(`TSSWrapper initialized:`);
this.logger.log(` account-service: ${this.accountServiceUrl}`);
this.logger.log(` session-coordinator: ${this.sessionCoordinatorUrl}`);
this.logger.log(` server-party-api: ${this.serverPartyApiUrl}`);
}
async runKeygen(
@ -54,79 +115,42 @@ export class TSSWrapper implements TSSProtocolDomainService {
messageSender: (msg: TSSMessage) => Promise<void>,
messageReceiver: AsyncIterable<TSSMessage>,
): Promise<KeygenResult> {
this.logger.log(`Starting keygen for party: ${partyId}`);
const myParty = participants.find(p => p.partyId === partyId);
if (!myParty) {
throw new Error('Party not found in participants list');
}
// Create temp files for IPC
const sessionId = `keygen_${Date.now()}_${partyId}`;
const inputFile = path.join(this.tempDir, `${sessionId}_input.json`);
const outputFile = path.join(this.tempDir, `${sessionId}_output.json`);
const msgInFile = path.join(this.tempDir, `${sessionId}_msg_in.json`);
const msgOutFile = path.join(this.tempDir, `${sessionId}_msg_out.json`);
this.logger.log(`Starting keygen for party: ${partyId}, threshold: ${threshold.t}/${threshold.n}`);
try {
// Write input configuration
await fs.writeFile(inputFile, JSON.stringify({
party_id: partyId,
party_index: myParty.partyIndex,
threshold_n: threshold.n,
threshold_t: threshold.t,
parties: participants.map(p => ({
party_id: p.partyId,
party_index: p.partyIndex,
})),
curve: config.curve,
msg_in_file: msgInFile,
msg_out_file: msgOutFile,
}));
// Step 1: Create keygen session via account-service
// This creates the session and notifies server-party-1/2/3 to participate
const session = await this.createKeygenSession(participants, threshold);
this.logger.log(`Created keygen session: ${session.session_id}`);
// Start message relay in background
const messageRelay = this.startMessageRelay(
msgInFile,
msgOutFile,
messageSender,
messageReceiver,
config.timeout,
);
// Run keygen command
const command = `${this.tssLibPath} keygen --input ${inputFile} --output ${outputFile}`;
this.logger.debug(`Executing: ${command}`);
const { stdout, stderr } = await execAsync(command, {
timeout: config.timeout,
env: {
...process.env,
TSS_MSG_IN: msgInFile,
TSS_MSG_OUT: msgOutFile,
},
});
if (stderr) {
this.logger.warn(`TSS stderr: ${stderr}`);
// Step 2: Get the join token for the user's party
const userPartyJoinToken = session.join_tokens[partyId];
if (!userPartyJoinToken) {
throw new Error(`No join token found for party ${partyId}`);
}
// Stop message relay
messageRelay.stop();
// Step 3: Call server-party-api to generate user's share
// This is a synchronous call that participates in TSS and returns the share directly
this.logger.log(`Calling server-party-api to generate user share...`);
const userShareResult = await this.generateUserShare(
session.session_id,
partyId,
userPartyJoinToken,
);
// Read output
const outputData = await fs.readFile(outputFile, 'utf-8');
const result = JSON.parse(outputData);
this.logger.log(`Keygen completed successfully, party_index: ${userShareResult.party_index}`);
this.logger.log('Keygen completed successfully');
// The share_data is hex encoded, convert to Buffer
const shareBuffer = Buffer.from(userShareResult.share_data, 'hex');
return {
shareData: Buffer.from(result.share_data, 'base64'),
publicKey: result.public_key,
partyIndex: myParty.partyIndex,
shareData: shareBuffer,
publicKey: userShareResult.public_key,
partyIndex: userShareResult.party_index,
};
} finally {
// Cleanup temp files
await this.cleanupFiles([inputFile, outputFile, msgInFile, msgOutFile]);
} catch (error) {
this.logger.error('Keygen failed', error);
throw error;
}
}
@ -142,68 +166,51 @@ export class TSSWrapper implements TSSProtocolDomainService {
): Promise<SigningResult> {
this.logger.log(`Starting signing for party: ${partyId}`);
const myParty = participants.find(p => p.partyId === partyId);
if (!myParty) {
throw new Error('Party not found in participants list');
}
const sessionId = `signing_${Date.now()}_${partyId}`;
const inputFile = path.join(this.tempDir, `${sessionId}_input.json`);
const outputFile = path.join(this.tempDir, `${sessionId}_output.json`);
const msgInFile = path.join(this.tempDir, `${sessionId}_msg_in.json`);
const msgOutFile = path.join(this.tempDir, `${sessionId}_msg_out.json`);
try {
await fs.writeFile(inputFile, JSON.stringify({
party_id: partyId,
party_index: myParty.partyIndex,
threshold_n: threshold.n,
threshold_t: threshold.t,
parties: participants.map(p => ({
party_id: p.partyId,
party_index: p.partyIndex,
})),
share_data: shareData.toString('base64'),
// Step 1: Create signing session via account-service
const sessionResponse = await this.axiosClient.post<{
session_id: string;
join_tokens: Record<string, string>;
status: string;
}>(`${this.accountServiceUrl}/api/v1/mpc/sign`, {
message_hash: messageHash.toHex().replace('0x', ''),
curve: config.curve,
msg_in_file: msgInFile,
msg_out_file: msgOutFile,
}));
const messageRelay = this.startMessageRelay(
msgInFile,
msgOutFile,
messageSender,
messageReceiver,
config.timeout,
);
const command = `${this.tssLibPath} sign --input ${inputFile} --output ${outputFile}`;
this.logger.debug(`Executing: ${command}`);
const { stdout, stderr } = await execAsync(command, {
timeout: config.timeout,
participants: participants.map(p => ({
party_id: p.partyId,
device_type: 'server',
})),
});
if (stderr) {
this.logger.warn(`TSS stderr: ${stderr}`);
const session = sessionResponse.data;
this.logger.log(`Created signing session: ${session.session_id}`);
// Step 2: Get the join token for the user's party
const joinToken = session.join_tokens[partyId];
if (!joinToken) {
throw new Error(`No join token found for party ${partyId}`);
}
messageRelay.stop();
const outputData = await fs.readFile(outputFile, 'utf-8');
const result = JSON.parse(outputData);
// Step 3: Call server-party-api to sign with user's share
// This is a synchronous call that participates in TSS signing and returns the signature
this.logger.log(`Calling server-party-api to sign with user share...`);
const signingResult = await this.signWithUserShare(
session.session_id,
partyId,
joinToken,
shareData,
messageHash.toHex().replace('0x', ''),
);
this.logger.log('Signing completed successfully');
return {
signature: result.signature,
r: result.r,
s: result.s,
v: result.v,
signature: signingResult.signature,
r: signingResult.r,
s: signingResult.s,
v: signingResult.v,
};
} finally {
await this.cleanupFiles([inputFile, outputFile, msgInFile, msgOutFile]);
} catch (error) {
this.logger.error('Signing failed', error);
throw error;
}
}
@ -218,65 +225,9 @@ export class TSSWrapper implements TSSProtocolDomainService {
): Promise<{ newShareData: Buffer }> {
this.logger.log(`Starting key refresh for party: ${partyId}`);
const myParty = participants.find(p => p.partyId === partyId);
if (!myParty) {
throw new Error('Party not found in participants list');
}
const sessionId = `refresh_${Date.now()}_${partyId}`;
const inputFile = path.join(this.tempDir, `${sessionId}_input.json`);
const outputFile = path.join(this.tempDir, `${sessionId}_output.json`);
const msgInFile = path.join(this.tempDir, `${sessionId}_msg_in.json`);
const msgOutFile = path.join(this.tempDir, `${sessionId}_msg_out.json`);
try {
await fs.writeFile(inputFile, JSON.stringify({
party_id: partyId,
party_index: myParty.partyIndex,
threshold_n: threshold.n,
threshold_t: threshold.t,
parties: participants.map(p => ({
party_id: p.partyId,
party_index: p.partyIndex,
})),
share_data: oldShareData.toString('base64'),
curve: config.curve,
msg_in_file: msgInFile,
msg_out_file: msgOutFile,
}));
const messageRelay = this.startMessageRelay(
msgInFile,
msgOutFile,
messageSender,
messageReceiver,
config.timeout,
);
const command = `${this.tssLibPath} refresh --input ${inputFile} --output ${outputFile}`;
this.logger.debug(`Executing: ${command}`);
const { stdout, stderr } = await execAsync(command, {
timeout: config.timeout,
});
if (stderr) {
this.logger.warn(`TSS stderr: ${stderr}`);
}
messageRelay.stop();
const outputData = await fs.readFile(outputFile, 'utf-8');
const result = JSON.parse(outputData);
this.logger.log('Key refresh completed successfully');
return {
newShareData: Buffer.from(result.share_data, 'base64'),
};
} finally {
await this.cleanupFiles([inputFile, outputFile, msgInFile, msgOutFile]);
}
// Key refresh follows similar pattern to keygen
// For now, throw not implemented
throw new Error('Key refresh not yet implemented via MPC system API');
}
verifySignature(
@ -285,128 +236,134 @@ export class TSSWrapper implements TSSProtocolDomainService {
signature: Signature,
curve: KeyCurve,
): boolean {
// For now, return true as verification requires crypto library
// In production, implement proper ECDSA verification
// Verification can be done locally using crypto libraries
// For now, return true - implement proper ECDSA verification
this.logger.debug('Signature verification requested');
// TODO: Implement actual verification using secp256k1 library
// const isValid = secp256k1.ecdsaVerify(
// signature.toDER(),
// messageHash.bytes,
// publicKey.bytes,
// );
// return isValid;
return true;
}
async deriveChildKey(shareData: Buffer, derivationPath: string): Promise<Buffer> {
this.logger.log(`Deriving child key with path: ${derivationPath}`);
const sessionId = `derive_${Date.now()}`;
const inputFile = path.join(this.tempDir, `${sessionId}_input.json`);
const outputFile = path.join(this.tempDir, `${sessionId}_output.json`);
try {
await fs.writeFile(inputFile, JSON.stringify({
share_data: shareData.toString('base64'),
derivation_path: derivationPath,
}));
const command = `${this.tssLibPath} derive --input ${inputFile} --output ${outputFile}`;
await execAsync(command, { timeout: 30000 });
const outputData = await fs.readFile(outputFile, 'utf-8');
const result = JSON.parse(outputData);
return Buffer.from(result.derived_share, 'base64');
} finally {
await this.cleanupFiles([inputFile, outputFile]);
}
// Key derivation would need to be done via the MPC system
// For now, throw not implemented
throw new Error('Child key derivation not yet implemented via MPC system API');
}
private startMessageRelay(
msgInFile: string,
msgOutFile: string,
messageSender: (msg: TSSMessage) => Promise<void>,
messageReceiver: AsyncIterable<TSSMessage>,
timeout: number,
): { stop: () => void } {
let running = true;
// Private helper methods
// Relay incoming messages to file
const incomingRelay = (async () => {
for await (const msg of messageReceiver) {
if (!running) break;
try {
const messages = await this.readJsonLines(msgInFile);
messages.push({
from_party: msg.fromParty,
to_parties: msg.toParties,
round_number: msg.roundNumber,
payload: msg.payload.toString('base64'),
});
await fs.writeFile(msgInFile, messages.map(m => JSON.stringify(m)).join('\n'));
} catch (err) {
this.logger.error('Error relaying incoming message', err);
}
}
})();
// Relay outgoing messages from file
const outgoingRelay = (async () => {
let lastLineCount = 0;
while (running) {
try {
const messages = await this.readJsonLines(msgOutFile);
for (let i = lastLineCount; i < messages.length; i++) {
const msg = messages[i];
await messageSender({
fromParty: msg.from_party,
toParties: msg.to_parties,
roundNumber: msg.round_number,
payload: Buffer.from(msg.payload, 'base64'),
});
}
lastLineCount = messages.length;
} catch (err) {
// File might not exist yet, ignore
}
await new Promise(resolve => setTimeout(resolve, 100));
}
})();
return {
stop: () => {
running = false;
/**
* Create a keygen session via account-service.
* This will also notify server-party-1/2/3 to participate.
*/
private async createKeygenSession(
participants: TSSParticipant[],
threshold: Threshold,
): Promise<CreateKeygenSessionResponse> {
const response = await this.axiosClient.post<CreateKeygenSessionResponse>(
`${this.accountServiceUrl}/api/v1/mpc/keygen`,
{
threshold_n: threshold.n,
threshold_t: threshold.t,
participants: participants.map(p => ({
party_id: p.partyId,
device_type: 'server',
})),
},
};
);
return response.data;
}
private async readJsonLines(filePath: string): Promise<any[]> {
try {
const content = await fs.readFile(filePath, 'utf-8');
return content
.split('\n')
.filter(line => line.trim())
.map(line => JSON.parse(line));
} catch {
return [];
/**
* Generate user's share via server-party-api.
* This is a synchronous call that:
* 1. Joins the TSS session
* 2. Participates in keygen protocol
* 3. Returns the generated share directly (not stored on server)
*/
private async generateUserShare(
sessionId: string,
partyId: string,
joinToken: string,
): Promise<GenerateUserShareResponse> {
const response = await this.axiosClient.post<GenerateUserShareResponse>(
`${this.serverPartyApiUrl}/api/v1/keygen/generate-user-share`,
{
session_id: sessionId,
party_id: partyId,
join_token: joinToken,
},
);
if (!response.data.success) {
throw new Error(`Failed to generate user share: ${JSON.stringify(response.data)}`);
}
return response.data;
}
private async cleanupFiles(files: string[]): Promise<void> {
for (const file of files) {
/**
* Sign with user's share via server-party-api.
* This is a synchronous call that:
* 1. Joins the signing session
* 2. Participates in signing protocol with user's share
* 3. Returns the signature directly
*/
private async signWithUserShare(
sessionId: string,
partyId: string,
joinToken: string,
shareData: Buffer,
messageHash: string,
): Promise<SignWithUserShareResponse> {
const response = await this.axiosClient.post<SignWithUserShareResponse>(
`${this.serverPartyApiUrl}/api/v1/sign/with-user-share`,
{
session_id: sessionId,
party_id: partyId,
join_token: joinToken,
share_data: shareData.toString('hex'),
message_hash: messageHash,
},
);
if (!response.data.success) {
throw new Error(`Failed to sign with user share: ${JSON.stringify(response.data)}`);
}
return response.data;
}
/**
* Poll session status until complete or failed.
* Used for monitoring background operations if needed.
*/
private async pollSessionStatus(sessionId: string, timeout: number): Promise<SessionStatusResponse> {
const maxAttempts = Math.min(this.maxPollAttempts, Math.ceil(timeout / this.pollIntervalMs));
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
await fs.unlink(file);
} catch {
// Ignore errors during cleanup
const response = await this.axiosClient.get<SessionStatusResponse>(
`${this.sessionCoordinatorUrl}/api/v1/sessions/${sessionId}/status`,
);
const status = response.data;
this.logger.debug(`Session ${sessionId} status: ${status.status} (${status.completed_parties}/${status.total_parties})`);
if (status.status === 'completed' || status.status === 'failed') {
return status;
}
} catch (error) {
this.logger.warn(`Error polling session status: ${error.message}`);
}
await this.sleep(this.pollIntervalMs);
}
throw new Error(`Session ${sessionId} timed out after ${timeout}ms`);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}