rwadurian/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts

879 lines
32 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Prisma } from '@prisma/client';
import { PrismaService } from '../persistence/prisma/prisma.service';
import {
CdcConsumerService,
CdcEvent,
ServiceEvent,
ServiceEventHandler,
TransactionalServiceEventHandler,
} from './cdc-consumer.service';
import { WalletSyncHandlers } from './wallet-sync.handlers';
/** Prisma 事务客户端类型 */
type TransactionClient = Prisma.TransactionClient;
/**
* CDC 同步服务
* 负责从各个 2.0 服务同步数据到 mining-admin-service
*
* 实现100%可靠的事务性幂等消费模式:
* 1. 使用数据库事务保证原子性
* 2. 在同一事务中:检查幂等键 → 记录幂等键 → 执行业务逻辑
* 3. 任何步骤失败都会回滚,保证数据一致性
*/
@Injectable()
export class CdcSyncService implements OnModuleInit {
private readonly logger = new Logger(CdcSyncService.name);
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
private readonly cdcConsumer: CdcConsumerService,
private readonly walletHandlers: WalletSyncHandlers,
) {}
async onModuleInit() {
await this.registerHandlers();
await this.cdcConsumer.start();
}
/**
* 事务性幂等包装器 - 100%保证 exactly-once 语义
*
* 在同一个数据库事务中完成:
* 1. 检查事件是否已处理(使用 SELECT FOR UPDATE 防止并发)
* 2. 先插入幂等记录(如果已存在则事务失败回滚)
* 3. 执行业务逻辑
*
* 任何步骤失败都会回滚整个事务,保证数据一致性
*/
private withIdempotency(handler: TransactionalServiceEventHandler): ServiceEventHandler {
return async (event: ServiceEvent) => {
const idempotencyKey = `${event.sourceTopic}:${event.id}`;
try {
await this.prisma.$transaction(async (tx) => {
// 1. 尝试插入幂等记录(使用唯一约束防止重复)
// 如果记录已存在,会抛出唯一约束冲突异常
try {
await tx.processedEvent.create({
data: {
eventId: event.id,
eventType: event.eventType,
sourceService: event.sourceTopic,
},
});
} catch (error: any) {
// 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑)
if (error.code === 'P2002') {
this.logger.debug(`Skipping duplicate event: ${idempotencyKey} (${event.eventType})`);
return;
}
throw error;
}
// 2. 执行业务逻辑(传入事务客户端)
await handler(event, tx);
this.logger.debug(`Processed event in transaction: ${idempotencyKey} (${event.eventType})`);
}, {
// 设置事务隔离级别为 Serializable防止并发问题
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
timeout: 30000, // 30秒超时
});
} catch (error: any) {
// 唯一约束冲突在事务外也可能发生(并发场景)
if (error.code === 'P2002') {
this.logger.debug(`Skipping duplicate event (concurrent): ${idempotencyKey}`);
return;
}
this.logger.error(`Failed to process event: ${idempotencyKey}`, error);
throw error;
}
};
}
private async registerHandlers(): Promise<void> {
// ===========================================================================
// 从 auth-service 同步用户数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const usersTopic = this.configService.get<string>(
'CDC_TOPIC_AUTH_OUTBOX',
'cdc.auth.outbox',
);
this.cdcConsumer.addTopic(usersTopic);
this.cdcConsumer.registerServiceHandler(
'UserCreated',
this.withIdempotency(this.handleUserCreated.bind(this)),
);
// auth-service 发布的 user.registered 事件
this.cdcConsumer.registerServiceHandler(
'user.registered',
this.withIdempotency(this.handleUserRegistered.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'UserUpdated',
this.withIdempotency(this.handleUserUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'KycStatusChanged',
this.withIdempotency(this.handleKycStatusChanged.bind(this)),
);
// auth-service 发布的 user.kyc_verified 事件
this.cdcConsumer.registerServiceHandler(
'user.kyc_verified',
this.withIdempotency(this.handleKycStatusChanged.bind(this)),
);
// auth-service 发布的 user.legacy.migrated 事件 (1.0用户首次登录2.0时)
this.cdcConsumer.registerServiceHandler(
'user.legacy.migrated',
this.withIdempotency(this.handleLegacyUserMigrated.bind(this)),
);
// ===========================================================================
// 从 contribution-service 同步算力数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const contributionTopic = this.configService.get<string>(
'CDC_TOPIC_CONTRIBUTION_OUTBOX',
'cdc.contribution.outbox',
);
this.cdcConsumer.addTopic(contributionTopic);
this.cdcConsumer.registerServiceHandler(
'ContributionAccountUpdated',
this.withIdempotency(this.handleContributionAccountUpdated.bind(this)),
);
// ContributionAccountSynced 用于初始全量同步
this.cdcConsumer.registerServiceHandler(
'ContributionAccountSynced',
this.withIdempotency(this.handleContributionAccountUpdated.bind(this)),
);
// ContributionCalculated 事件在算力计算完成时发布
this.cdcConsumer.registerServiceHandler(
'ContributionCalculated',
this.withIdempotency(this.handleContributionCalculated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'SystemContributionUpdated',
this.withIdempotency(this.handleSystemContributionUpdated.bind(this)),
);
// ReferralSynced 事件 - 同步推荐关系
this.cdcConsumer.registerServiceHandler(
'ReferralSynced',
this.withIdempotency(this.handleReferralSynced.bind(this)),
);
// AdoptionSynced 事件 - 同步认种记录
this.cdcConsumer.registerServiceHandler(
'AdoptionSynced',
this.withIdempotency(this.handleAdoptionSynced.bind(this)),
);
// ContributionRecordSynced 事件 - 同步算力明细记录
this.cdcConsumer.registerServiceHandler(
'ContributionRecordSynced',
this.withIdempotency(this.handleContributionRecordSynced.bind(this)),
);
// NetworkProgressUpdated 事件 - 同步全网算力进度
this.cdcConsumer.registerServiceHandler(
'NetworkProgressUpdated',
this.withIdempotency(this.handleNetworkProgressUpdated.bind(this)),
);
// ===========================================================================
// 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const miningTopic = this.configService.get<string>(
'CDC_TOPIC_MINING_OUTBOX',
'cdc.mining.outbox',
);
this.cdcConsumer.addTopic(miningTopic);
this.cdcConsumer.registerServiceHandler(
'MiningAccountUpdated',
this.withIdempotency(this.handleMiningAccountUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'MiningConfigUpdated',
this.withIdempotency(this.handleMiningConfigUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'DailyMiningStatCreated',
this.withIdempotency(this.handleDailyMiningStatCreated.bind(this)),
);
// ===========================================================================
// 从 trading-service 同步交易数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const tradingTopic = this.configService.get<string>(
'CDC_TOPIC_TRADING_OUTBOX',
'cdc.trading.outbox',
);
this.cdcConsumer.addTopic(tradingTopic);
this.cdcConsumer.registerServiceHandler(
'TradingAccountUpdated',
this.withIdempotency(this.handleTradingAccountUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'DayKLineCreated',
this.withIdempotency(this.handleDayKLineCreated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'CirculationPoolUpdated',
this.withIdempotency(this.handleCirculationPoolUpdated.bind(this)),
);
// ===========================================================================
// 从 mining-wallet-service 同步钱包数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const walletTopic = this.configService.get<string>(
'CDC_TOPIC_WALLET_OUTBOX',
'cdc.mining-wallet.outbox',
);
this.cdcConsumer.addTopic(walletTopic);
// 区域数据
this.cdcConsumer.registerServiceHandler(
'ProvinceCreated',
this.withIdempotency(this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'ProvinceUpdated',
this.withIdempotency(this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'CityCreated',
this.withIdempotency(this.walletHandlers.handleCityCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'CityUpdated',
this.withIdempotency(this.walletHandlers.handleCityUpdated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserRegionMappingCreated',
this.withIdempotency(this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserRegionMappingUpdated',
this.withIdempotency(this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers)),
);
// 系统账户
this.cdcConsumer.registerServiceHandler(
'WalletSystemAccountCreated',
this.withIdempotency(this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WalletSystemAccountUpdated',
this.withIdempotency(this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers)),
);
// 池账户
this.cdcConsumer.registerServiceHandler(
'WalletPoolAccountCreated',
this.withIdempotency(this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WalletPoolAccountUpdated',
this.withIdempotency(this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers)),
);
// 用户钱包
this.cdcConsumer.registerServiceHandler(
'UserWalletCreated',
this.withIdempotency(this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserWalletUpdated',
this.withIdempotency(this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers)),
);
// 提现请求
this.cdcConsumer.registerServiceHandler(
'WithdrawRequestCreated',
this.withIdempotency(this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WithdrawRequestUpdated',
this.withIdempotency(this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers)),
);
// 充值记录
this.cdcConsumer.registerServiceHandler(
'DepositRecordCreated',
this.withIdempotency(this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'DepositRecordUpdated',
this.withIdempotency(this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers)),
);
// DEX Swap
this.cdcConsumer.registerServiceHandler(
'DexSwapRecordCreated',
this.withIdempotency(this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'DexSwapRecordUpdated',
this.withIdempotency(this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers)),
);
// 地址绑定
this.cdcConsumer.registerServiceHandler(
'BlockchainAddressBindingCreated',
this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'BlockchainAddressBindingUpdated',
this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers)),
);
// 黑洞合约
this.cdcConsumer.registerServiceHandler(
'BlackHoleContractCreated',
this.withIdempotency(this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'BlackHoleContractUpdated',
this.withIdempotency(this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers)),
);
// 销毁记录
this.cdcConsumer.registerServiceHandler(
'BurnToBlackHoleRecordCreated',
this.withIdempotency(this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers)),
);
// 费率配置
this.cdcConsumer.registerServiceHandler(
'FeeConfigCreated',
this.withIdempotency(this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'FeeConfigUpdated',
this.withIdempotency(this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers)),
);
// CONTRIBUTION_CREDITED 事件 - 贡献值入账时更新用户钱包
this.cdcConsumer.registerServiceHandler(
'CONTRIBUTION_CREDITED',
this.withIdempotency(this.handleContributionCredited.bind(this)),
);
this.logger.log('CDC sync handlers registered with idempotency protection');
}
// ===========================================================================
// 用户事件处理
// ===========================================================================
private async handleUserCreated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedUser.upsert({
where: { originalUserId: payload.id },
create: {
originalUserId: payload.id,
accountSequence: payload.accountSequence,
phone: payload.phone,
status: payload.status || 'ACTIVE',
kycStatus: payload.kycStatus || 'PENDING',
realName: payload.realName,
isLegacyUser: payload.isLegacyUser || false,
createdAt: new Date(payload.createdAt),
},
update: {
phone: payload.phone,
status: payload.status || 'ACTIVE',
kycStatus: payload.kycStatus || 'PENDING',
realName: payload.realName,
},
});
}
/**
* 处理 auth-service 发布的 user.registered 事件
* payload: { accountSequence, phone, source, registeredAt }
*/
private async handleUserRegistered(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedUser.upsert({
where: { accountSequence: payload.accountSequence },
create: {
originalUserId: payload.accountSequence,
accountSequence: payload.accountSequence,
phone: payload.phone,
status: 'ACTIVE',
kycStatus: 'PENDING',
realName: null,
isLegacyUser: payload.source === 'V1',
createdAt: new Date(payload.registeredAt),
},
update: {
phone: payload.phone,
isLegacyUser: payload.source === 'V1',
},
});
}
/**
* 处理 auth-service 发布的 user.legacy.migrated 事件
* payload: { accountSequence, phone, nickname, migratedAt }
*/
private async handleLegacyUserMigrated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedUser.upsert({
where: { accountSequence: payload.accountSequence },
create: {
originalUserId: payload.accountSequence,
accountSequence: payload.accountSequence,
phone: payload.phone,
nickname: payload.nickname || null,
status: 'ACTIVE',
kycStatus: 'PENDING',
realName: null,
isLegacyUser: true,
createdAt: new Date(payload.migratedAt),
},
update: {
phone: payload.phone,
nickname: payload.nickname || null,
isLegacyUser: true,
},
});
}
private async handleUserUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedUser.updateMany({
where: { originalUserId: payload.id },
data: {
phone: payload.phone,
status: payload.status,
kycStatus: payload.kycStatus,
realName: payload.realName,
},
});
}
private async handleKycStatusChanged(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedUser.updateMany({
where: { accountSequence: payload.accountSequence },
data: {
kycStatus: payload.kycStatus,
realName: payload.realName,
},
});
}
// ===========================================================================
// 算力账户事件处理
// ===========================================================================
private async handleContributionAccountUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedContributionAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
personalContribution: payload.personalContribution || 0,
teamLevelContribution: payload.teamLevelContribution || 0,
teamBonusContribution: payload.teamBonusContribution || 0,
totalContribution: payload.totalContribution || 0,
effectiveContribution: payload.effectiveContribution || 0,
hasAdopted: payload.hasAdopted || false,
directReferralCount: payload.directReferralAdoptedCount || 0,
unlockedLevelDepth: payload.unlockedLevelDepth || 0,
unlockedBonusTiers: payload.unlockedBonusTiers || 0,
},
update: {
personalContribution: payload.personalContribution,
teamLevelContribution: payload.teamLevelContribution,
teamBonusContribution: payload.teamBonusContribution,
totalContribution: payload.totalContribution,
effectiveContribution: payload.effectiveContribution,
hasAdopted: payload.hasAdopted,
directReferralCount: payload.directReferralAdoptedCount,
unlockedLevelDepth: payload.unlockedLevelDepth,
unlockedBonusTiers: payload.unlockedBonusTiers,
},
});
}
/**
* 处理 ContributionCalculated 事件
* contribution-service 在计算算力时发布,触发增量更新
*/
private async handleContributionCalculated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedContributionAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
personalContribution: payload.personalContribution || 0,
teamLevelContribution: 0,
teamBonusContribution: 0,
totalContribution: 0,
effectiveContribution: 0,
hasAdopted: true,
directReferralCount: 0,
unlockedLevelDepth: 0,
unlockedBonusTiers: 0,
},
update: {
personalContribution: {
increment: parseFloat(payload.personalContribution) || 0,
},
hasAdopted: true,
},
});
}
private async handleSystemContributionUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedSystemContribution.upsert({
where: { accountType: payload.accountType },
create: {
accountType: payload.accountType,
name: payload.name,
contributionBalance: payload.contributionBalance || 0,
contributionNeverExpires: payload.contributionNeverExpires || false,
},
update: {
name: payload.name,
contributionBalance: payload.contributionBalance,
contributionNeverExpires: payload.contributionNeverExpires,
},
});
}
/**
* 处理 ReferralSynced 事件 - 同步推荐关系
*/
private async handleReferralSynced(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedReferral.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
referrerAccountSequence: payload.referrerAccountSequence,
referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null,
originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null,
ancestorPath: payload.ancestorPath,
depth: payload.depth || 0,
},
update: {
referrerAccountSequence: payload.referrerAccountSequence,
referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null,
originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null,
ancestorPath: payload.ancestorPath,
depth: payload.depth || 0,
},
});
}
/**
* 处理 AdoptionSynced 事件 - 同步认种记录
*/
private async handleAdoptionSynced(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedAdoption.upsert({
where: { originalAdoptionId: BigInt(payload.originalAdoptionId) },
create: {
originalAdoptionId: BigInt(payload.originalAdoptionId),
accountSequence: payload.accountSequence,
treeCount: payload.treeCount,
adoptionDate: new Date(payload.adoptionDate),
status: payload.status,
contributionPerTree: payload.contributionPerTree,
},
update: {
accountSequence: payload.accountSequence,
treeCount: payload.treeCount,
adoptionDate: new Date(payload.adoptionDate),
status: payload.status,
contributionPerTree: payload.contributionPerTree,
},
});
}
/**
* 处理 ContributionRecordSynced 事件 - 同步算力明细记录
*/
private async handleContributionRecordSynced(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedContributionRecord.upsert({
where: { originalRecordId: BigInt(payload.originalRecordId) },
create: {
originalRecordId: BigInt(payload.originalRecordId),
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
createdAt: new Date(payload.createdAt),
},
update: {
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
},
});
}
/**
* 处理 NetworkProgressUpdated 事件 - 同步全网算力进度
*/
private async handleNetworkProgressUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
// 全网进度只保留一条记录
const existing = await tx.syncedNetworkProgress.findFirst();
if (existing) {
await tx.syncedNetworkProgress.update({
where: { id: existing.id },
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
} else {
await tx.syncedNetworkProgress.create({
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
}
}
// ===========================================================================
// 挖矿账户事件处理
// ===========================================================================
private async handleMiningAccountUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedMiningAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
totalMined: payload.totalMined || 0,
availableBalance: payload.availableBalance || 0,
frozenBalance: payload.frozenBalance || 0,
totalContribution: payload.totalContribution || 0,
},
update: {
totalMined: payload.totalMined,
availableBalance: payload.availableBalance,
frozenBalance: payload.frozenBalance,
totalContribution: payload.totalContribution,
},
});
}
private async handleMiningConfigUpdated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
// 只保留一条挖矿配置记录
await tx.syncedMiningConfig.deleteMany({});
await tx.syncedMiningConfig.create({
data: {
totalShares: payload.totalShares,
distributionPool: payload.distributionPool,
remainingDistribution: payload.remainingDistribution,
halvingPeriodYears: payload.halvingPeriodYears,
currentEra: payload.currentEra || 1,
minuteDistribution: payload.minuteDistribution,
isActive: payload.isActive || false,
activatedAt: payload.activatedAt ? new Date(payload.activatedAt) : null,
},
});
}
private async handleDailyMiningStatCreated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedDailyMiningStat.upsert({
where: { statDate: new Date(payload.date) },
create: {
statDate: new Date(payload.date),
totalContribution: payload.totalContribution || 0,
totalDistributed: payload.totalDistributed || 0,
totalBurned: payload.totalBurned || 0,
participantCount: payload.participantCount || 0,
avgContributionRate: payload.avgContributionRate || 0,
},
update: {
totalContribution: payload.totalContribution,
totalDistributed: payload.totalDistributed,
totalBurned: payload.totalBurned,
participantCount: payload.participantCount,
avgContributionRate: payload.avgContributionRate,
},
});
}
// ===========================================================================
// 交易账户事件处理
// ===========================================================================
private async handleTradingAccountUpdated(
event: ServiceEvent,
tx: TransactionClient,
): Promise<void> {
const { payload } = event;
await tx.syncedTradingAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
shareBalance: payload.shareBalance || 0,
cashBalance: payload.cashBalance || 0,
frozenShares: payload.frozenShares || 0,
frozenCash: payload.frozenCash || 0,
totalBought: payload.totalBought || 0,
totalSold: payload.totalSold || 0,
},
update: {
shareBalance: payload.shareBalance,
cashBalance: payload.cashBalance,
frozenShares: payload.frozenShares,
frozenCash: payload.frozenCash,
totalBought: payload.totalBought,
totalSold: payload.totalSold,
},
});
this.logger.debug(`Synced trading account: ${payload.accountSequence}`);
}
private async handleDayKLineCreated(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
await tx.syncedDayKLine.upsert({
where: { klineDate: new Date(payload.date) },
create: {
klineDate: new Date(payload.date),
open: payload.open,
high: payload.high,
low: payload.low,
close: payload.close,
volume: payload.volume || 0,
amount: payload.amount || 0,
tradeCount: payload.tradeCount || 0,
},
update: {
open: payload.open,
high: payload.high,
low: payload.low,
close: payload.close,
volume: payload.volume,
amount: payload.amount,
tradeCount: payload.tradeCount,
},
});
this.logger.debug(`Synced day K-line: ${payload.date}`);
}
private async handleCirculationPoolUpdated(
event: ServiceEvent,
tx: TransactionClient,
): Promise<void> {
const { payload } = event;
// 只保留一条流通池记录
await tx.syncedCirculationPool.deleteMany({});
await tx.syncedCirculationPool.create({
data: {
totalShares: payload.totalShares || 0,
totalCash: payload.totalCash || 0,
},
});
this.logger.debug('Synced circulation pool');
}
// ===========================================================================
// 钱包事件处理 (mining-wallet-service)
// ===========================================================================
/**
* 处理 CONTRIBUTION_CREDITED 事件
* mining-wallet-service 在为用户入账贡献值时发布
* payload: { accountSequence, walletType, amount, balanceAfter, transactionId, ... }
*/
private async handleContributionCredited(event: ServiceEvent, tx: TransactionClient): Promise<void> {
const { payload } = event;
const walletType = payload.walletType || 'CONTRIBUTION';
// 先查找是否已存在
const existing = await tx.syncedUserWallet.findUnique({
where: {
accountSequence_walletType: {
accountSequence: payload.accountSequence,
walletType,
},
},
});
if (existing) {
// 更新余额(使用最新的 balanceAfter
await tx.syncedUserWallet.update({
where: { id: existing.id },
data: {
balance: payload.balanceAfter,
totalInflow: {
increment: parseFloat(payload.amount) || 0,
},
},
});
} else {
// 创建新钱包记录
// originalId 使用 accountSequence + walletType 的组合生成一个稳定的 ID
const originalId = `wallet-${payload.accountSequence}-${walletType}`;
await tx.syncedUserWallet.create({
data: {
originalId,
accountSequence: payload.accountSequence,
walletType,
balance: payload.balanceAfter || 0,
frozenBalance: 0,
totalInflow: parseFloat(payload.amount) || 0,
totalOutflow: 0,
isActive: true,
},
});
}
this.logger.debug(`Synced user wallet from CONTRIBUTION_CREDITED: ${payload.accountSequence}, balance: ${payload.balanceAfter}`);
}
}