diff --git a/backend/mpc-system/api/grpc/router/v1/message_router.pb.go b/backend/mpc-system/api/grpc/router/v1/message_router.pb.go index d9b3d7a1..8803ff2a 100644 --- a/backend/mpc-system/api/grpc/router/v1/message_router.pb.go +++ b/backend/mpc-system/api/grpc/router/v1/message_router.pb.go @@ -680,8 +680,11 @@ type SessionEvent struct { ExpiresAt int64 `protobuf:"varint,10,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` // Unix timestamp milliseconds // For sign sessions with delegate party: user's share for delegate to use DelegateUserShare *DelegateUserShare `protobuf:"bytes,11,opt,name=delegate_user_share,json=delegateUserShare,proto3" json:"delegate_user_share,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // For session_started event: complete list of participants with their indices + // CRITICAL: Use this for TSS protocol instead of JoinSession response + Participants []*PartyInfo `protobuf:"bytes,12,rep,name=participants,proto3" json:"participants,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SessionEvent) Reset() { @@ -791,6 +794,13 @@ func (x *SessionEvent) GetDelegateUserShare() *DelegateUserShare { return nil } +func (x *SessionEvent) GetParticipants() []*PartyInfo { + if x != nil { + return x.Participants + } + return nil +} + // DelegateUserShare contains user's share for delegate party to use in signing type DelegateUserShare struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2479,7 +2489,7 @@ const file_api_proto_message_router_proto_rawDesc = "" + "\x1dSubscribeSessionEventsRequest\x12\x19\n" + "\bparty_id\x18\x01 \x01(\tR\apartyId\x12\x1f\n" + "\vevent_types\x18\x02 \x03(\tR\n" + - "eventTypes\"\x94\x04\n" + + "eventTypes\"\xd2\x04\n" + "\fSessionEvent\x12\x19\n" + "\bevent_id\x18\x01 \x01(\tR\aeventId\x12\x1d\n" + "\n" + @@ -2499,7 +2509,8 @@ const file_api_proto_message_router_proto_rawDesc = "" + "\n" + "expires_at\x18\n" + " \x01(\x03R\texpiresAt\x12P\n" + - "\x13delegate_user_share\x18\v \x01(\v2 .mpc.router.v1.DelegateUserShareR\x11delegateUserShare\x1a=\n" + + "\x13delegate_user_share\x18\v \x01(\v2 .mpc.router.v1.DelegateUserShareR\x11delegateUserShare\x12<\n" + + "\fparticipants\x18\f \x03(\v2\x18.mpc.router.v1.PartyInfoR\fparticipants\x1a=\n" + "\x0fJoinTokensEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x89\x01\n" + @@ -2723,50 +2734,51 @@ var file_api_proto_message_router_proto_depIdxs = []int32{ 6, // 1: mpc.router.v1.RegisterPartyRequest.notification:type_name -> mpc.router.v1.NotificationChannel 37, // 2: mpc.router.v1.SessionEvent.join_tokens:type_name -> mpc.router.v1.SessionEvent.JoinTokensEntry 11, // 3: mpc.router.v1.SessionEvent.delegate_user_share:type_name -> mpc.router.v1.DelegateUserShare - 10, // 4: mpc.router.v1.PublishSessionEventRequest.event:type_name -> mpc.router.v1.SessionEvent - 6, // 5: mpc.router.v1.RegisteredParty.notification:type_name -> mpc.router.v1.NotificationChannel - 15, // 6: mpc.router.v1.GetRegisteredPartiesResponse.parties:type_name -> mpc.router.v1.RegisteredParty - 20, // 7: mpc.router.v1.GetMessageStatusResponse.deliveries:type_name -> mpc.router.v1.MessageDeliveryStatus - 24, // 8: mpc.router.v1.PartyInfo.device_info:type_name -> mpc.router.v1.DeviceInfo - 24, // 9: mpc.router.v1.JoinSessionRequest.device_info:type_name -> mpc.router.v1.DeviceInfo - 26, // 10: mpc.router.v1.JoinSessionResponse.session_info:type_name -> mpc.router.v1.SessionInfo - 25, // 11: mpc.router.v1.JoinSessionResponse.other_parties:type_name -> mpc.router.v1.PartyInfo - 25, // 12: mpc.router.v1.GetSessionStatusResponse.participants:type_name -> mpc.router.v1.PartyInfo - 0, // 13: mpc.router.v1.MessageRouter.RouteMessage:input_type -> mpc.router.v1.RouteMessageRequest - 2, // 14: mpc.router.v1.MessageRouter.SubscribeMessages:input_type -> mpc.router.v1.SubscribeMessagesRequest - 4, // 15: mpc.router.v1.MessageRouter.GetPendingMessages:input_type -> mpc.router.v1.GetPendingMessagesRequest - 17, // 16: mpc.router.v1.MessageRouter.AcknowledgeMessage:input_type -> mpc.router.v1.AcknowledgeMessageRequest - 19, // 17: mpc.router.v1.MessageRouter.GetMessageStatus:input_type -> mpc.router.v1.GetMessageStatusRequest - 7, // 18: mpc.router.v1.MessageRouter.RegisterParty:input_type -> mpc.router.v1.RegisterPartyRequest - 22, // 19: mpc.router.v1.MessageRouter.Heartbeat:input_type -> mpc.router.v1.HeartbeatRequest - 9, // 20: mpc.router.v1.MessageRouter.SubscribeSessionEvents:input_type -> mpc.router.v1.SubscribeSessionEventsRequest - 12, // 21: mpc.router.v1.MessageRouter.PublishSessionEvent:input_type -> mpc.router.v1.PublishSessionEventRequest - 14, // 22: mpc.router.v1.MessageRouter.GetRegisteredParties:input_type -> mpc.router.v1.GetRegisteredPartiesRequest - 27, // 23: mpc.router.v1.MessageRouter.JoinSession:input_type -> mpc.router.v1.JoinSessionRequest - 29, // 24: mpc.router.v1.MessageRouter.MarkPartyReady:input_type -> mpc.router.v1.MarkPartyReadyRequest - 31, // 25: mpc.router.v1.MessageRouter.ReportCompletion:input_type -> mpc.router.v1.ReportCompletionRequest - 33, // 26: mpc.router.v1.MessageRouter.GetSessionStatus:input_type -> mpc.router.v1.GetSessionStatusRequest - 35, // 27: mpc.router.v1.MessageRouter.SubmitDelegateShare:input_type -> mpc.router.v1.SubmitDelegateShareRequest - 1, // 28: mpc.router.v1.MessageRouter.RouteMessage:output_type -> mpc.router.v1.RouteMessageResponse - 3, // 29: mpc.router.v1.MessageRouter.SubscribeMessages:output_type -> mpc.router.v1.MPCMessage - 5, // 30: mpc.router.v1.MessageRouter.GetPendingMessages:output_type -> mpc.router.v1.GetPendingMessagesResponse - 18, // 31: mpc.router.v1.MessageRouter.AcknowledgeMessage:output_type -> mpc.router.v1.AcknowledgeMessageResponse - 21, // 32: mpc.router.v1.MessageRouter.GetMessageStatus:output_type -> mpc.router.v1.GetMessageStatusResponse - 8, // 33: mpc.router.v1.MessageRouter.RegisterParty:output_type -> mpc.router.v1.RegisterPartyResponse - 23, // 34: mpc.router.v1.MessageRouter.Heartbeat:output_type -> mpc.router.v1.HeartbeatResponse - 10, // 35: mpc.router.v1.MessageRouter.SubscribeSessionEvents:output_type -> mpc.router.v1.SessionEvent - 13, // 36: mpc.router.v1.MessageRouter.PublishSessionEvent:output_type -> mpc.router.v1.PublishSessionEventResponse - 16, // 37: mpc.router.v1.MessageRouter.GetRegisteredParties:output_type -> mpc.router.v1.GetRegisteredPartiesResponse - 28, // 38: mpc.router.v1.MessageRouter.JoinSession:output_type -> mpc.router.v1.JoinSessionResponse - 30, // 39: mpc.router.v1.MessageRouter.MarkPartyReady:output_type -> mpc.router.v1.MarkPartyReadyResponse - 32, // 40: mpc.router.v1.MessageRouter.ReportCompletion:output_type -> mpc.router.v1.ReportCompletionResponse - 34, // 41: mpc.router.v1.MessageRouter.GetSessionStatus:output_type -> mpc.router.v1.GetSessionStatusResponse - 36, // 42: mpc.router.v1.MessageRouter.SubmitDelegateShare:output_type -> mpc.router.v1.SubmitDelegateShareResponse - 28, // [28:43] is the sub-list for method output_type - 13, // [13:28] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 25, // 4: mpc.router.v1.SessionEvent.participants:type_name -> mpc.router.v1.PartyInfo + 10, // 5: mpc.router.v1.PublishSessionEventRequest.event:type_name -> mpc.router.v1.SessionEvent + 6, // 6: mpc.router.v1.RegisteredParty.notification:type_name -> mpc.router.v1.NotificationChannel + 15, // 7: mpc.router.v1.GetRegisteredPartiesResponse.parties:type_name -> mpc.router.v1.RegisteredParty + 20, // 8: mpc.router.v1.GetMessageStatusResponse.deliveries:type_name -> mpc.router.v1.MessageDeliveryStatus + 24, // 9: mpc.router.v1.PartyInfo.device_info:type_name -> mpc.router.v1.DeviceInfo + 24, // 10: mpc.router.v1.JoinSessionRequest.device_info:type_name -> mpc.router.v1.DeviceInfo + 26, // 11: mpc.router.v1.JoinSessionResponse.session_info:type_name -> mpc.router.v1.SessionInfo + 25, // 12: mpc.router.v1.JoinSessionResponse.other_parties:type_name -> mpc.router.v1.PartyInfo + 25, // 13: mpc.router.v1.GetSessionStatusResponse.participants:type_name -> mpc.router.v1.PartyInfo + 0, // 14: mpc.router.v1.MessageRouter.RouteMessage:input_type -> mpc.router.v1.RouteMessageRequest + 2, // 15: mpc.router.v1.MessageRouter.SubscribeMessages:input_type -> mpc.router.v1.SubscribeMessagesRequest + 4, // 16: mpc.router.v1.MessageRouter.GetPendingMessages:input_type -> mpc.router.v1.GetPendingMessagesRequest + 17, // 17: mpc.router.v1.MessageRouter.AcknowledgeMessage:input_type -> mpc.router.v1.AcknowledgeMessageRequest + 19, // 18: mpc.router.v1.MessageRouter.GetMessageStatus:input_type -> mpc.router.v1.GetMessageStatusRequest + 7, // 19: mpc.router.v1.MessageRouter.RegisterParty:input_type -> mpc.router.v1.RegisterPartyRequest + 22, // 20: mpc.router.v1.MessageRouter.Heartbeat:input_type -> mpc.router.v1.HeartbeatRequest + 9, // 21: mpc.router.v1.MessageRouter.SubscribeSessionEvents:input_type -> mpc.router.v1.SubscribeSessionEventsRequest + 12, // 22: mpc.router.v1.MessageRouter.PublishSessionEvent:input_type -> mpc.router.v1.PublishSessionEventRequest + 14, // 23: mpc.router.v1.MessageRouter.GetRegisteredParties:input_type -> mpc.router.v1.GetRegisteredPartiesRequest + 27, // 24: mpc.router.v1.MessageRouter.JoinSession:input_type -> mpc.router.v1.JoinSessionRequest + 29, // 25: mpc.router.v1.MessageRouter.MarkPartyReady:input_type -> mpc.router.v1.MarkPartyReadyRequest + 31, // 26: mpc.router.v1.MessageRouter.ReportCompletion:input_type -> mpc.router.v1.ReportCompletionRequest + 33, // 27: mpc.router.v1.MessageRouter.GetSessionStatus:input_type -> mpc.router.v1.GetSessionStatusRequest + 35, // 28: mpc.router.v1.MessageRouter.SubmitDelegateShare:input_type -> mpc.router.v1.SubmitDelegateShareRequest + 1, // 29: mpc.router.v1.MessageRouter.RouteMessage:output_type -> mpc.router.v1.RouteMessageResponse + 3, // 30: mpc.router.v1.MessageRouter.SubscribeMessages:output_type -> mpc.router.v1.MPCMessage + 5, // 31: mpc.router.v1.MessageRouter.GetPendingMessages:output_type -> mpc.router.v1.GetPendingMessagesResponse + 18, // 32: mpc.router.v1.MessageRouter.AcknowledgeMessage:output_type -> mpc.router.v1.AcknowledgeMessageResponse + 21, // 33: mpc.router.v1.MessageRouter.GetMessageStatus:output_type -> mpc.router.v1.GetMessageStatusResponse + 8, // 34: mpc.router.v1.MessageRouter.RegisterParty:output_type -> mpc.router.v1.RegisterPartyResponse + 23, // 35: mpc.router.v1.MessageRouter.Heartbeat:output_type -> mpc.router.v1.HeartbeatResponse + 10, // 36: mpc.router.v1.MessageRouter.SubscribeSessionEvents:output_type -> mpc.router.v1.SessionEvent + 13, // 37: mpc.router.v1.MessageRouter.PublishSessionEvent:output_type -> mpc.router.v1.PublishSessionEventResponse + 16, // 38: mpc.router.v1.MessageRouter.GetRegisteredParties:output_type -> mpc.router.v1.GetRegisteredPartiesResponse + 28, // 39: mpc.router.v1.MessageRouter.JoinSession:output_type -> mpc.router.v1.JoinSessionResponse + 30, // 40: mpc.router.v1.MessageRouter.MarkPartyReady:output_type -> mpc.router.v1.MarkPartyReadyResponse + 32, // 41: mpc.router.v1.MessageRouter.ReportCompletion:output_type -> mpc.router.v1.ReportCompletionResponse + 34, // 42: mpc.router.v1.MessageRouter.GetSessionStatus:output_type -> mpc.router.v1.GetSessionStatusResponse + 36, // 43: mpc.router.v1.MessageRouter.SubmitDelegateShare:output_type -> mpc.router.v1.SubmitDelegateShareResponse + 29, // [29:44] is the sub-list for method output_type + 14, // [14:29] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_api_proto_message_router_proto_init() } diff --git a/backend/mpc-system/api/proto/message_router.proto b/backend/mpc-system/api/proto/message_router.proto index f3f29ed2..58339bd7 100644 --- a/backend/mpc-system/api/proto/message_router.proto +++ b/backend/mpc-system/api/proto/message_router.proto @@ -166,6 +166,9 @@ message SessionEvent { int64 expires_at = 10; // Unix timestamp milliseconds // For sign sessions with delegate party: user's share for delegate to use DelegateUserShare delegate_user_share = 11; + // For session_started event: complete list of participants with their indices + // CRITICAL: Use this for TSS protocol instead of JoinSession response + repeated PartyInfo participants = 12; } // DelegateUserShare contains user's share for delegate party to use in signing diff --git a/backend/mpc-system/services/server-party-co-managed/cmd/server/main.go b/backend/mpc-system/services/server-party-co-managed/cmd/server/main.go index 1a546f32..219729a6 100644 --- a/backend/mpc-system/services/server-party-co-managed/cmd/server/main.go +++ b/backend/mpc-system/services/server-party-co-managed/cmd/server/main.go @@ -432,17 +432,43 @@ func createCoManagedSessionEventHandler( return } + // CRITICAL FIX: Use participants from session_started event, NOT from JoinSession cache + // The JoinSession response only contains parties that had joined at that moment, + // but session_started event contains the COMPLETE list of all participants + var participants []use_cases.ParticipantInfo + if len(event.Participants) > 0 { + // Use participants from event (preferred - complete list) + participants = make([]use_cases.ParticipantInfo, len(event.Participants)) + for i, p := range event.Participants { + participants[i] = use_cases.ParticipantInfo{ + PartyID: p.PartyId, + PartyIndex: int(p.PartyIndex), + } + } + logger.Info("Using participants from session_started event", + zap.String("session_id", event.SessionId), + zap.Int("participant_count", len(participants))) + } else { + // Fallback to cached participants (for backward compatibility) + participants = pendingSession.Participants + logger.Warn("No participants in session_started event, using cached participants", + zap.String("session_id", event.SessionId), + zap.Int("participant_count", len(participants))) + } + // Determine session type based on message_hash isSignSession := len(pendingSession.MessageHash) > 0 if isSignSession { logger.Info("Session started event received, beginning TSS signing protocol", zap.String("session_id", event.SessionId), - zap.String("party_id", partyID)) + zap.String("party_id", partyID), + zap.Int("participant_count", len(participants))) } else { logger.Info("Session started event received, beginning TSS keygen protocol", zap.String("session_id", event.SessionId), - zap.String("party_id", partyID)) + zap.String("party_id", partyID), + zap.Int("participant_count", len(participants))) } // Execute TSS protocol in goroutine @@ -452,11 +478,6 @@ func createCoManagedSessionEventHandler( participateCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - // CRITICAL: Use participants from pendingSession (which came from JoinSession response) - // These contain the correct PartyIndex values from the database, NOT loop indices - // The JoinSession response already includes all participants with their assigned indices - participants := pendingSession.Participants - if isSignSession { // Execute signing protocol logger.Info("Auto-participating in co_managed_sign session", diff --git a/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go b/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go index 28c614bd..85ed23ed 100644 --- a/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go +++ b/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go @@ -149,6 +149,8 @@ func (c *MessageRouterClient) PublishSessionCreated( } // PublishSessionStarted publishes a session_started event when all parties have joined +// CRITICAL: participants contains the complete list of all parties with their indices +// Receivers should use this list for TSS protocol instead of JoinSession response func (c *MessageRouterClient) PublishSessionStarted( ctx context.Context, sessionID string, @@ -157,7 +159,17 @@ func (c *MessageRouterClient) PublishSessionStarted( selectedParties []string, joinTokens map[string]string, startedAt int64, + participants []use_cases.SessionParticipantInfo, ) error { + // Convert participants to proto format + protoParticipants := make([]*router.PartyInfo, len(participants)) + for i, p := range participants { + protoParticipants[i] = &router.PartyInfo{ + PartyId: p.PartyID, + PartyIndex: p.PartyIndex, + } + } + event := &router.SessionEvent{ EventId: uuid.New().String(), EventType: "session_started", @@ -167,8 +179,13 @@ func (c *MessageRouterClient) PublishSessionStarted( SelectedParties: selectedParties, JoinTokens: joinTokens, CreatedAt: startedAt, + Participants: protoParticipants, } + logger.Info("Publishing session_started event with participants", + zap.String("session_id", sessionID), + zap.Int("participant_count", len(participants))) + return c.PublishSessionEvent(ctx, event) } diff --git a/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go b/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go index 96ee8420..d9a3f132 100644 --- a/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go +++ b/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go @@ -21,8 +21,16 @@ import ( // Maximum retries for optimistic lock conflicts during join session const joinSessionMaxRetries = 3 +// SessionParticipantInfo contains party ID and index for session_started event +type SessionParticipantInfo struct { + PartyID string + PartyIndex int32 +} + // JoinSessionMessageRouterClient defines the interface for publishing session events via gRPC type JoinSessionMessageRouterClient interface { + // PublishSessionStarted publishes session_started event with complete participants list + // CRITICAL: participants contains all parties with their indices for TSS protocol PublishSessionStarted( ctx context.Context, sessionID string, @@ -31,6 +39,7 @@ type JoinSessionMessageRouterClient interface { selectedParties []string, joinTokens map[string]string, startedAt int64, + participants []SessionParticipantInfo, ) error // PublishParticipantJoined broadcasts a participant_joined event to all parties in the session @@ -248,6 +257,16 @@ func (uc *JoinSessionUseCase) executeWithRetry( // Build join tokens map (empty for session_started, parties already have tokens) joinTokens := make(map[string]string) + // CRITICAL: Build complete participants list with party indices + // This ensures all parties have the same participant list for TSS protocol + participants := make([]SessionParticipantInfo, len(session.Participants)) + for i, p := range session.Participants { + participants[i] = SessionParticipantInfo{ + PartyID: p.PartyID.String(), + PartyIndex: int32(p.PartyIndex), + } + } + if err := uc.messageRouterClient.PublishSessionStarted( ctx, session.ID.String(), @@ -256,6 +275,7 @@ func (uc *JoinSessionUseCase) executeWithRetry( selectedParties, joinTokens, startedAt, + participants, ); err != nil { logger.Error("failed to publish session started event to message router", zap.String("session_id", session.ID.String()), @@ -263,7 +283,8 @@ func (uc *JoinSessionUseCase) executeWithRetry( } else { logger.Info("published session started event to message router", zap.String("session_id", session.ID.String()), - zap.Int("party_count", len(selectedParties))) + zap.Int("party_count", len(selectedParties)), + zap.Int("participant_count", len(participants))) } } }