250 lines
5.6 KiB
Go
250 lines
5.6 KiB
Go
package grpcutil
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rwadurian/mpc-system/pkg/logger"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
// ClientConfig holds configuration for gRPC client connections
|
|
type ClientConfig struct {
|
|
// Connection settings
|
|
Address string
|
|
ConnectTimeout time.Duration
|
|
BlockingConnect bool
|
|
|
|
// Keepalive settings
|
|
KeepaliveTime time.Duration // How often to send pings
|
|
KeepaliveTimeout time.Duration // How long to wait for ping ack
|
|
PermitWithoutStream bool // Allow pings even without active streams
|
|
|
|
// Reconnection settings
|
|
EnableReconnect bool
|
|
ReconnectBackoff time.Duration
|
|
MaxReconnectBackoff time.Duration
|
|
}
|
|
|
|
// DefaultClientConfig returns a sensible default configuration
|
|
func DefaultClientConfig(address string) ClientConfig {
|
|
return ClientConfig{
|
|
Address: address,
|
|
ConnectTimeout: 10 * time.Second,
|
|
BlockingConnect: true,
|
|
KeepaliveTime: 30 * time.Second,
|
|
KeepaliveTimeout: 10 * time.Second,
|
|
PermitWithoutStream: true,
|
|
EnableReconnect: true,
|
|
ReconnectBackoff: 1 * time.Second,
|
|
MaxReconnectBackoff: 30 * time.Second,
|
|
}
|
|
}
|
|
|
|
// ResilientConn wraps a gRPC connection with automatic reconnection
|
|
type ResilientConn struct {
|
|
config ClientConfig
|
|
conn *grpc.ClientConn
|
|
mu sync.RWMutex
|
|
closed bool
|
|
closeChan chan struct{}
|
|
}
|
|
|
|
// NewResilientConn creates a new resilient gRPC connection
|
|
func NewResilientConn(config ClientConfig) (*ResilientConn, error) {
|
|
rc := &ResilientConn{
|
|
config: config,
|
|
closeChan: make(chan struct{}),
|
|
}
|
|
|
|
// Initial connection
|
|
conn, err := rc.dial()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rc.conn = conn
|
|
|
|
// Start connection monitor if reconnection is enabled
|
|
if config.EnableReconnect {
|
|
go rc.monitorConnection()
|
|
}
|
|
|
|
return rc, nil
|
|
}
|
|
|
|
// dial creates a new gRPC connection with keepalive settings
|
|
func (rc *ResilientConn) dial() (*grpc.ClientConn, error) {
|
|
// Build dial options
|
|
opts := []grpc.DialOption{
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: rc.config.KeepaliveTime,
|
|
Timeout: rc.config.KeepaliveTimeout,
|
|
PermitWithoutStream: rc.config.PermitWithoutStream,
|
|
}),
|
|
}
|
|
|
|
if rc.config.BlockingConnect {
|
|
opts = append(opts, grpc.WithBlock())
|
|
}
|
|
|
|
// Create context with timeout for connection
|
|
ctx, cancel := context.WithTimeout(context.Background(), rc.config.ConnectTimeout)
|
|
defer cancel()
|
|
|
|
conn, err := grpc.DialContext(ctx, rc.config.Address, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logger.Info("gRPC connection established",
|
|
zap.String("address", rc.config.Address),
|
|
zap.Duration("keepalive_time", rc.config.KeepaliveTime))
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// monitorConnection monitors the connection state and reconnects if needed
|
|
func (rc *ResilientConn) monitorConnection() {
|
|
backoff := rc.config.ReconnectBackoff
|
|
|
|
for {
|
|
select {
|
|
case <-rc.closeChan:
|
|
return
|
|
default:
|
|
}
|
|
|
|
rc.mu.RLock()
|
|
conn := rc.conn
|
|
closed := rc.closed
|
|
rc.mu.RUnlock()
|
|
|
|
if closed {
|
|
return
|
|
}
|
|
|
|
// Wait for state change
|
|
state := conn.GetState()
|
|
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
|
|
logger.Warn("gRPC connection lost, attempting reconnection",
|
|
zap.String("address", rc.config.Address),
|
|
zap.String("state", state.String()))
|
|
|
|
// Attempt reconnection with backoff
|
|
for {
|
|
select {
|
|
case <-rc.closeChan:
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
|
|
newConn, err := rc.dial()
|
|
if err != nil {
|
|
logger.Error("Reconnection failed, retrying",
|
|
zap.String("address", rc.config.Address),
|
|
zap.Duration("backoff", backoff),
|
|
zap.Error(err))
|
|
|
|
// Increase backoff
|
|
backoff = backoff * 2
|
|
if backoff > rc.config.MaxReconnectBackoff {
|
|
backoff = rc.config.MaxReconnectBackoff
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Successfully reconnected
|
|
rc.mu.Lock()
|
|
oldConn := rc.conn
|
|
rc.conn = newConn
|
|
rc.mu.Unlock()
|
|
|
|
// Close old connection
|
|
if oldConn != nil {
|
|
oldConn.Close()
|
|
}
|
|
|
|
logger.Info("gRPC connection restored",
|
|
zap.String("address", rc.config.Address))
|
|
|
|
// Reset backoff
|
|
backoff = rc.config.ReconnectBackoff
|
|
break
|
|
}
|
|
}
|
|
|
|
// Wait for next state change or check periodically
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
conn.WaitForStateChange(ctx, state)
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
// GetConn returns the current connection
|
|
func (rc *ResilientConn) GetConn() *grpc.ClientConn {
|
|
rc.mu.RLock()
|
|
defer rc.mu.RUnlock()
|
|
return rc.conn
|
|
}
|
|
|
|
// IsConnected returns true if the connection is ready
|
|
func (rc *ResilientConn) IsConnected() bool {
|
|
rc.mu.RLock()
|
|
defer rc.mu.RUnlock()
|
|
if rc.conn == nil {
|
|
return false
|
|
}
|
|
state := rc.conn.GetState()
|
|
return state == connectivity.Ready || state == connectivity.Idle
|
|
}
|
|
|
|
// WaitForReady waits for the connection to be ready
|
|
func (rc *ResilientConn) WaitForReady(ctx context.Context) bool {
|
|
for {
|
|
rc.mu.RLock()
|
|
conn := rc.conn
|
|
rc.mu.RUnlock()
|
|
|
|
if conn == nil {
|
|
return false
|
|
}
|
|
|
|
state := conn.GetState()
|
|
if state == connectivity.Ready {
|
|
return true
|
|
}
|
|
if state == connectivity.Shutdown {
|
|
return false
|
|
}
|
|
|
|
// Wait for state change
|
|
if !conn.WaitForStateChange(ctx, state) {
|
|
return false // Context cancelled
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the connection and stops the monitor
|
|
func (rc *ResilientConn) Close() error {
|
|
rc.mu.Lock()
|
|
defer rc.mu.Unlock()
|
|
|
|
if rc.closed {
|
|
return nil
|
|
}
|
|
|
|
rc.closed = true
|
|
close(rc.closeChan)
|
|
|
|
if rc.conn != nil {
|
|
return rc.conn.Close()
|
|
}
|
|
return nil
|
|
}
|