gcx/backend/services/trading-service/cmd/server/main.go

176 lines
5.6 KiB
Go

package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
pgdriver "gorm.io/driver/postgres"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
appservice "github.com/genex/trading-service/internal/application/service"
"github.com/genex/trading-service/internal/infrastructure/kafka"
"github.com/genex/trading-service/internal/infrastructure/postgres"
"github.com/genex/trading-service/internal/interface/http/handler"
"github.com/genex/trading-service/internal/interface/http/middleware"
)
func main() {
logger, _ := zap.NewProduction()
defer logger.Sync()
port := os.Getenv("PORT")
if port == "" {
port = "3003"
}
// ── Infrastructure Layer ────────────────────────────────────────────
db := mustInitDB(logger)
orderRepo := postgres.NewPostgresOrderRepository(db)
tradeRepo := postgres.NewPostgresTradeRepository(db)
eventPublisher := mustInitKafka(logger)
defer eventPublisher.Close()
// ── Application Layer ───────────────────────────────────────────────
matchingService := appservice.NewMatchingService(eventPublisher)
tradeService := appservice.NewTradeService(orderRepo, tradeRepo, matchingService, eventPublisher)
// ── Interface Layer (HTTP) ──────────────────────────────────────────
r := gin.New()
r.Use(gin.Recovery())
// Health endpoints
r.GET("/health", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok", "service": "trading-service", "timestamp": time.Now().UTC().Format(time.RFC3339)})
})
r.GET("/health/ready", func(c *gin.Context) { c.JSON(200, gin.H{"status": "ready"}) })
r.GET("/health/live", func(c *gin.Context) { c.JSON(200, gin.H{"status": "alive"}) })
// API routes
api := r.Group("/api/v1")
// User-facing trade handler
tradeHandler := handler.NewTradeHandler(tradeService)
trades := api.Group("/trades")
trades.Use(middleware.JWTAuth())
{
trades.POST("/orders", tradeHandler.PlaceOrder)
trades.DELETE("/orders/:id", tradeHandler.CancelOrder)
trades.GET("/my/orders", tradeHandler.MyOrders)
trades.POST("/coupons/:id/transfer", tradeHandler.TransferCoupon)
}
// Public orderbook
api.GET("/trades/orderbook/:couponId", tradeHandler.GetOrderBook)
// Admin routes (require JWT + admin role)
adminTradeHandler := handler.NewAdminTradeHandler(tradeService)
adminMMHandler := handler.NewAdminMMHandler(tradeService)
admin := api.Group("/admin")
admin.Use(middleware.JWTAuth(), middleware.RequireAdmin())
{
// Trade administration
adminTrades := admin.Group("/trades")
adminTrades.GET("/stats", adminTradeHandler.GetTradingStats)
adminTrades.GET("/orders", adminTradeHandler.ListOrders)
adminTrades.GET("/volume-trend", adminTradeHandler.GetVolumeTrend)
adminTrades.POST("/orders/:id/investigate", adminTradeHandler.InvestigateOrder)
// Market maker administration
mm := admin.Group("/mm")
mm.GET("/list", adminMMHandler.ListMarketMakers)
mm.GET("/:id/details", adminMMHandler.GetMarketMakerDetails)
mm.POST("/:id/suspend", adminMMHandler.SuspendMarketMaker)
mm.POST("/:id/resume", adminMMHandler.ResumeMarketMaker)
mm.GET("/liquidity-pools", adminMMHandler.GetLiquidityPools)
mm.GET("/order-book-depth", adminMMHandler.GetOrderBookDepth)
mm.GET("/health-indicators", adminMMHandler.GetHealthIndicators)
}
server := &http.Server{
Addr: ":" + port,
Handler: r,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
go func() {
logger.Info("Trading Service starting", zap.String("port", port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("Server failed", zap.Error(err))
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info("Shutting down gracefully...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
logger.Fatal("Server forced to shutdown", zap.Error(err))
}
logger.Info("Trading Service stopped")
}
func mustInitDB(logger *zap.Logger) *gorm.DB {
host := getEnv("DB_HOST", "localhost")
dbPort := getEnv("DB_PORT", "5432")
user := getEnv("DB_USERNAME", "genex")
pass := getEnv("DB_PASSWORD", "genex_dev_password")
name := getEnv("DB_NAME", "genex")
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
host, dbPort, user, pass, name)
db, err := gorm.Open(pgdriver.Open(dsn), &gorm.Config{
Logger: gormlogger.Default.LogMode(gormlogger.Warn),
})
if err != nil {
logger.Fatal("Failed to connect to PostgreSQL", zap.Error(err))
}
sqlDB, _ := db.DB()
sqlDB.SetMaxOpenConns(20)
sqlDB.SetMaxIdleConns(5)
sqlDB.SetConnMaxLifetime(30 * time.Minute)
logger.Info("PostgreSQL connected", zap.String("host", host), zap.String("db", name))
return db
}
func mustInitKafka(logger *zap.Logger) *kafka.KafkaEventPublisher {
brokersEnv := getEnv("KAFKA_BROKERS", "localhost:9092")
brokers := strings.Split(brokersEnv, ",")
publisher, err := kafka.NewKafkaEventPublisher(brokers)
if err != nil {
logger.Fatal("Failed to connect to Kafka", zap.Error(err))
}
logger.Info("Kafka producer connected", zap.Strings("brokers", brokers))
return publisher
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}