feat(mpc-system): add connection retry logic with exponential backoff
- Add retry mechanism for PostgreSQL connections (10 retries, 2s base delay) - Add retry mechanism for RabbitMQ connections (10 retries, 2s base delay) - Add retry mechanism for Redis connections (10 retries, 2s base delay) - Use exponential backoff: delay increases with each retry attempt - Log detailed retry information (attempt number, max retries, errors) - Redis continues without cache if all retries fail (non-critical) - Database and RabbitMQ return error after all retries (critical) This resolves startup failures when dependent services are slow to initialize, particularly RabbitMQ which may pass health checks but not be fully ready.
This commit is contained in:
parent
62091e5ede
commit
b4d6b0f264
|
|
@ -127,31 +127,67 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
||||||
db, err := sql.Open("postgres", cfg.DSN())
|
const maxRetries = 10
|
||||||
|
const retryDelay = 2 * time.Second
|
||||||
|
|
||||||
|
var db *sql.DB
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
db, err = sql.Open("postgres", cfg.DSN())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to open database connection, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
||||||
db.SetMaxIdleConns(cfg.MaxIdleConns)
|
db.SetMaxIdleConns(cfg.MaxIdleConns)
|
||||||
db.SetConnMaxLifetime(cfg.ConnMaxLife)
|
db.SetConnMaxLifetime(cfg.ConnMaxLife)
|
||||||
|
|
||||||
if err := db.Ping(); err != nil {
|
if err = db.Ping(); err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to ping database, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
db.Close()
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Connected to PostgreSQL")
|
logger.Info("Connected to PostgreSQL")
|
||||||
return db, nil
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to connect to database after %d retries: %w", maxRetries, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) {
|
func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) {
|
||||||
conn, err := amqp.Dial(cfg.URL())
|
const maxRetries = 10
|
||||||
|
const retryDelay = 2 * time.Second
|
||||||
|
|
||||||
|
var conn *amqp.Connection
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
conn, err = amqp.Dial(cfg.URL())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to connect to RabbitMQ, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Connected to RabbitMQ")
|
logger.Info("Connected to RabbitMQ")
|
||||||
return conn, nil
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to connect to RabbitMQ after %d retries: %w", maxRetries, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startGRPCServer(
|
func startGRPCServer(
|
||||||
|
|
|
||||||
|
|
@ -168,9 +168,21 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
||||||
db, err := sql.Open("postgres", cfg.DSN())
|
const maxRetries = 10
|
||||||
|
const retryDelay = 2 * time.Second
|
||||||
|
|
||||||
|
var db *sql.DB
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
db, err = sql.Open("postgres", cfg.DSN())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to open database connection, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
||||||
|
|
@ -178,40 +190,75 @@ func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
||||||
db.SetConnMaxLifetime(cfg.ConnMaxLife)
|
db.SetConnMaxLifetime(cfg.ConnMaxLife)
|
||||||
|
|
||||||
// Test connection
|
// Test connection
|
||||||
if err := db.Ping(); err != nil {
|
if err = db.Ping(); err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to ping database, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
db.Close()
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Connected to PostgreSQL")
|
logger.Info("Connected to PostgreSQL")
|
||||||
return db, nil
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to connect to database after %d retries: %w", maxRetries, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initRedis(cfg config.RedisConfig) *redis.Client {
|
func initRedis(cfg config.RedisConfig) *redis.Client {
|
||||||
|
const maxRetries = 10
|
||||||
|
const retryDelay = 2 * time.Second
|
||||||
|
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: cfg.Addr(),
|
Addr: cfg.Addr(),
|
||||||
Password: cfg.Password,
|
Password: cfg.Password,
|
||||||
DB: cfg.DB,
|
DB: cfg.DB,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Test connection
|
// Test connection with retry
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
if err := client.Ping(ctx).Err(); err != nil {
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
logger.Warn("Redis connection failed, continuing without cache", zap.Error(err))
|
logger.Warn("Redis connection failed, retrying...",
|
||||||
} else {
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
|
}
|
||||||
logger.Info("Connected to Redis")
|
logger.Info("Connected to Redis")
|
||||||
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Warn("Redis connection failed after retries, continuing without cache")
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) {
|
func initRabbitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) {
|
||||||
conn, err := amqp.Dial(cfg.URL())
|
const maxRetries = 10
|
||||||
|
const retryDelay = 2 * time.Second
|
||||||
|
|
||||||
|
var conn *amqp.Connection
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
conn, err = amqp.Dial(cfg.URL())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
logger.Warn("Failed to connect to RabbitMQ, retrying...",
|
||||||
|
zap.Int("attempt", i+1),
|
||||||
|
zap.Int("max_retries", maxRetries),
|
||||||
|
zap.Error(err))
|
||||||
|
time.Sleep(retryDelay * time.Duration(i+1))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Connected to RabbitMQ")
|
logger.Info("Connected to RabbitMQ")
|
||||||
return conn, nil
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed to connect to RabbitMQ after %d retries: %w", maxRetries, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startGRPCServer(
|
func startGRPCServer(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue