import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { CdcConsumerService, CdcEvent, ServiceEvent, } from './cdc-consumer.service'; import { WalletSyncHandlers } from './wallet-sync.handlers'; /** * CDC 同步服务 * 负责从各个 2.0 服务同步数据到 mining-admin-service */ @Injectable() export class CdcSyncService implements OnModuleInit { private readonly logger = new Logger(CdcSyncService.name); constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, private readonly cdcConsumer: CdcConsumerService, private readonly walletHandlers: WalletSyncHandlers, ) {} async onModuleInit() { await this.registerHandlers(); await this.cdcConsumer.start(); } private async registerHandlers(): Promise { // =========================================================================== // 从 auth-service 同步用户数据 // =========================================================================== const usersTopic = this.configService.get( 'CDC_TOPIC_USERS', 'mining-admin.auth.users', ); this.cdcConsumer.addTopic(usersTopic); this.cdcConsumer.registerServiceHandler( 'UserCreated', this.handleUserCreated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'UserUpdated', this.handleUserUpdated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'KycStatusChanged', this.handleKycStatusChanged.bind(this), ); // =========================================================================== // 从 contribution-service 同步算力数据 // =========================================================================== const contributionTopic = this.configService.get( 'CDC_TOPIC_CONTRIBUTION', 'mining-admin.contribution.accounts', ); this.cdcConsumer.addTopic(contributionTopic); this.cdcConsumer.registerServiceHandler( 'ContributionAccountUpdated', this.handleContributionAccountUpdated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'SystemContributionUpdated', this.handleSystemContributionUpdated.bind(this), ); // =========================================================================== // 从 mining-service 同步挖矿数据 // =========================================================================== const miningTopic = this.configService.get( 'CDC_TOPIC_MINING', 'mining-admin.mining.accounts', ); this.cdcConsumer.addTopic(miningTopic); this.cdcConsumer.registerServiceHandler( 'MiningAccountUpdated', this.handleMiningAccountUpdated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'MiningConfigUpdated', this.handleMiningConfigUpdated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'DailyMiningStatCreated', this.handleDailyMiningStatCreated.bind(this), ); // =========================================================================== // 从 trading-service 同步交易数据 // =========================================================================== const tradingTopic = this.configService.get( 'CDC_TOPIC_TRADING', 'mining-admin.trading.accounts', ); this.cdcConsumer.addTopic(tradingTopic); this.cdcConsumer.registerServiceHandler( 'TradingAccountUpdated', this.handleTradingAccountUpdated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'DayKLineCreated', this.handleDayKLineCreated.bind(this), ); this.cdcConsumer.registerServiceHandler( 'CirculationPoolUpdated', this.handleCirculationPoolUpdated.bind(this), ); // =========================================================================== // 从 mining-wallet-service 同步钱包数据 // =========================================================================== const walletTopic = this.configService.get( 'CDC_TOPIC_WALLET', 'mining-admin.wallet.accounts', ); this.cdcConsumer.addTopic(walletTopic); // 区域数据 this.cdcConsumer.registerServiceHandler( 'ProvinceCreated', this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'ProvinceUpdated', this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'CityCreated', this.walletHandlers.handleCityCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'CityUpdated', this.walletHandlers.handleCityUpdated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'UserRegionMappingCreated', this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'UserRegionMappingUpdated', this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers), ); // 系统账户 this.cdcConsumer.registerServiceHandler( 'WalletSystemAccountCreated', this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'WalletSystemAccountUpdated', this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers), ); // 池账户 this.cdcConsumer.registerServiceHandler( 'WalletPoolAccountCreated', this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'WalletPoolAccountUpdated', this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers), ); // 用户钱包 this.cdcConsumer.registerServiceHandler( 'UserWalletCreated', this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'UserWalletUpdated', this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers), ); // 提现请求 this.cdcConsumer.registerServiceHandler( 'WithdrawRequestCreated', this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'WithdrawRequestUpdated', this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers), ); // 充值记录 this.cdcConsumer.registerServiceHandler( 'DepositRecordCreated', this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'DepositRecordUpdated', this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers), ); // DEX Swap this.cdcConsumer.registerServiceHandler( 'DexSwapRecordCreated', this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'DexSwapRecordUpdated', this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers), ); // 地址绑定 this.cdcConsumer.registerServiceHandler( 'BlockchainAddressBindingCreated', this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'BlockchainAddressBindingUpdated', this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers), ); // 黑洞合约 this.cdcConsumer.registerServiceHandler( 'BlackHoleContractCreated', this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'BlackHoleContractUpdated', this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers), ); // 销毁记录 this.cdcConsumer.registerServiceHandler( 'BurnToBlackHoleRecordCreated', this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers), ); // 费率配置 this.cdcConsumer.registerServiceHandler( 'FeeConfigCreated', this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers), ); this.cdcConsumer.registerServiceHandler( 'FeeConfigUpdated', this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers), ); this.logger.log('CDC sync handlers registered'); } // =========================================================================== // 用户事件处理 // =========================================================================== private async handleUserCreated(event: ServiceEvent): Promise { const { payload } = event; try { await this.prisma.syncedUser.upsert({ where: { originalUserId: payload.id }, create: { originalUserId: payload.id, accountSequence: payload.accountSequence, phone: payload.phone, status: payload.status || 'ACTIVE', kycStatus: payload.kycStatus || 'PENDING', realName: payload.realName, isLegacyUser: payload.isLegacyUser || false, createdAt: new Date(payload.createdAt), }, update: { phone: payload.phone, status: payload.status || 'ACTIVE', kycStatus: payload.kycStatus || 'PENDING', realName: payload.realName, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Synced user: ${payload.accountSequence}`); } catch (error) { this.logger.error(`Failed to sync user: ${payload.id}`, error); } } private async handleUserUpdated(event: ServiceEvent): Promise { const { payload } = event; try { await this.prisma.syncedUser.updateMany({ where: { originalUserId: payload.id }, data: { phone: payload.phone, status: payload.status, kycStatus: payload.kycStatus, realName: payload.realName, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Updated user: ${payload.id}`); } catch (error) { this.logger.error(`Failed to update user: ${payload.id}`, error); } } private async handleKycStatusChanged(event: ServiceEvent): Promise { const { payload } = event; try { await this.prisma.syncedUser.updateMany({ where: { accountSequence: payload.accountSequence }, data: { kycStatus: payload.kycStatus, realName: payload.realName, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Updated KYC status: ${payload.accountSequence}`); } catch (error) { this.logger.error( `Failed to update KYC status: ${payload.accountSequence}`, error, ); } } // =========================================================================== // 算力账户事件处理 // =========================================================================== private async handleContributionAccountUpdated( event: ServiceEvent, ): Promise { const { payload } = event; try { await this.prisma.syncedContributionAccount.upsert({ where: { accountSequence: payload.accountSequence }, create: { accountSequence: payload.accountSequence, personalContribution: payload.personalContribution || 0, teamLevelContribution: payload.teamLevelContribution || 0, teamBonusContribution: payload.teamBonusContribution || 0, totalContribution: payload.totalContribution || 0, effectiveContribution: payload.effectiveContribution || 0, hasAdopted: payload.hasAdopted || false, directReferralCount: payload.directReferralAdoptedCount || 0, unlockedLevelDepth: payload.unlockedLevelDepth || 0, }, update: { personalContribution: payload.personalContribution, teamLevelContribution: payload.teamLevelContribution, teamBonusContribution: payload.teamBonusContribution, totalContribution: payload.totalContribution, effectiveContribution: payload.effectiveContribution, hasAdopted: payload.hasAdopted, directReferralCount: payload.directReferralAdoptedCount, unlockedLevelDepth: payload.unlockedLevelDepth, }, }); await this.recordProcessedEvent(event); this.logger.debug( `Synced contribution account: ${payload.accountSequence}`, ); } catch (error) { this.logger.error( `Failed to sync contribution account: ${payload.accountSequence}`, error, ); } } private async handleSystemContributionUpdated( event: ServiceEvent, ): Promise { const { payload } = event; try { await this.prisma.syncedSystemContribution.upsert({ where: { accountType: payload.accountType }, create: { accountType: payload.accountType, name: payload.name, contributionBalance: payload.contributionBalance || 0, contributionNeverExpires: payload.contributionNeverExpires || false, }, update: { name: payload.name, contributionBalance: payload.contributionBalance, contributionNeverExpires: payload.contributionNeverExpires, }, }); await this.recordProcessedEvent(event); this.logger.debug( `Synced system contribution: ${payload.accountType}`, ); } catch (error) { this.logger.error( `Failed to sync system contribution: ${payload.accountType}`, error, ); } } // =========================================================================== // 挖矿账户事件处理 // =========================================================================== private async handleMiningAccountUpdated(event: ServiceEvent): Promise { const { payload } = event; try { await this.prisma.syncedMiningAccount.upsert({ where: { accountSequence: payload.accountSequence }, create: { accountSequence: payload.accountSequence, totalMined: payload.totalMined || 0, availableBalance: payload.availableBalance || 0, frozenBalance: payload.frozenBalance || 0, totalContribution: payload.totalContribution || 0, }, update: { totalMined: payload.totalMined, availableBalance: payload.availableBalance, frozenBalance: payload.frozenBalance, totalContribution: payload.totalContribution, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Synced mining account: ${payload.accountSequence}`); } catch (error) { this.logger.error( `Failed to sync mining account: ${payload.accountSequence}`, error, ); } } private async handleMiningConfigUpdated(event: ServiceEvent): Promise { const { payload } = event; try { // 只保留一条挖矿配置记录 await this.prisma.syncedMiningConfig.deleteMany({}); await this.prisma.syncedMiningConfig.create({ data: { totalShares: payload.totalShares, distributionPool: payload.distributionPool, remainingDistribution: payload.remainingDistribution, halvingPeriodYears: payload.halvingPeriodYears, currentEra: payload.currentEra || 1, minuteDistribution: payload.minuteDistribution, isActive: payload.isActive || false, activatedAt: payload.activatedAt ? new Date(payload.activatedAt) : null, }, }); await this.recordProcessedEvent(event); this.logger.debug('Synced mining config'); } catch (error) { this.logger.error('Failed to sync mining config', error); } } private async handleDailyMiningStatCreated( event: ServiceEvent, ): Promise { const { payload } = event; try { await this.prisma.syncedDailyMiningStat.upsert({ where: { statDate: new Date(payload.date) }, create: { statDate: new Date(payload.date), totalContribution: payload.totalContribution || 0, totalDistributed: payload.totalDistributed || 0, totalBurned: payload.totalBurned || 0, participantCount: payload.participantCount || 0, avgContributionRate: payload.avgContributionRate || 0, }, update: { totalContribution: payload.totalContribution, totalDistributed: payload.totalDistributed, totalBurned: payload.totalBurned, participantCount: payload.participantCount, avgContributionRate: payload.avgContributionRate, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Synced daily mining stat: ${payload.date}`); } catch (error) { this.logger.error( `Failed to sync daily mining stat: ${payload.date}`, error, ); } } // =========================================================================== // 交易账户事件处理 // =========================================================================== private async handleTradingAccountUpdated( event: ServiceEvent, ): Promise { const { payload } = event; try { await this.prisma.syncedTradingAccount.upsert({ where: { accountSequence: payload.accountSequence }, create: { accountSequence: payload.accountSequence, shareBalance: payload.shareBalance || 0, cashBalance: payload.cashBalance || 0, frozenShares: payload.frozenShares || 0, frozenCash: payload.frozenCash || 0, totalBought: payload.totalBought || 0, totalSold: payload.totalSold || 0, }, update: { shareBalance: payload.shareBalance, cashBalance: payload.cashBalance, frozenShares: payload.frozenShares, frozenCash: payload.frozenCash, totalBought: payload.totalBought, totalSold: payload.totalSold, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Synced trading account: ${payload.accountSequence}`); } catch (error) { this.logger.error( `Failed to sync trading account: ${payload.accountSequence}`, error, ); } } private async handleDayKLineCreated(event: ServiceEvent): Promise { const { payload } = event; try { await this.prisma.syncedDayKLine.upsert({ where: { klineDate: new Date(payload.date) }, create: { klineDate: new Date(payload.date), open: payload.open, high: payload.high, low: payload.low, close: payload.close, volume: payload.volume || 0, amount: payload.amount || 0, tradeCount: payload.tradeCount || 0, }, update: { open: payload.open, high: payload.high, low: payload.low, close: payload.close, volume: payload.volume, amount: payload.amount, tradeCount: payload.tradeCount, }, }); await this.recordProcessedEvent(event); this.logger.debug(`Synced day K-line: ${payload.date}`); } catch (error) { this.logger.error(`Failed to sync day K-line: ${payload.date}`, error); } } private async handleCirculationPoolUpdated( event: ServiceEvent, ): Promise { const { payload } = event; try { // 只保留一条流通池记录 await this.prisma.syncedCirculationPool.deleteMany({}); await this.prisma.syncedCirculationPool.create({ data: { totalShares: payload.totalShares || 0, totalCash: payload.totalCash || 0, }, }); await this.recordProcessedEvent(event); this.logger.debug('Synced circulation pool'); } catch (error) { this.logger.error('Failed to sync circulation pool', error); } } // =========================================================================== // 辅助方法 // =========================================================================== private async recordProcessedEvent(event: ServiceEvent): Promise { try { await this.prisma.processedEvent.upsert({ where: { eventId: event.id }, create: { eventId: event.id, eventType: event.eventType, sourceService: event.aggregateType, }, update: {}, }); } catch (error) { // 忽略幂等性记录失败 this.logger.warn(`Failed to record processed event: ${event.id}`); } } }