From 19428a8cb7da0e3b8dcf5291809dab7020ff9518 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 15 Jan 2026 04:27:14 -0800 Subject: [PATCH] feat(trading-service): sync trading account creation with wallet service - Add CDC consumer to listen for UserWalletCreated events from mining-wallet-service - Create trading accounts when user contribution wallets are created (lazy creation) - Add WalletSystemAccountCreated handler for province/city system accounts - Add seed script for core system accounts (HQ, operation, cost, pool) - Keep auth.user.registered listener for V2 new user registration This ensures trading accounts are created in sync with wallet accounts, supporting both V2 new users and V1 migrated users. Co-Authored-By: Claude Opus 4.5 --- backend/services/trading-service/package.json | 6 +- .../services/trading-service/prisma/seed.ts | 137 +++++++ .../infrastructure/infrastructure.module.ts | 2 + .../kafka/cdc-consumer.service.ts | 371 ++++++++++++++++++ 4 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 backend/services/trading-service/prisma/seed.ts create mode 100644 backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts diff --git a/backend/services/trading-service/package.json b/backend/services/trading-service/package.json index aa9a8329..f09ff781 100644 --- a/backend/services/trading-service/package.json +++ b/backend/services/trading-service/package.json @@ -15,7 +15,11 @@ "test:cov": "jest --coverage", "prisma:generate": "prisma generate", "prisma:migrate": "prisma migrate dev", - "prisma:migrate:prod": "prisma migrate deploy" + "prisma:migrate:prod": "prisma migrate deploy", + "prisma:seed": "ts-node prisma/seed.ts" + }, + "prisma": { + "seed": "ts-node prisma/seed.ts" }, "dependencies": { "@nestjs/common": "^10.3.0", diff --git a/backend/services/trading-service/prisma/seed.ts b/backend/services/trading-service/prisma/seed.ts new file mode 100644 index 00000000..3a3a8861 --- /dev/null +++ b/backend/services/trading-service/prisma/seed.ts @@ -0,0 +1,137 @@ +import { PrismaClient } from '@prisma/client'; +import Decimal from 'decimal.js'; + +const prisma = new PrismaClient(); + +async function main() { + console.log('Seeding trading-service database...'); + + // 1. 初始化系统账户的交易账户 + // 这些账户序列号对应 mining-wallet-service 中的系统账户 + const systemAccounts = [ + { accountSequence: 'S0000000001', name: '总部社区' }, + { accountSequence: 'S0000000002', name: '成本费账户' }, + { accountSequence: 'S0000000003', name: '运营费账户' }, + { accountSequence: 'S0000000004', name: 'RWAD底池账户' }, + ]; + + for (const account of systemAccounts) { + const existing = await prisma.tradingAccount.findUnique({ + where: { accountSequence: account.accountSequence }, + }); + + if (!existing) { + const created = await prisma.tradingAccount.create({ + data: { + accountSequence: account.accountSequence, + shareBalance: new Decimal(0), + cashBalance: new Decimal(0), + frozenShares: new Decimal(0), + frozenCash: new Decimal(0), + totalBought: new Decimal(0), + totalSold: new Decimal(0), + }, + }); + + // 发布交易账户创建事件到 Outbox(用于 mining-admin-service 同步) + await prisma.outboxEvent.create({ + data: { + aggregateType: 'TradingAccount', + aggregateId: created.id, + eventType: 'TradingAccountCreated', + topic: 'cdc.trading.outbox', + key: created.accountSequence, + payload: { + accountId: created.id, + accountSequence: created.accountSequence, + shareBalance: '0', + cashBalance: '0', + frozenShares: '0', + frozenCash: '0', + totalBought: '0', + totalSold: '0', + createdAt: created.createdAt.toISOString(), + }, + }, + }); + + console.log(`Created trading account for system: ${account.name} (${account.accountSequence})`); + } else { + console.log(`Trading account already exists: ${account.name} (${account.accountSequence})`); + } + } + + // 2. 初始化交易配置 + const existingConfig = await prisma.tradingConfig.findFirst(); + if (!existingConfig) { + await prisma.tradingConfig.create({ + data: { + totalShares: new Decimal('100020000000'), // 100.02B 总积分股 + burnTarget: new Decimal('10000000000'), // 100亿目标销毁量 + burnPeriodMinutes: 2102400, // 4年 = 365*4*1440 分钟 + minuteBurnRate: new Decimal('4756.468797564687'), // 每分钟销毁率 + isActive: false, + }, + }); + console.log('Created trading config'); + } else { + console.log('Trading config already exists'); + } + + // 3. 初始化黑洞账户 + const existingBlackHole = await prisma.blackHole.findFirst(); + if (!existingBlackHole) { + await prisma.blackHole.create({ + data: { + totalBurned: new Decimal(0), + targetBurn: new Decimal('10000000000'), // 100亿目标销毁 + remainingBurn: new Decimal('10000000000'), + }, + }); + console.log('Created black hole account'); + } else { + console.log('Black hole account already exists'); + } + + // 4. 初始化积分股池 + const existingSharePool = await prisma.sharePool.findFirst(); + if (!existingSharePool) { + await prisma.sharePool.create({ + data: { + greenPoints: new Decimal(0), // 初始绿积分为 0 + totalInflow: new Decimal(0), + totalOutflow: new Decimal(0), + }, + }); + console.log('Created share pool'); + } else { + console.log('Share pool already exists'); + } + + // 5. 初始化流通池 + const existingCirculationPool = await prisma.circulationPool.findFirst(); + if (!existingCirculationPool) { + await prisma.circulationPool.create({ + data: { + totalShares: new Decimal(0), + totalCash: new Decimal(0), + totalInflow: new Decimal(0), + totalOutflow: new Decimal(0), + }, + }); + console.log('Created circulation pool'); + } else { + console.log('Circulation pool already exists'); + } + + console.log('Seeding completed!'); +} + +main() + .catch((e) => { + console.error('Seeding failed:', e); + process.exit(1); + }) + .finally(async () => { + await prisma.$disconnect(); + }); diff --git a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts index 106b093e..2e792552 100644 --- a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts @@ -14,6 +14,7 @@ import { ProcessedEventRepository } from './persistence/repositories/processed-e import { RedisService } from './redis/redis.service'; import { KafkaProducerService } from './kafka/kafka-producer.service'; import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer'; +import { CdcConsumerService } from './kafka/cdc-consumer.service'; @Global() @Module({ @@ -51,6 +52,7 @@ import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consum PriceSnapshotRepository, ProcessedEventRepository, KafkaProducerService, + CdcConsumerService, { provide: 'REDIS_OPTIONS', useFactory: (configService: ConfigService) => ({ diff --git a/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts new file mode 100644 index 00000000..667e4e18 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -0,0 +1,371 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { TradingAccountRepository } from '../persistence/repositories/trading-account.repository'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; +import { ProcessedEventRepository } from '../persistence/repositories/processed-event.repository'; +import { RedisService } from '../redis/redis.service'; +import { TradingAccountAggregate } from '../../domain/aggregates/trading-account.aggregate'; +import { + TradingEventTypes, + TradingTopics, + TradingAccountCreatedPayload, +} from '../../domain/events/trading.events'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +/** + * CDC Consumer Service + * 监听 mining-wallet-service 的 outbox 事件,在用户钱包创建时同步创建交易账户 + */ +@Injectable() +export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(CdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + constructor( + private readonly configService: ConfigService, + private readonly tradingAccountRepository: TradingAccountRepository, + private readonly outboxRepository: OutboxRepository, + private readonly processedEventRepository: ProcessedEventRepository, + private readonly redis: RedisService, + ) { + const brokers = this.configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','); + + this.kafka = new Kafka({ + clientId: 'trading-service-cdc', + brokers, + }); + + this.consumer = this.kafka.consumer({ + groupId: this.configService.get( + 'CDC_CONSUMER_GROUP', + 'trading-service-cdc-group', + ), + }); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('CDC consumer is already running'); + return; + } + + const walletTopic = this.configService.get( + 'CDC_TOPIC_WALLET_OUTBOX', + 'cdc.mining-wallet.outbox', + ); + + try { + await this.consumer.connect(); + this.logger.log('CDC consumer connected'); + + await this.consumer.subscribe({ + topics: [walletTopic], + fromBeginning: false, // 不需要处理历史消息 + }); + this.logger.log(`Subscribed to topic: ${walletTopic}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('CDC consumer started - listening for UserWalletCreated and WalletSystemAccountCreated events'); + } catch (error) { + this.logger.error('Failed to start CDC consumer', error); + } + } + + async stop(): Promise { + if (!this.isRunning) { + return; + } + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('CDC consumer stopped'); + } catch (error) { + this.logger.error('Failed to stop CDC consumer', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, message } = payload; + + try { + if (!message.value) { + return; + } + + const eventData = JSON.parse(message.value.toString()); + + // 忽略心跳消息 + if (this.isHeartbeatMessage(eventData)) { + return; + } + + // 处理 Debezium outbox 事件 + if (this.isDebeziumOutboxEvent(eventData)) { + const op = eventData.payload?.op; + // 只处理 create 操作 + if (op !== 'c' && op !== 'r') { + return; + } + + const after = eventData.payload.after; + await this.handleOutboxEvent(after); + } + } catch (error) { + this.logger.error(`Error processing message from topic ${topic}`, error); + } + } + + private isHeartbeatMessage(data: any): boolean { + const keys = Object.keys(data); + return keys.length === 1 && keys[0] === 'ts_ms'; + } + + private isDebeziumOutboxEvent(data: any): boolean { + const after = data.payload?.after; + if (!after) return false; + return after.event_type && after.aggregate_type; + } + + private async handleOutboxEvent(data: any): Promise { + const eventType = data.event_type; + const eventId = data.id || data.outbox_id; + + // 处理不同类型的事件 + if (eventType === 'UserWalletCreated') { + await this.handleUserWalletCreated(eventId, data); + } else if (eventType === 'WalletSystemAccountCreated') { + await this.handleSystemAccountCreated(eventId, data); + } + } + + /** + * 处理用户钱包创建事件 - 创建用户交易账户 + */ + private async handleUserWalletCreated(eventId: string, data: any): Promise { + + const payload = typeof data.payload === 'string' + ? JSON.parse(data.payload) + : data.payload; + + const accountSequence = payload?.accountSequence; + if (!accountSequence) { + this.logger.warn(`UserWalletCreated event ${eventId} missing accountSequence`); + return; + } + + // 只处理 CONTRIBUTION 类型的钱包 + if (payload?.walletType !== 'CONTRIBUTION') { + return; + } + + this.logger.debug( + `Processing UserWalletCreated event: ${eventId}, accountSequence: ${accountSequence}`, + ); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + // 检查交易账户是否已存在 + const existingAccount = + await this.tradingAccountRepository.findByAccountSequence(accountSequence); + + if (existingAccount) { + this.logger.debug(`Trading account ${accountSequence} already exists`); + await this.markEventProcessed(eventId); + return; + } + + // 创建交易账户 + const account = TradingAccountAggregate.create(accountSequence); + const accountId = await this.tradingAccountRepository.save(account); + + // 发布交易账户创建事件 + await this.publishAccountCreatedEvent(accountId, accountSequence); + + // 标记为已处理 + await this.markEventProcessed(eventId); + + this.logger.log( + `Trading account created for user ${accountSequence} (triggered by UserWalletCreated)`, + ); + } catch (error) { + if (error instanceof Error && error.message.includes('Unique constraint')) { + this.logger.debug( + `Trading account already exists for ${accountSequence}, marking as processed`, + ); + await this.markEventProcessed(eventId, 'UserWalletCreated'); + return; + } + + this.logger.error( + `Failed to create trading account for ${accountSequence}`, + error instanceof Error ? error.stack : error, + ); + throw error; + } + } + + /** + * 处理系统账户创建事件 - 为省/市系统账户创建交易账户 + * 系统账户的 accountSequence 格式为 code(如 PROV-440000, CITY-440100) + */ + private async handleSystemAccountCreated(eventId: string, data: any): Promise { + const payload = typeof data.payload === 'string' + ? JSON.parse(data.payload) + : data.payload; + + // 系统账户使用 code 作为 accountSequence + const accountCode = payload?.code; + if (!accountCode) { + this.logger.warn(`WalletSystemAccountCreated event ${eventId} missing code`); + return; + } + + // 只处理省/市级系统账户 + const accountType = payload?.accountType; + if (accountType !== 'PROVINCE' && accountType !== 'CITY') { + this.logger.debug( + `Skipping non-regional system account: ${accountCode}, type: ${accountType}`, + ); + return; + } + + this.logger.debug( + `Processing WalletSystemAccountCreated event: ${eventId}, code: ${accountCode}, type: ${accountType}`, + ); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + // 检查交易账户是否已存在 + const existingAccount = + await this.tradingAccountRepository.findByAccountSequence(accountCode); + + if (existingAccount) { + this.logger.debug(`Trading account ${accountCode} already exists`); + await this.markEventProcessed(eventId, 'WalletSystemAccountCreated'); + return; + } + + // 创建交易账户 + const account = TradingAccountAggregate.create(accountCode); + const accountId = await this.tradingAccountRepository.save(account); + + // 发布交易账户创建事件 + await this.publishAccountCreatedEvent(accountId, accountCode); + + // 标记为已处理 + await this.markEventProcessed(eventId, 'WalletSystemAccountCreated'); + + this.logger.log( + `Trading account created for system ${accountType}: ${accountCode} (triggered by WalletSystemAccountCreated)`, + ); + } catch (error) { + if (error instanceof Error && error.message.includes('Unique constraint')) { + this.logger.debug( + `Trading account already exists for ${accountCode}, marking as processed`, + ); + await this.markEventProcessed(eventId, 'WalletSystemAccountCreated'); + return; + } + + this.logger.error( + `Failed to create trading account for system ${accountCode}`, + error instanceof Error ? error.stack : error, + ); + throw error; + } + } + + private async isEventProcessed(eventId: string): Promise { + const redisKey = `trading:processed-event:wallet:${eventId}`; + + const cached = await this.redis.get(redisKey); + if (cached) return true; + + const dbRecord = await this.processedEventRepository.findByEventId(String(eventId)); + if (dbRecord) { + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + private async markEventProcessed( + eventId: string, + eventType: string = 'UserWalletCreated', + ): Promise { + const redisKey = `trading:processed-event:wallet:${eventId}`; + + try { + await this.processedEventRepository.create({ + eventId: String(eventId), + eventType, + sourceService: 'mining-wallet-service', + }); + } catch (error) { + if (!(error instanceof Error && error.message.includes('Unique constraint'))) { + throw error; + } + } + + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } + + private async publishAccountCreatedEvent( + accountId: string, + accountSequence: string, + ): Promise { + try { + const payload: TradingAccountCreatedPayload = { + accountId, + accountSequence, + createdAt: new Date().toISOString(), + }; + + await this.outboxRepository.create({ + aggregateType: 'TradingAccount', + aggregateId: accountId, + eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED, + payload, + topic: TradingTopics.ACCOUNTS, + key: accountSequence, + }); + + this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`); + } catch (error) { + this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`); + } + } +}