diff --git a/backend/services/trading-service/prisma/migrations/0001_init/migration.sql b/backend/services/trading-service/prisma/migrations/0001_init/migration.sql index a97cedde..3df793ae 100644 --- a/backend/services/trading-service/prisma/migrations/0001_init/migration.sql +++ b/backend/services/trading-service/prisma/migrations/0001_init/migration.sql @@ -47,14 +47,14 @@ CREATE TABLE "orders" ( CREATE TABLE "trades" ( "id" TEXT NOT NULL, "tradeNo" TEXT NOT NULL, - "buyOrderId" TEXT NOT NULL, - "sellOrderId" TEXT NOT NULL, - "buyerSequence" TEXT NOT NULL, - "sellerSequence" TEXT NOT NULL, + "buy_order_id" TEXT NOT NULL, + "sell_order_id" TEXT NOT NULL, + "buyer_sequence" TEXT NOT NULL, + "seller_sequence" TEXT NOT NULL, "price" DECIMAL(30,18) NOT NULL, "quantity" DECIMAL(30,8) NOT NULL, "amount" DECIMAL(30,8) NOT NULL, - "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT "trades_pkey" PRIMARY KEY ("id") ); @@ -229,13 +229,13 @@ CREATE INDEX "orders_createdAt_idx" ON "orders"("createdAt" DESC); CREATE UNIQUE INDEX "trades_tradeNo_key" ON "trades"("tradeNo"); -- CreateIndex -CREATE INDEX "trades_buyerSequence_idx" ON "trades"("buyerSequence"); +CREATE INDEX "trades_buyer_sequence_idx" ON "trades"("buyer_sequence"); -- CreateIndex -CREATE INDEX "trades_sellerSequence_idx" ON "trades"("sellerSequence"); +CREATE INDEX "trades_seller_sequence_idx" ON "trades"("seller_sequence"); -- CreateIndex -CREATE INDEX "trades_createdAt_idx" ON "trades"("createdAt" DESC); +CREATE INDEX "trades_created_at_idx" ON "trades"("created_at" DESC); -- CreateIndex CREATE INDEX "trading_transactions_accountSequence_createdAt_idx" ON "trading_transactions"("accountSequence", "createdAt" DESC); @@ -307,7 +307,7 @@ CREATE INDEX "outbox_events_created_at_idx" ON "outbox_events"("created_at"); ALTER TABLE "orders" ADD CONSTRAINT "orders_accountSequence_fkey" FOREIGN KEY ("accountSequence") REFERENCES "trading_accounts"("accountSequence") ON DELETE RESTRICT ON UPDATE CASCADE; -- AddForeignKey -ALTER TABLE "trades" ADD CONSTRAINT "trades_buyOrderId_fkey" FOREIGN KEY ("buyOrderId") REFERENCES "orders"("id") ON DELETE RESTRICT ON UPDATE CASCADE; +ALTER TABLE "trades" ADD CONSTRAINT "trades_buy_order_id_fkey" FOREIGN KEY ("buy_order_id") REFERENCES "orders"("id") ON DELETE RESTRICT ON UPDATE CASCADE; -- AddForeignKey ALTER TABLE "trading_transactions" ADD CONSTRAINT "trading_transactions_accountSequence_fkey" FOREIGN KEY ("accountSequence") REFERENCES "trading_accounts"("accountSequence") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/backend/services/trading-service/prisma/migrations/0002_add_trading_burn_system/migration.sql b/backend/services/trading-service/prisma/migrations/0002_add_trading_burn_system/migration.sql new file mode 100644 index 00000000..bcecd98e --- /dev/null +++ b/backend/services/trading-service/prisma/migrations/0002_add_trading_burn_system/migration.sql @@ -0,0 +1,139 @@ +-- ============================================================================ +-- trading-service 添加交易销毁系统 +-- 包含:交易配置、黑洞账户、积分股池、价格快照、订单销毁字段 +-- ============================================================================ + +-- ==================== 交易配置表 ==================== + +-- CreateTable +CREATE TABLE "trading_configs" ( + "id" TEXT NOT NULL, + "total_shares" DECIMAL(30,8) NOT NULL DEFAULT 100020000000, + "burn_target" DECIMAL(30,8) NOT NULL DEFAULT 10000000000, + "burn_period_minutes" INTEGER NOT NULL DEFAULT 2102400, + "minute_burn_rate" DECIMAL(30,18) NOT NULL DEFAULT 4756.468797564687, + "is_active" BOOLEAN NOT NULL DEFAULT false, + "activated_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "trading_configs_pkey" PRIMARY KEY ("id") +); + +-- ==================== 黑洞账户(销毁池)==================== + +-- CreateTable +CREATE TABLE "black_holes" ( + "id" TEXT NOT NULL, + "total_burned" DECIMAL(30,8) NOT NULL DEFAULT 0, + "target_burn" DECIMAL(30,8) NOT NULL, + "remaining_burn" DECIMAL(30,8) NOT NULL, + "last_burn_minute" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "black_holes_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "burn_records" ( + "id" TEXT NOT NULL, + "black_hole_id" TEXT NOT NULL, + "burn_minute" TIMESTAMP(3) NOT NULL, + "burn_amount" DECIMAL(30,18) NOT NULL, + "remaining_target" DECIMAL(30,8) NOT NULL, + "source_type" TEXT, + "source_account_seq" TEXT, + "source_order_no" TEXT, + "memo" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "burn_records_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "burn_records_burn_minute_idx" ON "burn_records"("burn_minute"); + +-- CreateIndex +CREATE INDEX "burn_records_source_account_seq_idx" ON "burn_records"("source_account_seq"); + +-- CreateIndex +CREATE INDEX "burn_records_source_order_no_idx" ON "burn_records"("source_order_no"); + +-- CreateIndex +CREATE INDEX "burn_records_source_type_idx" ON "burn_records"("source_type"); + +-- AddForeignKey +ALTER TABLE "burn_records" ADD CONSTRAINT "burn_records_black_hole_id_fkey" FOREIGN KEY ("black_hole_id") REFERENCES "black_holes"("id") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- ==================== 积分股池(绿积分池)==================== + +-- CreateTable +CREATE TABLE "share_pools" ( + "id" TEXT NOT NULL, + "green_points" DECIMAL(30,8) NOT NULL DEFAULT 0, + "total_inflow" DECIMAL(30,8) NOT NULL DEFAULT 0, + "total_outflow" DECIMAL(30,8) NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "share_pools_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "share_pool_transactions" ( + "id" TEXT NOT NULL, + "pool_id" TEXT NOT NULL, + "type" TEXT NOT NULL, + "amount" DECIMAL(30,8) NOT NULL, + "balance_before" DECIMAL(30,8) NOT NULL, + "balance_after" DECIMAL(30,8) NOT NULL, + "reference_id" TEXT, + "reference_type" TEXT, + "memo" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "share_pool_transactions_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "share_pool_transactions_pool_id_created_at_idx" ON "share_pool_transactions"("pool_id", "created_at" DESC); + +-- AddForeignKey +ALTER TABLE "share_pool_transactions" ADD CONSTRAINT "share_pool_transactions_pool_id_fkey" FOREIGN KEY ("pool_id") REFERENCES "share_pools"("id") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- ==================== 价格快照 ==================== + +-- CreateTable +CREATE TABLE "price_snapshots" ( + "id" TEXT NOT NULL, + "snapshot_time" TIMESTAMP(3) NOT NULL, + "price" DECIMAL(30,18) NOT NULL, + "green_points" DECIMAL(30,8) NOT NULL, + "black_hole_amount" DECIMAL(30,8) NOT NULL, + "circulation_pool" DECIMAL(30,8) NOT NULL, + "effective_denominator" DECIMAL(30,8) NOT NULL, + "minute_burn_rate" DECIMAL(30,18) NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "price_snapshots_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "price_snapshots_snapshot_time_key" ON "price_snapshots"("snapshot_time"); + +-- CreateIndex +CREATE INDEX "price_snapshots_snapshot_time_idx" ON "price_snapshots"("snapshot_time" DESC); + +-- ==================== 订单表添加销毁相关字段 ==================== + +-- AlterTable: 添加销毁相关字段到 orders 表 +ALTER TABLE "orders" ADD COLUMN "burn_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0; +ALTER TABLE "orders" ADD COLUMN "burn_multiplier" DECIMAL(30,18) NOT NULL DEFAULT 0; +ALTER TABLE "orders" ADD COLUMN "effective_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0; + +-- ==================== 成交记录表添加销毁相关字段 ==================== + +-- 添加销毁相关字段到 trades 表 +ALTER TABLE "trades" ADD COLUMN "burn_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0; +ALTER TABLE "trades" ADD COLUMN "effective_qty" DECIMAL(30,8) NOT NULL DEFAULT 0; diff --git a/backend/services/trading-service/prisma/migrations/0003_add_processed_events/migration.sql b/backend/services/trading-service/prisma/migrations/0003_add_processed_events/migration.sql new file mode 100644 index 00000000..0a2589df --- /dev/null +++ b/backend/services/trading-service/prisma/migrations/0003_add_processed_events/migration.sql @@ -0,0 +1,23 @@ +-- ============================================================================ +-- trading-service 添加已处理事件表(幂等性支持) +-- ============================================================================ + +-- CreateTable +CREATE TABLE "processed_events" ( + "id" TEXT NOT NULL, + "event_id" TEXT NOT NULL, + "event_type" TEXT NOT NULL, + "source_service" TEXT NOT NULL, + "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "processed_events_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "processed_events_event_id_key" ON "processed_events"("event_id"); + +-- CreateIndex +CREATE INDEX "processed_events_event_id_idx" ON "processed_events"("event_id"); + +-- CreateIndex +CREATE INDEX "processed_events_processed_at_idx" ON "processed_events"("processed_at"); diff --git a/backend/services/trading-service/prisma/schema.prisma b/backend/services/trading-service/prisma/schema.prisma index 0c06ce66..66961125 100644 --- a/backend/services/trading-service/prisma/schema.prisma +++ b/backend/services/trading-service/prisma/schema.prisma @@ -7,6 +7,125 @@ datasource db { url = env("DATABASE_URL") } +// ==================== 交易配置 ==================== + +// 交易全局配置 +model TradingConfig { + id String @id @default(uuid()) + // 总积分股数量: 100.02B + totalShares Decimal @default(100020000000) @map("total_shares") @db.Decimal(30, 8) + // 目标销毁量: 100亿 (4年销毁完) + burnTarget Decimal @default(10000000000) @map("burn_target") @db.Decimal(30, 8) + // 销毁周期: 4年 (分钟数) 365*4*1440 = 2102400 + burnPeriodMinutes Int @default(2102400) @map("burn_period_minutes") + // 每分钟基础销毁量: 100亿÷(365*4*1440) = 4756.468797564687 + minuteBurnRate Decimal @default(4756.468797564687) @map("minute_burn_rate") @db.Decimal(30, 18) + // 是否启用交易 + isActive Boolean @default(false) @map("is_active") + // 启动时间 + activatedAt DateTime? @map("activated_at") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@map("trading_configs") +} + +// ==================== 黑洞账户(销毁池)==================== + +// 黑洞账户 +model BlackHole { + id String @id @default(uuid()) + totalBurned Decimal @default(0) @map("total_burned") @db.Decimal(30, 8) // 已销毁总量 + targetBurn Decimal @map("target_burn") @db.Decimal(30, 8) // 目标销毁量 (10B) + remainingBurn Decimal @map("remaining_burn") @db.Decimal(30, 8) // 剩余待销毁 + lastBurnMinute DateTime? @map("last_burn_minute") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + records BurnRecord[] + + @@map("black_holes") +} + +// 销毁记录 +model BurnRecord { + id String @id @default(uuid()) + blackHoleId String @map("black_hole_id") + burnMinute DateTime @map("burn_minute") + burnAmount Decimal @map("burn_amount") @db.Decimal(30, 18) + remainingTarget Decimal @map("remaining_target") @db.Decimal(30, 8) // 销毁后剩余目标 + + // 来源信息 + sourceType String? @map("source_type") // MINUTE_BURN (每分钟销毁), SELL_BURN (卖出销毁) + sourceAccountSeq String? @map("source_account_seq") // 来源账户序列号(卖出时) + sourceOrderNo String? @map("source_order_no") // 来源订单号(卖出时) + + memo String? @db.Text + createdAt DateTime @default(now()) @map("created_at") + + blackHole BlackHole @relation(fields: [blackHoleId], references: [id]) + + @@index([burnMinute]) + @@index([sourceAccountSeq]) + @@index([sourceOrderNo]) + @@index([sourceType]) + @@map("burn_records") +} + +// ==================== 积分股池(绿积分池)==================== + +// 积分股池(存储绿积分用于计算价格) +model SharePool { + id String @id @default(uuid()) + // 绿积分总量(用于价格计算的分子) + greenPoints Decimal @default(0) @map("green_points") @db.Decimal(30, 8) + totalInflow Decimal @default(0) @map("total_inflow") @db.Decimal(30, 8) + totalOutflow Decimal @default(0) @map("total_outflow") @db.Decimal(30, 8) + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + transactions SharePoolTransaction[] + + @@map("share_pools") +} + +// 积分股池交易记录 +model SharePoolTransaction { + id String @id @default(uuid()) + poolId String @map("pool_id") + type String // INJECT (注入), TRADE_IN (交易流入), TRADE_OUT (交易流出) + amount Decimal @db.Decimal(30, 8) + balanceBefore Decimal @map("balance_before") @db.Decimal(30, 8) + balanceAfter Decimal @map("balance_after") @db.Decimal(30, 8) + referenceId String? @map("reference_id") + referenceType String? @map("reference_type") + memo String? @db.Text + createdAt DateTime @default(now()) @map("created_at") + + pool SharePool @relation(fields: [poolId], references: [id]) + + @@index([poolId, createdAt(sort: Desc)]) + @@map("share_pool_transactions") +} + +// ==================== 价格快照 ==================== + +// 价格快照(每分钟) +model PriceSnapshot { + id String @id @default(uuid()) + snapshotTime DateTime @unique @map("snapshot_time") + price Decimal @db.Decimal(30, 18) // 当时价格 + greenPoints Decimal @map("green_points") @db.Decimal(30, 8) // 绿积分(股池) + blackHoleAmount Decimal @map("black_hole_amount") @db.Decimal(30, 8) // 黑洞数量 + circulationPool Decimal @map("circulation_pool") @db.Decimal(30, 8) // 流通池 + effectiveDenominator Decimal @map("effective_denominator") @db.Decimal(30, 8) // 有效分母 + minuteBurnRate Decimal @map("minute_burn_rate") @db.Decimal(30, 18) // 当时的每分钟销毁率 + createdAt DateTime @default(now()) @map("created_at") + + @@index([snapshotTime(sort: Desc)]) + @@map("price_snapshots") +} + // ==================== 交易账户 ==================== // 用户交易账户 @@ -43,6 +162,10 @@ model Order { remainingQuantity Decimal @db.Decimal(30, 8) // 剩余数量 averagePrice Decimal @default(0) @db.Decimal(30, 18) // 平均成交价 totalAmount Decimal @default(0) @db.Decimal(30, 8) // 总成交金额 + // 卖出销毁相关字段 + burnQuantity Decimal @default(0) @map("burn_quantity") @db.Decimal(30, 8) // 卖出销毁量 + burnMultiplier Decimal @default(0) @map("burn_multiplier") @db.Decimal(30, 18) // 销毁倍数 + effectiveQuantity Decimal @default(0) @map("effective_quantity") @db.Decimal(30, 8) // 有效卖出量(含销毁) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt cancelledAt DateTime? @@ -61,14 +184,16 @@ model Order { model Trade { id String @id @default(uuid()) tradeNo String @unique - buyOrderId String - sellOrderId String - buyerSequence String - sellerSequence String + buyOrderId String @map("buy_order_id") + sellOrderId String @map("sell_order_id") + buyerSequence String @map("buyer_sequence") + sellerSequence String @map("seller_sequence") price Decimal @db.Decimal(30, 18) - quantity Decimal @db.Decimal(30, 8) - amount Decimal @db.Decimal(30, 8) // price * quantity - createdAt DateTime @default(now()) + quantity Decimal @db.Decimal(30, 8) // 实际成交量 + burnQuantity Decimal @default(0) @map("burn_quantity") @db.Decimal(30, 8) // 卖出销毁量 + effectiveQty Decimal @default(0) @map("effective_qty") @db.Decimal(30, 8) // 有效量(quantity + burnQuantity) + amount Decimal @db.Decimal(30, 8) // effectiveQty * price(卖出交易额) + createdAt DateTime @default(now()) @map("created_at") buyOrder Order @relation(fields: [buyOrderId], references: [id]) @@ -281,3 +406,18 @@ model OutboxEvent { @@index([createdAt]) @@map("outbox_events") } + +// ==================== 已处理事件(幂等性)==================== + +// 已处理事件记录(用于消费者幂等性检查) +model ProcessedEvent { + id String @id @default(uuid()) + eventId String @unique @map("event_id") // 事件唯一ID + eventType String @map("event_type") // 事件类型 + sourceService String @map("source_service") // 来源服务 + processedAt DateTime @default(now()) @map("processed_at") + + @@index([eventId]) + @@index([processedAt]) + @@map("processed_events") +} diff --git a/backend/services/trading-service/src/api/api.module.ts b/backend/services/trading-service/src/api/api.module.ts index 22e4d234..a83e0391 100644 --- a/backend/services/trading-service/src/api/api.module.ts +++ b/backend/services/trading-service/src/api/api.module.ts @@ -5,9 +5,20 @@ import { TradingController } from './controllers/trading.controller'; import { TransferController } from './controllers/transfer.controller'; import { HealthController } from './controllers/health.controller'; import { AdminController } from './controllers/admin.controller'; +import { PriceController } from './controllers/price.controller'; +import { BurnController } from './controllers/burn.controller'; +import { AssetController } from './controllers/asset.controller'; @Module({ imports: [ApplicationModule, InfrastructureModule], - controllers: [TradingController, TransferController, HealthController, AdminController], + controllers: [ + TradingController, + TransferController, + HealthController, + AdminController, + PriceController, + BurnController, + AssetController, + ], }) export class ApiModule {} diff --git a/backend/services/trading-service/src/api/controllers/asset.controller.ts b/backend/services/trading-service/src/api/controllers/asset.controller.ts new file mode 100644 index 00000000..27b92a09 --- /dev/null +++ b/backend/services/trading-service/src/api/controllers/asset.controller.ts @@ -0,0 +1,68 @@ +import { Controller, Get, Param, Query, Req } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiParam, ApiQuery, ApiBearerAuth } from '@nestjs/swagger'; +import { AssetService } from '../../application/services/asset.service'; +import { Public } from '../../shared/guards/jwt-auth.guard'; + +@ApiTags('Asset') +@ApiBearerAuth() +@Controller('asset') +export class AssetController { + constructor(private readonly assetService: AssetService) {} + + @Get('my') + @ApiOperation({ summary: '获取我的资产显示' }) + @ApiQuery({ name: 'dailyAllocation', required: false, type: String, description: '每日分配量(可选)' }) + async getMyAsset(@Req() req: any, @Query('dailyAllocation') dailyAllocation?: string) { + const accountSequence = req.user?.accountSequence; + if (!accountSequence) { + throw new Error('Unauthorized'); + } + + const asset = await this.assetService.getAssetDisplay(accountSequence, dailyAllocation); + if (!asset) { + throw new Error('Account not found'); + } + + return asset; + } + + @Get('account/:accountSequence') + @Public() + @ApiOperation({ summary: '获取指定账户资产显示' }) + @ApiParam({ name: 'accountSequence', description: '账户序号' }) + @ApiQuery({ name: 'dailyAllocation', required: false, type: String, description: '每日分配量(可选)' }) + async getAccountAsset( + @Param('accountSequence') accountSequence: string, + @Query('dailyAllocation') dailyAllocation?: string, + ) { + const asset = await this.assetService.getAssetDisplay(accountSequence, dailyAllocation); + if (!asset) { + return { message: 'Account not found' }; + } + return asset; + } + + @Get('estimate-sell') + @Public() + @ApiOperation({ summary: '预估卖出收益' }) + @ApiQuery({ name: 'quantity', required: true, type: String, description: '卖出数量' }) + async estimateSellProceeds(@Query('quantity') quantity: string) { + return this.assetService.estimateSellProceeds(quantity); + } + + @Get('market') + @Public() + @ApiOperation({ summary: '获取市场概览' }) + async getMarketOverview() { + return this.assetService.getMarketOverview(); + } + + @Get('growth-per-second') + @Public() + @ApiOperation({ summary: '计算资产每秒增长量' }) + @ApiQuery({ name: 'dailyAllocation', required: true, type: String, description: '每日分配量' }) + async calculateGrowthPerSecond(@Query('dailyAllocation') dailyAllocation: string) { + const perSecond = this.assetService.calculateAssetGrowthPerSecond(dailyAllocation); + return { dailyAllocation, assetGrowthPerSecond: perSecond }; + } +} diff --git a/backend/services/trading-service/src/api/controllers/burn.controller.ts b/backend/services/trading-service/src/api/controllers/burn.controller.ts new file mode 100644 index 00000000..26b5fab8 --- /dev/null +++ b/backend/services/trading-service/src/api/controllers/burn.controller.ts @@ -0,0 +1,31 @@ +import { Controller, Get, Query } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiQuery } from '@nestjs/swagger'; +import { BurnService } from '../../application/services/burn.service'; +import { Public } from '../../shared/guards/jwt-auth.guard'; + +@ApiTags('Burn') +@Controller('burn') +export class BurnController { + constructor(private readonly burnService: BurnService) {} + + @Get('status') + @Public() + @ApiOperation({ summary: '获取销毁状态' }) + async getBurnStatus() { + return this.burnService.getBurnStatus(); + } + + @Get('records') + @Public() + @ApiOperation({ summary: '获取销毁记录' }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'pageSize', required: false, type: Number }) + @ApiQuery({ name: 'sourceType', required: false, enum: ['MINUTE_BURN', 'SELL_BURN'] }) + async getBurnRecords( + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + @Query('sourceType') sourceType?: 'MINUTE_BURN' | 'SELL_BURN', + ) { + return this.burnService.getBurnRecords(page ?? 1, pageSize ?? 50, sourceType); + } +} diff --git a/backend/services/trading-service/src/api/controllers/price.controller.ts b/backend/services/trading-service/src/api/controllers/price.controller.ts new file mode 100644 index 00000000..ad2241af --- /dev/null +++ b/backend/services/trading-service/src/api/controllers/price.controller.ts @@ -0,0 +1,46 @@ +import { Controller, Get, Query } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiQuery, ApiBearerAuth } from '@nestjs/swagger'; +import { PriceService } from '../../application/services/price.service'; +import { Public } from '../../shared/guards/jwt-auth.guard'; + +@ApiTags('Price') +@Controller('price') +export class PriceController { + constructor(private readonly priceService: PriceService) {} + + @Get('current') + @Public() + @ApiOperation({ summary: '获取当前价格信息' }) + async getCurrentPrice() { + return this.priceService.getCurrentPrice(); + } + + @Get('latest') + @Public() + @ApiOperation({ summary: '获取最新价格快照' }) + async getLatestSnapshot() { + const snapshot = await this.priceService.getLatestSnapshot(); + if (!snapshot) { + return { message: 'No price snapshot available' }; + } + return snapshot; + } + + @Get('history') + @Public() + @ApiOperation({ summary: '获取价格历史' }) + @ApiQuery({ name: 'startTime', required: true, type: String, description: 'ISO datetime' }) + @ApiQuery({ name: 'endTime', required: true, type: String, description: 'ISO datetime' }) + @ApiQuery({ name: 'limit', required: false, type: Number }) + async getPriceHistory( + @Query('startTime') startTime: string, + @Query('endTime') endTime: string, + @Query('limit') limit?: number, + ) { + return this.priceService.getPriceHistory( + new Date(startTime), + new Date(endTime), + limit ?? 1440, + ); + } +} diff --git a/backend/services/trading-service/src/application/application.module.ts b/backend/services/trading-service/src/application/application.module.ts index 8ab82d93..6e1e5121 100644 --- a/backend/services/trading-service/src/application/application.module.ts +++ b/backend/services/trading-service/src/application/application.module.ts @@ -3,11 +3,25 @@ import { ScheduleModule } from '@nestjs/schedule'; import { InfrastructureModule } from '../infrastructure/infrastructure.module'; import { OrderService } from './services/order.service'; import { TransferService } from './services/transfer.service'; +import { PriceService } from './services/price.service'; +import { BurnService } from './services/burn.service'; +import { AssetService } from './services/asset.service'; import { OutboxScheduler } from './schedulers/outbox.scheduler'; +import { BurnScheduler } from './schedulers/burn.scheduler'; @Module({ imports: [ScheduleModule.forRoot(), InfrastructureModule], - providers: [OrderService, TransferService, OutboxScheduler], - exports: [OrderService, TransferService], + providers: [ + // Services + PriceService, + BurnService, + AssetService, + OrderService, + TransferService, + // Schedulers + OutboxScheduler, + BurnScheduler, + ], + exports: [OrderService, TransferService, PriceService, BurnService, AssetService], }) export class ApplicationModule {} diff --git a/backend/services/trading-service/src/application/schedulers/burn.scheduler.ts b/backend/services/trading-service/src/application/schedulers/burn.scheduler.ts new file mode 100644 index 00000000..e9788be9 --- /dev/null +++ b/backend/services/trading-service/src/application/schedulers/burn.scheduler.ts @@ -0,0 +1,95 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { BurnService } from '../services/burn.service'; +import { PriceService } from '../services/price.service'; +import { RedisService } from '../../infrastructure/redis/redis.service'; + +@Injectable() +export class BurnScheduler implements OnModuleInit { + private readonly logger = new Logger(BurnScheduler.name); + + constructor( + private readonly burnService: BurnService, + private readonly priceService: PriceService, + private readonly redis: RedisService, + ) {} + + async onModuleInit() { + this.logger.log('Burn scheduler initialized'); + + // 初始化销毁系统 + try { + await this.burnService.initialize(); + this.logger.log('Burn system initialized'); + } catch (error) { + this.logger.error('Failed to initialize burn system', error); + } + } + + /** + * 每分钟执行销毁 + * 每分钟销毁量 = 100亿 ÷ (365×4×1440) = 4756.468797564687 进黑洞 + */ + @Cron(CronExpression.EVERY_MINUTE) + async executeMinuteBurn(): Promise { + try { + const burnAmount = await this.burnService.executeMinuteBurn(); + if (!burnAmount.isZero()) { + this.logger.debug(`Minute burn completed: ${burnAmount.toFixed(8)}`); + } + } catch (error) { + this.logger.error('Failed to execute minute burn', error); + } + } + + /** + * 每分钟创建价格快照 + */ + @Cron(CronExpression.EVERY_MINUTE) + async createPriceSnapshot(): Promise { + try { + await this.priceService.createSnapshot(); + } catch (error) { + this.logger.error('Failed to create price snapshot', error); + } + } + + /** + * 每天清理旧的价格快照(保留30天) + */ + @Cron('0 3 * * *') // 每天凌晨3点 + async cleanupOldSnapshots(): Promise { + const lockValue = await this.redis.acquireLock('snapshot:cleanup:lock', 300); + if (!lockValue) { + return; + } + + try { + // 通过 PriceService 调用 repository 清理 + this.logger.log('Starting cleanup of old price snapshots'); + // 这里可以添加清理逻辑 + } catch (error) { + this.logger.error('Failed to cleanup old snapshots', error); + } finally { + await this.redis.releaseLock('snapshot:cleanup:lock', lockValue); + } + } + + /** + * 每小时记录销毁状态日志 + */ + @Cron('0 * * * *') // 每小时整点 + async logBurnStatus(): Promise { + try { + const status = await this.burnService.getBurnStatus(); + this.logger.log( + `Burn status: burned=${status.totalBurned}, ` + + `remaining=${status.remainingBurn}, ` + + `progress=${status.burnProgress}%, ` + + `minuteRate=${status.minuteBurnRate}`, + ); + } catch (error) { + this.logger.error('Failed to log burn status', error); + } + } +} diff --git a/backend/services/trading-service/src/application/schedulers/index.ts b/backend/services/trading-service/src/application/schedulers/index.ts index 140724aa..c7f81a2a 100644 --- a/backend/services/trading-service/src/application/schedulers/index.ts +++ b/backend/services/trading-service/src/application/schedulers/index.ts @@ -1 +1,2 @@ export * from './outbox.scheduler'; +export * from './burn.scheduler'; diff --git a/backend/services/trading-service/src/application/services/asset.service.ts b/backend/services/trading-service/src/application/services/asset.service.ts new file mode 100644 index 00000000..d25d5ed2 --- /dev/null +++ b/backend/services/trading-service/src/application/services/asset.service.ts @@ -0,0 +1,199 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { TradingCalculatorService } from '../../domain/services/trading-calculator.service'; +import { TradingAccountRepository } from '../../infrastructure/persistence/repositories/trading-account.repository'; +import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository'; +import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository'; +import { SharePoolRepository } from '../../infrastructure/persistence/repositories/share-pool.repository'; +import { PriceService } from './price.service'; +import { Money } from '../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface AssetDisplay { + // 账户积分股余额 + shareBalance: string; + // 账户现金余额 + cashBalance: string; + // 冻结积分股 + frozenShares: string; + // 冻结现金 + frozenCash: string; + // 可用积分股 + availableShares: string; + // 可用现金 + availableCash: string; + // 当前价格 + currentPrice: string; + // 销毁倍数 + burnMultiplier: string; + // 有效积分股(含销毁加成) + effectiveShares: string; + // 资产显示值 = (账户积分股 + 账户积分股 × 倍数) × 积分股价 + displayAssetValue: string; + // 每秒增长量(需要外部传入每日分配量) + assetGrowthPerSecond: string; + // 累计买入 + totalBought: string; + // 累计卖出 + totalSold: string; +} + +@Injectable() +export class AssetService { + private readonly logger = new Logger(AssetService.name); + private readonly calculator = new TradingCalculatorService(); + + constructor( + private readonly tradingAccountRepository: TradingAccountRepository, + private readonly blackHoleRepository: BlackHoleRepository, + private readonly circulationPoolRepository: CirculationPoolRepository, + private readonly sharePoolRepository: SharePoolRepository, + private readonly priceService: PriceService, + ) {} + + /** + * 获取用户资产显示 + * 资产显示 = (账户积分股 + 账户积分股 × 倍数) × 积分股价 + * + * @param accountSequence 账户序号 + * @param dailyAllocation 用户每天分配的积分股(可选,用于计算每秒增长) + */ + async getAssetDisplay( + accountSequence: string, + dailyAllocation?: string, + ): Promise { + const account = await this.tradingAccountRepository.findByAccountSequence(accountSequence); + if (!account) { + return null; + } + + // 获取当前价格信息 + const priceInfo = await this.priceService.getCurrentPrice(); + const price = new Money(priceInfo.price); + const burnMultiplier = new Decimal(priceInfo.burnMultiplier); + + // 计算有效积分股 = 余额 × (1 + 倍数) + const multiplierFactor = new Decimal(1).plus(burnMultiplier); + const effectiveShares = account.shareBalance.value.times(multiplierFactor); + + // 计算资产显示值 + const displayAssetValue = this.calculator.calculateDisplayAssetValue( + account.shareBalance, + burnMultiplier, + price, + ); + + // 计算每秒增长量 + let assetGrowthPerSecond = Money.zero(); + if (dailyAllocation) { + const dailyAmount = new Money(dailyAllocation); + assetGrowthPerSecond = this.calculator.calculateAssetGrowthPerSecond(dailyAmount); + } + + return { + shareBalance: account.shareBalance.toFixed(8), + cashBalance: account.cashBalance.toFixed(8), + frozenShares: account.frozenShares.toFixed(8), + frozenCash: account.frozenCash.toFixed(8), + availableShares: account.availableShares.toFixed(8), + availableCash: account.availableCash.toFixed(8), + currentPrice: price.toFixed(18), + burnMultiplier: burnMultiplier.toFixed(18), + effectiveShares: new Money(effectiveShares).toFixed(8), + displayAssetValue: displayAssetValue.toFixed(8), + assetGrowthPerSecond: assetGrowthPerSecond.toFixed(18), + totalBought: account.totalBought.toFixed(8), + totalSold: account.totalSold.toFixed(8), + }; + } + + /** + * 计算资产每秒增长量 + * 资产每秒增长量 = 用户每天分配的积分股 ÷ 24 ÷ 60 ÷ 60 + */ + calculateAssetGrowthPerSecond(dailyAllocation: string): string { + const dailyAmount = new Money(dailyAllocation); + const perSecond = this.calculator.calculateAssetGrowthPerSecond(dailyAmount); + return perSecond.toFixed(18); + } + + /** + * 预估卖出收益 + * 卖出交易额 = (卖出量 + 卖出销毁量) × 积分股价 + */ + async estimateSellProceeds(sellQuantity: string): Promise<{ + sellQuantity: string; + burnQuantity: string; + effectiveQuantity: string; + price: string; + proceeds: string; + burnMultiplier: string; + }> { + const quantity = new Money(sellQuantity); + const result = await this.priceService.calculateSellAmount(quantity); + + return { + sellQuantity: quantity.toFixed(8), + burnQuantity: result.burnQuantity.toFixed(8), + effectiveQuantity: result.effectiveQuantity.toFixed(8), + price: result.price.toFixed(18), + proceeds: result.amount.toFixed(8), + burnMultiplier: (await this.priceService.getCurrentBurnMultiplier()).toFixed(18), + }; + } + + /** + * 获取市场概览 + */ + async getMarketOverview(): Promise<{ + price: string; + greenPoints: string; + blackHoleAmount: string; + circulationPool: string; + effectiveDenominator: string; + burnMultiplier: string; + totalShares: string; + burnTarget: string; + burnProgress: string; + }> { + const [sharePool, blackHole, circulationPool] = await Promise.all([ + this.sharePoolRepository.getPool(), + this.blackHoleRepository.getBlackHole(), + this.circulationPoolRepository.getPool(), + ]); + + const greenPoints = sharePool?.greenPoints || Money.zero(); + const blackHoleAmount = blackHole?.totalBurned || Money.zero(); + const circulationPoolAmount = circulationPool?.totalShares || Money.zero(); + + // 计算价格 + const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount); + + // 计算有效分母 + const effectiveDenominator = this.calculator.calculateEffectiveDenominator( + blackHoleAmount, + circulationPoolAmount, + ); + + // 计算销毁倍数 + const burnMultiplier = this.calculator.calculateSellBurnMultiplier( + blackHoleAmount, + circulationPoolAmount, + ); + + // 销毁进度 + const targetBurn = blackHole?.targetBurn || new Money(TradingCalculatorService.BURN_TARGET); + const burnProgress = blackHoleAmount.value.dividedBy(targetBurn.value).times(100); + + return { + price: price.toFixed(18), + greenPoints: greenPoints.toFixed(8), + blackHoleAmount: blackHoleAmount.toFixed(8), + circulationPool: circulationPoolAmount.toFixed(8), + effectiveDenominator: effectiveDenominator.toFixed(8), + burnMultiplier: burnMultiplier.toFixed(18), + totalShares: TradingCalculatorService.TOTAL_SHARES.toFixed(8), + burnTarget: targetBurn.toFixed(8), + burnProgress: burnProgress.toFixed(4), + }; + } +} diff --git a/backend/services/trading-service/src/application/services/burn.service.ts b/backend/services/trading-service/src/application/services/burn.service.ts new file mode 100644 index 00000000..531e30a6 --- /dev/null +++ b/backend/services/trading-service/src/application/services/burn.service.ts @@ -0,0 +1,365 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { TradingCalculatorService } from '../../domain/services/trading-calculator.service'; +import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository'; +import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository'; +import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { RedisService } from '../../infrastructure/redis/redis.service'; +import { Money } from '../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; +import { + TradingEventTypes, + TradingTopics, + BurnExecutedPayload, + MinuteBurnExecutedPayload, +} from '../../domain/events/trading.events'; + +export interface BurnStatus { + totalBurned: string; + targetBurn: string; + remainingBurn: string; + burnProgress: string; // 百分比 + minuteBurnRate: string; + remainingMinutes: number; + lastBurnMinute: Date | null; +} + +export interface SellBurnResult { + burnQuantity: Money; + burnMultiplier: Decimal; + newMinuteBurnRate: Money; +} + +@Injectable() +export class BurnService { + private readonly logger = new Logger(BurnService.name); + private readonly calculator = new TradingCalculatorService(); + + constructor( + private readonly blackHoleRepository: BlackHoleRepository, + private readonly circulationPoolRepository: CirculationPoolRepository, + private readonly tradingConfigRepository: TradingConfigRepository, + private readonly outboxRepository: OutboxRepository, + private readonly redis: RedisService, + ) {} + + /** + * 获取销毁状态 + */ + async getBurnStatus(): Promise { + const [blackHole, config] = await Promise.all([ + this.blackHoleRepository.getBlackHole(), + this.tradingConfigRepository.getConfig(), + ]); + + const totalBurned = blackHole?.totalBurned || Money.zero(); + const targetBurn = blackHole?.targetBurn || new Money(TradingCalculatorService.BURN_TARGET); + const remainingBurn = blackHole?.remainingBurn || targetBurn; + + // 计算进度百分比 + const progress = totalBurned.value.dividedBy(targetBurn.value).times(100); + + // 计算剩余分钟数 + const activatedAt = config?.activatedAt || new Date(); + const remainingMinutes = this.calculator.calculateRemainingMinutes(activatedAt); + + return { + totalBurned: totalBurned.toFixed(8), + targetBurn: targetBurn.toFixed(8), + remainingBurn: remainingBurn.toFixed(8), + burnProgress: progress.toFixed(4), + minuteBurnRate: (config?.minuteBurnRate || Money.zero()).toFixed(18), + remainingMinutes, + lastBurnMinute: blackHole?.lastBurnMinute || null, + }; + } + + /** + * 执行每分钟自动销毁 + */ + async executeMinuteBurn(): Promise { + const lockValue = await this.redis.acquireLock('burn:minute:lock', 55); + if (!lockValue) { + return Money.zero(); + } + + try { + const config = await this.tradingConfigRepository.getConfig(); + if (!config || !config.isActive) { + return Money.zero(); + } + + const blackHole = await this.blackHoleRepository.getBlackHole(); + if (!blackHole) { + return Money.zero(); + } + + // 检查是否已完成销毁目标 + if (blackHole.remainingBurn.isZero()) { + return Money.zero(); + } + + const currentMinute = new Date(); + currentMinute.setSeconds(0, 0); + + // 检查是否已处理过这一分钟 + const processedKey = `burn:processed:${currentMinute.getTime()}`; + if (await this.redis.get(processedKey)) { + return Money.zero(); + } + + // 使用当前配置的每分钟销毁率 + let burnAmount = config.minuteBurnRate; + + // 确保不超过剩余待销毁量 + if (burnAmount.isGreaterThan(blackHole.remainingBurn)) { + burnAmount = blackHole.remainingBurn; + } + + if (burnAmount.isZero()) { + return Money.zero(); + } + + // 记录销毁 + const burnRecord = await this.blackHoleRepository.recordMinuteBurn(currentMinute, burnAmount); + + // 标记已处理 + await this.redis.set(processedKey, '1', 120); + + this.logger.log(`Minute burn executed: ${burnAmount.toFixed(8)}`); + + // 发布每分钟销毁事件 + await this.publishMinuteBurnEvent( + burnRecord.id, + currentMinute, + burnAmount, + blackHole.totalBurned.add(burnAmount), + blackHole.remainingBurn.subtract(burnAmount), + ); + + return burnAmount; + } catch (error) { + this.logger.error('Failed to execute minute burn', error); + return Money.zero(); + } finally { + await this.redis.releaseLock('burn:minute:lock', lockValue); + } + } + + /** + * 执行卖出销毁 + * 卖出销毁量 = 卖出积分股 × 倍数 + * 卖出后需要重新计算每分钟销毁量 + */ + async executeSellBurn( + sellQuantity: Money, + accountSeq: string, + orderNo: string, + ): Promise { + const [blackHole, circulationPool, config] = await Promise.all([ + this.blackHoleRepository.getBlackHole(), + this.circulationPoolRepository.getPool(), + this.tradingConfigRepository.getConfig(), + ]); + + if (!blackHole || !config) { + throw new Error('Trading system not initialized'); + } + + const blackHoleAmount = blackHole.totalBurned; + const circulationPoolAmount = circulationPool?.totalShares || Money.zero(); + + // 计算销毁倍数 + const burnMultiplier = this.calculator.calculateSellBurnMultiplier( + blackHoleAmount, + circulationPoolAmount, + ); + + // 计算销毁量 + const burnQuantity = this.calculator.calculateSellBurnAmount(sellQuantity, burnMultiplier); + + // 确保销毁量不超过剩余待销毁量 + const actualBurnQuantity = burnQuantity.isGreaterThan(blackHole.remainingBurn) + ? blackHole.remainingBurn + : burnQuantity; + + if (!actualBurnQuantity.isZero()) { + // 记录卖出销毁 + const burnMinute = new Date(); + burnMinute.setSeconds(0, 0); + + const burnRecord = await this.blackHoleRepository.recordSellBurn( + burnMinute, + actualBurnQuantity, + accountSeq, + orderNo, + ); + + // 重新计算每分钟销毁量 + const newBlackHoleAmount = blackHoleAmount.add(actualBurnQuantity); + const remainingMinutes = this.calculator.calculateRemainingMinutes( + config.activatedAt || new Date(), + ); + const newMinuteBurnRate = this.calculator.calculateMinuteBurnRate( + newBlackHoleAmount, + remainingMinutes, + ); + + // 更新配置中的每分钟销毁率 + await this.tradingConfigRepository.updateMinuteBurnRate(newMinuteBurnRate); + + this.logger.log( + `Sell burn executed: quantity=${actualBurnQuantity.toFixed(8)}, ` + + `multiplier=${burnMultiplier.toFixed(8)}, newRate=${newMinuteBurnRate.toFixed(18)}`, + ); + + // 发布卖出销毁事件 + await this.publishSellBurnEvent( + burnRecord.id, + accountSeq, + orderNo, + actualBurnQuantity, + burnMultiplier, + blackHole.remainingBurn.subtract(actualBurnQuantity), + ); + + return { + burnQuantity: actualBurnQuantity, + burnMultiplier, + newMinuteBurnRate, + }; + } + + return { + burnQuantity: Money.zero(), + burnMultiplier, + newMinuteBurnRate: config.minuteBurnRate, + }; + } + + /** + * 初始化黑洞和配置 + */ + async initialize(): Promise { + const [existingConfig, existingBlackHole] = await Promise.all([ + this.tradingConfigRepository.getConfig(), + this.blackHoleRepository.getBlackHole(), + ]); + + if (!existingConfig) { + await this.tradingConfigRepository.initializeConfig(); + this.logger.log('Trading config initialized'); + } + + if (!existingBlackHole) { + await this.blackHoleRepository.initializeBlackHole( + new Money(TradingCalculatorService.BURN_TARGET), + ); + this.logger.log('Black hole initialized'); + } + } + + /** + * 获取销毁记录 + */ + async getBurnRecords( + page: number, + pageSize: number, + sourceType?: 'MINUTE_BURN' | 'SELL_BURN', + ): Promise<{ + data: any[]; + total: number; + }> { + const result = await this.blackHoleRepository.getBurnRecords(page, pageSize, sourceType); + + return { + data: result.data.map((r) => ({ + id: r.id, + burnMinute: r.burnMinute, + burnAmount: r.burnAmount.toFixed(8), + remainingTarget: r.remainingTarget.toFixed(8), + sourceType: r.sourceType, + sourceAccountSeq: r.sourceAccountSeq, + sourceOrderNo: r.sourceOrderNo, + memo: r.memo, + createdAt: r.createdAt, + })), + total: result.total, + }; + } + + // ==================== 事件发布方法 ==================== + + /** + * 发布每分钟销毁事件 + */ + private async publishMinuteBurnEvent( + burnRecordId: string, + burnMinute: Date, + burnAmount: Money, + totalBurned: Money, + remainingTarget: Money, + ): Promise { + try { + const payload: MinuteBurnExecutedPayload = { + burnRecordId, + burnMinute: burnMinute.toISOString(), + burnAmount: burnAmount.toString(), + totalBurned: totalBurned.toString(), + remainingTarget: remainingTarget.toString(), + executedAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'BurnRecord', + aggregateId: burnRecordId, + eventType: TradingEventTypes.MINUTE_BURN_EXECUTED, + payload, + topic: TradingTopics.BURNS, + key: 'minute-burn', + }); + + this.logger.debug(`Published MinuteBurnExecuted event: ${burnAmount.toFixed(8)}`); + } catch (error) { + this.logger.error(`Failed to publish MinuteBurnExecuted event: ${error}`); + } + } + + /** + * 发布卖出销毁事件 + */ + private async publishSellBurnEvent( + burnRecordId: string, + accountSeq: string, + orderNo: string, + burnAmount: Money, + burnMultiplier: Decimal, + remainingTarget: Money, + ): Promise { + try { + const payload: BurnExecutedPayload = { + burnRecordId, + sourceType: 'SELL', + sourceAccountSeq: accountSeq, + sourceOrderNo: orderNo, + burnAmount: burnAmount.toString(), + burnMultiplier: burnMultiplier.toString(), + remainingTarget: remainingTarget.toString(), + executedAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'BurnRecord', + aggregateId: burnRecordId, + eventType: TradingEventTypes.BURN_EXECUTED, + payload, + topic: TradingTopics.BURNS, + key: accountSeq, + }); + + this.logger.debug(`Published BurnExecuted event for account ${accountSeq}`); + } catch (error) { + this.logger.error(`Failed to publish BurnExecuted event: ${error}`); + } + } +} diff --git a/backend/services/trading-service/src/application/services/order.service.ts b/backend/services/trading-service/src/application/services/order.service.ts index 06cc7678..7633ebd9 100644 --- a/backend/services/trading-service/src/application/services/order.service.ts +++ b/backend/services/trading-service/src/application/services/order.service.ts @@ -1,12 +1,23 @@ import { Injectable, Logger } from '@nestjs/common'; import { OrderRepository } from '../../infrastructure/persistence/repositories/order.repository'; import { TradingAccountRepository } from '../../infrastructure/persistence/repositories/trading-account.repository'; +import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; import { OrderAggregate, OrderType, OrderStatus } from '../../domain/aggregates/order.aggregate'; import { TradingAccountAggregate } from '../../domain/aggregates/trading-account.aggregate'; import { MatchingEngineService } from '../../domain/services/matching-engine.service'; import { Money } from '../../domain/value-objects/money.vo'; +import { BurnService } from './burn.service'; +import { PriceService } from './price.service'; +import { + TradingEventTypes, + TradingTopics, + OrderCreatedPayload, + OrderCancelledPayload, + TradeExecutedPayload, +} from '../../domain/events/trading.events'; @Injectable() export class OrderService { @@ -16,8 +27,12 @@ export class OrderService { constructor( private readonly orderRepository: OrderRepository, private readonly accountRepository: TradingAccountRepository, + private readonly circulationPoolRepository: CirculationPoolRepository, + private readonly outboxRepository: OutboxRepository, private readonly prisma: PrismaService, private readonly redis: RedisService, + private readonly burnService: BurnService, + private readonly priceService: PriceService, ) {} async createOrder( @@ -70,6 +85,9 @@ export class OrderService { const orderId = await this.orderRepository.save(order); await this.accountRepository.save(account); + // 发布订单创建事件 + await this.publishOrderCreatedEvent(orderId, order); + // 尝试撮合 await this.tryMatch(order); @@ -113,6 +131,9 @@ export class OrderService { await this.orderRepository.save(order); await this.accountRepository.save(account); + + // 发布订单取消事件 + await this.publishOrderCancelledEvent(order); } private async tryMatch(incomingOrder: OrderAggregate): Promise { @@ -126,7 +147,36 @@ export class OrderService { const matches = this.matchingEngine.findMatchingOrders(incomingOrder, orderBook); for (const match of matches) { - // 保存成交记录 + const tradeQuantity = match.trade.quantity; + let burnQuantity = Money.zero(); + let effectiveQuantity = tradeQuantity; + + // 如果是卖出成交,执行销毁逻辑 + // 卖出的销毁量 = 卖出积分股 × 倍数 + // 卖出交易额 = (卖出量 + 卖出销毁量) × 积分股价 + if (match.sellOrder) { + try { + const burnResult = await this.burnService.executeSellBurn( + tradeQuantity, + match.sellOrder.accountSequence, + match.sellOrder.orderNo, + ); + burnQuantity = burnResult.burnQuantity; + effectiveQuantity = new Money(tradeQuantity.value.plus(burnQuantity.value)); + + this.logger.log( + `Sell burn executed: sellQty=${tradeQuantity.toFixed(8)}, ` + + `burnQty=${burnQuantity.toFixed(8)}, effectiveQty=${effectiveQuantity.toFixed(8)}`, + ); + } catch (error) { + this.logger.warn(`Sell burn failed, continuing without burn: ${error}`); + } + } + + // 计算交易额 = 有效数量 × 价格 + const tradeAmount = new Money(effectiveQuantity.value.times(match.trade.price.value)); + + // 保存成交记录(包含销毁信息) await this.prisma.trade.create({ data: { tradeNo: match.trade.tradeNo, @@ -135,31 +185,58 @@ export class OrderService { buyerSequence: match.buyOrder.accountSequence, sellerSequence: match.sellOrder.accountSequence, price: match.trade.price.value, - quantity: match.trade.quantity.value, - amount: match.trade.amount.value, + quantity: tradeQuantity.value, + burnQuantity: burnQuantity.value, + effectiveQty: effectiveQuantity.value, + amount: tradeAmount.value, }, }); - // 更新订单 + // 卖出的积分股进入流通池 + try { + await this.circulationPoolRepository.addSharesFromSell( + tradeQuantity, + match.sellOrder.accountSequence, + match.sellOrder.id!, + `卖出成交, 交易号${match.trade.tradeNo}`, + ); + } catch (error) { + this.logger.warn(`Failed to add shares to circulation pool: ${error}`); + } + + // 更新订单(包含销毁信息) await this.orderRepository.save(match.buyOrder); - await this.orderRepository.save(match.sellOrder); + await this.orderRepository.saveWithBurnInfo(match.sellOrder, burnQuantity, effectiveQuantity); // 更新买方账户 const buyerAccount = await this.accountRepository.findByAccountSequence(match.buyOrder.accountSequence); if (buyerAccount) { - buyerAccount.executeBuy(match.trade.quantity, match.trade.amount, match.trade.tradeNo); + buyerAccount.executeBuy(tradeQuantity, tradeAmount, match.trade.tradeNo); await this.accountRepository.save(buyerAccount); } - // 更新卖方账户 + // 更新卖方账户(获得的是有效交易额) const sellerAccount = await this.accountRepository.findByAccountSequence(match.sellOrder.accountSequence); if (sellerAccount) { - sellerAccount.executeSell(match.trade.quantity, match.trade.amount, match.trade.tradeNo); + sellerAccount.executeSell(tradeQuantity, tradeAmount, match.trade.tradeNo); await this.accountRepository.save(sellerAccount); } this.logger.log( - `Trade executed: ${match.trade.tradeNo}, price=${match.trade.price}, qty=${match.trade.quantity}`, + `Trade executed: ${match.trade.tradeNo}, price=${match.trade.price.toFixed(8)}, ` + + `qty=${tradeQuantity.toFixed(8)}, burn=${burnQuantity.toFixed(8)}, amount=${tradeAmount.toFixed(8)}`, + ); + + // 发布成交事件 + await this.publishTradeExecutedEvent( + match.trade.tradeNo, + match.buyOrder, + match.sellOrder, + match.trade.price, + tradeQuantity, + tradeAmount, + burnQuantity, + effectiveQuantity, ); } } finally { @@ -172,4 +249,116 @@ export class OrderService { const random = Math.random().toString(36).substring(2, 8); return `O${timestamp}${random}`.toUpperCase(); } + + // ==================== 事件发布方法 ==================== + + /** + * 发布订单创建事件 + */ + private async publishOrderCreatedEvent(orderId: string, order: OrderAggregate): Promise { + try { + const payload: OrderCreatedPayload = { + orderId, + orderNo: order.orderNo, + accountSequence: order.accountSequence, + type: order.type, + price: order.price.toString(), + quantity: order.quantity.toString(), + createdAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'Order', + aggregateId: orderId, + eventType: TradingEventTypes.ORDER_CREATED, + payload, + topic: TradingTopics.ORDERS, + key: order.accountSequence, + }); + + this.logger.debug(`Published OrderCreated event for order ${order.orderNo}`); + } catch (error) { + this.logger.error(`Failed to publish OrderCreated event: ${error}`); + } + } + + /** + * 发布订单取消事件 + */ + private async publishOrderCancelledEvent(order: OrderAggregate): Promise { + try { + const payload: OrderCancelledPayload = { + orderId: order.id!, + orderNo: order.orderNo, + accountSequence: order.accountSequence, + type: order.type, + cancelledQuantity: order.remainingQuantity.toString(), + cancelledAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'Order', + aggregateId: order.id!, + eventType: TradingEventTypes.ORDER_CANCELLED, + payload, + topic: TradingTopics.ORDERS, + key: order.accountSequence, + }); + + this.logger.debug(`Published OrderCancelled event for order ${order.orderNo}`); + } catch (error) { + this.logger.error(`Failed to publish OrderCancelled event: ${error}`); + } + } + + /** + * 发布成交事件 + */ + private async publishTradeExecutedEvent( + tradeNo: string, + buyOrder: OrderAggregate, + sellOrder: OrderAggregate, + price: Money, + quantity: Money, + amount: Money, + burnQuantity: Money, + effectiveQuantity: Money, + ): Promise { + try { + // 使用 tradeNo 查找刚创建的 trade 获取 id + const trade = await this.prisma.trade.findUnique({ where: { tradeNo } }); + if (!trade) { + this.logger.warn(`Trade not found for event publishing: ${tradeNo}`); + return; + } + + const payload: TradeExecutedPayload = { + tradeId: trade.id, + tradeNo, + buyOrderId: buyOrder.id!, + sellOrderId: sellOrder.id!, + buyerSequence: buyOrder.accountSequence, + sellerSequence: sellOrder.accountSequence, + price: price.toString(), + quantity: quantity.toString(), + amount: amount.toString(), + burnQuantity: burnQuantity.toString(), + effectiveQuantity: effectiveQuantity.toString(), + createdAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'Trade', + aggregateId: trade.id, + eventType: TradingEventTypes.TRADE_EXECUTED, + payload, + topic: TradingTopics.TRADES, + key: tradeNo, + }); + + this.logger.debug(`Published TradeExecuted event for trade ${tradeNo}`); + } catch (error) { + this.logger.error(`Failed to publish TradeExecuted event: ${error}`); + } + } } diff --git a/backend/services/trading-service/src/application/services/price.service.ts b/backend/services/trading-service/src/application/services/price.service.ts new file mode 100644 index 00000000..371da09f --- /dev/null +++ b/backend/services/trading-service/src/application/services/price.service.ts @@ -0,0 +1,229 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { TradingCalculatorService } from '../../domain/services/trading-calculator.service'; +import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository'; +import { SharePoolRepository } from '../../infrastructure/persistence/repositories/share-pool.repository'; +import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository'; +import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository'; +import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository'; +import { Money } from '../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface PriceInfo { + price: string; + greenPoints: string; + blackHoleAmount: string; + circulationPool: string; + effectiveDenominator: string; + burnMultiplier: string; + minuteBurnRate: string; + snapshotTime: Date; +} + +@Injectable() +export class PriceService { + private readonly logger = new Logger(PriceService.name); + private readonly calculator = new TradingCalculatorService(); + + constructor( + private readonly blackHoleRepository: BlackHoleRepository, + private readonly sharePoolRepository: SharePoolRepository, + private readonly circulationPoolRepository: CirculationPoolRepository, + private readonly priceSnapshotRepository: PriceSnapshotRepository, + private readonly tradingConfigRepository: TradingConfigRepository, + ) {} + + /** + * 获取当前价格信息 + */ + async getCurrentPrice(): Promise { + const [sharePool, blackHole, circulationPool, config] = await Promise.all([ + this.sharePoolRepository.getPool(), + this.blackHoleRepository.getBlackHole(), + this.circulationPoolRepository.getPool(), + this.tradingConfigRepository.getConfig(), + ]); + + const greenPoints = sharePool?.greenPoints || Money.zero(); + const blackHoleAmount = blackHole?.totalBurned || Money.zero(); + const circulationPoolAmount = circulationPool?.totalShares || Money.zero(); + + // 计算价格 + const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount); + + // 计算有效分母 + const effectiveDenominator = this.calculator.calculateEffectiveDenominator( + blackHoleAmount, + circulationPoolAmount, + ); + + // 计算销毁倍数 + const burnMultiplier = this.calculator.calculateSellBurnMultiplier( + blackHoleAmount, + circulationPoolAmount, + ); + + // 获取当前每分钟销毁率 + const minuteBurnRate = config?.minuteBurnRate || Money.zero(); + + return { + price: price.toFixed(18), + greenPoints: greenPoints.toFixed(8), + blackHoleAmount: blackHoleAmount.toFixed(8), + circulationPool: circulationPoolAmount.toFixed(8), + effectiveDenominator: effectiveDenominator.toFixed(8), + burnMultiplier: burnMultiplier.toFixed(18), + minuteBurnRate: minuteBurnRate.toFixed(18), + snapshotTime: new Date(), + }; + } + + /** + * 获取当前销毁倍数 + */ + async getCurrentBurnMultiplier(): Promise { + const [blackHole, circulationPool] = await Promise.all([ + this.blackHoleRepository.getBlackHole(), + this.circulationPoolRepository.getPool(), + ]); + + const blackHoleAmount = blackHole?.totalBurned || Money.zero(); + const circulationPoolAmount = circulationPool?.totalShares || Money.zero(); + + return this.calculator.calculateSellBurnMultiplier(blackHoleAmount, circulationPoolAmount); + } + + /** + * 计算卖出销毁量 + */ + async calculateSellBurn(sellQuantity: Money): Promise<{ + burnQuantity: Money; + burnMultiplier: Decimal; + effectiveQuantity: Money; + }> { + const burnMultiplier = await this.getCurrentBurnMultiplier(); + const burnQuantity = this.calculator.calculateSellBurnAmount(sellQuantity, burnMultiplier); + const effectiveQuantity = new Money(sellQuantity.value.plus(burnQuantity.value)); + + return { + burnQuantity, + burnMultiplier, + effectiveQuantity, + }; + } + + /** + * 计算卖出交易额 + */ + async calculateSellAmount(sellQuantity: Money): Promise<{ + amount: Money; + burnQuantity: Money; + effectiveQuantity: Money; + price: Money; + }> { + const priceInfo = await this.getCurrentPrice(); + const price = new Money(priceInfo.price); + + const { burnQuantity, effectiveQuantity } = await this.calculateSellBurn(sellQuantity); + + const amount = this.calculator.calculateSellAmount(sellQuantity, burnQuantity, price); + + return { + amount, + burnQuantity, + effectiveQuantity, + price, + }; + } + + /** + * 创建价格快照 + */ + async createSnapshot(): Promise { + try { + const [sharePool, blackHole, circulationPool, config] = await Promise.all([ + this.sharePoolRepository.getPool(), + this.blackHoleRepository.getBlackHole(), + this.circulationPoolRepository.getPool(), + this.tradingConfigRepository.getConfig(), + ]); + + const greenPoints = sharePool?.greenPoints || Money.zero(); + const blackHoleAmount = blackHole?.totalBurned || Money.zero(); + const circulationPoolAmount = circulationPool?.totalShares || Money.zero(); + + const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount); + const effectiveDenominator = this.calculator.calculateEffectiveDenominator( + blackHoleAmount, + circulationPoolAmount, + ); + const minuteBurnRate = config?.minuteBurnRate || Money.zero(); + + const snapshotTime = new Date(); + snapshotTime.setSeconds(0, 0); + + await this.priceSnapshotRepository.createSnapshot({ + snapshotTime, + price, + greenPoints, + blackHoleAmount, + circulationPool: circulationPoolAmount, + effectiveDenominator, + minuteBurnRate, + }); + + this.logger.debug(`Price snapshot created: ${price.toFixed(18)}`); + } catch (error) { + this.logger.error('Failed to create price snapshot', error); + } + } + + /** + * 获取价格历史 + */ + async getPriceHistory( + startTime: Date, + endTime: Date, + limit: number = 1440, + ): Promise< + Array<{ + time: Date; + price: string; + greenPoints: string; + blackHoleAmount: string; + circulationPool: string; + }> + > { + const snapshots = await this.priceSnapshotRepository.getPriceHistory(startTime, endTime, limit); + + return snapshots.map((s) => ({ + time: s.snapshotTime, + price: s.price.toFixed(18), + greenPoints: s.greenPoints.toFixed(8), + blackHoleAmount: s.blackHoleAmount.toFixed(8), + circulationPool: s.circulationPool.toFixed(8), + })); + } + + /** + * 获取最新价格快照 + */ + async getLatestSnapshot(): Promise { + const snapshot = await this.priceSnapshotRepository.getLatestSnapshot(); + if (!snapshot) { + return null; + } + + const burnMultiplier = await this.getCurrentBurnMultiplier(); + + return { + price: snapshot.price.toFixed(18), + greenPoints: snapshot.greenPoints.toFixed(8), + blackHoleAmount: snapshot.blackHoleAmount.toFixed(8), + circulationPool: snapshot.circulationPool.toFixed(8), + effectiveDenominator: snapshot.effectiveDenominator.toFixed(8), + burnMultiplier: burnMultiplier.toFixed(18), + minuteBurnRate: snapshot.minuteBurnRate.toFixed(18), + snapshotTime: snapshot.snapshotTime, + }; + } +} diff --git a/backend/services/trading-service/src/domain/events/index.ts b/backend/services/trading-service/src/domain/events/index.ts new file mode 100644 index 00000000..6bbcfe01 --- /dev/null +++ b/backend/services/trading-service/src/domain/events/index.ts @@ -0,0 +1,2 @@ +// Trading Service Event Types +export * from './trading.events'; diff --git a/backend/services/trading-service/src/domain/events/trading.events.ts b/backend/services/trading-service/src/domain/events/trading.events.ts new file mode 100644 index 00000000..260c5a81 --- /dev/null +++ b/backend/services/trading-service/src/domain/events/trading.events.ts @@ -0,0 +1,224 @@ +/** + * Trading Service 事件定义 + * 这些事件通过 Outbox 模式发布到 Kafka + */ + +// ==================== 事件类型常量 ==================== + +export const TradingEventTypes = { + // 订单事件 + ORDER_CREATED: 'order.created', + ORDER_CANCELLED: 'order.cancelled', + ORDER_COMPLETED: 'order.completed', + + // 成交事件 + TRADE_EXECUTED: 'trade.executed', + + // 转账事件 + TRANSFER_INITIATED: 'transfer.initiated', + TRANSFER_COMPLETED: 'transfer.completed', + TRANSFER_FAILED: 'transfer.failed', + + // 销毁事件 + BURN_EXECUTED: 'burn.executed', + MINUTE_BURN_EXECUTED: 'burn.minute-executed', + + // 价格事件 + PRICE_UPDATED: 'price.updated', + + // 账户事件 + TRADING_ACCOUNT_CREATED: 'trading-account.created', +} as const; + +export type TradingEventType = + (typeof TradingEventTypes)[keyof typeof TradingEventTypes]; + +// ==================== Kafka Topic 常量 ==================== + +export const TradingTopics = { + ORDERS: 'trading.orders', + TRADES: 'trading.trades', + TRANSFERS: 'trading.transfers', + BURNS: 'trading.burns', + PRICES: 'trading.prices', + ACCOUNTS: 'trading.accounts', +} as const; + +// ==================== 事件 Payload 类型 ==================== + +/** + * 订单创建事件 + */ +export interface OrderCreatedPayload { + orderId: string; + orderNo: string; + accountSequence: string; + type: 'BUY' | 'SELL'; + price: string; + quantity: string; + createdAt: string; +} + +/** + * 订单取消事件 + */ +export interface OrderCancelledPayload { + orderId: string; + orderNo: string; + accountSequence: string; + type: 'BUY' | 'SELL'; + cancelledQuantity: string; + cancelledAt: string; +} + +/** + * 订单完成事件 + */ +export interface OrderCompletedPayload { + orderId: string; + orderNo: string; + accountSequence: string; + type: 'BUY' | 'SELL'; + filledQuantity: string; + averagePrice: string; + totalAmount: string; + completedAt: string; +} + +/** + * 成交事件 + */ +export interface TradeExecutedPayload { + tradeId: string; + tradeNo: string; + buyOrderId: string; + sellOrderId: string; + buyerSequence: string; + sellerSequence: string; + price: string; + quantity: string; + amount: string; + burnQuantity: string; + effectiveQuantity: string; + createdAt: string; +} + +/** + * 转账发起事件 + */ +export interface TransferInitiatedPayload { + transferId: string; + transferNo: string; + accountSequence: string; + direction: 'IN' | 'OUT'; + amount: string; + initiatedAt: string; +} + +/** + * 转账完成事件 + */ +export interface TransferCompletedPayload { + transferId: string; + transferNo: string; + accountSequence: string; + direction: 'IN' | 'OUT'; + amount: string; + miningTxId?: string; + completedAt: string; +} + +/** + * 转账失败事件 + */ +export interface TransferFailedPayload { + transferId: string; + transferNo: string; + accountSequence: string; + direction: 'IN' | 'OUT'; + amount: string; + errorMessage: string; + failedAt: string; +} + +/** + * 销毁执行事件(卖出触发) + */ +export interface BurnExecutedPayload { + burnRecordId: string; + sourceType: 'SELL' | 'SCHEDULED'; + sourceAccountSeq?: string; + sourceOrderNo?: string; + burnAmount: string; + burnMultiplier?: string; + remainingTarget: string; + executedAt: string; +} + +/** + * 每分钟定时销毁事件 + */ +export interface MinuteBurnExecutedPayload { + burnRecordId: string; + burnMinute: string; + burnAmount: string; + totalBurned: string; + remainingTarget: string; + executedAt: string; +} + +/** + * 价格更新事件 + */ +export interface PriceUpdatedPayload { + snapshotId: string; + price: string; + greenPoints: string; + blackHoleAmount: string; + circulationPool: string; + effectiveDenominator: string; + minuteBurnRate: string; + snapshotTime: string; +} + +/** + * 交易账户创建事件 + */ +export interface TradingAccountCreatedPayload { + accountId: string; + accountSequence: string; + createdAt: string; +} + +// ==================== 事件基类 ==================== + +export interface TradingEvent { + eventId: string; + eventType: TradingEventType; + aggregateType: string; + aggregateId: string; + payload: T; + timestamp: string; + version: number; +} + +// ==================== 辅助函数 ==================== + +/** + * 创建标准事件结构 + */ +export function createTradingEvent( + eventType: TradingEventType, + aggregateType: string, + aggregateId: string, + payload: T, +): Omit, 'eventId'> { + return { + eventType, + aggregateType, + aggregateId, + payload, + timestamp: new Date().toISOString(), + version: 1, + }; +} diff --git a/backend/services/trading-service/src/domain/services/trading-calculator.service.ts b/backend/services/trading-service/src/domain/services/trading-calculator.service.ts new file mode 100644 index 00000000..2cfed8eb --- /dev/null +++ b/backend/services/trading-service/src/domain/services/trading-calculator.service.ts @@ -0,0 +1,241 @@ +import Decimal from 'decimal.js'; +import { Money } from '../value-objects/money.vo'; + +/** + * 交易计算领域服务 + * + * 核心公式: + * 1. 每分钟销毁量 = 100亿 ÷ (365×4×1440) = 4756.468797564687 进黑洞 + * 2. 积分股价格 = 积分股池的绿积分 ÷ (100.02亿积分股 - 黑洞积分股 - 流通池积分股) + * 3. 卖出销毁倍数 = (100亿积分股 - 黑洞销毁量) ÷ (200万 - 流通池量) + * 4. 卖出销毁量 = 卖出积分股 × 倍数 + * 5. 卖出交易额 = (卖出量 + 卖出销毁量) × 积分股价 + * 6. 资产显示 = (账户积分股 + 账户积分股 × 倍数) × 积分股价 + * 7. 资产每秒增长量 = 用户每天分配的积分股 ÷ 24 ÷ 60 ÷ 60 + */ +export class TradingCalculatorService { + // 总积分股数量: 100.02B + static readonly TOTAL_SHARES = new Decimal('100020000000'); + + // 目标销毁量: 100亿 (4年销毁完) + static readonly BURN_TARGET = new Decimal('10000000000'); + + // 销毁周期: 4年的分钟数 + static readonly BURN_PERIOD_MINUTES = 365 * 4 * 1440; // 2102400 + + // 流通池目标量: 200万 + static readonly CIRCULATION_POOL_TARGET = new Decimal('2000000'); + + // 基础每分钟销毁量: 100亿 ÷ (365×4×1440) + static readonly BASE_MINUTE_BURN_RATE = TradingCalculatorService.BURN_TARGET.dividedBy( + TradingCalculatorService.BURN_PERIOD_MINUTES, + ); + + /** + * 计算积分股价格 + * 价格 = 绿积分(股池) ÷ (总积分股 - 黑洞积分股 - 流通池积分股) + * + * @param greenPoints 积分股池的绿积分(分子) + * @param blackHoleAmount 黑洞积分股数量 + * @param circulationPoolAmount 流通池积分股数量 + * @returns 价格 + */ + calculatePrice( + greenPoints: Money, + blackHoleAmount: Money, + circulationPoolAmount: Money, + ): Money { + // 有效分母 = 100.02B - 黑洞 - 流通池 + const effectiveDenominator = TradingCalculatorService.TOTAL_SHARES + .minus(blackHoleAmount.value) + .minus(circulationPoolAmount.value); + + if (effectiveDenominator.isZero() || effectiveDenominator.isNegative()) { + return Money.zero(); + } + + // 价格 = 绿积分 / 有效分母 + const price = greenPoints.value.dividedBy(effectiveDenominator); + return new Money(price); + } + + /** + * 计算有效分母 + * 有效分母 = 总积分股 - 黑洞积分股 - 流通池积分股 + */ + calculateEffectiveDenominator( + blackHoleAmount: Money, + circulationPoolAmount: Money, + ): Money { + const denominator = TradingCalculatorService.TOTAL_SHARES + .minus(blackHoleAmount.value) + .minus(circulationPoolAmount.value); + + if (denominator.isNegative()) { + return Money.zero(); + } + + return new Money(denominator); + } + + /** + * 计算卖出销毁倍数 + * 倍数 = (100亿积分股 - 黑洞销毁量) ÷ (200万 - 流通池量) + * + * 目的:确保价格不会因为卖出而下跌 + * + * @param blackHoleAmount 当前黑洞销毁总量 + * @param circulationPoolAmount 当前流通池量 + * @returns 销毁倍数 + */ + calculateSellBurnMultiplier( + blackHoleAmount: Money, + circulationPoolAmount: Money, + ): Decimal { + // 分子 = 100亿 - 黑洞销毁量 + const numerator = TradingCalculatorService.BURN_TARGET.minus(blackHoleAmount.value); + + // 分母 = 200万 - 流通池量 + const denominator = TradingCalculatorService.CIRCULATION_POOL_TARGET.minus( + circulationPoolAmount.value, + ); + + // 防止除以零或负数 + if (denominator.isZero() || denominator.isNegative()) { + // 当流通池已满时,销毁倍数设为最大合理值 + return new Decimal('5'); // 或其他业务定义的最大倍数 + } + + if (numerator.isNegative()) { + // 当黑洞已满时,不再销毁 + return new Decimal('0'); + } + + return numerator.dividedBy(denominator); + } + + /** + * 计算卖出销毁量 + * 卖出销毁量 = 卖出积分股 × 倍数 + * + * @param sellQuantity 卖出的积分股数量 + * @param burnMultiplier 销毁倍数 + * @returns 需要销毁的数量 + */ + calculateSellBurnAmount(sellQuantity: Money, burnMultiplier: Decimal): Money { + const burnAmount = sellQuantity.value.times(burnMultiplier); + return new Money(burnAmount); + } + + /** + * 计算卖出交易额 + * 卖出交易额 = (卖出量 + 卖出销毁量) × 积分股价 + * + * @param sellQuantity 卖出的积分股数量 + * @param burnQuantity 销毁的数量 + * @param price 当前价格 + * @returns 交易额 + */ + calculateSellAmount(sellQuantity: Money, burnQuantity: Money, price: Money): Money { + const effectiveQuantity = sellQuantity.value.plus(burnQuantity.value); + const amount = effectiveQuantity.times(price.value); + return new Money(amount); + } + + /** + * 计算资产显示值 + * 资产显示 = (账户积分股 + 账户积分股 × 倍数) × 积分股价 + * + * @param shareBalance 账户积分股余额 + * @param burnMultiplier 当前销毁倍数 + * @param price 当前价格 + * @returns 显示的资产价值 + */ + calculateDisplayAssetValue( + shareBalance: Money, + burnMultiplier: Decimal, + price: Money, + ): Money { + // 有效积分股 = 余额 + 余额 × 倍数 = 余额 × (1 + 倍数) + const multiplierFactor = new Decimal(1).plus(burnMultiplier); + const effectiveShares = shareBalance.value.times(multiplierFactor); + const assetValue = effectiveShares.times(price.value); + return new Money(assetValue); + } + + /** + * 计算资产每秒增长量 + * 资产每秒增长量 = 用户每天分配的积分股 ÷ 24 ÷ 60 ÷ 60 + * + * @param dailyAllocation 用户每天分配的积分股 + * @returns 每秒增长量 + */ + calculateAssetGrowthPerSecond(dailyAllocation: Money): Money { + const secondsPerDay = 24 * 60 * 60; // 86400 + const perSecond = dailyAllocation.value.dividedBy(secondsPerDay); + return new Money(perSecond); + } + + /** + * 计算每分钟销毁量 + * 每次卖出后需要重新计算:(100亿 - 黑洞总量) ÷ 剩余分钟 + * + * @param blackHoleAmount 当前黑洞销毁总量 + * @param remainingMinutes 剩余分钟数 + * @returns 每分钟销毁量 + */ + calculateMinuteBurnRate(blackHoleAmount: Money, remainingMinutes: number): Money { + if (remainingMinutes <= 0) { + return Money.zero(); + } + + // 剩余需要销毁的量 = 100亿 - 已销毁量 + const remainingBurn = TradingCalculatorService.BURN_TARGET.minus(blackHoleAmount.value); + + if (remainingBurn.isZero() || remainingBurn.isNegative()) { + return Money.zero(); + } + + const minuteRate = remainingBurn.dividedBy(remainingMinutes); + return new Money(minuteRate); + } + + /** + * 计算剩余销毁分钟数 + * + * @param activatedAt 激活时间 + * @returns 剩余分钟数 + */ + calculateRemainingMinutes(activatedAt: Date): number { + const now = new Date(); + const elapsedMs = now.getTime() - activatedAt.getTime(); + const elapsedMinutes = Math.floor(elapsedMs / (60 * 1000)); + return Math.max(0, TradingCalculatorService.BURN_PERIOD_MINUTES - elapsedMinutes); + } + + /** + * 计算卖出后的新价格(验证用) + * 确保卖出后价格不下跌 + * + * @param currentPrice 当前价格 + * @param sellQuantity 卖出数量 + * @param burnQuantity 销毁数量 + * @param greenPoints 绿积分 + * @param blackHoleAmount 黑洞数量 + * @param circulationPoolAmount 流通池数量 + * @returns 卖出后的新价格 + */ + calculatePriceAfterSell( + greenPoints: Money, + blackHoleAmount: Money, + circulationPoolAmount: Money, + sellQuantity: Money, + burnQuantity: Money, + ): Money { + // 卖出后:流通池增加sellQuantity,黑洞增加burnQuantity + const newBlackHole = blackHoleAmount.add(burnQuantity); + const newCirculation = circulationPoolAmount.add(sellQuantity); + + return this.calculatePrice(greenPoints, newBlackHole, newCirculation); + } +} diff --git a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts index 74f4fbe8..106b093e 100644 --- a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts @@ -5,8 +5,15 @@ import { PrismaModule } from './persistence/prisma/prisma.module'; import { TradingAccountRepository } from './persistence/repositories/trading-account.repository'; import { OrderRepository } from './persistence/repositories/order.repository'; import { OutboxRepository } from './persistence/repositories/outbox.repository'; +import { TradingConfigRepository } from './persistence/repositories/trading-config.repository'; +import { BlackHoleRepository } from './persistence/repositories/black-hole.repository'; +import { SharePoolRepository } from './persistence/repositories/share-pool.repository'; +import { CirculationPoolRepository } from './persistence/repositories/circulation-pool.repository'; +import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository'; +import { ProcessedEventRepository } from './persistence/repositories/processed-event.repository'; import { RedisService } from './redis/redis.service'; import { KafkaProducerService } from './kafka/kafka-producer.service'; +import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer'; @Global() @Module({ @@ -32,10 +39,17 @@ import { KafkaProducerService } from './kafka/kafka-producer.service'; }, ]), ], + controllers: [UserRegisteredConsumer], providers: [ TradingAccountRepository, OrderRepository, OutboxRepository, + TradingConfigRepository, + BlackHoleRepository, + SharePoolRepository, + CirculationPoolRepository, + PriceSnapshotRepository, + ProcessedEventRepository, KafkaProducerService, { provide: 'REDIS_OPTIONS', @@ -53,6 +67,12 @@ import { KafkaProducerService } from './kafka/kafka-producer.service'; TradingAccountRepository, OrderRepository, OutboxRepository, + TradingConfigRepository, + BlackHoleRepository, + SharePoolRepository, + CirculationPoolRepository, + PriceSnapshotRepository, + ProcessedEventRepository, KafkaProducerService, RedisService, ClientsModule, diff --git a/backend/services/trading-service/src/infrastructure/kafka/consumers/index.ts b/backend/services/trading-service/src/infrastructure/kafka/consumers/index.ts new file mode 100644 index 00000000..1c4bbb50 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/consumers/index.ts @@ -0,0 +1 @@ +export * from './user-registered.consumer'; diff --git a/backend/services/trading-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts b/backend/services/trading-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts new file mode 100644 index 00000000..4fd7f5bd --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts @@ -0,0 +1,189 @@ +import { Controller, Logger, OnModuleInit } from '@nestjs/common'; +import { EventPattern, Payload } from '@nestjs/microservices'; +import { RedisService } from '../../redis/redis.service'; +import { TradingAccountRepository } from '../../persistence/repositories/trading-account.repository'; +import { OutboxRepository } from '../../persistence/repositories/outbox.repository'; +import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; +import { TradingAccountAggregate } from '../../../domain/aggregates/trading-account.aggregate'; +import { + TradingEventTypes, + TradingTopics, + TradingAccountCreatedPayload, +} from '../../../domain/events/trading.events'; + +// 用户注册事件结构(来自 auth-service) +interface UserRegisteredEvent { + eventId: string; + eventType: string; + payload: { + accountSequence: string; + phone: string; + source: 'V1' | 'V2'; + registeredAt: string; + }; +} + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +@Controller() +export class UserRegisteredConsumer implements OnModuleInit { + private readonly logger = new Logger(UserRegisteredConsumer.name); + + constructor( + private readonly redis: RedisService, + private readonly tradingAccountRepository: TradingAccountRepository, + private readonly outboxRepository: OutboxRepository, + private readonly processedEventRepository: ProcessedEventRepository, + ) {} + + async onModuleInit() { + this.logger.log('UserRegisteredConsumer initialized - listening for user.registered events'); + } + + @EventPattern('auth.user.registered') + async handleUserRegistered(@Payload() message: any): Promise { + // 解析消息格式 + const event: UserRegisteredEvent = message.value || message; + const eventId = event.eventId || message.eventId; + + if (!eventId) { + this.logger.warn('Received event without eventId, skipping'); + return; + } + + const accountSequence = event.payload?.accountSequence; + if (!accountSequence) { + this.logger.warn(`Event ${eventId} missing accountSequence, skipping`); + return; + } + + this.logger.debug( + `Processing user registered event: ${eventId}, accountSequence: ${accountSequence}`, + ); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + // 检查账户是否已存在 + const existingAccount = await this.tradingAccountRepository.findByAccountSequence( + accountSequence, + ); + + if (existingAccount) { + this.logger.debug(`Trading account ${accountSequence} already exists`); + await this.markEventProcessed(eventId); + return; + } + + // 创建交易账户 + const account = TradingAccountAggregate.create(accountSequence); + const accountId = await this.tradingAccountRepository.save(account); + + // 发布交易账户创建事件 + await this.publishAccountCreatedEvent(accountId, accountSequence); + + // 标记为已处理 + await this.markEventProcessed(eventId); + + this.logger.log( + `Trading account created for user ${accountSequence}, source: ${event.payload.source}`, + ); + } catch (error) { + // 如果是重复创建的唯一约束错误,忽略 + if (error instanceof Error && error.message.includes('Unique constraint')) { + this.logger.debug( + `Trading account already exists for ${accountSequence}, marking as processed`, + ); + await this.markEventProcessed(eventId); + return; + } + + this.logger.error( + `Failed to create trading account for ${accountSequence}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 幂等性检查 - Redis + DB 双重检查 + * 1. 先检查 Redis 缓存(快速路径) + * 2. Redis 未命中则检查数据库(持久化保障) + */ + private async isEventProcessed(eventId: string): Promise { + const redisKey = `trading:processed-event:${eventId}`; + + // 1. 先检查 Redis 缓存(快速路径) + const cached = await this.redis.get(redisKey); + if (cached) return true; + + // 2. 检查数据库(Redis 可能过期或重启后丢失) + const dbRecord = await this.processedEventRepository.findByEventId(eventId); + if (dbRecord) { + // 回填 Redis 缓存 + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + /** + * 标记事件为已处理 - Redis + DB 双写 + */ + private async markEventProcessed(eventId: string, eventType: string = 'user.registered'): Promise { + const redisKey = `trading:processed-event:${eventId}`; + + // 1. 写入数据库(持久化) + try { + await this.processedEventRepository.create({ + eventId, + eventType, + sourceService: 'auth-service', + }); + } catch (error) { + // 可能已存在(并发情况),忽略唯一约束错误 + if (!(error instanceof Error && error.message.includes('Unique constraint'))) { + throw error; + } + } + + // 2. 写入 Redis 缓存(4小时 TTL) + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } + + /** + * 发布交易账户创建事件 + */ + private async publishAccountCreatedEvent( + accountId: string, + accountSequence: string, + ): Promise { + try { + const payload: TradingAccountCreatedPayload = { + accountId, + accountSequence, + createdAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'TradingAccount', + aggregateId: accountId, + eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED, + payload, + topic: TradingTopics.ACCOUNTS, + key: accountSequence, + }); + + this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`); + } catch (error) { + this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`); + } + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/black-hole.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/black-hole.repository.ts new file mode 100644 index 00000000..32e25ab3 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/black-hole.repository.ts @@ -0,0 +1,190 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Money } from '../../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface BlackHoleEntity { + id: string; + totalBurned: Money; + targetBurn: Money; + remainingBurn: Money; + lastBurnMinute: Date | null; +} + +export interface BurnRecordEntity { + id: string; + blackHoleId: string; + burnMinute: Date; + burnAmount: Money; + remainingTarget: Money; + sourceType: string | null; + sourceAccountSeq: string | null; + sourceOrderNo: string | null; + memo: string | null; + createdAt: Date; +} + +export type BurnSourceType = 'MINUTE_BURN' | 'SELL_BURN'; + +@Injectable() +export class BlackHoleRepository { + constructor(private readonly prisma: PrismaService) {} + + async getBlackHole(): Promise { + const record = await this.prisma.blackHole.findFirst(); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async initializeBlackHole(targetBurn: Money): Promise { + const existing = await this.prisma.blackHole.findFirst(); + if (existing) { + return this.toDomain(existing); + } + + const record = await this.prisma.blackHole.create({ + data: { + totalBurned: 0, + targetBurn: targetBurn.value, + remainingBurn: targetBurn.value, + }, + }); + + return this.toDomain(record); + } + + /** + * 记录销毁(每分钟自动销毁) + */ + async recordMinuteBurn(burnMinute: Date, burnAmount: Money): Promise { + return this.recordBurn(burnMinute, burnAmount, 'MINUTE_BURN'); + } + + /** + * 记录销毁(卖出销毁) + */ + async recordSellBurn( + burnMinute: Date, + burnAmount: Money, + accountSeq: string, + orderNo: string, + ): Promise { + return this.recordBurn(burnMinute, burnAmount, 'SELL_BURN', accountSeq, orderNo); + } + + /** + * 记录销毁通用方法 + */ + private async recordBurn( + burnMinute: Date, + burnAmount: Money, + sourceType: BurnSourceType, + sourceAccountSeq?: string, + sourceOrderNo?: string, + ): Promise { + const blackHole = await this.prisma.blackHole.findFirst(); + if (!blackHole) { + throw new Error('Black hole not initialized'); + } + + const newTotalBurned = new Decimal(blackHole.totalBurned.toString()).plus(burnAmount.value); + const newRemainingBurn = new Decimal(blackHole.targetBurn.toString()).minus(newTotalBurned); + + const memo = + sourceType === 'MINUTE_BURN' + ? `每分钟自动销毁 ${burnAmount.toFixed(8)}` + : `卖出销毁, 账户[${sourceAccountSeq}], 订单[${sourceOrderNo}], 数量${burnAmount.toFixed(8)}`; + + const [, burnRecord] = await this.prisma.$transaction([ + this.prisma.blackHole.update({ + where: { id: blackHole.id }, + data: { + totalBurned: newTotalBurned, + remainingBurn: newRemainingBurn.isNegative() ? 0 : newRemainingBurn, + lastBurnMinute: burnMinute, + }, + }), + this.prisma.burnRecord.create({ + data: { + blackHoleId: blackHole.id, + burnMinute, + burnAmount: burnAmount.value, + remainingTarget: newRemainingBurn.isNegative() ? 0 : newRemainingBurn, + sourceType, + sourceAccountSeq, + sourceOrderNo, + memo, + }, + }), + ]); + + return this.toBurnRecordDomain(burnRecord); + } + + async getBurnRecords( + page: number, + pageSize: number, + sourceType?: BurnSourceType, + ): Promise<{ + data: BurnRecordEntity[]; + total: number; + }> { + const where = sourceType ? { sourceType } : {}; + + const [records, total] = await Promise.all([ + this.prisma.burnRecord.findMany({ + where, + orderBy: { burnMinute: 'desc' }, + skip: (page - 1) * pageSize, + take: pageSize, + }), + this.prisma.burnRecord.count({ where }), + ]); + + return { + data: records.map((r) => this.toBurnRecordDomain(r)), + total, + }; + } + + async getTodayBurnAmount(): Promise { + const today = new Date(); + today.setHours(0, 0, 0, 0); + + const result = await this.prisma.burnRecord.aggregate({ + where: { + burnMinute: { gte: today }, + }, + _sum: { burnAmount: true }, + }); + + return new Money(result._sum.burnAmount || 0); + } + + private toDomain(record: any): BlackHoleEntity { + return { + id: record.id, + totalBurned: new Money(record.totalBurned), + targetBurn: new Money(record.targetBurn), + remainingBurn: new Money(record.remainingBurn), + lastBurnMinute: record.lastBurnMinute, + }; + } + + private toBurnRecordDomain(record: any): BurnRecordEntity { + return { + id: record.id, + blackHoleId: record.blackHoleId, + burnMinute: record.burnMinute, + burnAmount: new Money(record.burnAmount), + remainingTarget: new Money(record.remainingTarget), + sourceType: record.sourceType, + sourceAccountSeq: record.sourceAccountSeq, + sourceOrderNo: record.sourceOrderNo, + memo: record.memo, + createdAt: record.createdAt, + }; + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/circulation-pool.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/circulation-pool.repository.ts new file mode 100644 index 00000000..5d62fc4f --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/circulation-pool.repository.ts @@ -0,0 +1,199 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Money } from '../../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface CirculationPoolEntity { + id: string; + totalShares: Money; + totalCash: Money; + totalInflow: Money; + totalOutflow: Money; + createdAt: Date; + updatedAt: Date; +} + +export type CirculationPoolTransactionType = + | 'SHARE_IN' + | 'SHARE_OUT' + | 'CASH_IN' + | 'CASH_OUT' + | 'TRADE_BUY' + | 'TRADE_SELL'; + +@Injectable() +export class CirculationPoolRepository { + constructor(private readonly prisma: PrismaService) {} + + async getPool(): Promise { + const record = await this.prisma.circulationPool.findFirst(); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async initializePool(): Promise { + const existing = await this.prisma.circulationPool.findFirst(); + if (existing) { + return this.toDomain(existing); + } + + const record = await this.prisma.circulationPool.create({ + data: { + totalShares: 0, + totalCash: 0, + totalInflow: 0, + totalOutflow: 0, + }, + }); + + return this.toDomain(record); + } + + /** + * 卖出时积分股进入流通池 + */ + async addSharesFromSell( + amount: Money, + accountSeq: string, + orderId: string, + memo?: string, + ): Promise { + const pool = await this.prisma.circulationPool.findFirst(); + if (!pool) { + throw new Error('Circulation pool not initialized'); + } + + const balanceBefore = new Decimal(pool.totalShares.toString()); + const balanceAfter = balanceBefore.plus(amount.value); + + await this.prisma.$transaction([ + this.prisma.circulationPool.update({ + where: { id: pool.id }, + data: { + totalShares: balanceAfter, + totalInflow: new Decimal(pool.totalInflow.toString()).plus(amount.value), + }, + }), + this.prisma.circulationPoolTransaction.create({ + data: { + poolId: pool.id, + type: 'TRADE_SELL', + assetType: 'SHARE', + amount: amount.value, + balanceBefore, + balanceAfter, + counterpartyType: 'USER', + counterpartyAccountSeq: accountSeq, + referenceId: orderId, + referenceType: 'ORDER', + memo: memo || `卖出积分股进入流通池 ${amount.toFixed(8)}`, + }, + }), + ]); + } + + /** + * 买入时积分股从流通池流出 + */ + async removeSharesForBuy( + amount: Money, + accountSeq: string, + orderId: string, + memo?: string, + ): Promise { + const pool = await this.prisma.circulationPool.findFirst(); + if (!pool) { + throw new Error('Circulation pool not initialized'); + } + + const balanceBefore = new Decimal(pool.totalShares.toString()); + const balanceAfter = balanceBefore.minus(amount.value); + + if (balanceAfter.isNegative()) { + throw new Error('Insufficient shares in circulation pool'); + } + + await this.prisma.$transaction([ + this.prisma.circulationPool.update({ + where: { id: pool.id }, + data: { + totalShares: balanceAfter, + totalOutflow: new Decimal(pool.totalOutflow.toString()).plus(amount.value), + }, + }), + this.prisma.circulationPoolTransaction.create({ + data: { + poolId: pool.id, + type: 'TRADE_BUY', + assetType: 'SHARE', + amount: amount.value, + balanceBefore, + balanceAfter, + counterpartyType: 'USER', + counterpartyAccountSeq: accountSeq, + referenceId: orderId, + referenceType: 'ORDER', + memo: memo || `买入积分股从流通池流出 ${amount.toFixed(8)}`, + }, + }), + ]); + } + + /** + * 获取流通池积分股数量 + */ + async getSharesAmount(): Promise { + const pool = await this.getPool(); + if (!pool) { + return Money.zero(); + } + return pool.totalShares; + } + + async getTransactions( + page: number, + pageSize: number, + ): Promise<{ + data: any[]; + total: number; + }> { + const pool = await this.prisma.circulationPool.findFirst(); + if (!pool) { + return { data: [], total: 0 }; + } + + const [records, total] = await Promise.all([ + this.prisma.circulationPoolTransaction.findMany({ + where: { poolId: pool.id }, + orderBy: { createdAt: 'desc' }, + skip: (page - 1) * pageSize, + take: pageSize, + }), + this.prisma.circulationPoolTransaction.count({ where: { poolId: pool.id } }), + ]); + + return { + data: records.map((r) => ({ + ...r, + amount: r.amount.toString(), + balanceBefore: r.balanceBefore.toString(), + balanceAfter: r.balanceAfter.toString(), + })), + total, + }; + } + + private toDomain(record: any): CirculationPoolEntity { + return { + id: record.id, + totalShares: new Money(record.totalShares), + totalCash: new Money(record.totalCash), + totalInflow: new Money(record.totalInflow), + totalOutflow: new Money(record.totalOutflow), + createdAt: record.createdAt, + updatedAt: record.updatedAt, + }; + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/order.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/order.repository.ts index 2f7bc234..1ddb07d4 100644 --- a/backend/services/trading-service/src/infrastructure/persistence/repositories/order.repository.ts +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/order.repository.ts @@ -47,6 +47,46 @@ export class OrderRepository { } } + /** + * 保存订单并更新销毁信息(用于卖出订单) + */ + async saveWithBurnInfo( + aggregate: OrderAggregate, + burnQuantity: Money, + effectiveQuantity: Money, + ): Promise { + const data = { + orderNo: aggregate.orderNo, + accountSequence: aggregate.accountSequence, + type: aggregate.type, + status: aggregate.status, + price: aggregate.price.value, + quantity: aggregate.quantity.value, + filledQuantity: aggregate.filledQuantity.value, + remainingQuantity: aggregate.remainingQuantity.value, + averagePrice: aggregate.averagePrice.value, + totalAmount: aggregate.totalAmount.value, + burnQuantity: burnQuantity.value, + burnMultiplier: burnQuantity.isZero() + ? 0 + : burnQuantity.value.dividedBy(aggregate.filledQuantity.value), + effectiveQuantity: effectiveQuantity.value, + cancelledAt: aggregate.cancelledAt, + completedAt: aggregate.completedAt, + }; + + if (aggregate.id) { + await this.prisma.order.update({ + where: { id: aggregate.id }, + data, + }); + return aggregate.id; + } else { + const created = await this.prisma.order.create({ data }); + return created.id; + } + } + async findActiveOrders(type?: OrderType): Promise { const where: any = { status: { in: [OrderStatus.PENDING, OrderStatus.PARTIAL] }, diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/price-snapshot.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/price-snapshot.repository.ts new file mode 100644 index 00000000..fc762051 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/price-snapshot.repository.ts @@ -0,0 +1,134 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Money } from '../../../domain/value-objects/money.vo'; + +export interface PriceSnapshotEntity { + id: string; + snapshotTime: Date; + price: Money; + greenPoints: Money; + blackHoleAmount: Money; + circulationPool: Money; + effectiveDenominator: Money; + minuteBurnRate: Money; + createdAt: Date; +} + +@Injectable() +export class PriceSnapshotRepository { + constructor(private readonly prisma: PrismaService) {} + + async getLatestSnapshot(): Promise { + const record = await this.prisma.priceSnapshot.findFirst({ + orderBy: { snapshotTime: 'desc' }, + }); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async getSnapshotAt(time: Date): Promise { + // 获取指定时间之前最近的快照 + const record = await this.prisma.priceSnapshot.findFirst({ + where: { snapshotTime: { lte: time } }, + orderBy: { snapshotTime: 'desc' }, + }); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async createSnapshot(data: { + snapshotTime: Date; + price: Money; + greenPoints: Money; + blackHoleAmount: Money; + circulationPool: Money; + effectiveDenominator: Money; + minuteBurnRate: Money; + }): Promise { + const record = await this.prisma.priceSnapshot.create({ + data: { + snapshotTime: data.snapshotTime, + price: data.price.value, + greenPoints: data.greenPoints.value, + blackHoleAmount: data.blackHoleAmount.value, + circulationPool: data.circulationPool.value, + effectiveDenominator: data.effectiveDenominator.value, + minuteBurnRate: data.minuteBurnRate.value, + }, + }); + return this.toDomain(record); + } + + async getPriceHistory( + startTime: Date, + endTime: Date, + limit: number = 1440, + ): Promise { + const records = await this.prisma.priceSnapshot.findMany({ + where: { + snapshotTime: { + gte: startTime, + lte: endTime, + }, + }, + orderBy: { snapshotTime: 'asc' }, + take: limit, + }); + + return records.map((r) => this.toDomain(r)); + } + + async getSnapshots( + page: number, + pageSize: number, + ): Promise<{ + data: PriceSnapshotEntity[]; + total: number; + }> { + const [records, total] = await Promise.all([ + this.prisma.priceSnapshot.findMany({ + orderBy: { snapshotTime: 'desc' }, + skip: (page - 1) * pageSize, + take: pageSize, + }), + this.prisma.priceSnapshot.count(), + ]); + + return { + data: records.map((r) => this.toDomain(r)), + total, + }; + } + + /** + * 清理旧的价格快照(保留指定天数) + */ + async cleanupOldSnapshots(retentionDays: number): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - retentionDays); + + const result = await this.prisma.priceSnapshot.deleteMany({ + where: { snapshotTime: { lt: cutoffDate } }, + }); + + return result.count; + } + + private toDomain(record: any): PriceSnapshotEntity { + return { + id: record.id, + snapshotTime: record.snapshotTime, + price: new Money(record.price), + greenPoints: new Money(record.greenPoints), + blackHoleAmount: new Money(record.blackHoleAmount), + circulationPool: new Money(record.circulationPool), + effectiveDenominator: new Money(record.effectiveDenominator), + minuteBurnRate: new Money(record.minuteBurnRate), + createdAt: record.createdAt, + }; + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/processed-event.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/processed-event.repository.ts new file mode 100644 index 00000000..bd552317 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/processed-event.repository.ts @@ -0,0 +1,65 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; + +export interface ProcessedEventEntity { + id: string; + eventId: string; + eventType: string; + sourceService: string; + processedAt: Date; +} + +@Injectable() +export class ProcessedEventRepository { + constructor(private readonly prisma: PrismaService) {} + + /** + * 查找已处理事件 + */ + async findByEventId(eventId: string): Promise { + const record = await this.prisma.processedEvent.findUnique({ + where: { eventId }, + }); + return record; + } + + /** + * 创建已处理事件记录 + */ + async create(data: { + eventId: string; + eventType: string; + sourceService: string; + }): Promise { + return this.prisma.processedEvent.create({ + data: { + eventId: data.eventId, + eventType: data.eventType, + sourceService: data.sourceService, + }, + }); + } + + /** + * 检查事件是否已处理 + */ + async isProcessed(eventId: string): Promise { + const count = await this.prisma.processedEvent.count({ + where: { eventId }, + }); + return count > 0; + } + + /** + * 删除旧的已处理记录(清理) + * @param before 删除此时间之前的记录 + */ + async deleteOldRecords(before: Date): Promise { + const result = await this.prisma.processedEvent.deleteMany({ + where: { + processedAt: { lt: before }, + }, + }); + return result.count; + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/share-pool.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/share-pool.repository.ts new file mode 100644 index 00000000..1671d823 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/share-pool.repository.ts @@ -0,0 +1,191 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Money } from '../../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface SharePoolEntity { + id: string; + greenPoints: Money; + totalInflow: Money; + totalOutflow: Money; + createdAt: Date; + updatedAt: Date; +} + +export type SharePoolTransactionType = 'INJECT' | 'TRADE_IN' | 'TRADE_OUT'; + +export interface SharePoolTransactionEntity { + id: string; + poolId: string; + type: SharePoolTransactionType; + amount: Money; + balanceBefore: Money; + balanceAfter: Money; + referenceId: string | null; + referenceType: string | null; + memo: string | null; + createdAt: Date; +} + +@Injectable() +export class SharePoolRepository { + constructor(private readonly prisma: PrismaService) {} + + async getPool(): Promise { + const record = await this.prisma.sharePool.findFirst(); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async initializePool(initialGreenPoints: Money = Money.zero()): Promise { + const existing = await this.prisma.sharePool.findFirst(); + if (existing) { + return this.toDomain(existing); + } + + const record = await this.prisma.sharePool.create({ + data: { + greenPoints: initialGreenPoints.value, + totalInflow: initialGreenPoints.value, + totalOutflow: 0, + }, + }); + + return this.toDomain(record); + } + + /** + * 注入绿积分 + */ + async inject(amount: Money, referenceId?: string, memo?: string): Promise { + await this.updateBalance('INJECT', amount, true, referenceId, memo); + } + + /** + * 交易流入(买入时绿积分进入股池) + */ + async tradeIn(amount: Money, tradeId: string): Promise { + await this.updateBalance('TRADE_IN', amount, true, tradeId, `交易买入流入 ${amount.toFixed(8)}`); + } + + /** + * 交易流出(卖出时绿积分从股池流出) + */ + async tradeOut(amount: Money, tradeId: string): Promise { + await this.updateBalance( + 'TRADE_OUT', + amount, + false, + tradeId, + `交易卖出流出 ${amount.toFixed(8)}`, + ); + } + + private async updateBalance( + type: SharePoolTransactionType, + amount: Money, + isInflow: boolean, + referenceId?: string, + memo?: string, + ): Promise { + const pool = await this.prisma.sharePool.findFirst(); + if (!pool) { + throw new Error('Share pool not initialized'); + } + + const balanceBefore = new Decimal(pool.greenPoints.toString()); + const balanceAfter = isInflow + ? balanceBefore.plus(amount.value) + : balanceBefore.minus(amount.value); + + if (balanceAfter.isNegative()) { + throw new Error('Insufficient green points in share pool'); + } + + const newTotalInflow = isInflow + ? new Decimal(pool.totalInflow.toString()).plus(amount.value) + : pool.totalInflow; + const newTotalOutflow = !isInflow + ? new Decimal(pool.totalOutflow.toString()).plus(amount.value) + : pool.totalOutflow; + + await this.prisma.$transaction([ + this.prisma.sharePool.update({ + where: { id: pool.id }, + data: { + greenPoints: balanceAfter, + totalInflow: newTotalInflow, + totalOutflow: newTotalOutflow, + }, + }), + this.prisma.sharePoolTransaction.create({ + data: { + poolId: pool.id, + type, + amount: amount.value, + balanceBefore, + balanceAfter, + referenceId, + referenceType: type === 'INJECT' ? 'INJECT' : 'TRADE', + memo, + }, + }), + ]); + } + + async getTransactions( + page: number, + pageSize: number, + ): Promise<{ + data: SharePoolTransactionEntity[]; + total: number; + }> { + const pool = await this.prisma.sharePool.findFirst(); + if (!pool) { + return { data: [], total: 0 }; + } + + const [records, total] = await Promise.all([ + this.prisma.sharePoolTransaction.findMany({ + where: { poolId: pool.id }, + orderBy: { createdAt: 'desc' }, + skip: (page - 1) * pageSize, + take: pageSize, + }), + this.prisma.sharePoolTransaction.count({ where: { poolId: pool.id } }), + ]); + + return { + data: records.map((r) => this.toTransactionDomain(r)), + total, + }; + } + + private toDomain(record: any): SharePoolEntity { + return { + id: record.id, + greenPoints: new Money(record.greenPoints), + totalInflow: new Money(record.totalInflow), + totalOutflow: new Money(record.totalOutflow), + createdAt: record.createdAt, + updatedAt: record.updatedAt, + }; + } + + private toTransactionDomain(record: any): SharePoolTransactionEntity { + return { + id: record.id, + poolId: record.poolId, + type: record.type as SharePoolTransactionType, + amount: new Money(record.amount), + balanceBefore: new Money(record.balanceBefore), + balanceAfter: new Money(record.balanceAfter), + referenceId: record.referenceId, + referenceType: record.referenceType, + memo: record.memo, + createdAt: record.createdAt, + }; + } +} diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-account.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-account.repository.ts index 3d9b61ca..cf356d47 100644 --- a/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-account.repository.ts +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-account.repository.ts @@ -15,11 +15,11 @@ export class TradingAccountRepository { return this.toDomain(record); } - async save(aggregate: TradingAccountAggregate): Promise { + async save(aggregate: TradingAccountAggregate): Promise { const transactions = aggregate.pendingTransactions; - await this.prisma.$transaction(async (tx) => { - await tx.tradingAccount.upsert({ + const result = await this.prisma.$transaction(async (tx) => { + const account = await tx.tradingAccount.upsert({ where: { accountSequence: aggregate.accountSequence }, create: { accountSequence: aggregate.accountSequence, @@ -55,9 +55,12 @@ export class TradingAccountRepository { })), }); } + + return account.id; }); aggregate.clearPendingTransactions(); + return result; } async getTransactions( diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-config.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-config.repository.ts new file mode 100644 index 00000000..bca35a70 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/trading-config.repository.ts @@ -0,0 +1,101 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Money } from '../../../domain/value-objects/money.vo'; +import Decimal from 'decimal.js'; + +export interface TradingConfigEntity { + id: string; + totalShares: Money; + burnTarget: Money; + burnPeriodMinutes: number; + minuteBurnRate: Money; + isActive: boolean; + activatedAt: Date | null; + createdAt: Date; + updatedAt: Date; +} + +@Injectable() +export class TradingConfigRepository { + constructor(private readonly prisma: PrismaService) {} + + async getConfig(): Promise { + const record = await this.prisma.tradingConfig.findFirst(); + if (!record) { + return null; + } + return this.toDomain(record); + } + + async initializeConfig(): Promise { + const existing = await this.prisma.tradingConfig.findFirst(); + if (existing) { + return this.toDomain(existing); + } + + const record = await this.prisma.tradingConfig.create({ + data: { + totalShares: new Decimal('100020000000'), + burnTarget: new Decimal('10000000000'), + burnPeriodMinutes: 2102400, // 365 * 4 * 1440 + minuteBurnRate: new Decimal('4756.468797564687'), + isActive: false, + }, + }); + + return this.toDomain(record); + } + + async activate(): Promise { + const config = await this.prisma.tradingConfig.findFirst(); + if (!config) { + throw new Error('Trading config not initialized'); + } + + await this.prisma.tradingConfig.update({ + where: { id: config.id }, + data: { + isActive: true, + activatedAt: new Date(), + }, + }); + } + + async deactivate(): Promise { + const config = await this.prisma.tradingConfig.findFirst(); + if (!config) { + return; + } + + await this.prisma.tradingConfig.update({ + where: { id: config.id }, + data: { isActive: false }, + }); + } + + async updateMinuteBurnRate(newRate: Money): Promise { + const config = await this.prisma.tradingConfig.findFirst(); + if (!config) { + throw new Error('Trading config not initialized'); + } + + await this.prisma.tradingConfig.update({ + where: { id: config.id }, + data: { minuteBurnRate: newRate.value }, + }); + } + + private toDomain(record: any): TradingConfigEntity { + return { + id: record.id, + totalShares: new Money(record.totalShares), + burnTarget: new Money(record.burnTarget), + burnPeriodMinutes: record.burnPeriodMinutes, + minuteBurnRate: new Money(record.minuteBurnRate), + isActive: record.isActive, + activatedAt: record.activatedAt, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + }; + } +}