From 553ffd365eb8878b128d1aecffaaaa7a34622678 Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 5 Dec 2025 04:00:09 -0800 Subject: [PATCH] feat(mpc-system): optimize party index handling and add gRPC debug logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Simplified participant list handling in JoinSession client - Added debug logging for party_index conversion in gRPC messages - Removed redundant party filtering logic - Added detailed logging to trace protobuf field values 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../output/grpc/session_coordinator_client.go | 393 +++++++++--------- .../input/grpc/session_grpc_handler.go | 5 + 2 files changed, 200 insertions(+), 198 deletions(-) diff --git a/backend/mpc-system/services/server-party/adapters/output/grpc/session_coordinator_client.go b/backend/mpc-system/services/server-party/adapters/output/grpc/session_coordinator_client.go index c1bb91e6..89287827 100644 --- a/backend/mpc-system/services/server-party/adapters/output/grpc/session_coordinator_client.go +++ b/backend/mpc-system/services/server-party/adapters/output/grpc/session_coordinator_client.go @@ -1,198 +1,195 @@ -package grpc - -import ( - "context" - "time" - - "github.com/google/uuid" - coordinator "github.com/rwadurian/mpc-system/api/grpc/coordinator/v1" - "github.com/rwadurian/mpc-system/pkg/logger" - "github.com/rwadurian/mpc-system/services/server-party/application/use_cases" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// SessionCoordinatorClient implements use_cases.SessionCoordinatorClient -type SessionCoordinatorClient struct { - conn *grpc.ClientConn - address string -} - -// NewSessionCoordinatorClient creates a new session coordinator gRPC client -func NewSessionCoordinatorClient(address string) (*SessionCoordinatorClient, error) { - conn, err := grpc.Dial( - address, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithTimeout(10*time.Second), - ) - if err != nil { - return nil, err - } - - logger.Info("Connected to Session Coordinator", zap.String("address", address)) - - return &SessionCoordinatorClient{ - conn: conn, - address: address, - }, nil -} - -// Close closes the gRPC connection -func (c *SessionCoordinatorClient) Close() error { - if c.conn != nil { - return c.conn.Close() - } - return nil -} - -// JoinSession joins an MPC session -func (c *SessionCoordinatorClient) JoinSession( - ctx context.Context, - sessionID uuid.UUID, - partyID, joinToken string, -) (*use_cases.SessionInfo, error) { - // Create the request - req := &coordinator.JoinSessionRequest{ - SessionId: sessionID.String(), - PartyId: partyID, - JoinToken: joinToken, - DeviceInfo: &coordinator.DeviceInfo{ - DeviceType: "server", - DeviceId: partyID, - Platform: "linux", - AppVersion: "1.0.0", - }, - } - - // Make the gRPC call using the raw connection - resp := &coordinator.JoinSessionResponse{} - err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/JoinSession", req, resp) - if err != nil { - logger.Error("Failed to join session", zap.Error(err)) - return nil, err - } - - if !resp.Success { - logger.Error("Join session failed", zap.String("session_id", sessionID.String())) - return nil, use_cases.ErrInvalidSession - } - - // Convert response to SessionInfo - participants := make([]use_cases.ParticipantInfo, 0, len(resp.OtherParties)+1) - - // Add self - participants = append(participants, use_cases.ParticipantInfo{ - PartyID: partyID, - PartyIndex: findPartyIndex(resp.OtherParties, partyID), - }) - - // Add other parties - for _, p := range resp.OtherParties { - if p.PartyId != partyID { - participants = append(participants, use_cases.ParticipantInfo{ - PartyID: p.PartyId, - PartyIndex: int(p.PartyIndex), - }) - } - } - - sessionInfo := &use_cases.SessionInfo{ - SessionID: sessionID, - SessionType: resp.SessionInfo.SessionType, - ThresholdN: int(resp.SessionInfo.ThresholdN), - ThresholdT: int(resp.SessionInfo.ThresholdT), - MessageHash: resp.SessionInfo.MessageHash, - Participants: participants, - } - - logger.Info("Joined session successfully", - zap.String("session_id", sessionID.String()), - zap.String("party_id", partyID), - zap.String("session_type", sessionInfo.SessionType)) - - return sessionInfo, nil -} - -// ReportCompletion reports that a party has completed the MPC protocol -func (c *SessionCoordinatorClient) ReportCompletion( - ctx context.Context, - sessionID uuid.UUID, - partyID string, - resultData []byte, -) error { - req := &coordinator.ReportCompletionRequest{ - SessionId: sessionID.String(), - PartyId: partyID, - PublicKey: resultData, // For keygen, this is public key; for signing, this is signature - } - - resp := &coordinator.ReportCompletionResponse{} - err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/ReportCompletion", req, resp) - if err != nil { - logger.Error("Failed to report completion", zap.Error(err)) - return err - } - - logger.Info("Reported completion", - zap.String("session_id", sessionID.String()), - zap.String("party_id", partyID), - zap.Bool("all_completed", resp.AllCompleted)) - - return nil -} - -// MarkPartyReady marks the party as ready to start the protocol -func (c *SessionCoordinatorClient) MarkPartyReady( - ctx context.Context, - sessionID uuid.UUID, - partyID string, -) (bool, error) { - req := &coordinator.MarkPartyReadyRequest{ - SessionId: sessionID.String(), - PartyId: partyID, - } - - resp := &coordinator.MarkPartyReadyResponse{} - err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/MarkPartyReady", req, resp) - if err != nil { - logger.Error("Failed to mark party ready", zap.Error(err)) - return false, err - } - - logger.Info("Marked party ready", - zap.String("session_id", sessionID.String()), - zap.String("party_id", partyID), - zap.Bool("all_ready", resp.AllReady)) - - return resp.AllReady, nil -} - -// GetSessionStatus gets the current session status -func (c *SessionCoordinatorClient) GetSessionStatus( - ctx context.Context, - sessionID uuid.UUID, -) (string, error) { - req := &coordinator.GetSessionStatusRequest{ - SessionId: sessionID.String(), - } - - resp := &coordinator.GetSessionStatusResponse{} - err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/GetSessionStatus", req, resp) - if err != nil { - return "", err - } - - return resp.Status, nil -} - -// findPartyIndex finds the party index from the list of parties -func findPartyIndex(parties []*coordinator.PartyInfo, partyID string) int { - for _, p := range parties { - if p.PartyId == partyID { - return int(p.PartyIndex) - } - } - return 0 -} +package grpc + +import ( + "context" + "time" + + "github.com/google/uuid" + coordinator "github.com/rwadurian/mpc-system/api/grpc/coordinator/v1" + "github.com/rwadurian/mpc-system/pkg/logger" + "github.com/rwadurian/mpc-system/services/server-party/application/use_cases" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// SessionCoordinatorClient implements use_cases.SessionCoordinatorClient +type SessionCoordinatorClient struct { + conn *grpc.ClientConn + address string +} + +// NewSessionCoordinatorClient creates a new session coordinator gRPC client +func NewSessionCoordinatorClient(address string) (*SessionCoordinatorClient, error) { + conn, err := grpc.Dial( + address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithTimeout(10*time.Second), + ) + if err != nil { + return nil, err + } + + logger.Info("Connected to Session Coordinator", zap.String("address", address)) + + return &SessionCoordinatorClient{ + conn: conn, + address: address, + }, nil +} + +// Close closes the gRPC connection +func (c *SessionCoordinatorClient) Close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// JoinSession joins an MPC session +func (c *SessionCoordinatorClient) JoinSession( + ctx context.Context, + sessionID uuid.UUID, + partyID, joinToken string, +) (*use_cases.SessionInfo, error) { + // Create the request + req := &coordinator.JoinSessionRequest{ + SessionId: sessionID.String(), + PartyId: partyID, + JoinToken: joinToken, + DeviceInfo: &coordinator.DeviceInfo{ + DeviceType: "server", + DeviceId: partyID, + Platform: "linux", + AppVersion: "1.0.0", + }, + } + + // Make the gRPC call using the raw connection + resp := &coordinator.JoinSessionResponse{} + err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/JoinSession", req, resp) + if err != nil { + logger.Error("Failed to join session", zap.Error(err)) + return nil, err + } + + if !resp.Success { + logger.Error("Join session failed", zap.String("session_id", sessionID.String())) + return nil, use_cases.ErrInvalidSession + } + + // Convert response to SessionInfo + // Note: OtherParties should include ALL participants (including self) from coordinator + participants := make([]use_cases.ParticipantInfo, len(resp.OtherParties)) + for i, p := range resp.OtherParties { + // Debug: Log what we received from gRPC + logger.Info("gRPC client - received party_index from protobuf response", + zap.String("party_id", p.PartyId), + zap.Int32("proto_party_index", p.PartyIndex), + zap.Int("converted_party_index", int(p.PartyIndex))) + + participants[i] = use_cases.ParticipantInfo{ + PartyID: p.PartyId, + PartyIndex: int(p.PartyIndex), + } + } + + sessionInfo := &use_cases.SessionInfo{ + SessionID: sessionID, + SessionType: resp.SessionInfo.SessionType, + ThresholdN: int(resp.SessionInfo.ThresholdN), + ThresholdT: int(resp.SessionInfo.ThresholdT), + MessageHash: resp.SessionInfo.MessageHash, + Participants: participants, + } + + logger.Info("Joined session successfully", + zap.String("session_id", sessionID.String()), + zap.String("party_id", partyID), + zap.String("session_type", sessionInfo.SessionType)) + + return sessionInfo, nil +} + +// ReportCompletion reports that a party has completed the MPC protocol +func (c *SessionCoordinatorClient) ReportCompletion( + ctx context.Context, + sessionID uuid.UUID, + partyID string, + resultData []byte, +) error { + req := &coordinator.ReportCompletionRequest{ + SessionId: sessionID.String(), + PartyId: partyID, + PublicKey: resultData, // For keygen, this is public key; for signing, this is signature + } + + resp := &coordinator.ReportCompletionResponse{} + err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/ReportCompletion", req, resp) + if err != nil { + logger.Error("Failed to report completion", zap.Error(err)) + return err + } + + logger.Info("Reported completion", + zap.String("session_id", sessionID.String()), + zap.String("party_id", partyID), + zap.Bool("all_completed", resp.AllCompleted)) + + return nil +} + +// MarkPartyReady marks the party as ready to start the protocol +func (c *SessionCoordinatorClient) MarkPartyReady( + ctx context.Context, + sessionID uuid.UUID, + partyID string, +) (bool, error) { + req := &coordinator.MarkPartyReadyRequest{ + SessionId: sessionID.String(), + PartyId: partyID, + } + + resp := &coordinator.MarkPartyReadyResponse{} + err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/MarkPartyReady", req, resp) + if err != nil { + logger.Error("Failed to mark party ready", zap.Error(err)) + return false, err + } + + logger.Info("Marked party ready", + zap.String("session_id", sessionID.String()), + zap.String("party_id", partyID), + zap.Bool("all_ready", resp.AllReady)) + + return resp.AllReady, nil +} + +// GetSessionStatus gets the current session status +func (c *SessionCoordinatorClient) GetSessionStatus( + ctx context.Context, + sessionID uuid.UUID, +) (string, error) { + req := &coordinator.GetSessionStatusRequest{ + SessionId: sessionID.String(), + } + + resp := &coordinator.GetSessionStatusResponse{} + err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/GetSessionStatus", req, resp) + if err != nil { + return "", err + } + + return resp.Status, nil +} + +// findPartyIndex finds the party index from the list of parties +func findPartyIndex(parties []*coordinator.PartyInfo, partyID string) int { + for _, p := range parties { + if p.PartyId == partyID { + return int(p.PartyIndex) + } + } + return 0 +} diff --git a/backend/mpc-system/services/session-coordinator/adapters/input/grpc/session_grpc_handler.go b/backend/mpc-system/services/session-coordinator/adapters/input/grpc/session_grpc_handler.go index 9f07d7bf..f0b38954 100644 --- a/backend/mpc-system/services/session-coordinator/adapters/input/grpc/session_grpc_handler.go +++ b/backend/mpc-system/services/session-coordinator/adapters/input/grpc/session_grpc_handler.go @@ -150,6 +150,11 @@ func (s *SessionCoordinatorServer) JoinSession( AppVersion: p.DeviceInfo.AppVersion, }, } + // Debug: Log what we're about to send in gRPC response + logger.Info("gRPC JoinSession - setting party_index in protobuf response", + zap.String("party_id", p.PartyID), + zap.Int("source_party_index", p.PartyIndex), + zap.Int32("proto_party_index", otherParties[i].PartyIndex)) } return &pb.JoinSessionResponse{