package main import ( "context" "fmt" "net/http" "os" "os/signal" "strings" "syscall" "time" "github.com/gin-gonic/gin" "go.uber.org/zap" pgdriver "gorm.io/driver/postgres" "gorm.io/gorm" gormlogger "gorm.io/gorm/logger" appservice "github.com/genex/trading-service/internal/application/service" "github.com/genex/trading-service/internal/infrastructure/kafka" "github.com/genex/trading-service/internal/infrastructure/postgres" "github.com/genex/trading-service/internal/interface/http/handler" "github.com/genex/trading-service/internal/interface/http/middleware" ) func main() { logger, _ := zap.NewProduction() defer logger.Sync() port := os.Getenv("PORT") if port == "" { port = "3003" } // ── Infrastructure Layer ──────────────────────────────────────────── db := mustInitDB(logger) orderRepo := postgres.NewPostgresOrderRepository(db) tradeRepo := postgres.NewPostgresTradeRepository(db) eventPublisher := mustInitKafka(logger) defer eventPublisher.Close() // ── Application Layer ─────────────────────────────────────────────── matchingService := appservice.NewMatchingService(eventPublisher) tradeService := appservice.NewTradeService(orderRepo, tradeRepo, matchingService, eventPublisher) // ── Interface Layer (HTTP) ────────────────────────────────────────── r := gin.New() r.Use(gin.Recovery()) // Health endpoints r.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{"status": "ok", "service": "trading-service", "timestamp": time.Now().UTC().Format(time.RFC3339)}) }) r.GET("/health/ready", func(c *gin.Context) { c.JSON(200, gin.H{"status": "ready"}) }) r.GET("/health/live", func(c *gin.Context) { c.JSON(200, gin.H{"status": "alive"}) }) // API routes api := r.Group("/api/v1") // User-facing trade handler tradeHandler := handler.NewTradeHandler(tradeService) trades := api.Group("/trades") trades.Use(middleware.JWTAuth()) { trades.POST("/orders", tradeHandler.PlaceOrder) trades.DELETE("/orders/:id", tradeHandler.CancelOrder) trades.GET("/my/orders", tradeHandler.MyOrders) trades.POST("/coupons/:id/transfer", tradeHandler.TransferCoupon) } // Public orderbook api.GET("/trades/orderbook/:couponId", tradeHandler.GetOrderBook) // Admin routes (require JWT + admin role) adminTradeHandler := handler.NewAdminTradeHandler(tradeService) adminMMHandler := handler.NewAdminMMHandler(tradeService) admin := api.Group("/admin") admin.Use(middleware.JWTAuth(), middleware.RequireAdmin()) { // Trade administration adminTrades := admin.Group("/trades") adminTrades.GET("/stats", adminTradeHandler.GetTradingStats) adminTrades.GET("/orders", adminTradeHandler.ListOrders) adminTrades.GET("/volume-trend", adminTradeHandler.GetVolumeTrend) adminTrades.POST("/orders/:id/investigate", adminTradeHandler.InvestigateOrder) // Market maker administration mm := admin.Group("/mm") mm.GET("/list", adminMMHandler.ListMarketMakers) mm.GET("/:id/details", adminMMHandler.GetMarketMakerDetails) mm.POST("/:id/suspend", adminMMHandler.SuspendMarketMaker) mm.POST("/:id/resume", adminMMHandler.ResumeMarketMaker) mm.GET("/liquidity-pools", adminMMHandler.GetLiquidityPools) mm.GET("/order-book-depth", adminMMHandler.GetOrderBookDepth) mm.GET("/health-indicators", adminMMHandler.GetHealthIndicators) } server := &http.Server{ Addr: ":" + port, Handler: r, ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } go func() { logger.Info("Trading Service starting", zap.String("port", port)) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Fatal("Server failed", zap.Error(err)) } }() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit logger.Info("Shutting down gracefully...") ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { logger.Fatal("Server forced to shutdown", zap.Error(err)) } logger.Info("Trading Service stopped") } func mustInitDB(logger *zap.Logger) *gorm.DB { host := getEnv("DB_HOST", "localhost") dbPort := getEnv("DB_PORT", "5432") user := getEnv("DB_USERNAME", "genex") pass := getEnv("DB_PASSWORD", "genex_dev_password") name := getEnv("DB_NAME", "genex") dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", host, dbPort, user, pass, name) db, err := gorm.Open(pgdriver.Open(dsn), &gorm.Config{ Logger: gormlogger.Default.LogMode(gormlogger.Warn), }) if err != nil { logger.Fatal("Failed to connect to PostgreSQL", zap.Error(err)) } sqlDB, _ := db.DB() sqlDB.SetMaxOpenConns(20) sqlDB.SetMaxIdleConns(5) sqlDB.SetConnMaxLifetime(30 * time.Minute) logger.Info("PostgreSQL connected", zap.String("host", host), zap.String("db", name)) return db } func mustInitKafka(logger *zap.Logger) *kafka.KafkaEventPublisher { brokersEnv := getEnv("KAFKA_BROKERS", "localhost:9092") brokers := strings.Split(brokersEnv, ",") publisher, err := kafka.NewKafkaEventPublisher(brokers) if err != nil { logger.Fatal("Failed to connect to Kafka", zap.Error(err)) } logger.Info("Kafka producer connected", zap.Strings("brokers", brokers)) return publisher } func getEnv(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback }