feat(mpc-system): optimize party index handling and add gRPC debug logs

- Simplified participant list handling in JoinSession client
- Added debug logging for party_index conversion in gRPC messages
- Removed redundant party filtering logic
- Added detailed logging to trace protobuf field values

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-05 04:00:09 -08:00
parent c9cb5676d0
commit 553ffd365e
2 changed files with 200 additions and 198 deletions

View File

@ -1,198 +1,195 @@
package grpc package grpc
import ( import (
"context" "context"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
coordinator "github.com/rwadurian/mpc-system/api/grpc/coordinator/v1" coordinator "github.com/rwadurian/mpc-system/api/grpc/coordinator/v1"
"github.com/rwadurian/mpc-system/pkg/logger" "github.com/rwadurian/mpc-system/pkg/logger"
"github.com/rwadurian/mpc-system/services/server-party/application/use_cases" "github.com/rwadurian/mpc-system/services/server-party/application/use_cases"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
// SessionCoordinatorClient implements use_cases.SessionCoordinatorClient // SessionCoordinatorClient implements use_cases.SessionCoordinatorClient
type SessionCoordinatorClient struct { type SessionCoordinatorClient struct {
conn *grpc.ClientConn conn *grpc.ClientConn
address string address string
} }
// NewSessionCoordinatorClient creates a new session coordinator gRPC client // NewSessionCoordinatorClient creates a new session coordinator gRPC client
func NewSessionCoordinatorClient(address string) (*SessionCoordinatorClient, error) { func NewSessionCoordinatorClient(address string) (*SessionCoordinatorClient, error) {
conn, err := grpc.Dial( conn, err := grpc.Dial(
address, address,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithTimeout(10*time.Second), grpc.WithTimeout(10*time.Second),
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
logger.Info("Connected to Session Coordinator", zap.String("address", address)) logger.Info("Connected to Session Coordinator", zap.String("address", address))
return &SessionCoordinatorClient{ return &SessionCoordinatorClient{
conn: conn, conn: conn,
address: address, address: address,
}, nil }, nil
} }
// Close closes the gRPC connection // Close closes the gRPC connection
func (c *SessionCoordinatorClient) Close() error { func (c *SessionCoordinatorClient) Close() error {
if c.conn != nil { if c.conn != nil {
return c.conn.Close() return c.conn.Close()
} }
return nil return nil
} }
// JoinSession joins an MPC session // JoinSession joins an MPC session
func (c *SessionCoordinatorClient) JoinSession( func (c *SessionCoordinatorClient) JoinSession(
ctx context.Context, ctx context.Context,
sessionID uuid.UUID, sessionID uuid.UUID,
partyID, joinToken string, partyID, joinToken string,
) (*use_cases.SessionInfo, error) { ) (*use_cases.SessionInfo, error) {
// Create the request // Create the request
req := &coordinator.JoinSessionRequest{ req := &coordinator.JoinSessionRequest{
SessionId: sessionID.String(), SessionId: sessionID.String(),
PartyId: partyID, PartyId: partyID,
JoinToken: joinToken, JoinToken: joinToken,
DeviceInfo: &coordinator.DeviceInfo{ DeviceInfo: &coordinator.DeviceInfo{
DeviceType: "server", DeviceType: "server",
DeviceId: partyID, DeviceId: partyID,
Platform: "linux", Platform: "linux",
AppVersion: "1.0.0", AppVersion: "1.0.0",
}, },
} }
// Make the gRPC call using the raw connection // Make the gRPC call using the raw connection
resp := &coordinator.JoinSessionResponse{} resp := &coordinator.JoinSessionResponse{}
err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/JoinSession", req, resp) err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/JoinSession", req, resp)
if err != nil { if err != nil {
logger.Error("Failed to join session", zap.Error(err)) logger.Error("Failed to join session", zap.Error(err))
return nil, err return nil, err
} }
if !resp.Success { if !resp.Success {
logger.Error("Join session failed", zap.String("session_id", sessionID.String())) logger.Error("Join session failed", zap.String("session_id", sessionID.String()))
return nil, use_cases.ErrInvalidSession return nil, use_cases.ErrInvalidSession
} }
// Convert response to SessionInfo // Convert response to SessionInfo
participants := make([]use_cases.ParticipantInfo, 0, len(resp.OtherParties)+1) // Note: OtherParties should include ALL participants (including self) from coordinator
participants := make([]use_cases.ParticipantInfo, len(resp.OtherParties))
// Add self for i, p := range resp.OtherParties {
participants = append(participants, use_cases.ParticipantInfo{ // Debug: Log what we received from gRPC
PartyID: partyID, logger.Info("gRPC client - received party_index from protobuf response",
PartyIndex: findPartyIndex(resp.OtherParties, partyID), zap.String("party_id", p.PartyId),
}) zap.Int32("proto_party_index", p.PartyIndex),
zap.Int("converted_party_index", int(p.PartyIndex)))
// Add other parties
for _, p := range resp.OtherParties { participants[i] = use_cases.ParticipantInfo{
if p.PartyId != partyID { PartyID: p.PartyId,
participants = append(participants, use_cases.ParticipantInfo{ PartyIndex: int(p.PartyIndex),
PartyID: p.PartyId, }
PartyIndex: int(p.PartyIndex), }
})
} sessionInfo := &use_cases.SessionInfo{
} SessionID: sessionID,
SessionType: resp.SessionInfo.SessionType,
sessionInfo := &use_cases.SessionInfo{ ThresholdN: int(resp.SessionInfo.ThresholdN),
SessionID: sessionID, ThresholdT: int(resp.SessionInfo.ThresholdT),
SessionType: resp.SessionInfo.SessionType, MessageHash: resp.SessionInfo.MessageHash,
ThresholdN: int(resp.SessionInfo.ThresholdN), Participants: participants,
ThresholdT: int(resp.SessionInfo.ThresholdT), }
MessageHash: resp.SessionInfo.MessageHash,
Participants: participants, logger.Info("Joined session successfully",
} zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID),
logger.Info("Joined session successfully", zap.String("session_type", sessionInfo.SessionType))
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID), return sessionInfo, nil
zap.String("session_type", sessionInfo.SessionType)) }
return sessionInfo, nil // ReportCompletion reports that a party has completed the MPC protocol
} func (c *SessionCoordinatorClient) ReportCompletion(
ctx context.Context,
// ReportCompletion reports that a party has completed the MPC protocol sessionID uuid.UUID,
func (c *SessionCoordinatorClient) ReportCompletion( partyID string,
ctx context.Context, resultData []byte,
sessionID uuid.UUID, ) error {
partyID string, req := &coordinator.ReportCompletionRequest{
resultData []byte, SessionId: sessionID.String(),
) error { PartyId: partyID,
req := &coordinator.ReportCompletionRequest{ PublicKey: resultData, // For keygen, this is public key; for signing, this is signature
SessionId: sessionID.String(), }
PartyId: partyID,
PublicKey: resultData, // For keygen, this is public key; for signing, this is signature resp := &coordinator.ReportCompletionResponse{}
} err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/ReportCompletion", req, resp)
if err != nil {
resp := &coordinator.ReportCompletionResponse{} logger.Error("Failed to report completion", zap.Error(err))
err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/ReportCompletion", req, resp) return err
if err != nil { }
logger.Error("Failed to report completion", zap.Error(err))
return err logger.Info("Reported completion",
} zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID),
logger.Info("Reported completion", zap.Bool("all_completed", resp.AllCompleted))
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID), return nil
zap.Bool("all_completed", resp.AllCompleted)) }
return nil // MarkPartyReady marks the party as ready to start the protocol
} func (c *SessionCoordinatorClient) MarkPartyReady(
ctx context.Context,
// MarkPartyReady marks the party as ready to start the protocol sessionID uuid.UUID,
func (c *SessionCoordinatorClient) MarkPartyReady( partyID string,
ctx context.Context, ) (bool, error) {
sessionID uuid.UUID, req := &coordinator.MarkPartyReadyRequest{
partyID string, SessionId: sessionID.String(),
) (bool, error) { PartyId: partyID,
req := &coordinator.MarkPartyReadyRequest{ }
SessionId: sessionID.String(),
PartyId: partyID, resp := &coordinator.MarkPartyReadyResponse{}
} err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/MarkPartyReady", req, resp)
if err != nil {
resp := &coordinator.MarkPartyReadyResponse{} logger.Error("Failed to mark party ready", zap.Error(err))
err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/MarkPartyReady", req, resp) return false, err
if err != nil { }
logger.Error("Failed to mark party ready", zap.Error(err))
return false, err logger.Info("Marked party ready",
} zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID),
logger.Info("Marked party ready", zap.Bool("all_ready", resp.AllReady))
zap.String("session_id", sessionID.String()),
zap.String("party_id", partyID), return resp.AllReady, nil
zap.Bool("all_ready", resp.AllReady)) }
return resp.AllReady, nil // GetSessionStatus gets the current session status
} func (c *SessionCoordinatorClient) GetSessionStatus(
ctx context.Context,
// GetSessionStatus gets the current session status sessionID uuid.UUID,
func (c *SessionCoordinatorClient) GetSessionStatus( ) (string, error) {
ctx context.Context, req := &coordinator.GetSessionStatusRequest{
sessionID uuid.UUID, SessionId: sessionID.String(),
) (string, error) { }
req := &coordinator.GetSessionStatusRequest{
SessionId: sessionID.String(), resp := &coordinator.GetSessionStatusResponse{}
} err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/GetSessionStatus", req, resp)
if err != nil {
resp := &coordinator.GetSessionStatusResponse{} return "", err
err := c.conn.Invoke(ctx, "/mpc.coordinator.v1.SessionCoordinator/GetSessionStatus", req, resp) }
if err != nil {
return "", err return resp.Status, nil
} }
return resp.Status, nil // findPartyIndex finds the party index from the list of parties
} func findPartyIndex(parties []*coordinator.PartyInfo, partyID string) int {
for _, p := range parties {
// findPartyIndex finds the party index from the list of parties if p.PartyId == partyID {
func findPartyIndex(parties []*coordinator.PartyInfo, partyID string) int { return int(p.PartyIndex)
for _, p := range parties { }
if p.PartyId == partyID { }
return int(p.PartyIndex) return 0
} }
}
return 0
}

View File

@ -150,6 +150,11 @@ func (s *SessionCoordinatorServer) JoinSession(
AppVersion: p.DeviceInfo.AppVersion, AppVersion: p.DeviceInfo.AppVersion,
}, },
} }
// Debug: Log what we're about to send in gRPC response
logger.Info("gRPC JoinSession - setting party_index in protobuf response",
zap.String("party_id", p.PartyID),
zap.Int("source_party_index", p.PartyIndex),
zap.Int32("proto_party_index", otherParties[i].PartyIndex))
} }
return &pb.JoinSessionResponse{ return &pb.JoinSessionResponse{