336 lines
10 KiB
Go
336 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
_ "github.com/lib/pq"
|
|
|
|
"github.com/rwadurian/mpc-system/pkg/config"
|
|
"github.com/rwadurian/mpc-system/pkg/jwt"
|
|
"github.com/rwadurian/mpc-system/pkg/logger"
|
|
"github.com/rwadurian/mpc-system/pkg/middleware"
|
|
httphandler "github.com/rwadurian/mpc-system/services/account/adapters/input/http"
|
|
grpcadapter "github.com/rwadurian/mpc-system/services/account/adapters/output/grpc"
|
|
jwtadapter "github.com/rwadurian/mpc-system/services/account/adapters/output/jwt"
|
|
"github.com/rwadurian/mpc-system/services/account/adapters/output/memory"
|
|
"github.com/rwadurian/mpc-system/services/account/adapters/output/postgres"
|
|
"github.com/rwadurian/mpc-system/services/account/application/use_cases"
|
|
"github.com/rwadurian/mpc-system/services/account/domain/repositories"
|
|
"github.com/rwadurian/mpc-system/services/account/domain/services"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func main() {
|
|
// Parse flags
|
|
configPath := flag.String("config", "", "Path to config file")
|
|
flag.Parse()
|
|
|
|
// Load configuration
|
|
cfg, err := config.Load(*configPath)
|
|
if err != nil {
|
|
fmt.Printf("Failed to load config: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Initialize logger
|
|
if err := logger.Init(&logger.Config{
|
|
Level: cfg.Logger.Level,
|
|
Encoding: cfg.Logger.Encoding,
|
|
}); err != nil {
|
|
fmt.Printf("Failed to initialize logger: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer logger.Sync()
|
|
|
|
logger.Info("Starting Account Service",
|
|
zap.String("environment", cfg.Server.Environment),
|
|
zap.Int("http_port", cfg.Server.HTTPPort))
|
|
|
|
// Initialize database connection
|
|
db, err := initDatabase(cfg.Database)
|
|
if err != nil {
|
|
logger.Fatal("Failed to connect to database", zap.Error(err))
|
|
}
|
|
defer db.Close()
|
|
|
|
// Initialize gRPC client for session coordinator
|
|
sessionCoordinatorAddr := os.Getenv("MPC_COORDINATOR_URL")
|
|
if sessionCoordinatorAddr == "" {
|
|
sessionCoordinatorAddr = "mpc-session-coordinator:50051"
|
|
}
|
|
sessionCoordinatorClient, err := grpcadapter.NewSessionCoordinatorClient(sessionCoordinatorAddr)
|
|
if err != nil {
|
|
logger.Fatal("Failed to connect to session coordinator", zap.Error(err))
|
|
}
|
|
defer sessionCoordinatorClient.Close()
|
|
|
|
// Initialize repositories
|
|
accountRepo := postgres.NewAccountPostgresRepo(db)
|
|
shareRepo := postgres.NewAccountSharePostgresRepo(db)
|
|
recoveryRepo := postgres.NewRecoverySessionPostgresRepo(db)
|
|
sessionEventRepo := postgres.NewSessionEventPostgresRepo(db)
|
|
|
|
// Initialize adapters (using in-memory implementations)
|
|
eventPublisher := memory.NewEventPublisherAdapter()
|
|
cacheAdapter := memory.NewCacheAdapter()
|
|
|
|
// Initialize JWT service
|
|
jwtService := jwt.NewJWTService(
|
|
cfg.JWT.SecretKey,
|
|
cfg.JWT.Issuer,
|
|
cfg.JWT.TokenExpiry,
|
|
cfg.JWT.RefreshExpiry,
|
|
)
|
|
tokenService := jwtadapter.NewTokenServiceAdapter(jwtService)
|
|
|
|
// Initialize domain service
|
|
domainService := services.NewAccountDomainService(accountRepo, shareRepo, recoveryRepo)
|
|
|
|
// Initialize use cases
|
|
createAccountUC := use_cases.NewCreateAccountUseCase(accountRepo, shareRepo, domainService, eventPublisher)
|
|
getAccountUC := use_cases.NewGetAccountUseCase(accountRepo, shareRepo)
|
|
updateAccountUC := use_cases.NewUpdateAccountUseCase(accountRepo, eventPublisher)
|
|
listAccountsUC := use_cases.NewListAccountsUseCase(accountRepo)
|
|
getAccountSharesUC := use_cases.NewGetAccountSharesUseCase(accountRepo, shareRepo)
|
|
deactivateShareUC := use_cases.NewDeactivateShareUseCase(accountRepo, shareRepo, eventPublisher)
|
|
loginUC := use_cases.NewLoginUseCase(accountRepo, shareRepo, tokenService, eventPublisher)
|
|
refreshTokenUC := use_cases.NewRefreshTokenUseCase(accountRepo, tokenService)
|
|
generateChallengeUC := use_cases.NewGenerateChallengeUseCase(cacheAdapter)
|
|
initiateRecoveryUC := use_cases.NewInitiateRecoveryUseCase(accountRepo, recoveryRepo, domainService, eventPublisher)
|
|
completeRecoveryUC := use_cases.NewCompleteRecoveryUseCase(accountRepo, shareRepo, recoveryRepo, domainService, eventPublisher)
|
|
getRecoveryStatusUC := use_cases.NewGetRecoveryStatusUseCase(recoveryRepo)
|
|
cancelRecoveryUC := use_cases.NewCancelRecoveryUseCase(accountRepo, recoveryRepo)
|
|
|
|
// Create shutdown context
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Start HTTP server
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if err := startHTTPServer(
|
|
cfg,
|
|
jwtService,
|
|
accountRepo,
|
|
sessionEventRepo,
|
|
createAccountUC,
|
|
getAccountUC,
|
|
updateAccountUC,
|
|
listAccountsUC,
|
|
getAccountSharesUC,
|
|
deactivateShareUC,
|
|
loginUC,
|
|
refreshTokenUC,
|
|
generateChallengeUC,
|
|
initiateRecoveryUC,
|
|
completeRecoveryUC,
|
|
getRecoveryStatusUC,
|
|
cancelRecoveryUC,
|
|
sessionCoordinatorClient,
|
|
db,
|
|
); err != nil {
|
|
errChan <- fmt.Errorf("HTTP server error: %w", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
select {
|
|
case sig := <-sigChan:
|
|
logger.Info("Received shutdown signal", zap.String("signal", sig.String()))
|
|
case err := <-errChan:
|
|
logger.Error("Server error", zap.Error(err))
|
|
}
|
|
|
|
// Graceful shutdown
|
|
logger.Info("Shutting down...")
|
|
cancel()
|
|
|
|
// Give services time to shutdown gracefully
|
|
time.Sleep(5 * time.Second)
|
|
logger.Info("Shutdown complete")
|
|
|
|
_ = ctx
|
|
}
|
|
|
|
func initDatabase(cfg config.DatabaseConfig) (*sql.DB, error) {
|
|
const maxRetries = 10
|
|
const retryDelay = 2 * time.Second
|
|
|
|
var db *sql.DB
|
|
var err error
|
|
|
|
for i := 0; i < maxRetries; i++ {
|
|
db, err = sql.Open("postgres", cfg.DSN())
|
|
if err != nil {
|
|
logger.Warn("Failed to open database connection, retrying...",
|
|
zap.Int("attempt", i+1),
|
|
zap.Int("max_retries", maxRetries),
|
|
zap.Error(err))
|
|
time.Sleep(retryDelay * time.Duration(i+1))
|
|
continue
|
|
}
|
|
|
|
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
|
db.SetMaxIdleConns(cfg.MaxIdleConns)
|
|
db.SetConnMaxLifetime(cfg.ConnMaxLife)
|
|
|
|
// Test connection with Ping
|
|
if err = db.Ping(); err != nil {
|
|
logger.Warn("Failed to ping database, retrying...",
|
|
zap.Int("attempt", i+1),
|
|
zap.Int("max_retries", maxRetries),
|
|
zap.Error(err))
|
|
db.Close()
|
|
time.Sleep(retryDelay * time.Duration(i+1))
|
|
continue
|
|
}
|
|
|
|
// Verify database is actually usable with a simple query
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
var result int
|
|
err = db.QueryRowContext(ctx, "SELECT 1").Scan(&result)
|
|
cancel()
|
|
if err != nil {
|
|
logger.Warn("Database ping succeeded but query failed, retrying...",
|
|
zap.Int("attempt", i+1),
|
|
zap.Int("max_retries", maxRetries),
|
|
zap.Error(err))
|
|
db.Close()
|
|
time.Sleep(retryDelay * time.Duration(i+1))
|
|
continue
|
|
}
|
|
|
|
logger.Info("Connected to PostgreSQL and verified connectivity",
|
|
zap.Int("attempt", i+1))
|
|
return db, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("failed to connect to database after %d retries: %w", maxRetries, err)
|
|
}
|
|
|
|
func startHTTPServer(
|
|
cfg *config.Config,
|
|
jwtService *jwt.JWTService,
|
|
accountRepo repositories.AccountRepository,
|
|
sessionEventRepo repositories.SessionEventRepository,
|
|
createAccountUC *use_cases.CreateAccountUseCase,
|
|
getAccountUC *use_cases.GetAccountUseCase,
|
|
updateAccountUC *use_cases.UpdateAccountUseCase,
|
|
listAccountsUC *use_cases.ListAccountsUseCase,
|
|
getAccountSharesUC *use_cases.GetAccountSharesUseCase,
|
|
deactivateShareUC *use_cases.DeactivateShareUseCase,
|
|
loginUC *use_cases.LoginUseCase,
|
|
refreshTokenUC *use_cases.RefreshTokenUseCase,
|
|
generateChallengeUC *use_cases.GenerateChallengeUseCase,
|
|
initiateRecoveryUC *use_cases.InitiateRecoveryUseCase,
|
|
completeRecoveryUC *use_cases.CompleteRecoveryUseCase,
|
|
getRecoveryStatusUC *use_cases.GetRecoveryStatusUseCase,
|
|
cancelRecoveryUC *use_cases.CancelRecoveryUseCase,
|
|
sessionCoordinatorClient *grpcadapter.SessionCoordinatorClient,
|
|
db *sql.DB,
|
|
) error {
|
|
// Set Gin mode
|
|
if cfg.Server.Environment == "production" {
|
|
gin.SetMode(gin.ReleaseMode)
|
|
}
|
|
|
|
router := gin.New()
|
|
router.Use(gin.Recovery())
|
|
router.Use(gin.Logger())
|
|
|
|
// Apply security headers middleware
|
|
router.Use(middleware.SecureHeaders())
|
|
|
|
// Apply CORS middleware
|
|
// Parse allowed origins from environment or use defaults
|
|
allowedOrigins := []string{}
|
|
if origins := os.Getenv("CORS_ALLOWED_ORIGINS"); origins != "" {
|
|
allowedOrigins = strings.Split(origins, ",")
|
|
}
|
|
if cfg.Server.Environment != "production" {
|
|
// Allow all origins in development
|
|
router.Use(middleware.AllowAllCORS())
|
|
} else if len(allowedOrigins) > 0 {
|
|
router.Use(middleware.CORS(middleware.CORSConfig{
|
|
AllowOrigins: allowedOrigins,
|
|
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
|
|
AllowHeaders: []string{"Origin", "Content-Type", "Accept", "Authorization", "X-Requested-With"},
|
|
ExposeHeaders: []string{"Content-Length", "X-Request-ID"},
|
|
AllowCredentials: true,
|
|
MaxAge: 86400,
|
|
}))
|
|
}
|
|
|
|
// Create HTTP handler with session coordinator client
|
|
httpHandler := httphandler.NewAccountHTTPHandler(
|
|
accountRepo,
|
|
sessionEventRepo,
|
|
createAccountUC,
|
|
getAccountUC,
|
|
updateAccountUC,
|
|
listAccountsUC,
|
|
getAccountSharesUC,
|
|
deactivateShareUC,
|
|
loginUC,
|
|
refreshTokenUC,
|
|
generateChallengeUC,
|
|
initiateRecoveryUC,
|
|
completeRecoveryUC,
|
|
getRecoveryStatusUC,
|
|
cancelRecoveryUC,
|
|
sessionCoordinatorClient,
|
|
)
|
|
|
|
// Health check (public)
|
|
router.GET("/health", func(c *gin.Context) {
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"status": "healthy",
|
|
"service": "account",
|
|
})
|
|
})
|
|
|
|
// Create co-managed wallet handler (independent from existing functionality)
|
|
// Uses database connection for invite_code lookups
|
|
coManagedHandler := httphandler.NewCoManagedHTTPHandlerWithDB(sessionCoordinatorClient, db)
|
|
|
|
// Configure authentication middleware
|
|
// Skip paths that don't require authentication
|
|
authConfig := middleware.AuthConfig{
|
|
JWTService: jwtService,
|
|
SkipPaths: []string{
|
|
"/health",
|
|
"/api/v1/auth/*", // Auth endpoints (login, refresh, challenge)
|
|
"/api/v1/accounts/from-keygen", // Internal API from coordinator
|
|
"/api/v1/co-managed/*", // Co-managed wallet API (public for Service Party App)
|
|
},
|
|
AllowAnonymous: false,
|
|
}
|
|
|
|
// API routes with authentication
|
|
api := router.Group("/api/v1")
|
|
api.Use(middleware.BearerAuth(authConfig))
|
|
httpHandler.RegisterRoutes(api)
|
|
|
|
// Register co-managed wallet routes (public API)
|
|
coManagedHandler.RegisterRoutes(api)
|
|
|
|
logger.Info("Starting HTTP server",
|
|
zap.Int("port", cfg.Server.HTTPPort),
|
|
zap.String("environment", cfg.Server.Environment),
|
|
zap.Bool("cors_enabled", len(allowedOrigins) > 0 || cfg.Server.Environment != "production"))
|
|
return router.Run(fmt.Sprintf(":%d", cfg.Server.HTTPPort))
|
|
}
|