feat(trading-service): sync trading account creation with wallet service

- Add CDC consumer to listen for UserWalletCreated events from mining-wallet-service
- Create trading accounts when user contribution wallets are created (lazy creation)
- Add WalletSystemAccountCreated handler for province/city system accounts
- Add seed script for core system accounts (HQ, operation, cost, pool)
- Keep auth.user.registered listener for V2 new user registration

This ensures trading accounts are created in sync with wallet accounts,
supporting both V2 new users and V1 migrated users.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-15 04:27:14 -08:00
parent 183b2bef59
commit 19428a8cb7
4 changed files with 515 additions and 1 deletions

View File

@ -15,7 +15,11 @@
"test:cov": "jest --coverage",
"prisma:generate": "prisma generate",
"prisma:migrate": "prisma migrate dev",
"prisma:migrate:prod": "prisma migrate deploy"
"prisma:migrate:prod": "prisma migrate deploy",
"prisma:seed": "ts-node prisma/seed.ts"
},
"prisma": {
"seed": "ts-node prisma/seed.ts"
},
"dependencies": {
"@nestjs/common": "^10.3.0",

View File

@ -0,0 +1,137 @@
import { PrismaClient } from '@prisma/client';
import Decimal from 'decimal.js';
const prisma = new PrismaClient();
async function main() {
console.log('Seeding trading-service database...');
// 1. 初始化系统账户的交易账户
// 这些账户序列号对应 mining-wallet-service 中的系统账户
const systemAccounts = [
{ accountSequence: 'S0000000001', name: '总部社区' },
{ accountSequence: 'S0000000002', name: '成本费账户' },
{ accountSequence: 'S0000000003', name: '运营费账户' },
{ accountSequence: 'S0000000004', name: 'RWAD底池账户' },
];
for (const account of systemAccounts) {
const existing = await prisma.tradingAccount.findUnique({
where: { accountSequence: account.accountSequence },
});
if (!existing) {
const created = await prisma.tradingAccount.create({
data: {
accountSequence: account.accountSequence,
shareBalance: new Decimal(0),
cashBalance: new Decimal(0),
frozenShares: new Decimal(0),
frozenCash: new Decimal(0),
totalBought: new Decimal(0),
totalSold: new Decimal(0),
},
});
// 发布交易账户创建事件到 Outbox用于 mining-admin-service 同步)
await prisma.outboxEvent.create({
data: {
aggregateType: 'TradingAccount',
aggregateId: created.id,
eventType: 'TradingAccountCreated',
topic: 'cdc.trading.outbox',
key: created.accountSequence,
payload: {
accountId: created.id,
accountSequence: created.accountSequence,
shareBalance: '0',
cashBalance: '0',
frozenShares: '0',
frozenCash: '0',
totalBought: '0',
totalSold: '0',
createdAt: created.createdAt.toISOString(),
},
},
});
console.log(`Created trading account for system: ${account.name} (${account.accountSequence})`);
} else {
console.log(`Trading account already exists: ${account.name} (${account.accountSequence})`);
}
}
// 2. 初始化交易配置
const existingConfig = await prisma.tradingConfig.findFirst();
if (!existingConfig) {
await prisma.tradingConfig.create({
data: {
totalShares: new Decimal('100020000000'), // 100.02B 总积分股
burnTarget: new Decimal('10000000000'), // 100亿目标销毁量
burnPeriodMinutes: 2102400, // 4年 = 365*4*1440 分钟
minuteBurnRate: new Decimal('4756.468797564687'), // 每分钟销毁率
isActive: false,
},
});
console.log('Created trading config');
} else {
console.log('Trading config already exists');
}
// 3. 初始化黑洞账户
const existingBlackHole = await prisma.blackHole.findFirst();
if (!existingBlackHole) {
await prisma.blackHole.create({
data: {
totalBurned: new Decimal(0),
targetBurn: new Decimal('10000000000'), // 100亿目标销毁
remainingBurn: new Decimal('10000000000'),
},
});
console.log('Created black hole account');
} else {
console.log('Black hole account already exists');
}
// 4. 初始化积分股池
const existingSharePool = await prisma.sharePool.findFirst();
if (!existingSharePool) {
await prisma.sharePool.create({
data: {
greenPoints: new Decimal(0), // 初始绿积分为 0
totalInflow: new Decimal(0),
totalOutflow: new Decimal(0),
},
});
console.log('Created share pool');
} else {
console.log('Share pool already exists');
}
// 5. 初始化流通池
const existingCirculationPool = await prisma.circulationPool.findFirst();
if (!existingCirculationPool) {
await prisma.circulationPool.create({
data: {
totalShares: new Decimal(0),
totalCash: new Decimal(0),
totalInflow: new Decimal(0),
totalOutflow: new Decimal(0),
},
});
console.log('Created circulation pool');
} else {
console.log('Circulation pool already exists');
}
console.log('Seeding completed!');
}
main()
.catch((e) => {
console.error('Seeding failed:', e);
process.exit(1);
})
.finally(async () => {
await prisma.$disconnect();
});

View File

@ -14,6 +14,7 @@ import { ProcessedEventRepository } from './persistence/repositories/processed-e
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer';
import { CdcConsumerService } from './kafka/cdc-consumer.service';
@Global()
@Module({
@ -51,6 +52,7 @@ import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consum
PriceSnapshotRepository,
ProcessedEventRepository,
KafkaProducerService,
CdcConsumerService,
{
provide: 'REDIS_OPTIONS',
useFactory: (configService: ConfigService) => ({

View File

@ -0,0 +1,371 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { TradingAccountRepository } from '../persistence/repositories/trading-account.repository';
import { OutboxRepository } from '../persistence/repositories/outbox.repository';
import { ProcessedEventRepository } from '../persistence/repositories/processed-event.repository';
import { RedisService } from '../redis/redis.service';
import { TradingAccountAggregate } from '../../domain/aggregates/trading-account.aggregate';
import {
TradingEventTypes,
TradingTopics,
TradingAccountCreatedPayload,
} from '../../domain/events/trading.events';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
/**
* CDC Consumer Service
* mining-wallet-service outbox
*/
@Injectable()
export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isRunning = false;
constructor(
private readonly configService: ConfigService,
private readonly tradingAccountRepository: TradingAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly processedEventRepository: ProcessedEventRepository,
private readonly redis: RedisService,
) {
const brokers = this.configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(',');
this.kafka = new Kafka({
clientId: 'trading-service-cdc',
brokers,
});
this.consumer = this.kafka.consumer({
groupId: this.configService.get<string>(
'CDC_CONSUMER_GROUP',
'trading-service-cdc-group',
),
});
}
async onModuleInit() {
await this.start();
}
async onModuleDestroy() {
await this.stop();
}
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('CDC consumer is already running');
return;
}
const walletTopic = this.configService.get<string>(
'CDC_TOPIC_WALLET_OUTBOX',
'cdc.mining-wallet.outbox',
);
try {
await this.consumer.connect();
this.logger.log('CDC consumer connected');
await this.consumer.subscribe({
topics: [walletTopic],
fromBeginning: false, // 不需要处理历史消息
});
this.logger.log(`Subscribed to topic: ${walletTopic}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('CDC consumer started - listening for UserWalletCreated and WalletSystemAccountCreated events');
} catch (error) {
this.logger.error('Failed to start CDC consumer', error);
}
}
async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('CDC consumer stopped');
} catch (error) {
this.logger.error('Failed to stop CDC consumer', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, message } = payload;
try {
if (!message.value) {
return;
}
const eventData = JSON.parse(message.value.toString());
// 忽略心跳消息
if (this.isHeartbeatMessage(eventData)) {
return;
}
// 处理 Debezium outbox 事件
if (this.isDebeziumOutboxEvent(eventData)) {
const op = eventData.payload?.op;
// 只处理 create 操作
if (op !== 'c' && op !== 'r') {
return;
}
const after = eventData.payload.after;
await this.handleOutboxEvent(after);
}
} catch (error) {
this.logger.error(`Error processing message from topic ${topic}`, error);
}
}
private isHeartbeatMessage(data: any): boolean {
const keys = Object.keys(data);
return keys.length === 1 && keys[0] === 'ts_ms';
}
private isDebeziumOutboxEvent(data: any): boolean {
const after = data.payload?.after;
if (!after) return false;
return after.event_type && after.aggregate_type;
}
private async handleOutboxEvent(data: any): Promise<void> {
const eventType = data.event_type;
const eventId = data.id || data.outbox_id;
// 处理不同类型的事件
if (eventType === 'UserWalletCreated') {
await this.handleUserWalletCreated(eventId, data);
} else if (eventType === 'WalletSystemAccountCreated') {
await this.handleSystemAccountCreated(eventId, data);
}
}
/**
* -
*/
private async handleUserWalletCreated(eventId: string, data: any): Promise<void> {
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
const accountSequence = payload?.accountSequence;
if (!accountSequence) {
this.logger.warn(`UserWalletCreated event ${eventId} missing accountSequence`);
return;
}
// 只处理 CONTRIBUTION 类型的钱包
if (payload?.walletType !== 'CONTRIBUTION') {
return;
}
this.logger.debug(
`Processing UserWalletCreated event: ${eventId}, accountSequence: ${accountSequence}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 检查交易账户是否已存在
const existingAccount =
await this.tradingAccountRepository.findByAccountSequence(accountSequence);
if (existingAccount) {
this.logger.debug(`Trading account ${accountSequence} already exists`);
await this.markEventProcessed(eventId);
return;
}
// 创建交易账户
const account = TradingAccountAggregate.create(accountSequence);
const accountId = await this.tradingAccountRepository.save(account);
// 发布交易账户创建事件
await this.publishAccountCreatedEvent(accountId, accountSequence);
// 标记为已处理
await this.markEventProcessed(eventId);
this.logger.log(
`Trading account created for user ${accountSequence} (triggered by UserWalletCreated)`,
);
} catch (error) {
if (error instanceof Error && error.message.includes('Unique constraint')) {
this.logger.debug(
`Trading account already exists for ${accountSequence}, marking as processed`,
);
await this.markEventProcessed(eventId, 'UserWalletCreated');
return;
}
this.logger.error(
`Failed to create trading account for ${accountSequence}`,
error instanceof Error ? error.stack : error,
);
throw error;
}
}
/**
* - /
* accountSequence code PROV-440000, CITY-440100
*/
private async handleSystemAccountCreated(eventId: string, data: any): Promise<void> {
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
// 系统账户使用 code 作为 accountSequence
const accountCode = payload?.code;
if (!accountCode) {
this.logger.warn(`WalletSystemAccountCreated event ${eventId} missing code`);
return;
}
// 只处理省/市级系统账户
const accountType = payload?.accountType;
if (accountType !== 'PROVINCE' && accountType !== 'CITY') {
this.logger.debug(
`Skipping non-regional system account: ${accountCode}, type: ${accountType}`,
);
return;
}
this.logger.debug(
`Processing WalletSystemAccountCreated event: ${eventId}, code: ${accountCode}, type: ${accountType}`,
);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
// 检查交易账户是否已存在
const existingAccount =
await this.tradingAccountRepository.findByAccountSequence(accountCode);
if (existingAccount) {
this.logger.debug(`Trading account ${accountCode} already exists`);
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
return;
}
// 创建交易账户
const account = TradingAccountAggregate.create(accountCode);
const accountId = await this.tradingAccountRepository.save(account);
// 发布交易账户创建事件
await this.publishAccountCreatedEvent(accountId, accountCode);
// 标记为已处理
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
this.logger.log(
`Trading account created for system ${accountType}: ${accountCode} (triggered by WalletSystemAccountCreated)`,
);
} catch (error) {
if (error instanceof Error && error.message.includes('Unique constraint')) {
this.logger.debug(
`Trading account already exists for ${accountCode}, marking as processed`,
);
await this.markEventProcessed(eventId, 'WalletSystemAccountCreated');
return;
}
this.logger.error(
`Failed to create trading account for system ${accountCode}`,
error instanceof Error ? error.stack : error,
);
throw error;
}
}
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `trading:processed-event:wallet:${eventId}`;
const cached = await this.redis.get(redisKey);
if (cached) return true;
const dbRecord = await this.processedEventRepository.findByEventId(String(eventId));
if (dbRecord) {
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
private async markEventProcessed(
eventId: string,
eventType: string = 'UserWalletCreated',
): Promise<void> {
const redisKey = `trading:processed-event:wallet:${eventId}`;
try {
await this.processedEventRepository.create({
eventId: String(eventId),
eventType,
sourceService: 'mining-wallet-service',
});
} catch (error) {
if (!(error instanceof Error && error.message.includes('Unique constraint'))) {
throw error;
}
}
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
private async publishAccountCreatedEvent(
accountId: string,
accountSequence: string,
): Promise<void> {
try {
const payload: TradingAccountCreatedPayload = {
accountId,
accountSequence,
createdAt: new Date().toISOString(),
};
await this.outboxRepository.create({
aggregateType: 'TradingAccount',
aggregateId: accountId,
eventType: TradingEventTypes.TRADING_ACCOUNT_CREATED,
payload,
topic: TradingTopics.ACCOUNTS,
key: accountSequence,
});
this.logger.debug(`Published TradingAccountCreated event for ${accountSequence}`);
} catch (error) {
this.logger.error(`Failed to publish TradingAccountCreated event: ${error}`);
}
}
}