From b4d6b0f2641146c00b39876f736b6481fa526844 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 4 Dec 2025 23:12:15 -0800 Subject: [PATCH] feat(mpc-system): add connection retry logic with exponential backoff - Add retry mechanism for PostgreSQL connections (10 retries, 2s base delay) - Add retry mechanism for RabbitMQ connections (10 retries, 2s base delay) - Add retry mechanism for Redis connections (10 retries, 2s base delay) - Use exponential backoff: delay increases with each retry attempt - Log detailed retry information (attempt number, max retries, errors) - Redis continues without cache if all retries fail (non-critical) - Database and RabbitMQ return error after all retries (critical) This resolves startup failures when dependent services are slow to initialize, particularly RabbitMQ which may pass health checks but not be fully ready. --- .../message-router/cmd/server/main.go | 72 ++++++++++---- .../session-coordinator/cmd/server/main.go | 93 ++++++++++++++----- 2 files changed, 124 insertions(+), 41 deletions(-) diff --git a/backend/mpc-system/services/message-router/cmd/server/main.go b/backend/mpc-system/services/message-router/cmd/server/main.go index 1ffd2059..25ae90e6 100644 --- a/backend/mpc-system/services/message-router/cmd/server/main.go +++ b/backend/mpc-system/services/message-router/cmd/server/main.go @@ -127,31 +127,67 @@ func main() { } func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) { - db, err := sql.Open("postgres", cfg.DSN()) - if err != nil { - return nil, err + const maxRetries = 10 + const retryDelay = 2 * time.Second + + var db *sql.DB + var err error + + for i := 0; i < maxRetries; i++ { + db, err = sql.Open("postgres", cfg.DSN()) + if err != nil { + logger.Warn("Failed to open database connection, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + db.SetConnMaxLifetime(cfg.ConnMaxLife) + + if err = db.Ping(); err != nil { + logger.Warn("Failed to ping database, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + db.Close() + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + logger.Info("Connected to PostgreSQL") + return db, nil } - db.SetMaxOpenConns(cfg.MaxOpenConns) - db.SetMaxIdleConns(cfg.MaxIdleConns) - db.SetConnMaxLifetime(cfg.ConnMaxLife) - - if err := db.Ping(); err != nil { - return nil, err - } - - logger.Info("Connected to PostgreSQL") - return db, nil + return nil, fmt.Errorf("failed to connect to database after %d retries: %w", maxRetries, err) } func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) { - conn, err := amqp.Dial(cfg.URL()) - if err != nil { - return nil, err + const maxRetries = 10 + const retryDelay = 2 * time.Second + + var conn *amqp.Connection + var err error + + for i := 0; i < maxRetries; i++ { + conn, err = amqp.Dial(cfg.URL()) + if err != nil { + logger.Warn("Failed to connect to RabbitMQ, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + logger.Info("Connected to RabbitMQ") + return conn, nil } - logger.Info("Connected to RabbitMQ") - return conn, nil + return nil, fmt.Errorf("failed to connect to RabbitMQ after %d retries: %w", maxRetries, err) } func startGRPCServer( diff --git a/backend/mpc-system/services/session-coordinator/cmd/server/main.go b/backend/mpc-system/services/session-coordinator/cmd/server/main.go index a5c5e8cd..58444850 100644 --- a/backend/mpc-system/services/session-coordinator/cmd/server/main.go +++ b/backend/mpc-system/services/session-coordinator/cmd/server/main.go @@ -168,50 +168,97 @@ func main() { } func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) { - db, err := sql.Open("postgres", cfg.DSN()) - if err != nil { - return nil, err + const maxRetries = 10 + const retryDelay = 2 * time.Second + + var db *sql.DB + var err error + + for i := 0; i < maxRetries; i++ { + db, err = sql.Open("postgres", cfg.DSN()) + if err != nil { + logger.Warn("Failed to open database connection, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + db.SetConnMaxLifetime(cfg.ConnMaxLife) + + // Test connection + if err = db.Ping(); err != nil { + logger.Warn("Failed to ping database, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + db.Close() + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + logger.Info("Connected to PostgreSQL") + return db, nil } - db.SetMaxOpenConns(cfg.MaxOpenConns) - db.SetMaxIdleConns(cfg.MaxIdleConns) - db.SetConnMaxLifetime(cfg.ConnMaxLife) - - // Test connection - if err := db.Ping(); err != nil { - return nil, err - } - - logger.Info("Connected to PostgreSQL") - return db, nil + return nil, fmt.Errorf("failed to connect to database after %d retries: %w", maxRetries, err) } func initRedis(cfg config.RedisConfig) *redis.Client { + const maxRetries = 10 + const retryDelay = 2 * time.Second + client := redis.NewClient(&redis.Options{ Addr: cfg.Addr(), Password: cfg.Password, DB: cfg.DB, }) - // Test connection + // Test connection with retry ctx := context.Background() - if err := client.Ping(ctx).Err(); err != nil { - logger.Warn("Redis connection failed, continuing without cache", zap.Error(err)) - } else { + for i := 0; i < maxRetries; i++ { + if err := client.Ping(ctx).Err(); err != nil { + logger.Warn("Redis connection failed, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } logger.Info("Connected to Redis") + return client } + logger.Warn("Redis connection failed after retries, continuing without cache") return client } func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) { - conn, err := amqp.Dial(cfg.URL()) - if err != nil { - return nil, err + const maxRetries = 10 + const retryDelay = 2 * time.Second + + var conn *amqp.Connection + var err error + + for i := 0; i < maxRetries; i++ { + conn, err = amqp.Dial(cfg.URL()) + if err != nil { + logger.Warn("Failed to connect to RabbitMQ, retrying...", + zap.Int("attempt", i+1), + zap.Int("max_retries", maxRetries), + zap.Error(err)) + time.Sleep(retryDelay * time.Duration(i+1)) + continue + } + + logger.Info("Connected to RabbitMQ") + return conn, nil } - logger.Info("Connected to RabbitMQ") - return conn, nil + return nil, fmt.Errorf("failed to connect to RabbitMQ after %d retries: %w", maxRetries, err) } func startGRPCServer(