rwadurian/backend/mpc-system/services/server-party-api/cmd/server/main.go

337 lines
10 KiB
Go

package main
import (
"context"
"encoding/hex"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
router "github.com/rwadurian/mpc-system/api/grpc/router/v1"
"github.com/rwadurian/mpc-system/pkg/config"
"github.com/rwadurian/mpc-system/pkg/crypto"
"github.com/rwadurian/mpc-system/pkg/logger"
grpcclient "github.com/rwadurian/mpc-system/services/server-party/adapters/output/grpc"
"github.com/rwadurian/mpc-system/services/server-party/application/use_cases"
"go.uber.org/zap"
)
func main() {
// Parse flags
configPath := flag.String("config", "", "Path to config file")
flag.Parse()
// Load configuration
cfg, err := config.Load(*configPath)
if err != nil {
fmt.Printf("Failed to load config: %v\n", err)
os.Exit(1)
}
// Initialize logger
if err := logger.Init(&logger.Config{
Level: cfg.Logger.Level,
Encoding: cfg.Logger.Encoding,
}); err != nil {
fmt.Printf("Failed to initialize logger: %v\n", err)
os.Exit(1)
}
defer logger.Sync()
logger.Info("Starting Delegate Party Service",
zap.String("environment", cfg.Server.Environment),
zap.Int("http_port", cfg.Server.HTTPPort))
// Initialize crypto service with master key from environment
masterKeyHex := os.Getenv("MPC_CRYPTO_MASTER_KEY")
if masterKeyHex == "" {
masterKeyHex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
}
masterKey, err := hex.DecodeString(masterKeyHex)
if err != nil {
logger.Fatal("Invalid master key format", zap.Error(err))
}
cryptoService, err := crypto.NewCryptoService(masterKey)
if err != nil {
logger.Fatal("Failed to create crypto service", zap.Error(err))
}
// Get Message Router address from environment
// Delegate party (like all parties) ONLY connects to Message Router
routerAddr := os.Getenv("MESSAGE_ROUTER_ADDR")
if routerAddr == "" {
routerAddr = "message-router:50051"
}
// Initialize Message Router client (the only gRPC connection needed)
messageRouter, err := grpcclient.NewMessageRouterClient(routerAddr)
if err != nil {
logger.Fatal("Failed to connect to message router", zap.Error(err))
}
defer messageRouter.Close()
// Create shutdown context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Get party ID from environment (or use default)
partyID := os.Getenv("PARTY_ID")
if partyID == "" {
partyID = "delegate-party"
}
// Force PARTY_ROLE to delegate for this service
partyRole := "delegate"
// Register this party as a delegate party with Message Router
logger.Info("Registering party with Message Router",
zap.String("party_id", partyID),
zap.String("role", partyRole))
if err := messageRouter.RegisterParty(ctx, partyID, partyRole, "1.0.0"); err != nil {
logger.Fatal("Failed to register party", zap.Error(err))
}
logger.Info("Party registered successfully",
zap.String("party_id", partyID),
zap.String("role", partyRole))
// Start heartbeat to keep party registered
heartbeatCancel := messageRouter.StartHeartbeat(ctx, partyID, 30*time.Second, func(pendingCount int32) {
if pendingCount > 0 {
logger.Info("Pending messages detected via heartbeat",
zap.String("party_id", partyID),
zap.Int32("pending_count", pendingCount))
}
})
defer heartbeatCancel()
logger.Info("Heartbeat started", zap.String("party_id", partyID), zap.Duration("interval", 30*time.Second))
// Initialize use cases with nil keyShareRepo (delegate doesn't use DB)
// MessageRouter handles both messaging AND session operations (proxied to coordinator)
participateKeygenUC := use_cases.NewParticipateKeygenUseCase(
nil, // No database storage for delegate
messageRouter,
messageRouter,
cryptoService,
)
participateSigningUC := use_cases.NewParticipateSigningUseCase(
nil, // No database storage for delegate
messageRouter,
messageRouter,
cryptoService,
)
// Subscribe to session events and handle them automatically (SAME AS SERVER-PARTY)
logger.Info("Subscribing to session events", zap.String("party_id", partyID))
eventHandler := createSessionEventHandler(
ctx,
partyID,
participateKeygenUC,
participateSigningUC,
messageRouter,
)
if err := messageRouter.SubscribeSessionEvents(ctx, partyID, eventHandler); err != nil {
logger.Fatal("Failed to subscribe to session events", zap.Error(err))
}
logger.Info("Delegate party initialized successfully (party-driven architecture)",
zap.String("party_id", partyID),
zap.String("role", partyRole))
// Start HTTP server (health check only)
errChan := make(chan error, 1)
go func() {
if err := startHTTPServer(cfg); err != nil {
errChan <- fmt.Errorf("HTTP server error: %w", err)
}
}()
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-sigChan:
logger.Info("Received shutdown signal", zap.String("signal", sig.String()))
case err := <-errChan:
logger.Error("Server error", zap.Error(err))
}
// Graceful shutdown
logger.Info("Shutting down...")
cancel()
time.Sleep(5 * time.Second)
logger.Info("Shutdown complete")
}
// startHTTPServer starts HTTP server for health checks only
func startHTTPServer(cfg *config.Config) error {
if cfg.Server.Environment == "production" {
gin.SetMode(gin.ReleaseMode)
}
r := gin.New()
r.Use(gin.Recovery())
// Health check only
r.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "healthy",
"service": "delegate-party",
"role": "delegate",
})
})
logger.Info("Starting HTTP server (health check only)", zap.Int("port", cfg.Server.HTTPPort))
return r.Run(fmt.Sprintf(":%d", cfg.Server.HTTPPort))
}
// createSessionEventHandler creates a handler for session events (party-driven architecture)
// Delegate party automatically responds to session creation events by joining keygen or signing sessions
// After keygen, it submits the user's share to Session Coordinator (instead of saving to DB)
func createSessionEventHandler(
ctx context.Context,
partyID string,
participateKeygenUC *use_cases.ParticipateKeygenUseCase,
participateSigningUC *use_cases.ParticipateSigningUseCase,
messageRouter *grpcclient.MessageRouterClient,
) func(*router.SessionEvent) {
return func(event *router.SessionEvent) {
// Check if this party is selected for the session
isSelected := false
for _, selectedParty := range event.SelectedParties {
if selectedParty == partyID {
isSelected = true
break
}
}
if !isSelected {
logger.Debug("Party not selected for this session",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
return
}
// Get join token for this party
joinToken, exists := event.JoinTokens[partyID]
if !exists {
logger.Error("No join token found for party",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
return
}
logger.Info("Delegate party selected for session, auto-participating",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID),
zap.String("event_type", event.EventType))
// Parse session ID
sessionID, err := uuid.Parse(event.SessionId)
if err != nil {
logger.Error("Invalid session ID", zap.Error(err))
return
}
// Automatically participate based on session type
go func() {
// Use parent context to allow proper cancellation
participateCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// Determine session type from event
if event.EventType == "session_created" {
// Check if it's keygen or sign based on message_hash
if len(event.MessageHash) == 0 {
// Keygen session
logger.Info("Auto-participating in keygen session (delegate)",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
input := use_cases.ParticipateKeygenInput{
SessionID: sessionID,
PartyID: partyID,
JoinToken: joinToken,
}
result, err := participateKeygenUC.Execute(participateCtx, input)
if err != nil {
logger.Error("Keygen participation failed",
zap.Error(err),
zap.String("session_id", event.SessionId))
return
}
logger.Info("Keygen participation completed (delegate)",
zap.String("session_id", event.SessionId),
zap.String("public_key", hex.EncodeToString(result.PublicKey)))
// Delegate party: Submit share to Session Coordinator (via Message Router)
// This is the key difference from persistent party which saves to DB
if len(result.ShareForUser) > 0 {
logger.Info("Submitting delegate share to Session Coordinator",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID),
zap.Int("share_size", len(result.ShareForUser)))
if err := messageRouter.SubmitDelegateShare(
participateCtx,
event.SessionId,
partyID,
result.ShareForUser,
result.PublicKey,
int32(result.KeyShare.PartyIndex),
); err != nil {
logger.Error("Failed to submit delegate share",
zap.Error(err),
zap.String("session_id", event.SessionId))
} else {
logger.Info("Delegate share submitted successfully",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
}
}
} else {
// Sign session
logger.Info("Auto-participating in sign session (delegate)",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
input := use_cases.ParticipateSigningInput{
SessionID: sessionID,
PartyID: partyID,
JoinToken: joinToken,
MessageHash: event.MessageHash,
// Note: For signing, user must provide their share via Account Service
// This will be passed through the session event or retrieved separately
}
result, err := participateSigningUC.Execute(participateCtx, input)
if err != nil {
logger.Error("Signing participation failed",
zap.Error(err),
zap.String("session_id", event.SessionId))
return
}
logger.Info("Signing participation completed (delegate)",
zap.String("session_id", event.SessionId),
zap.String("signature", hex.EncodeToString(result.Signature)))
}
}
}()
}
}