debug(coordinator): add detailed logging to track concurrent update issue
Add comprehensive debug logs to: 1. report_completion.go - log all participant statuses at key points 2. session_postgres_repo.go - log before/after each participant update This will help identify why server-party-1 status remains 'invited' despite successfully reporting completion. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
380bf46fb6
commit
fb9c85f883
|
|
@ -19,7 +19,12 @@
|
||||||
"Bash(git commit:*)",
|
"Bash(git commit:*)",
|
||||||
"Bash(git push:*)",
|
"Bash(git push:*)",
|
||||||
"Bash(git pull:*)",
|
"Bash(git pull:*)",
|
||||||
"Bash(del \"api\\proto\\session_coordinator.pb.go\" \"api\\proto\\session_coordinator_grpc.pb.go\" \"api\\proto\\message_router.pb.go\" \"api\\proto\\message_router_grpc.pb.go\")"
|
"Bash(del \"api\\proto\\session_coordinator.pb.go\" \"api\\proto\\session_coordinator_grpc.pb.go\" \"api\\proto\\message_router.pb.go\" \"api\\proto\\message_router_grpc.pb.go\")",
|
||||||
|
"Bash(grep:*)",
|
||||||
|
"Bash(cat:*)",
|
||||||
|
"Bash(dir:*)",
|
||||||
|
"Bash(copy /Y \"api\\proto\\session_coordinator.pb.go\" \"api\\grpc\\coordinator\\v1\"\" && copy /Y \"apiprotosession_coordinator_grpc.pb.go\" \"apigrpccoordinatorv1\"\")",
|
||||||
|
"Bash(timeout /t 10 /nobreak)"
|
||||||
],
|
],
|
||||||
"deny": [],
|
"deny": [],
|
||||||
"ask": []
|
"ask": []
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,11 @@ import (
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
"github.com/rwadurian/mpc-system/pkg/logger"
|
||||||
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/entities"
|
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/entities"
|
||||||
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/repositories"
|
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/repositories"
|
||||||
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/value_objects"
|
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/value_objects"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SessionPostgresRepo implements SessionRepository for PostgreSQL
|
// SessionPostgresRepo implements SessionRepository for PostgreSQL
|
||||||
|
|
@ -283,8 +285,23 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DEBUG: Log all participant statuses before update
|
||||||
|
var participantStatuses []string
|
||||||
|
for _, p := range session.Participants {
|
||||||
|
participantStatuses = append(participantStatuses, p.PartyID.String()+":"+p.Status.String())
|
||||||
|
}
|
||||||
|
logger.Debug("repo.Update: updating participants",
|
||||||
|
zap.String("session_id", session.ID.UUID().String()),
|
||||||
|
zap.Strings("participant_statuses", participantStatuses))
|
||||||
|
|
||||||
// Update each participant individually
|
// Update each participant individually
|
||||||
for _, p := range session.Participants {
|
for _, p := range session.Participants {
|
||||||
|
// DEBUG: Log before updating each participant
|
||||||
|
logger.Debug("repo.Update: updating individual participant",
|
||||||
|
zap.String("session_id", session.ID.UUID().String()),
|
||||||
|
zap.String("party_id", p.PartyID.String()),
|
||||||
|
zap.String("new_status", p.Status.String()))
|
||||||
|
|
||||||
// Try UPDATE first
|
// Try UPDATE first
|
||||||
result, err := tx.ExecContext(ctx, `
|
result, err := tx.ExecContext(ctx, `
|
||||||
UPDATE participants SET
|
UPDATE participants SET
|
||||||
|
|
@ -317,8 +334,18 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DEBUG: Log rows affected
|
||||||
|
logger.Debug("repo.Update: participant update result",
|
||||||
|
zap.String("session_id", session.ID.UUID().String()),
|
||||||
|
zap.String("party_id", p.PartyID.String()),
|
||||||
|
zap.Int64("rows_affected", rowsAffected))
|
||||||
|
|
||||||
if rowsAffected == 0 {
|
if rowsAffected == 0 {
|
||||||
// Participant doesn't exist, INSERT it
|
// Participant doesn't exist, INSERT it
|
||||||
|
logger.Debug("repo.Update: inserting new participant",
|
||||||
|
zap.String("session_id", session.ID.UUID().String()),
|
||||||
|
zap.String("party_id", p.PartyID.String()))
|
||||||
|
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = tx.ExecContext(ctx, `
|
||||||
INSERT INTO participants (
|
INSERT INTO participants (
|
||||||
id, session_id, party_id, party_index, status,
|
id, session_id, party_id, party_index, status,
|
||||||
|
|
@ -344,6 +371,9 @@ func (r *SessionPostgresRepo) Update(ctx context.Context, session *entities.MPCS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debug("repo.Update: committing transaction",
|
||||||
|
zap.String("session_id", session.ID.UUID().String()))
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,12 +42,26 @@ func (uc *ReportCompletionUseCase) Execute(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
inputData input.ReportCompletionInput,
|
inputData input.ReportCompletionInput,
|
||||||
) (*input.ReportCompletionOutput, error) {
|
) (*input.ReportCompletionOutput, error) {
|
||||||
|
logger.Debug("ReportCompletion.Execute: START",
|
||||||
|
zap.String("session_id", inputData.SessionID.String()),
|
||||||
|
zap.String("party_id", inputData.PartyID))
|
||||||
|
|
||||||
// 1. Load session
|
// 1. Load session
|
||||||
session, err := uc.sessionRepo.FindByUUID(ctx, inputData.SessionID)
|
session, err := uc.sessionRepo.FindByUUID(ctx, inputData.SessionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DEBUG: Log all participant statuses after loading session
|
||||||
|
var allParticipantStatuses []string
|
||||||
|
for _, p := range session.Participants {
|
||||||
|
allParticipantStatuses = append(allParticipantStatuses, p.PartyID.String()+":"+p.Status.String())
|
||||||
|
}
|
||||||
|
logger.Debug("ReportCompletion.Execute: loaded session",
|
||||||
|
zap.String("session_id", session.ID.String()),
|
||||||
|
zap.String("party_id", inputData.PartyID),
|
||||||
|
zap.Strings("all_participant_statuses", allParticipantStatuses))
|
||||||
|
|
||||||
// 2. Create party ID value object
|
// 2. Create party ID value object
|
||||||
partyID, err := value_objects.NewPartyID(inputData.PartyID)
|
partyID, err := value_objects.NewPartyID(inputData.PartyID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -60,6 +74,11 @@ func (uc *ReportCompletionUseCase) Execute(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debug("ReportCompletion.Execute: current participant status",
|
||||||
|
zap.String("session_id", session.ID.String()),
|
||||||
|
zap.String("party_id", inputData.PartyID),
|
||||||
|
zap.String("current_status", participant.Status.String()))
|
||||||
|
|
||||||
// 3.1 Ensure participant is in Ready state before marking as Completed
|
// 3.1 Ensure participant is in Ready state before marking as Completed
|
||||||
// The status flow is: Invited -> Joined -> Ready -> Completed
|
// The status flow is: Invited -> Joined -> Ready -> Completed
|
||||||
// Handle all possible states to reach Ready
|
// Handle all possible states to reach Ready
|
||||||
|
|
@ -139,10 +158,24 @@ func (uc *ReportCompletionUseCase) Execute(
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Save updated session
|
// 6. Save updated session
|
||||||
|
// DEBUG: Log all participant statuses before calling Update
|
||||||
|
var beforeUpdateStatuses []string
|
||||||
|
for _, p := range session.Participants {
|
||||||
|
beforeUpdateStatuses = append(beforeUpdateStatuses, p.PartyID.String()+":"+p.Status.String())
|
||||||
|
}
|
||||||
|
logger.Debug("ReportCompletion.Execute: BEFORE sessionRepo.Update",
|
||||||
|
zap.String("session_id", session.ID.String()),
|
||||||
|
zap.String("party_id", inputData.PartyID),
|
||||||
|
zap.Strings("all_participant_statuses", beforeUpdateStatuses))
|
||||||
|
|
||||||
if err := uc.sessionRepo.Update(ctx, session); err != nil {
|
if err := uc.sessionRepo.Update(ctx, session); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debug("ReportCompletion.Execute: AFTER sessionRepo.Update",
|
||||||
|
zap.String("session_id", session.ID.String()),
|
||||||
|
zap.String("party_id", inputData.PartyID))
|
||||||
|
|
||||||
// 7. Publish participant completed event
|
// 7. Publish participant completed event
|
||||||
event := output.ParticipantCompletedEvent{
|
event := output.ParticipantCompletedEvent{
|
||||||
SessionID: session.ID.String(),
|
SessionID: session.ID.String(),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,111 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang-jwt/jwt/v5"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Claims struct {
|
||||||
|
SessionID string `json:"session_id"`
|
||||||
|
PartyID string `json:"party_id"`
|
||||||
|
TokenType string `json:"token_type"`
|
||||||
|
jwt.RegisteredClaims
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateAccessToken(secretKey, userID, username string) (string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
claims := Claims{
|
||||||
|
PartyID: username,
|
||||||
|
TokenType: "access",
|
||||||
|
RegisteredClaims: jwt.RegisteredClaims{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
Issuer: "mpc-system",
|
||||||
|
Subject: userID,
|
||||||
|
IssuedAt: jwt.NewNumericDate(now),
|
||||||
|
NotBefore: jwt.NewNumericDate(now),
|
||||||
|
ExpiresAt: jwt.NewNumericDate(now.Add(24 * time.Hour)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
||||||
|
return token.SignedString([]byte(secretKey))
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Get JWT secret from environment or use test value
|
||||||
|
jwtSecret := os.Getenv("JWT_SECRET_KEY")
|
||||||
|
if jwtSecret == "" {
|
||||||
|
jwtSecret = "test-jwt-secret-key-please-change-in-production"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate access token
|
||||||
|
token, err := generateAccessToken(jwtSecret, "admin", "admin")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to generate token: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Generated JWT token: %s\n\n", token)
|
||||||
|
|
||||||
|
// Create keygen session via account-service
|
||||||
|
sessionData := map[string]interface{}{
|
||||||
|
"threshold_n": 3,
|
||||||
|
"threshold_t": 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(sessionData)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to marshal JSON: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get API key from environment or use test value
|
||||||
|
apiKey := os.Getenv("MPC_API_KEY")
|
||||||
|
if apiKey == "" {
|
||||||
|
apiKey = "test-api-key"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call account-service API
|
||||||
|
req, err := http.NewRequest("POST", "http://localhost:4000/api/v1/mpc/keygen", bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to create request: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("X-API-Key", apiKey)
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 10 * time.Second}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to send request: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to read response: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Response status: %s\n", resp.Status)
|
||||||
|
fmt.Printf("Response body: %s\n", string(body))
|
||||||
|
|
||||||
|
if resp.StatusCode == 200 || resp.StatusCode == 201 {
|
||||||
|
var result map[string]interface{}
|
||||||
|
if err := json.Unmarshal(body, &result); err == nil {
|
||||||
|
if sessionID, ok := result["session_id"].(string); ok {
|
||||||
|
fmt.Printf("\n✓ Session created successfully: %s\n", sessionID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue