fix(server-party-co-managed): 修复死锁问题 - session_created 时立即 JoinSession

问题:
- 原来在 session_created 时只存储 token,等待 session_started
- 但 session_started 需要所有 N 方都 JoinSession 后才触发
- 这导致死锁:co-managed-party 永远收不到 session_started

修复:
- Phase 1 (session_created): 立即调用 JoinSession + 存储 session 信息
- Phase 2 (session_started): 执行 TSS 协议(超时从此时开始计算)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-30 00:23:26 -08:00
parent e114723ab0
commit fd6f84ce82
1 changed files with 21 additions and 6 deletions

View File

@ -189,6 +189,7 @@ func main() {
eventHandler := createCoManagedSessionEventHandler(
ctx,
partyID,
messageRouter,
participateKeygenUC,
)
@ -304,11 +305,12 @@ func startHTTPServer(cfg *config.Config) error {
// createCoManagedSessionEventHandler creates a handler specifically for co_managed_keygen sessions
// Two-phase event handling:
// Phase 1 (session_created): Store join token and wait
// Phase 1 (session_created): JoinSession immediately + store session info
// Phase 2 (session_started): Execute TSS protocol (same timing as user clients receiving all_joined)
func createCoManagedSessionEventHandler(
ctx context.Context,
partyID string,
messageRouter *grpcclient.MessageRouterClient,
participateKeygenUC *use_cases.ParticipateKeygenUseCase,
) func(*router.SessionEvent) {
return func(event *router.SessionEvent) {
@ -350,7 +352,7 @@ func createCoManagedSessionEventHandler(
return
}
// Phase 1: Store session info and wait for session_started
// Phase 1: Get join token
joinToken, exists := event.JoinTokens[partyID]
if !exists {
logger.Error("No join token found for party in session_created",
@ -359,6 +361,22 @@ func createCoManagedSessionEventHandler(
return
}
// Immediately call JoinSession (this is required to trigger session_started)
joinCtx, joinCancel := context.WithTimeout(ctx, 30*time.Second)
_, err := messageRouter.JoinSession(joinCtx, sessionID, partyID, joinToken)
joinCancel()
if err != nil {
logger.Error("Failed to join session",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID),
zap.Error(err))
return
}
logger.Info("Successfully joined session, waiting for session_started",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
// Store pending session for later use when session_started arrives
pendingSessionCache.Store(event.SessionId, &PendingSession{
SessionID: sessionID,
@ -367,10 +385,6 @@ func createCoManagedSessionEventHandler(
CreatedAt: time.Now(),
})
logger.Info("Session created event received, waiting for session_started",
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
case "session_started":
// Phase 2: All participants have joined, now execute TSS protocol
pendingSession, exists := pendingSessionCache.Get(event.SessionId)
@ -386,6 +400,7 @@ func createCoManagedSessionEventHandler(
zap.String("party_id", partyID))
// Execute TSS keygen protocol in goroutine
// Timeout starts NOW (when session_started is received), not at session_created
go func() {
// 10 minute timeout for TSS protocol execution
participateCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)