fix(coordinator): prevent lost updates in concurrent participant status changes

Fix critical concurrency bug where simultaneous ReportCompletion calls from
multiple parties could cause lost database updates. Changed from UPSERT-all
to UPDATE-individual pattern to ensure each participant status update is
atomic and won't be overwritten by concurrent transactions.

Before: All participants were UPSERTed in single transaction, causing
last-commit-wins behavior that lost earlier status updates.

After: Each participant is UPDATEd individually using UPDATE...WHERE, then
INSERT only if row doesn't exist. This prevents concurrent updates to
different participants from conflicting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-06 01:35:38 -08:00
parent 00b48bab50
commit aab88834f9
1 changed files with 49 additions and 17 deletions

View File

@ -265,34 +265,66 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
return err
}
// Upsert participants (insert or update)
// Update each participant individually using UPDATE to avoid lost updates
// Using individual UPDATE statements ensures concurrent updates to different participants don't conflict
for _, p := range session.Participants {
_, err = tx.ExecContext(ctx, `
INSERT INTO participants (
id, session_id, party_id, party_index, status,
device_type, device_id, platform, app_version, public_key, joined_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
ON CONFLICT (session_id, party_id) DO UPDATE SET
status = EXCLUDED.status,
public_key = EXCLUDED.public_key,
completed_at = EXCLUDED.completed_at
// Try UPDATE first
result, err := tx.ExecContext(ctx, `
UPDATE participants SET
status = $1,
public_key = $2,
completed_at = $3,
device_type = $4,
device_id = $5,
platform = $6,
app_version = $7
WHERE session_id = $8 AND party_id = $9
`,
uuid.New(),
session.ID.UUID(),
p.PartyID.String(),
p.PartyIndex,
p.Status.String(),
p.PublicKey,
p.CompletedAt,
p.DeviceInfo.DeviceType,
p.DeviceInfo.DeviceID,
p.DeviceInfo.Platform,
p.DeviceInfo.AppVersion,
p.PublicKey,
p.JoinedAt,
p.CompletedAt,
session.ID.UUID(),
p.PartyID.String(),
)
if err != nil {
return err
}
// If no rows affected, participant doesn't exist yet, INSERT it
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
// Participant doesn't exist, INSERT it
_, err = tx.ExecContext(ctx, `
INSERT INTO participants (
id, session_id, party_id, party_index, status,
device_type, device_id, platform, app_version, public_key, joined_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
`,
uuid.New(),
session.ID.UUID(),
p.PartyID.String(),
p.PartyIndex,
p.Status.String(),
p.DeviceInfo.DeviceType,
p.DeviceInfo.DeviceID,
p.DeviceInfo.Platform,
p.DeviceInfo.AppVersion,
p.PublicKey,
p.JoinedAt,
p.CompletedAt,
)
if err != nil {
return err
}
}
}
return tx.Commit()