fix(coordinator): add row-level locking to prevent concurrent update conflicts

Problem:
Multiple parties reporting completion simultaneously caused lost updates
because each transaction would read the full session, modify their
participant status, then update ALL participants - causing last-write-wins
behavior.

Solution:
Add SELECT ... FOR UPDATE locks on both mpc_sessions and participants
tables at the start of the Update transaction. This serializes concurrent
updates and prevents lost updates.

Lock order:
1. Lock session row (FOR UPDATE)
2. Lock all participant rows for this session (FOR UPDATE)
3. Perform updates
4. Commit (releases locks)

This ensures that concurrent ReportCompletion calls are fully serialized
and each participant status update is preserved.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-06 01:58:05 -08:00
parent aab88834f9
commit 380bf46fb6
1 changed files with 19 additions and 2 deletions

View File

@ -249,6 +249,15 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
}
defer tx.Rollback()
// Lock the session row first to prevent concurrent modifications
// This ensures serializable isolation for the entire session update
_, err = tx.ExecContext(ctx, `
SELECT id FROM mpc_sessions WHERE id = $1 FOR UPDATE
`, session.ID.UUID())
if err != nil {
return err
}
// Update session
_, err = tx.ExecContext(ctx, `
UPDATE mpc_sessions SET
@ -265,8 +274,16 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
return err
}
// Update each participant individually using UPDATE to avoid lost updates
// Using individual UPDATE statements ensures concurrent updates to different participants don't conflict
// Lock all participant rows for this session to prevent concurrent modifications
// This prevents lost updates when multiple parties report completion simultaneously
_, err = tx.ExecContext(ctx, `
SELECT id FROM participants WHERE session_id = $1 FOR UPDATE
`, session.ID.UUID())
if err != nil {
return err
}
// Update each participant individually
for _, p := range session.Participants {
// Try UPDATE first
result, err := tx.ExecContext(ctx, `