rwadurian/backend/mpc-system/pkg/grpcutil/client.go

250 lines
5.6 KiB
Go

package grpcutil
import (
"context"
"sync"
"time"
"github.com/rwadurian/mpc-system/pkg/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
// ClientConfig holds configuration for gRPC client connections
type ClientConfig struct {
// Connection settings
Address string
ConnectTimeout time.Duration
BlockingConnect bool
// Keepalive settings
KeepaliveTime time.Duration // How often to send pings
KeepaliveTimeout time.Duration // How long to wait for ping ack
PermitWithoutStream bool // Allow pings even without active streams
// Reconnection settings
EnableReconnect bool
ReconnectBackoff time.Duration
MaxReconnectBackoff time.Duration
}
// DefaultClientConfig returns a sensible default configuration
func DefaultClientConfig(address string) ClientConfig {
return ClientConfig{
Address: address,
ConnectTimeout: 10 * time.Second,
BlockingConnect: true,
KeepaliveTime: 30 * time.Second,
KeepaliveTimeout: 10 * time.Second,
PermitWithoutStream: true,
EnableReconnect: true,
ReconnectBackoff: 1 * time.Second,
MaxReconnectBackoff: 30 * time.Second,
}
}
// ResilientConn wraps a gRPC connection with automatic reconnection
type ResilientConn struct {
config ClientConfig
conn *grpc.ClientConn
mu sync.RWMutex
closed bool
closeChan chan struct{}
}
// NewResilientConn creates a new resilient gRPC connection
func NewResilientConn(config ClientConfig) (*ResilientConn, error) {
rc := &ResilientConn{
config: config,
closeChan: make(chan struct{}),
}
// Initial connection
conn, err := rc.dial()
if err != nil {
return nil, err
}
rc.conn = conn
// Start connection monitor if reconnection is enabled
if config.EnableReconnect {
go rc.monitorConnection()
}
return rc, nil
}
// dial creates a new gRPC connection with keepalive settings
func (rc *ResilientConn) dial() (*grpc.ClientConn, error) {
// Build dial options
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: rc.config.KeepaliveTime,
Timeout: rc.config.KeepaliveTimeout,
PermitWithoutStream: rc.config.PermitWithoutStream,
}),
}
if rc.config.BlockingConnect {
opts = append(opts, grpc.WithBlock())
}
// Create context with timeout for connection
ctx, cancel := context.WithTimeout(context.Background(), rc.config.ConnectTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, rc.config.Address, opts...)
if err != nil {
return nil, err
}
logger.Info("gRPC connection established",
zap.String("address", rc.config.Address),
zap.Duration("keepalive_time", rc.config.KeepaliveTime))
return conn, nil
}
// monitorConnection monitors the connection state and reconnects if needed
func (rc *ResilientConn) monitorConnection() {
backoff := rc.config.ReconnectBackoff
for {
select {
case <-rc.closeChan:
return
default:
}
rc.mu.RLock()
conn := rc.conn
closed := rc.closed
rc.mu.RUnlock()
if closed {
return
}
// Wait for state change
state := conn.GetState()
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
logger.Warn("gRPC connection lost, attempting reconnection",
zap.String("address", rc.config.Address),
zap.String("state", state.String()))
// Attempt reconnection with backoff
for {
select {
case <-rc.closeChan:
return
case <-time.After(backoff):
}
newConn, err := rc.dial()
if err != nil {
logger.Error("Reconnection failed, retrying",
zap.String("address", rc.config.Address),
zap.Duration("backoff", backoff),
zap.Error(err))
// Increase backoff
backoff = backoff * 2
if backoff > rc.config.MaxReconnectBackoff {
backoff = rc.config.MaxReconnectBackoff
}
continue
}
// Successfully reconnected
rc.mu.Lock()
oldConn := rc.conn
rc.conn = newConn
rc.mu.Unlock()
// Close old connection
if oldConn != nil {
oldConn.Close()
}
logger.Info("gRPC connection restored",
zap.String("address", rc.config.Address))
// Reset backoff
backoff = rc.config.ReconnectBackoff
break
}
}
// Wait for next state change or check periodically
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
conn.WaitForStateChange(ctx, state)
cancel()
}
}
// GetConn returns the current connection
func (rc *ResilientConn) GetConn() *grpc.ClientConn {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.conn
}
// IsConnected returns true if the connection is ready
func (rc *ResilientConn) IsConnected() bool {
rc.mu.RLock()
defer rc.mu.RUnlock()
if rc.conn == nil {
return false
}
state := rc.conn.GetState()
return state == connectivity.Ready || state == connectivity.Idle
}
// WaitForReady waits for the connection to be ready
func (rc *ResilientConn) WaitForReady(ctx context.Context) bool {
for {
rc.mu.RLock()
conn := rc.conn
rc.mu.RUnlock()
if conn == nil {
return false
}
state := conn.GetState()
if state == connectivity.Ready {
return true
}
if state == connectivity.Shutdown {
return false
}
// Wait for state change
if !conn.WaitForStateChange(ctx, state) {
return false // Context cancelled
}
}
}
// Close closes the connection and stops the monitor
func (rc *ResilientConn) Close() error {
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.closed {
return nil
}
rc.closed = true
close(rc.closeChan)
if rc.conn != nil {
return rc.conn.Close()
}
return nil
}