fix(mpc): session_started事件携带完整participants列表

问题:server-party-co-managed 使用 JoinSession 缓存的 participants,
但如果它是第一个加入的,缓存的列表只有自己,导致 keygen 失败。

修复:
- proto: SessionEvent 添加 repeated PartyInfo participants 字段
- session-coordinator: PublishSessionStarted 时包含完整 participants
- server-party-co-managed: 优先使用事件中的 participants

这确保所有 party 在收到 session_started 时都能获得完整的参与者列表。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-27 07:26:25 -08:00
parent aeb70a6579
commit d04f0a08e0
5 changed files with 130 additions and 56 deletions

View File

@ -680,8 +680,11 @@ type SessionEvent struct {
ExpiresAt int64 `protobuf:"varint,10,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` // Unix timestamp milliseconds
// For sign sessions with delegate party: user's share for delegate to use
DelegateUserShare *DelegateUserShare `protobuf:"bytes,11,opt,name=delegate_user_share,json=delegateUserShare,proto3" json:"delegate_user_share,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
// For session_started event: complete list of participants with their indices
// CRITICAL: Use this for TSS protocol instead of JoinSession response
Participants []*PartyInfo `protobuf:"bytes,12,rep,name=participants,proto3" json:"participants,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SessionEvent) Reset() {
@ -791,6 +794,13 @@ func (x *SessionEvent) GetDelegateUserShare() *DelegateUserShare {
return nil
}
func (x *SessionEvent) GetParticipants() []*PartyInfo {
if x != nil {
return x.Participants
}
return nil
}
// DelegateUserShare contains user's share for delegate party to use in signing
type DelegateUserShare struct {
state protoimpl.MessageState `protogen:"open.v1"`
@ -2479,7 +2489,7 @@ const file_api_proto_message_router_proto_rawDesc = "" +
"\x1dSubscribeSessionEventsRequest\x12\x19\n" +
"\bparty_id\x18\x01 \x01(\tR\apartyId\x12\x1f\n" +
"\vevent_types\x18\x02 \x03(\tR\n" +
"eventTypes\"\x94\x04\n" +
"eventTypes\"\xd2\x04\n" +
"\fSessionEvent\x12\x19\n" +
"\bevent_id\x18\x01 \x01(\tR\aeventId\x12\x1d\n" +
"\n" +
@ -2499,7 +2509,8 @@ const file_api_proto_message_router_proto_rawDesc = "" +
"\n" +
"expires_at\x18\n" +
" \x01(\x03R\texpiresAt\x12P\n" +
"\x13delegate_user_share\x18\v \x01(\v2 .mpc.router.v1.DelegateUserShareR\x11delegateUserShare\x1a=\n" +
"\x13delegate_user_share\x18\v \x01(\v2 .mpc.router.v1.DelegateUserShareR\x11delegateUserShare\x12<\n" +
"\fparticipants\x18\f \x03(\v2\x18.mpc.router.v1.PartyInfoR\fparticipants\x1a=\n" +
"\x0fJoinTokensEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x89\x01\n" +
@ -2723,50 +2734,51 @@ var file_api_proto_message_router_proto_depIdxs = []int32{
6, // 1: mpc.router.v1.RegisterPartyRequest.notification:type_name -> mpc.router.v1.NotificationChannel
37, // 2: mpc.router.v1.SessionEvent.join_tokens:type_name -> mpc.router.v1.SessionEvent.JoinTokensEntry
11, // 3: mpc.router.v1.SessionEvent.delegate_user_share:type_name -> mpc.router.v1.DelegateUserShare
10, // 4: mpc.router.v1.PublishSessionEventRequest.event:type_name -> mpc.router.v1.SessionEvent
6, // 5: mpc.router.v1.RegisteredParty.notification:type_name -> mpc.router.v1.NotificationChannel
15, // 6: mpc.router.v1.GetRegisteredPartiesResponse.parties:type_name -> mpc.router.v1.RegisteredParty
20, // 7: mpc.router.v1.GetMessageStatusResponse.deliveries:type_name -> mpc.router.v1.MessageDeliveryStatus
24, // 8: mpc.router.v1.PartyInfo.device_info:type_name -> mpc.router.v1.DeviceInfo
24, // 9: mpc.router.v1.JoinSessionRequest.device_info:type_name -> mpc.router.v1.DeviceInfo
26, // 10: mpc.router.v1.JoinSessionResponse.session_info:type_name -> mpc.router.v1.SessionInfo
25, // 11: mpc.router.v1.JoinSessionResponse.other_parties:type_name -> mpc.router.v1.PartyInfo
25, // 12: mpc.router.v1.GetSessionStatusResponse.participants:type_name -> mpc.router.v1.PartyInfo
0, // 13: mpc.router.v1.MessageRouter.RouteMessage:input_type -> mpc.router.v1.RouteMessageRequest
2, // 14: mpc.router.v1.MessageRouter.SubscribeMessages:input_type -> mpc.router.v1.SubscribeMessagesRequest
4, // 15: mpc.router.v1.MessageRouter.GetPendingMessages:input_type -> mpc.router.v1.GetPendingMessagesRequest
17, // 16: mpc.router.v1.MessageRouter.AcknowledgeMessage:input_type -> mpc.router.v1.AcknowledgeMessageRequest
19, // 17: mpc.router.v1.MessageRouter.GetMessageStatus:input_type -> mpc.router.v1.GetMessageStatusRequest
7, // 18: mpc.router.v1.MessageRouter.RegisterParty:input_type -> mpc.router.v1.RegisterPartyRequest
22, // 19: mpc.router.v1.MessageRouter.Heartbeat:input_type -> mpc.router.v1.HeartbeatRequest
9, // 20: mpc.router.v1.MessageRouter.SubscribeSessionEvents:input_type -> mpc.router.v1.SubscribeSessionEventsRequest
12, // 21: mpc.router.v1.MessageRouter.PublishSessionEvent:input_type -> mpc.router.v1.PublishSessionEventRequest
14, // 22: mpc.router.v1.MessageRouter.GetRegisteredParties:input_type -> mpc.router.v1.GetRegisteredPartiesRequest
27, // 23: mpc.router.v1.MessageRouter.JoinSession:input_type -> mpc.router.v1.JoinSessionRequest
29, // 24: mpc.router.v1.MessageRouter.MarkPartyReady:input_type -> mpc.router.v1.MarkPartyReadyRequest
31, // 25: mpc.router.v1.MessageRouter.ReportCompletion:input_type -> mpc.router.v1.ReportCompletionRequest
33, // 26: mpc.router.v1.MessageRouter.GetSessionStatus:input_type -> mpc.router.v1.GetSessionStatusRequest
35, // 27: mpc.router.v1.MessageRouter.SubmitDelegateShare:input_type -> mpc.router.v1.SubmitDelegateShareRequest
1, // 28: mpc.router.v1.MessageRouter.RouteMessage:output_type -> mpc.router.v1.RouteMessageResponse
3, // 29: mpc.router.v1.MessageRouter.SubscribeMessages:output_type -> mpc.router.v1.MPCMessage
5, // 30: mpc.router.v1.MessageRouter.GetPendingMessages:output_type -> mpc.router.v1.GetPendingMessagesResponse
18, // 31: mpc.router.v1.MessageRouter.AcknowledgeMessage:output_type -> mpc.router.v1.AcknowledgeMessageResponse
21, // 32: mpc.router.v1.MessageRouter.GetMessageStatus:output_type -> mpc.router.v1.GetMessageStatusResponse
8, // 33: mpc.router.v1.MessageRouter.RegisterParty:output_type -> mpc.router.v1.RegisterPartyResponse
23, // 34: mpc.router.v1.MessageRouter.Heartbeat:output_type -> mpc.router.v1.HeartbeatResponse
10, // 35: mpc.router.v1.MessageRouter.SubscribeSessionEvents:output_type -> mpc.router.v1.SessionEvent
13, // 36: mpc.router.v1.MessageRouter.PublishSessionEvent:output_type -> mpc.router.v1.PublishSessionEventResponse
16, // 37: mpc.router.v1.MessageRouter.GetRegisteredParties:output_type -> mpc.router.v1.GetRegisteredPartiesResponse
28, // 38: mpc.router.v1.MessageRouter.JoinSession:output_type -> mpc.router.v1.JoinSessionResponse
30, // 39: mpc.router.v1.MessageRouter.MarkPartyReady:output_type -> mpc.router.v1.MarkPartyReadyResponse
32, // 40: mpc.router.v1.MessageRouter.ReportCompletion:output_type -> mpc.router.v1.ReportCompletionResponse
34, // 41: mpc.router.v1.MessageRouter.GetSessionStatus:output_type -> mpc.router.v1.GetSessionStatusResponse
36, // 42: mpc.router.v1.MessageRouter.SubmitDelegateShare:output_type -> mpc.router.v1.SubmitDelegateShareResponse
28, // [28:43] is the sub-list for method output_type
13, // [13:28] is the sub-list for method input_type
13, // [13:13] is the sub-list for extension type_name
13, // [13:13] is the sub-list for extension extendee
0, // [0:13] is the sub-list for field type_name
25, // 4: mpc.router.v1.SessionEvent.participants:type_name -> mpc.router.v1.PartyInfo
10, // 5: mpc.router.v1.PublishSessionEventRequest.event:type_name -> mpc.router.v1.SessionEvent
6, // 6: mpc.router.v1.RegisteredParty.notification:type_name -> mpc.router.v1.NotificationChannel
15, // 7: mpc.router.v1.GetRegisteredPartiesResponse.parties:type_name -> mpc.router.v1.RegisteredParty
20, // 8: mpc.router.v1.GetMessageStatusResponse.deliveries:type_name -> mpc.router.v1.MessageDeliveryStatus
24, // 9: mpc.router.v1.PartyInfo.device_info:type_name -> mpc.router.v1.DeviceInfo
24, // 10: mpc.router.v1.JoinSessionRequest.device_info:type_name -> mpc.router.v1.DeviceInfo
26, // 11: mpc.router.v1.JoinSessionResponse.session_info:type_name -> mpc.router.v1.SessionInfo
25, // 12: mpc.router.v1.JoinSessionResponse.other_parties:type_name -> mpc.router.v1.PartyInfo
25, // 13: mpc.router.v1.GetSessionStatusResponse.participants:type_name -> mpc.router.v1.PartyInfo
0, // 14: mpc.router.v1.MessageRouter.RouteMessage:input_type -> mpc.router.v1.RouteMessageRequest
2, // 15: mpc.router.v1.MessageRouter.SubscribeMessages:input_type -> mpc.router.v1.SubscribeMessagesRequest
4, // 16: mpc.router.v1.MessageRouter.GetPendingMessages:input_type -> mpc.router.v1.GetPendingMessagesRequest
17, // 17: mpc.router.v1.MessageRouter.AcknowledgeMessage:input_type -> mpc.router.v1.AcknowledgeMessageRequest
19, // 18: mpc.router.v1.MessageRouter.GetMessageStatus:input_type -> mpc.router.v1.GetMessageStatusRequest
7, // 19: mpc.router.v1.MessageRouter.RegisterParty:input_type -> mpc.router.v1.RegisterPartyRequest
22, // 20: mpc.router.v1.MessageRouter.Heartbeat:input_type -> mpc.router.v1.HeartbeatRequest
9, // 21: mpc.router.v1.MessageRouter.SubscribeSessionEvents:input_type -> mpc.router.v1.SubscribeSessionEventsRequest
12, // 22: mpc.router.v1.MessageRouter.PublishSessionEvent:input_type -> mpc.router.v1.PublishSessionEventRequest
14, // 23: mpc.router.v1.MessageRouter.GetRegisteredParties:input_type -> mpc.router.v1.GetRegisteredPartiesRequest
27, // 24: mpc.router.v1.MessageRouter.JoinSession:input_type -> mpc.router.v1.JoinSessionRequest
29, // 25: mpc.router.v1.MessageRouter.MarkPartyReady:input_type -> mpc.router.v1.MarkPartyReadyRequest
31, // 26: mpc.router.v1.MessageRouter.ReportCompletion:input_type -> mpc.router.v1.ReportCompletionRequest
33, // 27: mpc.router.v1.MessageRouter.GetSessionStatus:input_type -> mpc.router.v1.GetSessionStatusRequest
35, // 28: mpc.router.v1.MessageRouter.SubmitDelegateShare:input_type -> mpc.router.v1.SubmitDelegateShareRequest
1, // 29: mpc.router.v1.MessageRouter.RouteMessage:output_type -> mpc.router.v1.RouteMessageResponse
3, // 30: mpc.router.v1.MessageRouter.SubscribeMessages:output_type -> mpc.router.v1.MPCMessage
5, // 31: mpc.router.v1.MessageRouter.GetPendingMessages:output_type -> mpc.router.v1.GetPendingMessagesResponse
18, // 32: mpc.router.v1.MessageRouter.AcknowledgeMessage:output_type -> mpc.router.v1.AcknowledgeMessageResponse
21, // 33: mpc.router.v1.MessageRouter.GetMessageStatus:output_type -> mpc.router.v1.GetMessageStatusResponse
8, // 34: mpc.router.v1.MessageRouter.RegisterParty:output_type -> mpc.router.v1.RegisterPartyResponse
23, // 35: mpc.router.v1.MessageRouter.Heartbeat:output_type -> mpc.router.v1.HeartbeatResponse
10, // 36: mpc.router.v1.MessageRouter.SubscribeSessionEvents:output_type -> mpc.router.v1.SessionEvent
13, // 37: mpc.router.v1.MessageRouter.PublishSessionEvent:output_type -> mpc.router.v1.PublishSessionEventResponse
16, // 38: mpc.router.v1.MessageRouter.GetRegisteredParties:output_type -> mpc.router.v1.GetRegisteredPartiesResponse
28, // 39: mpc.router.v1.MessageRouter.JoinSession:output_type -> mpc.router.v1.JoinSessionResponse
30, // 40: mpc.router.v1.MessageRouter.MarkPartyReady:output_type -> mpc.router.v1.MarkPartyReadyResponse
32, // 41: mpc.router.v1.MessageRouter.ReportCompletion:output_type -> mpc.router.v1.ReportCompletionResponse
34, // 42: mpc.router.v1.MessageRouter.GetSessionStatus:output_type -> mpc.router.v1.GetSessionStatusResponse
36, // 43: mpc.router.v1.MessageRouter.SubmitDelegateShare:output_type -> mpc.router.v1.SubmitDelegateShareResponse
29, // [29:44] is the sub-list for method output_type
14, // [14:29] is the sub-list for method input_type
14, // [14:14] is the sub-list for extension type_name
14, // [14:14] is the sub-list for extension extendee
0, // [0:14] is the sub-list for field type_name
}
func init() { file_api_proto_message_router_proto_init() }

View File

@ -166,6 +166,9 @@ message SessionEvent {
int64 expires_at = 10; // Unix timestamp milliseconds
// For sign sessions with delegate party: user's share for delegate to use
DelegateUserShare delegate_user_share = 11;
// For session_started event: complete list of participants with their indices
// CRITICAL: Use this for TSS protocol instead of JoinSession response
repeated PartyInfo participants = 12;
}
// DelegateUserShare contains user's share for delegate party to use in signing

View File

@ -432,17 +432,43 @@ func createCoManagedSessionEventHandler(
return
}
// CRITICAL FIX: Use participants from session_started event, NOT from JoinSession cache
// The JoinSession response only contains parties that had joined at that moment,
// but session_started event contains the COMPLETE list of all participants
var participants []use_cases.ParticipantInfo
if len(event.Participants) > 0 {
// Use participants from event (preferred - complete list)
participants = make([]use_cases.ParticipantInfo, len(event.Participants))
for i, p := range event.Participants {
participants[i] = use_cases.ParticipantInfo{
PartyID: p.PartyId,
PartyIndex: int(p.PartyIndex),
}
}
logger.Info("Using participants from session_started event",
zap.String("session_id", event.SessionId),
zap.Int("participant_count", len(participants)))
} else {
// Fallback to cached participants (for backward compatibility)
participants = pendingSession.Participants
logger.Warn("No participants in session_started event, using cached participants",
zap.String("session_id", event.SessionId),
zap.Int("participant_count", len(participants)))
}
// Determine session type based on message_hash
isSignSession := len(pendingSession.MessageHash) > 0
if isSignSession {
logger.Info("Session started event received, beginning TSS signing protocol",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
zap.String("party_id", partyID),
zap.Int("participant_count", len(participants)))
} else {
logger.Info("Session started event received, beginning TSS keygen protocol",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
zap.String("party_id", partyID),
zap.Int("participant_count", len(participants)))
}
// Execute TSS protocol in goroutine
@ -452,11 +478,6 @@ func createCoManagedSessionEventHandler(
participateCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
// CRITICAL: Use participants from pendingSession (which came from JoinSession response)
// These contain the correct PartyIndex values from the database, NOT loop indices
// The JoinSession response already includes all participants with their assigned indices
participants := pendingSession.Participants
if isSignSession {
// Execute signing protocol
logger.Info("Auto-participating in co_managed_sign session",

View File

@ -149,6 +149,8 @@ func (c *MessageRouterClient) PublishSessionCreated(
}
// PublishSessionStarted publishes a session_started event when all parties have joined
// CRITICAL: participants contains the complete list of all parties with their indices
// Receivers should use this list for TSS protocol instead of JoinSession response
func (c *MessageRouterClient) PublishSessionStarted(
ctx context.Context,
sessionID string,
@ -157,7 +159,17 @@ func (c *MessageRouterClient) PublishSessionStarted(
selectedParties []string,
joinTokens map[string]string,
startedAt int64,
participants []use_cases.SessionParticipantInfo,
) error {
// Convert participants to proto format
protoParticipants := make([]*router.PartyInfo, len(participants))
for i, p := range participants {
protoParticipants[i] = &router.PartyInfo{
PartyId: p.PartyID,
PartyIndex: p.PartyIndex,
}
}
event := &router.SessionEvent{
EventId: uuid.New().String(),
EventType: "session_started",
@ -167,8 +179,13 @@ func (c *MessageRouterClient) PublishSessionStarted(
SelectedParties: selectedParties,
JoinTokens: joinTokens,
CreatedAt: startedAt,
Participants: protoParticipants,
}
logger.Info("Publishing session_started event with participants",
zap.String("session_id", sessionID),
zap.Int("participant_count", len(participants)))
return c.PublishSessionEvent(ctx, event)
}

View File

@ -21,8 +21,16 @@ import (
// Maximum retries for optimistic lock conflicts during join session
const joinSessionMaxRetries = 3
// SessionParticipantInfo contains party ID and index for session_started event
type SessionParticipantInfo struct {
PartyID string
PartyIndex int32
}
// JoinSessionMessageRouterClient defines the interface for publishing session events via gRPC
type JoinSessionMessageRouterClient interface {
// PublishSessionStarted publishes session_started event with complete participants list
// CRITICAL: participants contains all parties with their indices for TSS protocol
PublishSessionStarted(
ctx context.Context,
sessionID string,
@ -31,6 +39,7 @@ type JoinSessionMessageRouterClient interface {
selectedParties []string,
joinTokens map[string]string,
startedAt int64,
participants []SessionParticipantInfo,
) error
// PublishParticipantJoined broadcasts a participant_joined event to all parties in the session
@ -248,6 +257,16 @@ func (uc *JoinSessionUseCase) executeWithRetry(
// Build join tokens map (empty for session_started, parties already have tokens)
joinTokens := make(map[string]string)
// CRITICAL: Build complete participants list with party indices
// This ensures all parties have the same participant list for TSS protocol
participants := make([]SessionParticipantInfo, len(session.Participants))
for i, p := range session.Participants {
participants[i] = SessionParticipantInfo{
PartyID: p.PartyID.String(),
PartyIndex: int32(p.PartyIndex),
}
}
if err := uc.messageRouterClient.PublishSessionStarted(
ctx,
session.ID.String(),
@ -256,6 +275,7 @@ func (uc *JoinSessionUseCase) executeWithRetry(
selectedParties,
joinTokens,
startedAt,
participants,
); err != nil {
logger.Error("failed to publish session started event to message router",
zap.String("session_id", session.ID.String()),
@ -263,7 +283,8 @@ func (uc *JoinSessionUseCase) executeWithRetry(
} else {
logger.Info("published session started event to message router",
zap.String("session_id", session.ID.String()),
zap.Int("party_count", len(selectedParties)))
zap.Int("party_count", len(selectedParties)),
zap.Int("participant_count", len(participants)))
}
}
}