feat(trading-service): add burn system, Kafka events, and idempotency

- Add trading burn system with black hole, share pool, and price calculation
- Implement per-minute auto burn and sell burn with multiplier
- Add Kafka event publishing via outbox pattern (order, trade, burn events)
- Add user.registered consumer to auto-create trading accounts
- Implement Redis + DB dual idempotency for event processing
- Add price, burn, and asset API controllers
- Add migrations for burn tables and processed events

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-14 07:15:41 -08:00
parent f3d4799efc
commit e1fb70e2ee
29 changed files with 3181 additions and 31 deletions

View File

@ -47,14 +47,14 @@ CREATE TABLE "orders" (
CREATE TABLE "trades" (
"id" TEXT NOT NULL,
"tradeNo" TEXT NOT NULL,
"buyOrderId" TEXT NOT NULL,
"sellOrderId" TEXT NOT NULL,
"buyerSequence" TEXT NOT NULL,
"sellerSequence" TEXT NOT NULL,
"buy_order_id" TEXT NOT NULL,
"sell_order_id" TEXT NOT NULL,
"buyer_sequence" TEXT NOT NULL,
"seller_sequence" TEXT NOT NULL,
"price" DECIMAL(30,18) NOT NULL,
"quantity" DECIMAL(30,8) NOT NULL,
"amount" DECIMAL(30,8) NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "trades_pkey" PRIMARY KEY ("id")
);
@ -229,13 +229,13 @@ CREATE INDEX "orders_createdAt_idx" ON "orders"("createdAt" DESC);
CREATE UNIQUE INDEX "trades_tradeNo_key" ON "trades"("tradeNo");
-- CreateIndex
CREATE INDEX "trades_buyerSequence_idx" ON "trades"("buyerSequence");
CREATE INDEX "trades_buyer_sequence_idx" ON "trades"("buyer_sequence");
-- CreateIndex
CREATE INDEX "trades_sellerSequence_idx" ON "trades"("sellerSequence");
CREATE INDEX "trades_seller_sequence_idx" ON "trades"("seller_sequence");
-- CreateIndex
CREATE INDEX "trades_createdAt_idx" ON "trades"("createdAt" DESC);
CREATE INDEX "trades_created_at_idx" ON "trades"("created_at" DESC);
-- CreateIndex
CREATE INDEX "trading_transactions_accountSequence_createdAt_idx" ON "trading_transactions"("accountSequence", "createdAt" DESC);
@ -307,7 +307,7 @@ CREATE INDEX "outbox_events_created_at_idx" ON "outbox_events"("created_at");
ALTER TABLE "orders" ADD CONSTRAINT "orders_accountSequence_fkey" FOREIGN KEY ("accountSequence") REFERENCES "trading_accounts"("accountSequence") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "trades" ADD CONSTRAINT "trades_buyOrderId_fkey" FOREIGN KEY ("buyOrderId") REFERENCES "orders"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
ALTER TABLE "trades" ADD CONSTRAINT "trades_buy_order_id_fkey" FOREIGN KEY ("buy_order_id") REFERENCES "orders"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "trading_transactions" ADD CONSTRAINT "trading_transactions_accountSequence_fkey" FOREIGN KEY ("accountSequence") REFERENCES "trading_accounts"("accountSequence") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@ -0,0 +1,139 @@
-- ============================================================================
-- trading-service 添加交易销毁系统
-- 包含:交易配置、黑洞账户、积分股池、价格快照、订单销毁字段
-- ============================================================================
-- ==================== 交易配置表 ====================
-- CreateTable
CREATE TABLE "trading_configs" (
"id" TEXT NOT NULL,
"total_shares" DECIMAL(30,8) NOT NULL DEFAULT 100020000000,
"burn_target" DECIMAL(30,8) NOT NULL DEFAULT 10000000000,
"burn_period_minutes" INTEGER NOT NULL DEFAULT 2102400,
"minute_burn_rate" DECIMAL(30,18) NOT NULL DEFAULT 4756.468797564687,
"is_active" BOOLEAN NOT NULL DEFAULT false,
"activated_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "trading_configs_pkey" PRIMARY KEY ("id")
);
-- ==================== 黑洞账户(销毁池)====================
-- CreateTable
CREATE TABLE "black_holes" (
"id" TEXT NOT NULL,
"total_burned" DECIMAL(30,8) NOT NULL DEFAULT 0,
"target_burn" DECIMAL(30,8) NOT NULL,
"remaining_burn" DECIMAL(30,8) NOT NULL,
"last_burn_minute" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "black_holes_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "burn_records" (
"id" TEXT NOT NULL,
"black_hole_id" TEXT NOT NULL,
"burn_minute" TIMESTAMP(3) NOT NULL,
"burn_amount" DECIMAL(30,18) NOT NULL,
"remaining_target" DECIMAL(30,8) NOT NULL,
"source_type" TEXT,
"source_account_seq" TEXT,
"source_order_no" TEXT,
"memo" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "burn_records_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "burn_records_burn_minute_idx" ON "burn_records"("burn_minute");
-- CreateIndex
CREATE INDEX "burn_records_source_account_seq_idx" ON "burn_records"("source_account_seq");
-- CreateIndex
CREATE INDEX "burn_records_source_order_no_idx" ON "burn_records"("source_order_no");
-- CreateIndex
CREATE INDEX "burn_records_source_type_idx" ON "burn_records"("source_type");
-- AddForeignKey
ALTER TABLE "burn_records" ADD CONSTRAINT "burn_records_black_hole_id_fkey" FOREIGN KEY ("black_hole_id") REFERENCES "black_holes"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- ==================== 积分股池(绿积分池)====================
-- CreateTable
CREATE TABLE "share_pools" (
"id" TEXT NOT NULL,
"green_points" DECIMAL(30,8) NOT NULL DEFAULT 0,
"total_inflow" DECIMAL(30,8) NOT NULL DEFAULT 0,
"total_outflow" DECIMAL(30,8) NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "share_pools_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "share_pool_transactions" (
"id" TEXT NOT NULL,
"pool_id" TEXT NOT NULL,
"type" TEXT NOT NULL,
"amount" DECIMAL(30,8) NOT NULL,
"balance_before" DECIMAL(30,8) NOT NULL,
"balance_after" DECIMAL(30,8) NOT NULL,
"reference_id" TEXT,
"reference_type" TEXT,
"memo" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "share_pool_transactions_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "share_pool_transactions_pool_id_created_at_idx" ON "share_pool_transactions"("pool_id", "created_at" DESC);
-- AddForeignKey
ALTER TABLE "share_pool_transactions" ADD CONSTRAINT "share_pool_transactions_pool_id_fkey" FOREIGN KEY ("pool_id") REFERENCES "share_pools"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- ==================== 价格快照 ====================
-- CreateTable
CREATE TABLE "price_snapshots" (
"id" TEXT NOT NULL,
"snapshot_time" TIMESTAMP(3) NOT NULL,
"price" DECIMAL(30,18) NOT NULL,
"green_points" DECIMAL(30,8) NOT NULL,
"black_hole_amount" DECIMAL(30,8) NOT NULL,
"circulation_pool" DECIMAL(30,8) NOT NULL,
"effective_denominator" DECIMAL(30,8) NOT NULL,
"minute_burn_rate" DECIMAL(30,18) NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "price_snapshots_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "price_snapshots_snapshot_time_key" ON "price_snapshots"("snapshot_time");
-- CreateIndex
CREATE INDEX "price_snapshots_snapshot_time_idx" ON "price_snapshots"("snapshot_time" DESC);
-- ==================== 订单表添加销毁相关字段 ====================
-- AlterTable: 添加销毁相关字段到 orders 表
ALTER TABLE "orders" ADD COLUMN "burn_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0;
ALTER TABLE "orders" ADD COLUMN "burn_multiplier" DECIMAL(30,18) NOT NULL DEFAULT 0;
ALTER TABLE "orders" ADD COLUMN "effective_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0;
-- ==================== 成交记录表添加销毁相关字段 ====================
-- 添加销毁相关字段到 trades 表
ALTER TABLE "trades" ADD COLUMN "burn_quantity" DECIMAL(30,8) NOT NULL DEFAULT 0;
ALTER TABLE "trades" ADD COLUMN "effective_qty" DECIMAL(30,8) NOT NULL DEFAULT 0;

View File

@ -0,0 +1,23 @@
-- ============================================================================
-- trading-service 添加已处理事件表(幂等性支持)
-- ============================================================================
-- CreateTable
CREATE TABLE "processed_events" (
"id" TEXT NOT NULL,
"event_id" TEXT NOT NULL,
"event_type" TEXT NOT NULL,
"source_service" TEXT NOT NULL,
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "processed_events_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "processed_events_event_id_key" ON "processed_events"("event_id");
-- CreateIndex
CREATE INDEX "processed_events_event_id_idx" ON "processed_events"("event_id");
-- CreateIndex
CREATE INDEX "processed_events_processed_at_idx" ON "processed_events"("processed_at");

View File

@ -7,6 +7,125 @@ datasource db {
url = env("DATABASE_URL")
}
// ==================== 交易配置 ====================
// 交易全局配置
model TradingConfig {
id String @id @default(uuid())
// 总积分股数量: 100.02B
totalShares Decimal @default(100020000000) @map("total_shares") @db.Decimal(30, 8)
// 目标销毁量: 100亿 (4年销毁完)
burnTarget Decimal @default(10000000000) @map("burn_target") @db.Decimal(30, 8)
// 销毁周期: 4年 (分钟数) 365*4*1440 = 2102400
burnPeriodMinutes Int @default(2102400) @map("burn_period_minutes")
// 每分钟基础销毁量: 100亿÷(365*4*1440) = 4756.468797564687
minuteBurnRate Decimal @default(4756.468797564687) @map("minute_burn_rate") @db.Decimal(30, 18)
// 是否启用交易
isActive Boolean @default(false) @map("is_active")
// 启动时间
activatedAt DateTime? @map("activated_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("trading_configs")
}
// ==================== 黑洞账户(销毁池)====================
// 黑洞账户
model BlackHole {
id String @id @default(uuid())
totalBurned Decimal @default(0) @map("total_burned") @db.Decimal(30, 8) // 已销毁总量
targetBurn Decimal @map("target_burn") @db.Decimal(30, 8) // 目标销毁量 (10B)
remainingBurn Decimal @map("remaining_burn") @db.Decimal(30, 8) // 剩余待销毁
lastBurnMinute DateTime? @map("last_burn_minute")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
records BurnRecord[]
@@map("black_holes")
}
// 销毁记录
model BurnRecord {
id String @id @default(uuid())
blackHoleId String @map("black_hole_id")
burnMinute DateTime @map("burn_minute")
burnAmount Decimal @map("burn_amount") @db.Decimal(30, 18)
remainingTarget Decimal @map("remaining_target") @db.Decimal(30, 8) // 销毁后剩余目标
// 来源信息
sourceType String? @map("source_type") // MINUTE_BURN (每分钟销毁), SELL_BURN (卖出销毁)
sourceAccountSeq String? @map("source_account_seq") // 来源账户序列号(卖出时)
sourceOrderNo String? @map("source_order_no") // 来源订单号(卖出时)
memo String? @db.Text
createdAt DateTime @default(now()) @map("created_at")
blackHole BlackHole @relation(fields: [blackHoleId], references: [id])
@@index([burnMinute])
@@index([sourceAccountSeq])
@@index([sourceOrderNo])
@@index([sourceType])
@@map("burn_records")
}
// ==================== 积分股池(绿积分池)====================
// 积分股池(存储绿积分用于计算价格)
model SharePool {
id String @id @default(uuid())
// 绿积分总量(用于价格计算的分子)
greenPoints Decimal @default(0) @map("green_points") @db.Decimal(30, 8)
totalInflow Decimal @default(0) @map("total_inflow") @db.Decimal(30, 8)
totalOutflow Decimal @default(0) @map("total_outflow") @db.Decimal(30, 8)
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
transactions SharePoolTransaction[]
@@map("share_pools")
}
// 积分股池交易记录
model SharePoolTransaction {
id String @id @default(uuid())
poolId String @map("pool_id")
type String // INJECT (注入), TRADE_IN (交易流入), TRADE_OUT (交易流出)
amount Decimal @db.Decimal(30, 8)
balanceBefore Decimal @map("balance_before") @db.Decimal(30, 8)
balanceAfter Decimal @map("balance_after") @db.Decimal(30, 8)
referenceId String? @map("reference_id")
referenceType String? @map("reference_type")
memo String? @db.Text
createdAt DateTime @default(now()) @map("created_at")
pool SharePool @relation(fields: [poolId], references: [id])
@@index([poolId, createdAt(sort: Desc)])
@@map("share_pool_transactions")
}
// ==================== 价格快照 ====================
// 价格快照(每分钟)
model PriceSnapshot {
id String @id @default(uuid())
snapshotTime DateTime @unique @map("snapshot_time")
price Decimal @db.Decimal(30, 18) // 当时价格
greenPoints Decimal @map("green_points") @db.Decimal(30, 8) // 绿积分(股池)
blackHoleAmount Decimal @map("black_hole_amount") @db.Decimal(30, 8) // 黑洞数量
circulationPool Decimal @map("circulation_pool") @db.Decimal(30, 8) // 流通池
effectiveDenominator Decimal @map("effective_denominator") @db.Decimal(30, 8) // 有效分母
minuteBurnRate Decimal @map("minute_burn_rate") @db.Decimal(30, 18) // 当时的每分钟销毁率
createdAt DateTime @default(now()) @map("created_at")
@@index([snapshotTime(sort: Desc)])
@@map("price_snapshots")
}
// ==================== 交易账户 ====================
// 用户交易账户
@ -43,6 +162,10 @@ model Order {
remainingQuantity Decimal @db.Decimal(30, 8) // 剩余数量
averagePrice Decimal @default(0) @db.Decimal(30, 18) // 平均成交价
totalAmount Decimal @default(0) @db.Decimal(30, 8) // 总成交金额
// 卖出销毁相关字段
burnQuantity Decimal @default(0) @map("burn_quantity") @db.Decimal(30, 8) // 卖出销毁量
burnMultiplier Decimal @default(0) @map("burn_multiplier") @db.Decimal(30, 18) // 销毁倍数
effectiveQuantity Decimal @default(0) @map("effective_quantity") @db.Decimal(30, 8) // 有效卖出量(含销毁)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
cancelledAt DateTime?
@ -61,14 +184,16 @@ model Order {
model Trade {
id String @id @default(uuid())
tradeNo String @unique
buyOrderId String
sellOrderId String
buyerSequence String
sellerSequence String
buyOrderId String @map("buy_order_id")
sellOrderId String @map("sell_order_id")
buyerSequence String @map("buyer_sequence")
sellerSequence String @map("seller_sequence")
price Decimal @db.Decimal(30, 18)
quantity Decimal @db.Decimal(30, 8)
amount Decimal @db.Decimal(30, 8) // price * quantity
createdAt DateTime @default(now())
quantity Decimal @db.Decimal(30, 8) // 实际成交量
burnQuantity Decimal @default(0) @map("burn_quantity") @db.Decimal(30, 8) // 卖出销毁量
effectiveQty Decimal @default(0) @map("effective_qty") @db.Decimal(30, 8) // 有效量quantity + burnQuantity
amount Decimal @db.Decimal(30, 8) // effectiveQty * price卖出交易额
createdAt DateTime @default(now()) @map("created_at")
buyOrder Order @relation(fields: [buyOrderId], references: [id])
@ -281,3 +406,18 @@ model OutboxEvent {
@@index([createdAt])
@@map("outbox_events")
}
// ==================== 已处理事件(幂等性)====================
// 已处理事件记录(用于消费者幂等性检查)
model ProcessedEvent {
id String @id @default(uuid())
eventId String @unique @map("event_id") // 事件唯一ID
eventType String @map("event_type") // 事件类型
sourceService String @map("source_service") // 来源服务
processedAt DateTime @default(now()) @map("processed_at")
@@index([eventId])
@@index([processedAt])
@@map("processed_events")
}

View File

@ -5,9 +5,20 @@ import { TradingController } from './controllers/trading.controller';
import { TransferController } from './controllers/transfer.controller';
import { HealthController } from './controllers/health.controller';
import { AdminController } from './controllers/admin.controller';
import { PriceController } from './controllers/price.controller';
import { BurnController } from './controllers/burn.controller';
import { AssetController } from './controllers/asset.controller';
@Module({
imports: [ApplicationModule, InfrastructureModule],
controllers: [TradingController, TransferController, HealthController, AdminController],
controllers: [
TradingController,
TransferController,
HealthController,
AdminController,
PriceController,
BurnController,
AssetController,
],
})
export class ApiModule {}

View File

@ -0,0 +1,68 @@
import { Controller, Get, Param, Query, Req } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiParam, ApiQuery, ApiBearerAuth } from '@nestjs/swagger';
import { AssetService } from '../../application/services/asset.service';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Asset')
@ApiBearerAuth()
@Controller('asset')
export class AssetController {
constructor(private readonly assetService: AssetService) {}
@Get('my')
@ApiOperation({ summary: '获取我的资产显示' })
@ApiQuery({ name: 'dailyAllocation', required: false, type: String, description: '每日分配量(可选)' })
async getMyAsset(@Req() req: any, @Query('dailyAllocation') dailyAllocation?: string) {
const accountSequence = req.user?.accountSequence;
if (!accountSequence) {
throw new Error('Unauthorized');
}
const asset = await this.assetService.getAssetDisplay(accountSequence, dailyAllocation);
if (!asset) {
throw new Error('Account not found');
}
return asset;
}
@Get('account/:accountSequence')
@Public()
@ApiOperation({ summary: '获取指定账户资产显示' })
@ApiParam({ name: 'accountSequence', description: '账户序号' })
@ApiQuery({ name: 'dailyAllocation', required: false, type: String, description: '每日分配量(可选)' })
async getAccountAsset(
@Param('accountSequence') accountSequence: string,
@Query('dailyAllocation') dailyAllocation?: string,
) {
const asset = await this.assetService.getAssetDisplay(accountSequence, dailyAllocation);
if (!asset) {
return { message: 'Account not found' };
}
return asset;
}
@Get('estimate-sell')
@Public()
@ApiOperation({ summary: '预估卖出收益' })
@ApiQuery({ name: 'quantity', required: true, type: String, description: '卖出数量' })
async estimateSellProceeds(@Query('quantity') quantity: string) {
return this.assetService.estimateSellProceeds(quantity);
}
@Get('market')
@Public()
@ApiOperation({ summary: '获取市场概览' })
async getMarketOverview() {
return this.assetService.getMarketOverview();
}
@Get('growth-per-second')
@Public()
@ApiOperation({ summary: '计算资产每秒增长量' })
@ApiQuery({ name: 'dailyAllocation', required: true, type: String, description: '每日分配量' })
async calculateGrowthPerSecond(@Query('dailyAllocation') dailyAllocation: string) {
const perSecond = this.assetService.calculateAssetGrowthPerSecond(dailyAllocation);
return { dailyAllocation, assetGrowthPerSecond: perSecond };
}
}

View File

@ -0,0 +1,31 @@
import { Controller, Get, Query } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiQuery } from '@nestjs/swagger';
import { BurnService } from '../../application/services/burn.service';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Burn')
@Controller('burn')
export class BurnController {
constructor(private readonly burnService: BurnService) {}
@Get('status')
@Public()
@ApiOperation({ summary: '获取销毁状态' })
async getBurnStatus() {
return this.burnService.getBurnStatus();
}
@Get('records')
@Public()
@ApiOperation({ summary: '获取销毁记录' })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
@ApiQuery({ name: 'sourceType', required: false, enum: ['MINUTE_BURN', 'SELL_BURN'] })
async getBurnRecords(
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
@Query('sourceType') sourceType?: 'MINUTE_BURN' | 'SELL_BURN',
) {
return this.burnService.getBurnRecords(page ?? 1, pageSize ?? 50, sourceType);
}
}

View File

@ -0,0 +1,46 @@
import { Controller, Get, Query } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiQuery, ApiBearerAuth } from '@nestjs/swagger';
import { PriceService } from '../../application/services/price.service';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Price')
@Controller('price')
export class PriceController {
constructor(private readonly priceService: PriceService) {}
@Get('current')
@Public()
@ApiOperation({ summary: '获取当前价格信息' })
async getCurrentPrice() {
return this.priceService.getCurrentPrice();
}
@Get('latest')
@Public()
@ApiOperation({ summary: '获取最新价格快照' })
async getLatestSnapshot() {
const snapshot = await this.priceService.getLatestSnapshot();
if (!snapshot) {
return { message: 'No price snapshot available' };
}
return snapshot;
}
@Get('history')
@Public()
@ApiOperation({ summary: '获取价格历史' })
@ApiQuery({ name: 'startTime', required: true, type: String, description: 'ISO datetime' })
@ApiQuery({ name: 'endTime', required: true, type: String, description: 'ISO datetime' })
@ApiQuery({ name: 'limit', required: false, type: Number })
async getPriceHistory(
@Query('startTime') startTime: string,
@Query('endTime') endTime: string,
@Query('limit') limit?: number,
) {
return this.priceService.getPriceHistory(
new Date(startTime),
new Date(endTime),
limit ?? 1440,
);
}
}

View File

@ -3,11 +3,25 @@ import { ScheduleModule } from '@nestjs/schedule';
import { InfrastructureModule } from '../infrastructure/infrastructure.module';
import { OrderService } from './services/order.service';
import { TransferService } from './services/transfer.service';
import { PriceService } from './services/price.service';
import { BurnService } from './services/burn.service';
import { AssetService } from './services/asset.service';
import { OutboxScheduler } from './schedulers/outbox.scheduler';
import { BurnScheduler } from './schedulers/burn.scheduler';
@Module({
imports: [ScheduleModule.forRoot(), InfrastructureModule],
providers: [OrderService, TransferService, OutboxScheduler],
exports: [OrderService, TransferService],
providers: [
// Services
PriceService,
BurnService,
AssetService,
OrderService,
TransferService,
// Schedulers
OutboxScheduler,
BurnScheduler,
],
exports: [OrderService, TransferService, PriceService, BurnService, AssetService],
})
export class ApplicationModule {}

View File

@ -0,0 +1,95 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { BurnService } from '../services/burn.service';
import { PriceService } from '../services/price.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
@Injectable()
export class BurnScheduler implements OnModuleInit {
private readonly logger = new Logger(BurnScheduler.name);
constructor(
private readonly burnService: BurnService,
private readonly priceService: PriceService,
private readonly redis: RedisService,
) {}
async onModuleInit() {
this.logger.log('Burn scheduler initialized');
// 初始化销毁系统
try {
await this.burnService.initialize();
this.logger.log('Burn system initialized');
} catch (error) {
this.logger.error('Failed to initialize burn system', error);
}
}
/**
*
* = 100亿 ÷ (365×4×1440) = 4756.468797564687
*/
@Cron(CronExpression.EVERY_MINUTE)
async executeMinuteBurn(): Promise<void> {
try {
const burnAmount = await this.burnService.executeMinuteBurn();
if (!burnAmount.isZero()) {
this.logger.debug(`Minute burn completed: ${burnAmount.toFixed(8)}`);
}
} catch (error) {
this.logger.error('Failed to execute minute burn', error);
}
}
/**
*
*/
@Cron(CronExpression.EVERY_MINUTE)
async createPriceSnapshot(): Promise<void> {
try {
await this.priceService.createSnapshot();
} catch (error) {
this.logger.error('Failed to create price snapshot', error);
}
}
/**
* 30
*/
@Cron('0 3 * * *') // 每天凌晨3点
async cleanupOldSnapshots(): Promise<void> {
const lockValue = await this.redis.acquireLock('snapshot:cleanup:lock', 300);
if (!lockValue) {
return;
}
try {
// 通过 PriceService 调用 repository 清理
this.logger.log('Starting cleanup of old price snapshots');
// 这里可以添加清理逻辑
} catch (error) {
this.logger.error('Failed to cleanup old snapshots', error);
} finally {
await this.redis.releaseLock('snapshot:cleanup:lock', lockValue);
}
}
/**
*
*/
@Cron('0 * * * *') // 每小时整点
async logBurnStatus(): Promise<void> {
try {
const status = await this.burnService.getBurnStatus();
this.logger.log(
`Burn status: burned=${status.totalBurned}, ` +
`remaining=${status.remainingBurn}, ` +
`progress=${status.burnProgress}%, ` +
`minuteRate=${status.minuteBurnRate}`,
);
} catch (error) {
this.logger.error('Failed to log burn status', error);
}
}
}

View File

@ -1 +1,2 @@
export * from './outbox.scheduler';
export * from './burn.scheduler';

View File

@ -0,0 +1,199 @@
import { Injectable, Logger } from '@nestjs/common';
import { TradingCalculatorService } from '../../domain/services/trading-calculator.service';
import { TradingAccountRepository } from '../../infrastructure/persistence/repositories/trading-account.repository';
import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository';
import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository';
import { SharePoolRepository } from '../../infrastructure/persistence/repositories/share-pool.repository';
import { PriceService } from './price.service';
import { Money } from '../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface AssetDisplay {
// 账户积分股余额
shareBalance: string;
// 账户现金余额
cashBalance: string;
// 冻结积分股
frozenShares: string;
// 冻结现金
frozenCash: string;
// 可用积分股
availableShares: string;
// 可用现金
availableCash: string;
// 当前价格
currentPrice: string;
// 销毁倍数
burnMultiplier: string;
// 有效积分股(含销毁加成)
effectiveShares: string;
// 资产显示值 = (账户积分股 + 账户积分股 × 倍数) × 积分股价
displayAssetValue: string;
// 每秒增长量(需要外部传入每日分配量)
assetGrowthPerSecond: string;
// 累计买入
totalBought: string;
// 累计卖出
totalSold: string;
}
@Injectable()
export class AssetService {
private readonly logger = new Logger(AssetService.name);
private readonly calculator = new TradingCalculatorService();
constructor(
private readonly tradingAccountRepository: TradingAccountRepository,
private readonly blackHoleRepository: BlackHoleRepository,
private readonly circulationPoolRepository: CirculationPoolRepository,
private readonly sharePoolRepository: SharePoolRepository,
private readonly priceService: PriceService,
) {}
/**
*
* = ( + × ) ×
*
* @param accountSequence
* @param dailyAllocation
*/
async getAssetDisplay(
accountSequence: string,
dailyAllocation?: string,
): Promise<AssetDisplay | null> {
const account = await this.tradingAccountRepository.findByAccountSequence(accountSequence);
if (!account) {
return null;
}
// 获取当前价格信息
const priceInfo = await this.priceService.getCurrentPrice();
const price = new Money(priceInfo.price);
const burnMultiplier = new Decimal(priceInfo.burnMultiplier);
// 计算有效积分股 = 余额 × (1 + 倍数)
const multiplierFactor = new Decimal(1).plus(burnMultiplier);
const effectiveShares = account.shareBalance.value.times(multiplierFactor);
// 计算资产显示值
const displayAssetValue = this.calculator.calculateDisplayAssetValue(
account.shareBalance,
burnMultiplier,
price,
);
// 计算每秒增长量
let assetGrowthPerSecond = Money.zero();
if (dailyAllocation) {
const dailyAmount = new Money(dailyAllocation);
assetGrowthPerSecond = this.calculator.calculateAssetGrowthPerSecond(dailyAmount);
}
return {
shareBalance: account.shareBalance.toFixed(8),
cashBalance: account.cashBalance.toFixed(8),
frozenShares: account.frozenShares.toFixed(8),
frozenCash: account.frozenCash.toFixed(8),
availableShares: account.availableShares.toFixed(8),
availableCash: account.availableCash.toFixed(8),
currentPrice: price.toFixed(18),
burnMultiplier: burnMultiplier.toFixed(18),
effectiveShares: new Money(effectiveShares).toFixed(8),
displayAssetValue: displayAssetValue.toFixed(8),
assetGrowthPerSecond: assetGrowthPerSecond.toFixed(18),
totalBought: account.totalBought.toFixed(8),
totalSold: account.totalSold.toFixed(8),
};
}
/**
*
* = ÷ 24 ÷ 60 ÷ 60
*/
calculateAssetGrowthPerSecond(dailyAllocation: string): string {
const dailyAmount = new Money(dailyAllocation);
const perSecond = this.calculator.calculateAssetGrowthPerSecond(dailyAmount);
return perSecond.toFixed(18);
}
/**
*
* = ( + ) ×
*/
async estimateSellProceeds(sellQuantity: string): Promise<{
sellQuantity: string;
burnQuantity: string;
effectiveQuantity: string;
price: string;
proceeds: string;
burnMultiplier: string;
}> {
const quantity = new Money(sellQuantity);
const result = await this.priceService.calculateSellAmount(quantity);
return {
sellQuantity: quantity.toFixed(8),
burnQuantity: result.burnQuantity.toFixed(8),
effectiveQuantity: result.effectiveQuantity.toFixed(8),
price: result.price.toFixed(18),
proceeds: result.amount.toFixed(8),
burnMultiplier: (await this.priceService.getCurrentBurnMultiplier()).toFixed(18),
};
}
/**
*
*/
async getMarketOverview(): Promise<{
price: string;
greenPoints: string;
blackHoleAmount: string;
circulationPool: string;
effectiveDenominator: string;
burnMultiplier: string;
totalShares: string;
burnTarget: string;
burnProgress: string;
}> {
const [sharePool, blackHole, circulationPool] = await Promise.all([
this.sharePoolRepository.getPool(),
this.blackHoleRepository.getBlackHole(),
this.circulationPoolRepository.getPool(),
]);
const greenPoints = sharePool?.greenPoints || Money.zero();
const blackHoleAmount = blackHole?.totalBurned || Money.zero();
const circulationPoolAmount = circulationPool?.totalShares || Money.zero();
// 计算价格
const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount);
// 计算有效分母
const effectiveDenominator = this.calculator.calculateEffectiveDenominator(
blackHoleAmount,
circulationPoolAmount,
);
// 计算销毁倍数
const burnMultiplier = this.calculator.calculateSellBurnMultiplier(
blackHoleAmount,
circulationPoolAmount,
);
// 销毁进度
const targetBurn = blackHole?.targetBurn || new Money(TradingCalculatorService.BURN_TARGET);
const burnProgress = blackHoleAmount.value.dividedBy(targetBurn.value).times(100);
return {
price: price.toFixed(18),
greenPoints: greenPoints.toFixed(8),
blackHoleAmount: blackHoleAmount.toFixed(8),
circulationPool: circulationPoolAmount.toFixed(8),
effectiveDenominator: effectiveDenominator.toFixed(8),
burnMultiplier: burnMultiplier.toFixed(18),
totalShares: TradingCalculatorService.TOTAL_SHARES.toFixed(8),
burnTarget: targetBurn.toFixed(8),
burnProgress: burnProgress.toFixed(4),
};
}
}

View File

@ -0,0 +1,365 @@
import { Injectable, Logger } from '@nestjs/common';
import { TradingCalculatorService } from '../../domain/services/trading-calculator.service';
import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository';
import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository';
import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { Money } from '../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
import {
TradingEventTypes,
TradingTopics,
BurnExecutedPayload,
MinuteBurnExecutedPayload,
} from '../../domain/events/trading.events';
export interface BurnStatus {
totalBurned: string;
targetBurn: string;
remainingBurn: string;
burnProgress: string; // 百分比
minuteBurnRate: string;
remainingMinutes: number;
lastBurnMinute: Date | null;
}
export interface SellBurnResult {
burnQuantity: Money;
burnMultiplier: Decimal;
newMinuteBurnRate: Money;
}
@Injectable()
export class BurnService {
private readonly logger = new Logger(BurnService.name);
private readonly calculator = new TradingCalculatorService();
constructor(
private readonly blackHoleRepository: BlackHoleRepository,
private readonly circulationPoolRepository: CirculationPoolRepository,
private readonly tradingConfigRepository: TradingConfigRepository,
private readonly outboxRepository: OutboxRepository,
private readonly redis: RedisService,
) {}
/**
*
*/
async getBurnStatus(): Promise<BurnStatus> {
const [blackHole, config] = await Promise.all([
this.blackHoleRepository.getBlackHole(),
this.tradingConfigRepository.getConfig(),
]);
const totalBurned = blackHole?.totalBurned || Money.zero();
const targetBurn = blackHole?.targetBurn || new Money(TradingCalculatorService.BURN_TARGET);
const remainingBurn = blackHole?.remainingBurn || targetBurn;
// 计算进度百分比
const progress = totalBurned.value.dividedBy(targetBurn.value).times(100);
// 计算剩余分钟数
const activatedAt = config?.activatedAt || new Date();
const remainingMinutes = this.calculator.calculateRemainingMinutes(activatedAt);
return {
totalBurned: totalBurned.toFixed(8),
targetBurn: targetBurn.toFixed(8),
remainingBurn: remainingBurn.toFixed(8),
burnProgress: progress.toFixed(4),
minuteBurnRate: (config?.minuteBurnRate || Money.zero()).toFixed(18),
remainingMinutes,
lastBurnMinute: blackHole?.lastBurnMinute || null,
};
}
/**
*
*/
async executeMinuteBurn(): Promise<Money> {
const lockValue = await this.redis.acquireLock('burn:minute:lock', 55);
if (!lockValue) {
return Money.zero();
}
try {
const config = await this.tradingConfigRepository.getConfig();
if (!config || !config.isActive) {
return Money.zero();
}
const blackHole = await this.blackHoleRepository.getBlackHole();
if (!blackHole) {
return Money.zero();
}
// 检查是否已完成销毁目标
if (blackHole.remainingBurn.isZero()) {
return Money.zero();
}
const currentMinute = new Date();
currentMinute.setSeconds(0, 0);
// 检查是否已处理过这一分钟
const processedKey = `burn:processed:${currentMinute.getTime()}`;
if (await this.redis.get(processedKey)) {
return Money.zero();
}
// 使用当前配置的每分钟销毁率
let burnAmount = config.minuteBurnRate;
// 确保不超过剩余待销毁量
if (burnAmount.isGreaterThan(blackHole.remainingBurn)) {
burnAmount = blackHole.remainingBurn;
}
if (burnAmount.isZero()) {
return Money.zero();
}
// 记录销毁
const burnRecord = await this.blackHoleRepository.recordMinuteBurn(currentMinute, burnAmount);
// 标记已处理
await this.redis.set(processedKey, '1', 120);
this.logger.log(`Minute burn executed: ${burnAmount.toFixed(8)}`);
// 发布每分钟销毁事件
await this.publishMinuteBurnEvent(
burnRecord.id,
currentMinute,
burnAmount,
blackHole.totalBurned.add(burnAmount),
blackHole.remainingBurn.subtract(burnAmount),
);
return burnAmount;
} catch (error) {
this.logger.error('Failed to execute minute burn', error);
return Money.zero();
} finally {
await this.redis.releaseLock('burn:minute:lock', lockValue);
}
}
/**
*
* = ×
*
*/
async executeSellBurn(
sellQuantity: Money,
accountSeq: string,
orderNo: string,
): Promise<SellBurnResult> {
const [blackHole, circulationPool, config] = await Promise.all([
this.blackHoleRepository.getBlackHole(),
this.circulationPoolRepository.getPool(),
this.tradingConfigRepository.getConfig(),
]);
if (!blackHole || !config) {
throw new Error('Trading system not initialized');
}
const blackHoleAmount = blackHole.totalBurned;
const circulationPoolAmount = circulationPool?.totalShares || Money.zero();
// 计算销毁倍数
const burnMultiplier = this.calculator.calculateSellBurnMultiplier(
blackHoleAmount,
circulationPoolAmount,
);
// 计算销毁量
const burnQuantity = this.calculator.calculateSellBurnAmount(sellQuantity, burnMultiplier);
// 确保销毁量不超过剩余待销毁量
const actualBurnQuantity = burnQuantity.isGreaterThan(blackHole.remainingBurn)
? blackHole.remainingBurn
: burnQuantity;
if (!actualBurnQuantity.isZero()) {
// 记录卖出销毁
const burnMinute = new Date();
burnMinute.setSeconds(0, 0);
const burnRecord = await this.blackHoleRepository.recordSellBurn(
burnMinute,
actualBurnQuantity,
accountSeq,
orderNo,
);
// 重新计算每分钟销毁量
const newBlackHoleAmount = blackHoleAmount.add(actualBurnQuantity);
const remainingMinutes = this.calculator.calculateRemainingMinutes(
config.activatedAt || new Date(),
);
const newMinuteBurnRate = this.calculator.calculateMinuteBurnRate(
newBlackHoleAmount,
remainingMinutes,
);
// 更新配置中的每分钟销毁率
await this.tradingConfigRepository.updateMinuteBurnRate(newMinuteBurnRate);
this.logger.log(
`Sell burn executed: quantity=${actualBurnQuantity.toFixed(8)}, ` +
`multiplier=${burnMultiplier.toFixed(8)}, newRate=${newMinuteBurnRate.toFixed(18)}`,
);
// 发布卖出销毁事件
await this.publishSellBurnEvent(
burnRecord.id,
accountSeq,
orderNo,
actualBurnQuantity,
burnMultiplier,
blackHole.remainingBurn.subtract(actualBurnQuantity),
);
return {
burnQuantity: actualBurnQuantity,
burnMultiplier,
newMinuteBurnRate,
};
}
return {
burnQuantity: Money.zero(),
burnMultiplier,
newMinuteBurnRate: config.minuteBurnRate,
};
}
/**
*
*/
async initialize(): Promise<void> {
const [existingConfig, existingBlackHole] = await Promise.all([
this.tradingConfigRepository.getConfig(),
this.blackHoleRepository.getBlackHole(),
]);
if (!existingConfig) {
await this.tradingConfigRepository.initializeConfig();
this.logger.log('Trading config initialized');
}
if (!existingBlackHole) {
await this.blackHoleRepository.initializeBlackHole(
new Money(TradingCalculatorService.BURN_TARGET),
);
this.logger.log('Black hole initialized');
}
}
/**
*
*/
async getBurnRecords(
page: number,
pageSize: number,
sourceType?: 'MINUTE_BURN' | 'SELL_BURN',
): Promise<{
data: any[];
total: number;
}> {
const result = await this.blackHoleRepository.getBurnRecords(page, pageSize, sourceType);
return {
data: result.data.map((r) => ({
id: r.id,
burnMinute: r.burnMinute,
burnAmount: r.burnAmount.toFixed(8),
remainingTarget: r.remainingTarget.toFixed(8),
sourceType: r.sourceType,
sourceAccountSeq: r.sourceAccountSeq,
sourceOrderNo: r.sourceOrderNo,
memo: r.memo,
createdAt: r.createdAt,
})),
total: result.total,
};
}
// ==================== 事件发布方法 ====================
/**
*
*/
private async publishMinuteBurnEvent(
burnRecordId: string,
burnMinute: Date,
burnAmount: Money,
totalBurned: Money,
remainingTarget: Money,
): Promise<void> {
try {
const payload: MinuteBurnExecutedPayload = {
burnRecordId,
burnMinute: burnMinute.toISOString(),
burnAmount: burnAmount.toString(),
totalBurned: totalBurned.toString(),
remainingTarget: remainingTarget.toString(),
executedAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'BurnRecord',
aggregateId: burnRecordId,
eventType: TradingEventTypes.MINUTE_BURN_EXECUTED,
payload,
topic: TradingTopics.BURNS,
key: 'minute-burn',
});
this.logger.debug(`Published MinuteBurnExecuted event: ${burnAmount.toFixed(8)}`);
} catch (error) {
this.logger.error(`Failed to publish MinuteBurnExecuted event: ${error}`);
}
}
/**
*
*/
private async publishSellBurnEvent(
burnRecordId: string,
accountSeq: string,
orderNo: string,
burnAmount: Money,
burnMultiplier: Decimal,
remainingTarget: Money,
): Promise<void> {
try {
const payload: BurnExecutedPayload = {
burnRecordId,
sourceType: 'SELL',
sourceAccountSeq: accountSeq,
sourceOrderNo: orderNo,
burnAmount: burnAmount.toString(),
burnMultiplier: burnMultiplier.toString(),
remainingTarget: remainingTarget.toString(),
executedAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'BurnRecord',
aggregateId: burnRecordId,
eventType: TradingEventTypes.BURN_EXECUTED,
payload,
topic: TradingTopics.BURNS,
key: accountSeq,
});
this.logger.debug(`Published BurnExecuted event for account ${accountSeq}`);
} catch (error) {
this.logger.error(`Failed to publish BurnExecuted event: ${error}`);
}
}
}

View File

@ -1,12 +1,23 @@
import { Injectable, Logger } from '@nestjs/common';
import { OrderRepository } from '../../infrastructure/persistence/repositories/order.repository';
import { TradingAccountRepository } from '../../infrastructure/persistence/repositories/trading-account.repository';
import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { OrderAggregate, OrderType, OrderStatus } from '../../domain/aggregates/order.aggregate';
import { TradingAccountAggregate } from '../../domain/aggregates/trading-account.aggregate';
import { MatchingEngineService } from '../../domain/services/matching-engine.service';
import { Money } from '../../domain/value-objects/money.vo';
import { BurnService } from './burn.service';
import { PriceService } from './price.service';
import {
TradingEventTypes,
TradingTopics,
OrderCreatedPayload,
OrderCancelledPayload,
TradeExecutedPayload,
} from '../../domain/events/trading.events';
@Injectable()
export class OrderService {
@ -16,8 +27,12 @@ export class OrderService {
constructor(
private readonly orderRepository: OrderRepository,
private readonly accountRepository: TradingAccountRepository,
private readonly circulationPoolRepository: CirculationPoolRepository,
private readonly outboxRepository: OutboxRepository,
private readonly prisma: PrismaService,
private readonly redis: RedisService,
private readonly burnService: BurnService,
private readonly priceService: PriceService,
) {}
async createOrder(
@ -70,6 +85,9 @@ export class OrderService {
const orderId = await this.orderRepository.save(order);
await this.accountRepository.save(account);
// 发布订单创建事件
await this.publishOrderCreatedEvent(orderId, order);
// 尝试撮合
await this.tryMatch(order);
@ -113,6 +131,9 @@ export class OrderService {
await this.orderRepository.save(order);
await this.accountRepository.save(account);
// 发布订单取消事件
await this.publishOrderCancelledEvent(order);
}
private async tryMatch(incomingOrder: OrderAggregate): Promise<void> {
@ -126,7 +147,36 @@ export class OrderService {
const matches = this.matchingEngine.findMatchingOrders(incomingOrder, orderBook);
for (const match of matches) {
// 保存成交记录
const tradeQuantity = match.trade.quantity;
let burnQuantity = Money.zero();
let effectiveQuantity = tradeQuantity;
// 如果是卖出成交,执行销毁逻辑
// 卖出的销毁量 = 卖出积分股 × 倍数
// 卖出交易额 = (卖出量 + 卖出销毁量) × 积分股价
if (match.sellOrder) {
try {
const burnResult = await this.burnService.executeSellBurn(
tradeQuantity,
match.sellOrder.accountSequence,
match.sellOrder.orderNo,
);
burnQuantity = burnResult.burnQuantity;
effectiveQuantity = new Money(tradeQuantity.value.plus(burnQuantity.value));
this.logger.log(
`Sell burn executed: sellQty=${tradeQuantity.toFixed(8)}, ` +
`burnQty=${burnQuantity.toFixed(8)}, effectiveQty=${effectiveQuantity.toFixed(8)}`,
);
} catch (error) {
this.logger.warn(`Sell burn failed, continuing without burn: ${error}`);
}
}
// 计算交易额 = 有效数量 × 价格
const tradeAmount = new Money(effectiveQuantity.value.times(match.trade.price.value));
// 保存成交记录(包含销毁信息)
await this.prisma.trade.create({
data: {
tradeNo: match.trade.tradeNo,
@ -135,31 +185,58 @@ export class OrderService {
buyerSequence: match.buyOrder.accountSequence,
sellerSequence: match.sellOrder.accountSequence,
price: match.trade.price.value,
quantity: match.trade.quantity.value,
amount: match.trade.amount.value,
quantity: tradeQuantity.value,
burnQuantity: burnQuantity.value,
effectiveQty: effectiveQuantity.value,
amount: tradeAmount.value,
},
});
// 更新订单
// 卖出的积分股进入流通池
try {
await this.circulationPoolRepository.addSharesFromSell(
tradeQuantity,
match.sellOrder.accountSequence,
match.sellOrder.id!,
`卖出成交, 交易号${match.trade.tradeNo}`,
);
} catch (error) {
this.logger.warn(`Failed to add shares to circulation pool: ${error}`);
}
// 更新订单(包含销毁信息)
await this.orderRepository.save(match.buyOrder);
await this.orderRepository.save(match.sellOrder);
await this.orderRepository.saveWithBurnInfo(match.sellOrder, burnQuantity, effectiveQuantity);
// 更新买方账户
const buyerAccount = await this.accountRepository.findByAccountSequence(match.buyOrder.accountSequence);
if (buyerAccount) {
buyerAccount.executeBuy(match.trade.quantity, match.trade.amount, match.trade.tradeNo);
buyerAccount.executeBuy(tradeQuantity, tradeAmount, match.trade.tradeNo);
await this.accountRepository.save(buyerAccount);
}
// 更新卖方账户
// 更新卖方账户(获得的是有效交易额)
const sellerAccount = await this.accountRepository.findByAccountSequence(match.sellOrder.accountSequence);
if (sellerAccount) {
sellerAccount.executeSell(match.trade.quantity, match.trade.amount, match.trade.tradeNo);
sellerAccount.executeSell(tradeQuantity, tradeAmount, match.trade.tradeNo);
await this.accountRepository.save(sellerAccount);
}
this.logger.log(
`Trade executed: ${match.trade.tradeNo}, price=${match.trade.price}, qty=${match.trade.quantity}`,
`Trade executed: ${match.trade.tradeNo}, price=${match.trade.price.toFixed(8)}, ` +
`qty=${tradeQuantity.toFixed(8)}, burn=${burnQuantity.toFixed(8)}, amount=${tradeAmount.toFixed(8)}`,
);
// 发布成交事件
await this.publishTradeExecutedEvent(
match.trade.tradeNo,
match.buyOrder,
match.sellOrder,
match.trade.price,
tradeQuantity,
tradeAmount,
burnQuantity,
effectiveQuantity,
);
}
} finally {
@ -172,4 +249,116 @@ export class OrderService {
const random = Math.random().toString(36).substring(2, 8);
return `O${timestamp}${random}`.toUpperCase();
}
// ==================== 事件发布方法 ====================
/**
*
*/
private async publishOrderCreatedEvent(orderId: string, order: OrderAggregate): Promise<void> {
try {
const payload: OrderCreatedPayload = {
orderId,
orderNo: order.orderNo,
accountSequence: order.accountSequence,
type: order.type,
price: order.price.toString(),
quantity: order.quantity.toString(),
createdAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'Order',
aggregateId: orderId,
eventType: TradingEventTypes.ORDER_CREATED,
payload,
topic: TradingTopics.ORDERS,
key: order.accountSequence,
});
this.logger.debug(`Published OrderCreated event for order ${order.orderNo}`);
} catch (error) {
this.logger.error(`Failed to publish OrderCreated event: ${error}`);
}
}
/**
*
*/
private async publishOrderCancelledEvent(order: OrderAggregate): Promise<void> {
try {
const payload: OrderCancelledPayload = {
orderId: order.id!,
orderNo: order.orderNo,
accountSequence: order.accountSequence,
type: order.type,
cancelledQuantity: order.remainingQuantity.toString(),
cancelledAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'Order',
aggregateId: order.id!,
eventType: TradingEventTypes.ORDER_CANCELLED,
payload,
topic: TradingTopics.ORDERS,
key: order.accountSequence,
});
this.logger.debug(`Published OrderCancelled event for order ${order.orderNo}`);
} catch (error) {
this.logger.error(`Failed to publish OrderCancelled event: ${error}`);
}
}
/**
*
*/
private async publishTradeExecutedEvent(
tradeNo: string,
buyOrder: OrderAggregate,
sellOrder: OrderAggregate,
price: Money,
quantity: Money,
amount: Money,
burnQuantity: Money,
effectiveQuantity: Money,
): Promise<void> {
try {
// 使用 tradeNo 查找刚创建的 trade 获取 id
const trade = await this.prisma.trade.findUnique({ where: { tradeNo } });
if (!trade) {
this.logger.warn(`Trade not found for event publishing: ${tradeNo}`);
return;
}
const payload: TradeExecutedPayload = {
tradeId: trade.id,
tradeNo,
buyOrderId: buyOrder.id!,
sellOrderId: sellOrder.id!,
buyerSequence: buyOrder.accountSequence,
sellerSequence: sellOrder.accountSequence,
price: price.toString(),
quantity: quantity.toString(),
amount: amount.toString(),
burnQuantity: burnQuantity.toString(),
effectiveQuantity: effectiveQuantity.toString(),
createdAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'Trade',
aggregateId: trade.id,
eventType: TradingEventTypes.TRADE_EXECUTED,
payload,
topic: TradingTopics.TRADES,
key: tradeNo,
});
this.logger.debug(`Published TradeExecuted event for trade ${tradeNo}`);
} catch (error) {
this.logger.error(`Failed to publish TradeExecuted event: ${error}`);
}
}
}

View File

@ -0,0 +1,229 @@
import { Injectable, Logger } from '@nestjs/common';
import { TradingCalculatorService } from '../../domain/services/trading-calculator.service';
import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository';
import { SharePoolRepository } from '../../infrastructure/persistence/repositories/share-pool.repository';
import { CirculationPoolRepository } from '../../infrastructure/persistence/repositories/circulation-pool.repository';
import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository';
import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository';
import { Money } from '../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface PriceInfo {
price: string;
greenPoints: string;
blackHoleAmount: string;
circulationPool: string;
effectiveDenominator: string;
burnMultiplier: string;
minuteBurnRate: string;
snapshotTime: Date;
}
@Injectable()
export class PriceService {
private readonly logger = new Logger(PriceService.name);
private readonly calculator = new TradingCalculatorService();
constructor(
private readonly blackHoleRepository: BlackHoleRepository,
private readonly sharePoolRepository: SharePoolRepository,
private readonly circulationPoolRepository: CirculationPoolRepository,
private readonly priceSnapshotRepository: PriceSnapshotRepository,
private readonly tradingConfigRepository: TradingConfigRepository,
) {}
/**
*
*/
async getCurrentPrice(): Promise<PriceInfo> {
const [sharePool, blackHole, circulationPool, config] = await Promise.all([
this.sharePoolRepository.getPool(),
this.blackHoleRepository.getBlackHole(),
this.circulationPoolRepository.getPool(),
this.tradingConfigRepository.getConfig(),
]);
const greenPoints = sharePool?.greenPoints || Money.zero();
const blackHoleAmount = blackHole?.totalBurned || Money.zero();
const circulationPoolAmount = circulationPool?.totalShares || Money.zero();
// 计算价格
const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount);
// 计算有效分母
const effectiveDenominator = this.calculator.calculateEffectiveDenominator(
blackHoleAmount,
circulationPoolAmount,
);
// 计算销毁倍数
const burnMultiplier = this.calculator.calculateSellBurnMultiplier(
blackHoleAmount,
circulationPoolAmount,
);
// 获取当前每分钟销毁率
const minuteBurnRate = config?.minuteBurnRate || Money.zero();
return {
price: price.toFixed(18),
greenPoints: greenPoints.toFixed(8),
blackHoleAmount: blackHoleAmount.toFixed(8),
circulationPool: circulationPoolAmount.toFixed(8),
effectiveDenominator: effectiveDenominator.toFixed(8),
burnMultiplier: burnMultiplier.toFixed(18),
minuteBurnRate: minuteBurnRate.toFixed(18),
snapshotTime: new Date(),
};
}
/**
*
*/
async getCurrentBurnMultiplier(): Promise<Decimal> {
const [blackHole, circulationPool] = await Promise.all([
this.blackHoleRepository.getBlackHole(),
this.circulationPoolRepository.getPool(),
]);
const blackHoleAmount = blackHole?.totalBurned || Money.zero();
const circulationPoolAmount = circulationPool?.totalShares || Money.zero();
return this.calculator.calculateSellBurnMultiplier(blackHoleAmount, circulationPoolAmount);
}
/**
*
*/
async calculateSellBurn(sellQuantity: Money): Promise<{
burnQuantity: Money;
burnMultiplier: Decimal;
effectiveQuantity: Money;
}> {
const burnMultiplier = await this.getCurrentBurnMultiplier();
const burnQuantity = this.calculator.calculateSellBurnAmount(sellQuantity, burnMultiplier);
const effectiveQuantity = new Money(sellQuantity.value.plus(burnQuantity.value));
return {
burnQuantity,
burnMultiplier,
effectiveQuantity,
};
}
/**
*
*/
async calculateSellAmount(sellQuantity: Money): Promise<{
amount: Money;
burnQuantity: Money;
effectiveQuantity: Money;
price: Money;
}> {
const priceInfo = await this.getCurrentPrice();
const price = new Money(priceInfo.price);
const { burnQuantity, effectiveQuantity } = await this.calculateSellBurn(sellQuantity);
const amount = this.calculator.calculateSellAmount(sellQuantity, burnQuantity, price);
return {
amount,
burnQuantity,
effectiveQuantity,
price,
};
}
/**
*
*/
async createSnapshot(): Promise<void> {
try {
const [sharePool, blackHole, circulationPool, config] = await Promise.all([
this.sharePoolRepository.getPool(),
this.blackHoleRepository.getBlackHole(),
this.circulationPoolRepository.getPool(),
this.tradingConfigRepository.getConfig(),
]);
const greenPoints = sharePool?.greenPoints || Money.zero();
const blackHoleAmount = blackHole?.totalBurned || Money.zero();
const circulationPoolAmount = circulationPool?.totalShares || Money.zero();
const price = this.calculator.calculatePrice(greenPoints, blackHoleAmount, circulationPoolAmount);
const effectiveDenominator = this.calculator.calculateEffectiveDenominator(
blackHoleAmount,
circulationPoolAmount,
);
const minuteBurnRate = config?.minuteBurnRate || Money.zero();
const snapshotTime = new Date();
snapshotTime.setSeconds(0, 0);
await this.priceSnapshotRepository.createSnapshot({
snapshotTime,
price,
greenPoints,
blackHoleAmount,
circulationPool: circulationPoolAmount,
effectiveDenominator,
minuteBurnRate,
});
this.logger.debug(`Price snapshot created: ${price.toFixed(18)}`);
} catch (error) {
this.logger.error('Failed to create price snapshot', error);
}
}
/**
*
*/
async getPriceHistory(
startTime: Date,
endTime: Date,
limit: number = 1440,
): Promise<
Array<{
time: Date;
price: string;
greenPoints: string;
blackHoleAmount: string;
circulationPool: string;
}>
> {
const snapshots = await this.priceSnapshotRepository.getPriceHistory(startTime, endTime, limit);
return snapshots.map((s) => ({
time: s.snapshotTime,
price: s.price.toFixed(18),
greenPoints: s.greenPoints.toFixed(8),
blackHoleAmount: s.blackHoleAmount.toFixed(8),
circulationPool: s.circulationPool.toFixed(8),
}));
}
/**
*
*/
async getLatestSnapshot(): Promise<PriceInfo | null> {
const snapshot = await this.priceSnapshotRepository.getLatestSnapshot();
if (!snapshot) {
return null;
}
const burnMultiplier = await this.getCurrentBurnMultiplier();
return {
price: snapshot.price.toFixed(18),
greenPoints: snapshot.greenPoints.toFixed(8),
blackHoleAmount: snapshot.blackHoleAmount.toFixed(8),
circulationPool: snapshot.circulationPool.toFixed(8),
effectiveDenominator: snapshot.effectiveDenominator.toFixed(8),
burnMultiplier: burnMultiplier.toFixed(18),
minuteBurnRate: snapshot.minuteBurnRate.toFixed(18),
snapshotTime: snapshot.snapshotTime,
};
}
}

View File

@ -0,0 +1,2 @@
// Trading Service Event Types
export * from './trading.events';

View File

@ -0,0 +1,224 @@
/**
* Trading Service
* Outbox Kafka
*/
// ==================== 事件类型常量 ====================
export const TradingEventTypes = {
// 订单事件
ORDER_CREATED: 'order.created',
ORDER_CANCELLED: 'order.cancelled',
ORDER_COMPLETED: 'order.completed',
// 成交事件
TRADE_EXECUTED: 'trade.executed',
// 转账事件
TRANSFER_INITIATED: 'transfer.initiated',
TRANSFER_COMPLETED: 'transfer.completed',
TRANSFER_FAILED: 'transfer.failed',
// 销毁事件
BURN_EXECUTED: 'burn.executed',
MINUTE_BURN_EXECUTED: 'burn.minute-executed',
// 价格事件
PRICE_UPDATED: 'price.updated',
// 账户事件
TRADING_ACCOUNT_CREATED: 'trading-account.created',
} as const;
export type TradingEventType =
(typeof TradingEventTypes)[keyof typeof TradingEventTypes];
// ==================== Kafka Topic 常量 ====================
export const TradingTopics = {
ORDERS: 'trading.orders',
TRADES: 'trading.trades',
TRANSFERS: 'trading.transfers',
BURNS: 'trading.burns',
PRICES: 'trading.prices',
ACCOUNTS: 'trading.accounts',
} as const;
// ==================== 事件 Payload 类型 ====================
/**
*
*/
export interface OrderCreatedPayload {
orderId: string;
orderNo: string;
accountSequence: string;
type: 'BUY' | 'SELL';
price: string;
quantity: string;
createdAt: string;
}
/**
*
*/
export interface OrderCancelledPayload {
orderId: string;
orderNo: string;
accountSequence: string;
type: 'BUY' | 'SELL';
cancelledQuantity: string;
cancelledAt: string;
}
/**
*
*/
export interface OrderCompletedPayload {
orderId: string;
orderNo: string;
accountSequence: string;
type: 'BUY' | 'SELL';
filledQuantity: string;
averagePrice: string;
totalAmount: string;
completedAt: string;
}
/**
*
*/
export interface TradeExecutedPayload {
tradeId: string;
tradeNo: string;
buyOrderId: string;
sellOrderId: string;
buyerSequence: string;
sellerSequence: string;
price: string;
quantity: string;
amount: string;
burnQuantity: string;
effectiveQuantity: string;
createdAt: string;
}
/**
*
*/
export interface TransferInitiatedPayload {
transferId: string;
transferNo: string;
accountSequence: string;
direction: 'IN' | 'OUT';
amount: string;
initiatedAt: string;
}
/**
*
*/
export interface TransferCompletedPayload {
transferId: string;
transferNo: string;
accountSequence: string;
direction: 'IN' | 'OUT';
amount: string;
miningTxId?: string;
completedAt: string;
}
/**
*
*/
export interface TransferFailedPayload {
transferId: string;
transferNo: string;
accountSequence: string;
direction: 'IN' | 'OUT';
amount: string;
errorMessage: string;
failedAt: string;
}
/**
*
*/
export interface BurnExecutedPayload {
burnRecordId: string;
sourceType: 'SELL' | 'SCHEDULED';
sourceAccountSeq?: string;
sourceOrderNo?: string;
burnAmount: string;
burnMultiplier?: string;
remainingTarget: string;
executedAt: string;
}
/**
*
*/
export interface MinuteBurnExecutedPayload {
burnRecordId: string;
burnMinute: string;
burnAmount: string;
totalBurned: string;
remainingTarget: string;
executedAt: string;
}
/**
*
*/
export interface PriceUpdatedPayload {
snapshotId: string;
price: string;
greenPoints: string;
blackHoleAmount: string;
circulationPool: string;
effectiveDenominator: string;
minuteBurnRate: string;
snapshotTime: string;
}
/**
*
*/
export interface TradingAccountCreatedPayload {
accountId: string;
accountSequence: string;
createdAt: string;
}
// ==================== 事件基类 ====================
export interface TradingEvent<T = unknown> {
eventId: string;
eventType: TradingEventType;
aggregateType: string;
aggregateId: string;
payload: T;
timestamp: string;
version: number;
}
// ==================== 辅助函数 ====================
/**
*
*/
export function createTradingEvent<T>(
eventType: TradingEventType,
aggregateType: string,
aggregateId: string,
payload: T,
): Omit<TradingEvent<T>, 'eventId'> {
return {
eventType,
aggregateType,
aggregateId,
payload,
timestamp: new Date().toISOString(),
version: 1,
};
}

View File

@ -0,0 +1,241 @@
import Decimal from 'decimal.js';
import { Money } from '../value-objects/money.vo';
/**
*
*
*
* 1. = 100亿 ÷ (365×4×1440) = 4756.468797564687
* 2. = 绿 ÷ (100.02亿 - - )
* 3. = (100亿 - ) ÷ (200 - )
* 4. = ×
* 5. = ( + ) ×
* 6. = ( + × ) ×
* 7. = ÷ 24 ÷ 60 ÷ 60
*/
export class TradingCalculatorService {
// 总积分股数量: 100.02B
static readonly TOTAL_SHARES = new Decimal('100020000000');
// 目标销毁量: 100亿 (4年销毁完)
static readonly BURN_TARGET = new Decimal('10000000000');
// 销毁周期: 4年的分钟数
static readonly BURN_PERIOD_MINUTES = 365 * 4 * 1440; // 2102400
// 流通池目标量: 200万
static readonly CIRCULATION_POOL_TARGET = new Decimal('2000000');
// 基础每分钟销毁量: 100亿 ÷ (365×4×1440)
static readonly BASE_MINUTE_BURN_RATE = TradingCalculatorService.BURN_TARGET.dividedBy(
TradingCalculatorService.BURN_PERIOD_MINUTES,
);
/**
*
* = 绿() ÷ ( - - )
*
* @param greenPoints 绿
* @param blackHoleAmount
* @param circulationPoolAmount
* @returns
*/
calculatePrice(
greenPoints: Money,
blackHoleAmount: Money,
circulationPoolAmount: Money,
): Money {
// 有效分母 = 100.02B - 黑洞 - 流通池
const effectiveDenominator = TradingCalculatorService.TOTAL_SHARES
.minus(blackHoleAmount.value)
.minus(circulationPoolAmount.value);
if (effectiveDenominator.isZero() || effectiveDenominator.isNegative()) {
return Money.zero();
}
// 价格 = 绿积分 / 有效分母
const price = greenPoints.value.dividedBy(effectiveDenominator);
return new Money(price);
}
/**
*
* = - -
*/
calculateEffectiveDenominator(
blackHoleAmount: Money,
circulationPoolAmount: Money,
): Money {
const denominator = TradingCalculatorService.TOTAL_SHARES
.minus(blackHoleAmount.value)
.minus(circulationPoolAmount.value);
if (denominator.isNegative()) {
return Money.zero();
}
return new Money(denominator);
}
/**
*
* = (100亿 - ) ÷ (200 - )
*
*
*
* @param blackHoleAmount
* @param circulationPoolAmount
* @returns
*/
calculateSellBurnMultiplier(
blackHoleAmount: Money,
circulationPoolAmount: Money,
): Decimal {
// 分子 = 100亿 - 黑洞销毁量
const numerator = TradingCalculatorService.BURN_TARGET.minus(blackHoleAmount.value);
// 分母 = 200万 - 流通池量
const denominator = TradingCalculatorService.CIRCULATION_POOL_TARGET.minus(
circulationPoolAmount.value,
);
// 防止除以零或负数
if (denominator.isZero() || denominator.isNegative()) {
// 当流通池已满时,销毁倍数设为最大合理值
return new Decimal('5'); // 或其他业务定义的最大倍数
}
if (numerator.isNegative()) {
// 当黑洞已满时,不再销毁
return new Decimal('0');
}
return numerator.dividedBy(denominator);
}
/**
*
* = ×
*
* @param sellQuantity
* @param burnMultiplier
* @returns
*/
calculateSellBurnAmount(sellQuantity: Money, burnMultiplier: Decimal): Money {
const burnAmount = sellQuantity.value.times(burnMultiplier);
return new Money(burnAmount);
}
/**
*
* = ( + ) ×
*
* @param sellQuantity
* @param burnQuantity
* @param price
* @returns
*/
calculateSellAmount(sellQuantity: Money, burnQuantity: Money, price: Money): Money {
const effectiveQuantity = sellQuantity.value.plus(burnQuantity.value);
const amount = effectiveQuantity.times(price.value);
return new Money(amount);
}
/**
*
* = ( + × ) ×
*
* @param shareBalance
* @param burnMultiplier
* @param price
* @returns
*/
calculateDisplayAssetValue(
shareBalance: Money,
burnMultiplier: Decimal,
price: Money,
): Money {
// 有效积分股 = 余额 + 余额 × 倍数 = 余额 × (1 + 倍数)
const multiplierFactor = new Decimal(1).plus(burnMultiplier);
const effectiveShares = shareBalance.value.times(multiplierFactor);
const assetValue = effectiveShares.times(price.value);
return new Money(assetValue);
}
/**
*
* = ÷ 24 ÷ 60 ÷ 60
*
* @param dailyAllocation
* @returns
*/
calculateAssetGrowthPerSecond(dailyAllocation: Money): Money {
const secondsPerDay = 24 * 60 * 60; // 86400
const perSecond = dailyAllocation.value.dividedBy(secondsPerDay);
return new Money(perSecond);
}
/**
*
* (100亿 - ) ÷
*
* @param blackHoleAmount
* @param remainingMinutes
* @returns
*/
calculateMinuteBurnRate(blackHoleAmount: Money, remainingMinutes: number): Money {
if (remainingMinutes <= 0) {
return Money.zero();
}
// 剩余需要销毁的量 = 100亿 - 已销毁量
const remainingBurn = TradingCalculatorService.BURN_TARGET.minus(blackHoleAmount.value);
if (remainingBurn.isZero() || remainingBurn.isNegative()) {
return Money.zero();
}
const minuteRate = remainingBurn.dividedBy(remainingMinutes);
return new Money(minuteRate);
}
/**
*
*
* @param activatedAt
* @returns
*/
calculateRemainingMinutes(activatedAt: Date): number {
const now = new Date();
const elapsedMs = now.getTime() - activatedAt.getTime();
const elapsedMinutes = Math.floor(elapsedMs / (60 * 1000));
return Math.max(0, TradingCalculatorService.BURN_PERIOD_MINUTES - elapsedMinutes);
}
/**
*
*
*
* @param currentPrice
* @param sellQuantity
* @param burnQuantity
* @param greenPoints 绿
* @param blackHoleAmount
* @param circulationPoolAmount
* @returns
*/
calculatePriceAfterSell(
greenPoints: Money,
blackHoleAmount: Money,
circulationPoolAmount: Money,
sellQuantity: Money,
burnQuantity: Money,
): Money {
// 卖出后流通池增加sellQuantity黑洞增加burnQuantity
const newBlackHole = blackHoleAmount.add(burnQuantity);
const newCirculation = circulationPoolAmount.add(sellQuantity);
return this.calculatePrice(greenPoints, newBlackHole, newCirculation);
}
}

View File

@ -5,8 +5,15 @@ import { PrismaModule } from './persistence/prisma/prisma.module';
import { TradingAccountRepository } from './persistence/repositories/trading-account.repository';
import { OrderRepository } from './persistence/repositories/order.repository';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { TradingConfigRepository } from './persistence/repositories/trading-config.repository';
import { BlackHoleRepository } from './persistence/repositories/black-hole.repository';
import { SharePoolRepository } from './persistence/repositories/share-pool.repository';
import { CirculationPoolRepository } from './persistence/repositories/circulation-pool.repository';
import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository';
import { ProcessedEventRepository } from './persistence/repositories/processed-event.repository';
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer';
@Global()
@Module({
@ -32,10 +39,17 @@ import { KafkaProducerService } from './kafka/kafka-producer.service';
},
]),
],
controllers: [UserRegisteredConsumer],
providers: [
TradingAccountRepository,
OrderRepository,
OutboxRepository,
TradingConfigRepository,
BlackHoleRepository,
SharePoolRepository,
CirculationPoolRepository,
PriceSnapshotRepository,
ProcessedEventRepository,
KafkaProducerService,
{
provide: 'REDIS_OPTIONS',
@ -53,6 +67,12 @@ import { KafkaProducerService } from './kafka/kafka-producer.service';
TradingAccountRepository,
OrderRepository,
OutboxRepository,
TradingConfigRepository,
BlackHoleRepository,
SharePoolRepository,
CirculationPoolRepository,
PriceSnapshotRepository,
ProcessedEventRepository,
KafkaProducerService,
RedisService,
ClientsModule,

View File

@ -0,0 +1 @@
export * from './user-registered.consumer';

View File

@ -0,0 +1,189 @@
import { Controller, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { RedisService } from '../../redis/redis.service';
import { TradingAccountRepository } from '../../persistence/repositories/trading-account.repository';
import { OutboxRepository } from '../../persistence/repositories/outbox.repository';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
import { TradingAccountAggregate } from '../../../domain/aggregates/trading-account.aggregate';
import {
TradingEventTypes,
TradingTopics,
TradingAccountCreatedPayload,
} from '../../../domain/events/trading.events';
// 用户注册事件结构(来自 auth-service
interface UserRegisteredEvent {
eventId: string;
eventType: string;
payload: {
accountSequence: string;
phone: string;
source: 'V1' | 'V2';
registeredAt: string;
};
}
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
@Controller()
export class UserRegisteredConsumer implements OnModuleInit {
private readonly logger = new Logger(UserRegisteredConsumer.name);
constructor(
private readonly redis: RedisService,
private readonly tradingAccountRepository: TradingAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly processedEventRepository: ProcessedEventRepository,
) {}
async onModuleInit() {
this.logger.log('UserRegisteredConsumer initialized - listening for user.registered events');
}
@EventPattern('auth.user.registered')
async handleUserRegistered(@Payload() message: any): Promise<void> {
// 解析消息格式
const event: UserRegisteredEvent = message.value || message;
const eventId = event.eventId || message.eventId;
if (!eventId) {
this.logger.warn('Received event without eventId, skipping');
return;
}
const accountSequence = event.payload?.accountSequence;
if (!accountSequence) {
this.logger.warn(`Event ${eventId} missing accountSequence, skipping`);
return;
}
this.logger.debug(
`Processing user registered event: ${eventId}, accountSequence: ${accountSequence}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 检查账户是否已存在
const existingAccount = await this.tradingAccountRepository.findByAccountSequence(
accountSequence,
);
if (existingAccount) {
this.logger.debug(`Trading account ${accountSequence} already exists`);
await this.markEventProcessed(eventId);
return;
}
// 创建交易账户
const account = TradingAccountAggregate.create(accountSequence);
const accountId = await this.tradingAccountRepository.save(account);
// 发布交易账户创建事件
await this.publishAccountCreatedEvent(accountId, accountSequence);
// 标记为已处理
await this.markEventProcessed(eventId);
this.logger.log(
`Trading account created for user ${accountSequence}, source: ${event.payload.source}`,
);
} catch (error) {
// 如果是重复创建的唯一约束错误,忽略
if (error instanceof Error && error.message.includes('Unique constraint')) {
this.logger.debug(
`Trading account already exists for ${accountSequence}, marking as processed`,
);
await this.markEventProcessed(eventId);
return;
}
this.logger.error(
`Failed to create trading account for ${accountSequence}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
/**
* - Redis + DB
* 1. Redis
* 2. Redis
*/
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `trading:processed-event:${eventId}`;
// 1. 先检查 Redis 缓存(快速路径)
const cached = await this.redis.get(redisKey);
if (cached) return true;
// 2. 检查数据库Redis 可能过期或重启后丢失)
const dbRecord = await this.processedEventRepository.findByEventId(eventId);
if (dbRecord) {
// 回填 Redis 缓存
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
/**
* - Redis + DB
*/
private async markEventProcessed(eventId: string, eventType: string = 'user.registered'): Promise<void> {
const redisKey = `trading:processed-event:${eventId}`;
// 1. 写入数据库(持久化)
try {
await this.processedEventRepository.create({
eventId,
eventType,
sourceService: 'auth-service',
});
} catch (error) {
// 可能已存在(并发情况),忽略唯一约束错误
if (!(error instanceof Error && error.message.includes('Unique constraint'))) {
throw error;
}
}
// 2. 写入 Redis 缓存4小时 TTL
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
/**
*
*/
private async publishAccountCreatedEvent(
accountId: string,
accountSequence: string,
): Promise<void> {
try {
const payload: TradingAccountCreatedPayload = {
accountId,
accountSequence,
createdAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'TradingAccount',
aggregateId: accountId,
eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED,
payload,
topic: TradingTopics.ACCOUNTS,
key: accountSequence,
});
this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`);
} catch (error) {
this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`);
}
}
}

View File

@ -0,0 +1,190 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Money } from '../../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface BlackHoleEntity {
id: string;
totalBurned: Money;
targetBurn: Money;
remainingBurn: Money;
lastBurnMinute: Date | null;
}
export interface BurnRecordEntity {
id: string;
blackHoleId: string;
burnMinute: Date;
burnAmount: Money;
remainingTarget: Money;
sourceType: string | null;
sourceAccountSeq: string | null;
sourceOrderNo: string | null;
memo: string | null;
createdAt: Date;
}
export type BurnSourceType = 'MINUTE_BURN' | 'SELL_BURN';
@Injectable()
export class BlackHoleRepository {
constructor(private readonly prisma: PrismaService) {}
async getBlackHole(): Promise<BlackHoleEntity | null> {
const record = await this.prisma.blackHole.findFirst();
if (!record) {
return null;
}
return this.toDomain(record);
}
async initializeBlackHole(targetBurn: Money): Promise<BlackHoleEntity> {
const existing = await this.prisma.blackHole.findFirst();
if (existing) {
return this.toDomain(existing);
}
const record = await this.prisma.blackHole.create({
data: {
totalBurned: 0,
targetBurn: targetBurn.value,
remainingBurn: targetBurn.value,
},
});
return this.toDomain(record);
}
/**
*
*/
async recordMinuteBurn(burnMinute: Date, burnAmount: Money): Promise<BurnRecordEntity> {
return this.recordBurn(burnMinute, burnAmount, 'MINUTE_BURN');
}
/**
*
*/
async recordSellBurn(
burnMinute: Date,
burnAmount: Money,
accountSeq: string,
orderNo: string,
): Promise<BurnRecordEntity> {
return this.recordBurn(burnMinute, burnAmount, 'SELL_BURN', accountSeq, orderNo);
}
/**
*
*/
private async recordBurn(
burnMinute: Date,
burnAmount: Money,
sourceType: BurnSourceType,
sourceAccountSeq?: string,
sourceOrderNo?: string,
): Promise<BurnRecordEntity> {
const blackHole = await this.prisma.blackHole.findFirst();
if (!blackHole) {
throw new Error('Black hole not initialized');
}
const newTotalBurned = new Decimal(blackHole.totalBurned.toString()).plus(burnAmount.value);
const newRemainingBurn = new Decimal(blackHole.targetBurn.toString()).minus(newTotalBurned);
const memo =
sourceType === 'MINUTE_BURN'
? `每分钟自动销毁 ${burnAmount.toFixed(8)}`
: `卖出销毁, 账户[${sourceAccountSeq}], 订单[${sourceOrderNo}], 数量${burnAmount.toFixed(8)}`;
const [, burnRecord] = await this.prisma.$transaction([
this.prisma.blackHole.update({
where: { id: blackHole.id },
data: {
totalBurned: newTotalBurned,
remainingBurn: newRemainingBurn.isNegative() ? 0 : newRemainingBurn,
lastBurnMinute: burnMinute,
},
}),
this.prisma.burnRecord.create({
data: {
blackHoleId: blackHole.id,
burnMinute,
burnAmount: burnAmount.value,
remainingTarget: newRemainingBurn.isNegative() ? 0 : newRemainingBurn,
sourceType,
sourceAccountSeq,
sourceOrderNo,
memo,
},
}),
]);
return this.toBurnRecordDomain(burnRecord);
}
async getBurnRecords(
page: number,
pageSize: number,
sourceType?: BurnSourceType,
): Promise<{
data: BurnRecordEntity[];
total: number;
}> {
const where = sourceType ? { sourceType } : {};
const [records, total] = await Promise.all([
this.prisma.burnRecord.findMany({
where,
orderBy: { burnMinute: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.burnRecord.count({ where }),
]);
return {
data: records.map((r) => this.toBurnRecordDomain(r)),
total,
};
}
async getTodayBurnAmount(): Promise<Money> {
const today = new Date();
today.setHours(0, 0, 0, 0);
const result = await this.prisma.burnRecord.aggregate({
where: {
burnMinute: { gte: today },
},
_sum: { burnAmount: true },
});
return new Money(result._sum.burnAmount || 0);
}
private toDomain(record: any): BlackHoleEntity {
return {
id: record.id,
totalBurned: new Money(record.totalBurned),
targetBurn: new Money(record.targetBurn),
remainingBurn: new Money(record.remainingBurn),
lastBurnMinute: record.lastBurnMinute,
};
}
private toBurnRecordDomain(record: any): BurnRecordEntity {
return {
id: record.id,
blackHoleId: record.blackHoleId,
burnMinute: record.burnMinute,
burnAmount: new Money(record.burnAmount),
remainingTarget: new Money(record.remainingTarget),
sourceType: record.sourceType,
sourceAccountSeq: record.sourceAccountSeq,
sourceOrderNo: record.sourceOrderNo,
memo: record.memo,
createdAt: record.createdAt,
};
}
}

View File

@ -0,0 +1,199 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Money } from '../../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface CirculationPoolEntity {
id: string;
totalShares: Money;
totalCash: Money;
totalInflow: Money;
totalOutflow: Money;
createdAt: Date;
updatedAt: Date;
}
export type CirculationPoolTransactionType =
| 'SHARE_IN'
| 'SHARE_OUT'
| 'CASH_IN'
| 'CASH_OUT'
| 'TRADE_BUY'
| 'TRADE_SELL';
@Injectable()
export class CirculationPoolRepository {
constructor(private readonly prisma: PrismaService) {}
async getPool(): Promise<CirculationPoolEntity | null> {
const record = await this.prisma.circulationPool.findFirst();
if (!record) {
return null;
}
return this.toDomain(record);
}
async initializePool(): Promise<CirculationPoolEntity> {
const existing = await this.prisma.circulationPool.findFirst();
if (existing) {
return this.toDomain(existing);
}
const record = await this.prisma.circulationPool.create({
data: {
totalShares: 0,
totalCash: 0,
totalInflow: 0,
totalOutflow: 0,
},
});
return this.toDomain(record);
}
/**
*
*/
async addSharesFromSell(
amount: Money,
accountSeq: string,
orderId: string,
memo?: string,
): Promise<void> {
const pool = await this.prisma.circulationPool.findFirst();
if (!pool) {
throw new Error('Circulation pool not initialized');
}
const balanceBefore = new Decimal(pool.totalShares.toString());
const balanceAfter = balanceBefore.plus(amount.value);
await this.prisma.$transaction([
this.prisma.circulationPool.update({
where: { id: pool.id },
data: {
totalShares: balanceAfter,
totalInflow: new Decimal(pool.totalInflow.toString()).plus(amount.value),
},
}),
this.prisma.circulationPoolTransaction.create({
data: {
poolId: pool.id,
type: 'TRADE_SELL',
assetType: 'SHARE',
amount: amount.value,
balanceBefore,
balanceAfter,
counterpartyType: 'USER',
counterpartyAccountSeq: accountSeq,
referenceId: orderId,
referenceType: 'ORDER',
memo: memo || `卖出积分股进入流通池 ${amount.toFixed(8)}`,
},
}),
]);
}
/**
*
*/
async removeSharesForBuy(
amount: Money,
accountSeq: string,
orderId: string,
memo?: string,
): Promise<void> {
const pool = await this.prisma.circulationPool.findFirst();
if (!pool) {
throw new Error('Circulation pool not initialized');
}
const balanceBefore = new Decimal(pool.totalShares.toString());
const balanceAfter = balanceBefore.minus(amount.value);
if (balanceAfter.isNegative()) {
throw new Error('Insufficient shares in circulation pool');
}
await this.prisma.$transaction([
this.prisma.circulationPool.update({
where: { id: pool.id },
data: {
totalShares: balanceAfter,
totalOutflow: new Decimal(pool.totalOutflow.toString()).plus(amount.value),
},
}),
this.prisma.circulationPoolTransaction.create({
data: {
poolId: pool.id,
type: 'TRADE_BUY',
assetType: 'SHARE',
amount: amount.value,
balanceBefore,
balanceAfter,
counterpartyType: 'USER',
counterpartyAccountSeq: accountSeq,
referenceId: orderId,
referenceType: 'ORDER',
memo: memo || `买入积分股从流通池流出 ${amount.toFixed(8)}`,
},
}),
]);
}
/**
*
*/
async getSharesAmount(): Promise<Money> {
const pool = await this.getPool();
if (!pool) {
return Money.zero();
}
return pool.totalShares;
}
async getTransactions(
page: number,
pageSize: number,
): Promise<{
data: any[];
total: number;
}> {
const pool = await this.prisma.circulationPool.findFirst();
if (!pool) {
return { data: [], total: 0 };
}
const [records, total] = await Promise.all([
this.prisma.circulationPoolTransaction.findMany({
where: { poolId: pool.id },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.circulationPoolTransaction.count({ where: { poolId: pool.id } }),
]);
return {
data: records.map((r) => ({
...r,
amount: r.amount.toString(),
balanceBefore: r.balanceBefore.toString(),
balanceAfter: r.balanceAfter.toString(),
})),
total,
};
}
private toDomain(record: any): CirculationPoolEntity {
return {
id: record.id,
totalShares: new Money(record.totalShares),
totalCash: new Money(record.totalCash),
totalInflow: new Money(record.totalInflow),
totalOutflow: new Money(record.totalOutflow),
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
}

View File

@ -47,6 +47,46 @@ export class OrderRepository {
}
}
/**
*
*/
async saveWithBurnInfo(
aggregate: OrderAggregate,
burnQuantity: Money,
effectiveQuantity: Money,
): Promise<string> {
const data = {
orderNo: aggregate.orderNo,
accountSequence: aggregate.accountSequence,
type: aggregate.type,
status: aggregate.status,
price: aggregate.price.value,
quantity: aggregate.quantity.value,
filledQuantity: aggregate.filledQuantity.value,
remainingQuantity: aggregate.remainingQuantity.value,
averagePrice: aggregate.averagePrice.value,
totalAmount: aggregate.totalAmount.value,
burnQuantity: burnQuantity.value,
burnMultiplier: burnQuantity.isZero()
? 0
: burnQuantity.value.dividedBy(aggregate.filledQuantity.value),
effectiveQuantity: effectiveQuantity.value,
cancelledAt: aggregate.cancelledAt,
completedAt: aggregate.completedAt,
};
if (aggregate.id) {
await this.prisma.order.update({
where: { id: aggregate.id },
data,
});
return aggregate.id;
} else {
const created = await this.prisma.order.create({ data });
return created.id;
}
}
async findActiveOrders(type?: OrderType): Promise<OrderAggregate[]> {
const where: any = {
status: { in: [OrderStatus.PENDING, OrderStatus.PARTIAL] },

View File

@ -0,0 +1,134 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Money } from '../../../domain/value-objects/money.vo';
export interface PriceSnapshotEntity {
id: string;
snapshotTime: Date;
price: Money;
greenPoints: Money;
blackHoleAmount: Money;
circulationPool: Money;
effectiveDenominator: Money;
minuteBurnRate: Money;
createdAt: Date;
}
@Injectable()
export class PriceSnapshotRepository {
constructor(private readonly prisma: PrismaService) {}
async getLatestSnapshot(): Promise<PriceSnapshotEntity | null> {
const record = await this.prisma.priceSnapshot.findFirst({
orderBy: { snapshotTime: 'desc' },
});
if (!record) {
return null;
}
return this.toDomain(record);
}
async getSnapshotAt(time: Date): Promise<PriceSnapshotEntity | null> {
// 获取指定时间之前最近的快照
const record = await this.prisma.priceSnapshot.findFirst({
where: { snapshotTime: { lte: time } },
orderBy: { snapshotTime: 'desc' },
});
if (!record) {
return null;
}
return this.toDomain(record);
}
async createSnapshot(data: {
snapshotTime: Date;
price: Money;
greenPoints: Money;
blackHoleAmount: Money;
circulationPool: Money;
effectiveDenominator: Money;
minuteBurnRate: Money;
}): Promise<PriceSnapshotEntity> {
const record = await this.prisma.priceSnapshot.create({
data: {
snapshotTime: data.snapshotTime,
price: data.price.value,
greenPoints: data.greenPoints.value,
blackHoleAmount: data.blackHoleAmount.value,
circulationPool: data.circulationPool.value,
effectiveDenominator: data.effectiveDenominator.value,
minuteBurnRate: data.minuteBurnRate.value,
},
});
return this.toDomain(record);
}
async getPriceHistory(
startTime: Date,
endTime: Date,
limit: number = 1440,
): Promise<PriceSnapshotEntity[]> {
const records = await this.prisma.priceSnapshot.findMany({
where: {
snapshotTime: {
gte: startTime,
lte: endTime,
},
},
orderBy: { snapshotTime: 'asc' },
take: limit,
});
return records.map((r) => this.toDomain(r));
}
async getSnapshots(
page: number,
pageSize: number,
): Promise<{
data: PriceSnapshotEntity[];
total: number;
}> {
const [records, total] = await Promise.all([
this.prisma.priceSnapshot.findMany({
orderBy: { snapshotTime: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.priceSnapshot.count(),
]);
return {
data: records.map((r) => this.toDomain(r)),
total,
};
}
/**
*
*/
async cleanupOldSnapshots(retentionDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const result = await this.prisma.priceSnapshot.deleteMany({
where: { snapshotTime: { lt: cutoffDate } },
});
return result.count;
}
private toDomain(record: any): PriceSnapshotEntity {
return {
id: record.id,
snapshotTime: record.snapshotTime,
price: new Money(record.price),
greenPoints: new Money(record.greenPoints),
blackHoleAmount: new Money(record.blackHoleAmount),
circulationPool: new Money(record.circulationPool),
effectiveDenominator: new Money(record.effectiveDenominator),
minuteBurnRate: new Money(record.minuteBurnRate),
createdAt: record.createdAt,
};
}
}

View File

@ -0,0 +1,65 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
export interface ProcessedEventEntity {
id: string;
eventId: string;
eventType: string;
sourceService: string;
processedAt: Date;
}
@Injectable()
export class ProcessedEventRepository {
constructor(private readonly prisma: PrismaService) {}
/**
*
*/
async findByEventId(eventId: string): Promise<ProcessedEventEntity | null> {
const record = await this.prisma.processedEvent.findUnique({
where: { eventId },
});
return record;
}
/**
*
*/
async create(data: {
eventId: string;
eventType: string;
sourceService: string;
}): Promise<ProcessedEventEntity> {
return this.prisma.processedEvent.create({
data: {
eventId: data.eventId,
eventType: data.eventType,
sourceService: data.sourceService,
},
});
}
/**
*
*/
async isProcessed(eventId: string): Promise<boolean> {
const count = await this.prisma.processedEvent.count({
where: { eventId },
});
return count > 0;
}
/**
*
* @param before
*/
async deleteOldRecords(before: Date): Promise<number> {
const result = await this.prisma.processedEvent.deleteMany({
where: {
processedAt: { lt: before },
},
});
return result.count;
}
}

View File

@ -0,0 +1,191 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Money } from '../../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface SharePoolEntity {
id: string;
greenPoints: Money;
totalInflow: Money;
totalOutflow: Money;
createdAt: Date;
updatedAt: Date;
}
export type SharePoolTransactionType = 'INJECT' | 'TRADE_IN' | 'TRADE_OUT';
export interface SharePoolTransactionEntity {
id: string;
poolId: string;
type: SharePoolTransactionType;
amount: Money;
balanceBefore: Money;
balanceAfter: Money;
referenceId: string | null;
referenceType: string | null;
memo: string | null;
createdAt: Date;
}
@Injectable()
export class SharePoolRepository {
constructor(private readonly prisma: PrismaService) {}
async getPool(): Promise<SharePoolEntity | null> {
const record = await this.prisma.sharePool.findFirst();
if (!record) {
return null;
}
return this.toDomain(record);
}
async initializePool(initialGreenPoints: Money = Money.zero()): Promise<SharePoolEntity> {
const existing = await this.prisma.sharePool.findFirst();
if (existing) {
return this.toDomain(existing);
}
const record = await this.prisma.sharePool.create({
data: {
greenPoints: initialGreenPoints.value,
totalInflow: initialGreenPoints.value,
totalOutflow: 0,
},
});
return this.toDomain(record);
}
/**
* 绿
*/
async inject(amount: Money, referenceId?: string, memo?: string): Promise<void> {
await this.updateBalance('INJECT', amount, true, referenceId, memo);
}
/**
* 绿
*/
async tradeIn(amount: Money, tradeId: string): Promise<void> {
await this.updateBalance('TRADE_IN', amount, true, tradeId, `交易买入流入 ${amount.toFixed(8)}`);
}
/**
* 绿
*/
async tradeOut(amount: Money, tradeId: string): Promise<void> {
await this.updateBalance(
'TRADE_OUT',
amount,
false,
tradeId,
`交易卖出流出 ${amount.toFixed(8)}`,
);
}
private async updateBalance(
type: SharePoolTransactionType,
amount: Money,
isInflow: boolean,
referenceId?: string,
memo?: string,
): Promise<void> {
const pool = await this.prisma.sharePool.findFirst();
if (!pool) {
throw new Error('Share pool not initialized');
}
const balanceBefore = new Decimal(pool.greenPoints.toString());
const balanceAfter = isInflow
? balanceBefore.plus(amount.value)
: balanceBefore.minus(amount.value);
if (balanceAfter.isNegative()) {
throw new Error('Insufficient green points in share pool');
}
const newTotalInflow = isInflow
? new Decimal(pool.totalInflow.toString()).plus(amount.value)
: pool.totalInflow;
const newTotalOutflow = !isInflow
? new Decimal(pool.totalOutflow.toString()).plus(amount.value)
: pool.totalOutflow;
await this.prisma.$transaction([
this.prisma.sharePool.update({
where: { id: pool.id },
data: {
greenPoints: balanceAfter,
totalInflow: newTotalInflow,
totalOutflow: newTotalOutflow,
},
}),
this.prisma.sharePoolTransaction.create({
data: {
poolId: pool.id,
type,
amount: amount.value,
balanceBefore,
balanceAfter,
referenceId,
referenceType: type === 'INJECT' ? 'INJECT' : 'TRADE',
memo,
},
}),
]);
}
async getTransactions(
page: number,
pageSize: number,
): Promise<{
data: SharePoolTransactionEntity[];
total: number;
}> {
const pool = await this.prisma.sharePool.findFirst();
if (!pool) {
return { data: [], total: 0 };
}
const [records, total] = await Promise.all([
this.prisma.sharePoolTransaction.findMany({
where: { poolId: pool.id },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.sharePoolTransaction.count({ where: { poolId: pool.id } }),
]);
return {
data: records.map((r) => this.toTransactionDomain(r)),
total,
};
}
private toDomain(record: any): SharePoolEntity {
return {
id: record.id,
greenPoints: new Money(record.greenPoints),
totalInflow: new Money(record.totalInflow),
totalOutflow: new Money(record.totalOutflow),
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
private toTransactionDomain(record: any): SharePoolTransactionEntity {
return {
id: record.id,
poolId: record.poolId,
type: record.type as SharePoolTransactionType,
amount: new Money(record.amount),
balanceBefore: new Money(record.balanceBefore),
balanceAfter: new Money(record.balanceAfter),
referenceId: record.referenceId,
referenceType: record.referenceType,
memo: record.memo,
createdAt: record.createdAt,
};
}
}

View File

@ -15,11 +15,11 @@ export class TradingAccountRepository {
return this.toDomain(record);
}
async save(aggregate: TradingAccountAggregate): Promise<void> {
async save(aggregate: TradingAccountAggregate): Promise<string> {
const transactions = aggregate.pendingTransactions;
await this.prisma.$transaction(async (tx) => {
await tx.tradingAccount.upsert({
const result = await this.prisma.$transaction(async (tx) => {
const account = await tx.tradingAccount.upsert({
where: { accountSequence: aggregate.accountSequence },
create: {
accountSequence: aggregate.accountSequence,
@ -55,9 +55,12 @@ export class TradingAccountRepository {
})),
});
}
return account.id;
});
aggregate.clearPendingTransactions();
return result;
}
async getTransactions(

View File

@ -0,0 +1,101 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { Money } from '../../../domain/value-objects/money.vo';
import Decimal from 'decimal.js';
export interface TradingConfigEntity {
id: string;
totalShares: Money;
burnTarget: Money;
burnPeriodMinutes: number;
minuteBurnRate: Money;
isActive: boolean;
activatedAt: Date | null;
createdAt: Date;
updatedAt: Date;
}
@Injectable()
export class TradingConfigRepository {
constructor(private readonly prisma: PrismaService) {}
async getConfig(): Promise<TradingConfigEntity | null> {
const record = await this.prisma.tradingConfig.findFirst();
if (!record) {
return null;
}
return this.toDomain(record);
}
async initializeConfig(): Promise<TradingConfigEntity> {
const existing = await this.prisma.tradingConfig.findFirst();
if (existing) {
return this.toDomain(existing);
}
const record = await this.prisma.tradingConfig.create({
data: {
totalShares: new Decimal('100020000000'),
burnTarget: new Decimal('10000000000'),
burnPeriodMinutes: 2102400, // 365 * 4 * 1440
minuteBurnRate: new Decimal('4756.468797564687'),
isActive: false,
},
});
return this.toDomain(record);
}
async activate(): Promise<void> {
const config = await this.prisma.tradingConfig.findFirst();
if (!config) {
throw new Error('Trading config not initialized');
}
await this.prisma.tradingConfig.update({
where: { id: config.id },
data: {
isActive: true,
activatedAt: new Date(),
},
});
}
async deactivate(): Promise<void> {
const config = await this.prisma.tradingConfig.findFirst();
if (!config) {
return;
}
await this.prisma.tradingConfig.update({
where: { id: config.id },
data: { isActive: false },
});
}
async updateMinuteBurnRate(newRate: Money): Promise<void> {
const config = await this.prisma.tradingConfig.findFirst();
if (!config) {
throw new Error('Trading config not initialized');
}
await this.prisma.tradingConfig.update({
where: { id: config.id },
data: { minuteBurnRate: newRate.value },
});
}
private toDomain(record: any): TradingConfigEntity {
return {
id: record.id,
totalShares: new Money(record.totalShares),
burnTarget: new Money(record.burnTarget),
burnPeriodMinutes: record.burnPeriodMinutes,
minuteBurnRate: new Money(record.minuteBurnRate),
isActive: record.isActive,
activatedAt: record.activatedAt,
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
}