diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index b754a76c..96103c2d 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -102,56 +102,107 @@ export class MiningDistributionService { let systemParticipantCount = 0; let pendingParticipantCount = 0; - // 1. 分配给用户账户 - let page = 1; - const pageSize = 1000; + // 收集需要累积到Redis的数据(事务外执行) + const redisAccumulateData: Array<{ + accountSequence: string; + reward: ShareAmount; + accountContribution: ShareAmount; + }> = []; - while (true) { - const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize); - if (accounts.length === 0) break; + // 使用统一事务执行所有数据库操作 + const result = await this.unitOfWork.executeInTransaction( + async (tx) => { + // 1. 分配给用户账户 + let page = 1; + const pageSize = 1000; - for (const account of accounts) { - const reward = this.calculator.calculateUserMiningReward( - account.totalContribution, + while (true) { + const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize); + if (accounts.length === 0) break; + + for (const account of accounts) { + const reward = this.calculator.calculateUserMiningReward( + account.totalContribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + // 每秒更新账户余额 + account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); + await this.miningAccountRepository.save(account, tx); + + // 收集Redis累积数据(事务外执行) + redisAccumulateData.push({ + accountSequence: account.accountSequence, + reward, + accountContribution: account.totalContribution, + }); + + totalDistributed = totalDistributed.add(reward); + userParticipantCount++; + } + } + + if (page * pageSize >= total) break; + page++; + } + + // 2. 分配给系统账户(运营/省/市)和待解锁算力 - 复用同一事务 + const systemAndPendingResult = await this.distributeToSystemAndPendingInTx( + tx, + currentSecond, + currentMinute, networkTotalContribution, secondDistribution, ); - if (!reward.isZero()) { - // 每秒更新账户余额 - account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); - await this.miningAccountRepository.save(account); + return systemAndPendingResult; + }, + { + maxWait: 10000, // 10秒等待获取事务 + timeout: 60000, // 60秒事务超时 + }, + ); - // 累积每分钟的挖矿数据到Redis - await this.accumulateMinuteData( - account.accountSequence, - currentMinute, - reward, - account.totalContribution, - networkTotalContribution, - secondDistribution, - ); + totalDistributed = totalDistributed.add(result.systemDistributed); + totalDistributed = totalDistributed.add(result.pendingDistributed); + systemParticipantCount = result.systemParticipantCount; + pendingParticipantCount = result.pendingParticipantCount; - totalDistributed = totalDistributed.add(reward); - userParticipantCount++; - } - } - - if (page * pageSize >= total) break; - page++; + // 事务成功后,批量累积Redis数据(非事务性操作) + for (const data of redisAccumulateData) { + await this.accumulateMinuteData( + data.accountSequence, + currentMinute, + data.reward, + data.accountContribution, + networkTotalContribution, + secondDistribution, + ); } - // 2. 分配给系统账户(运营/省/市)和待解锁算力 - 在单个事务中执行 - const systemAndPendingResult = await this.distributeToSystemAndPending( - currentSecond, - currentMinute, - networkTotalContribution, - secondDistribution, - ); - totalDistributed = totalDistributed.add(systemAndPendingResult.systemDistributed); - totalDistributed = totalDistributed.add(systemAndPendingResult.pendingDistributed); - systemParticipantCount = systemAndPendingResult.systemParticipantCount; - pendingParticipantCount = systemAndPendingResult.pendingParticipantCount; + // 累积系统账户和待解锁算力的Redis数据 + for (const data of result.systemRedisData) { + await this.accumulateSystemMinuteData( + data.accountType, + currentMinute, + data.reward, + data.contribution, + networkTotalContribution, + secondDistribution, + ); + } + for (const data of result.pendingRedisData) { + await this.accumulatePendingMinuteData( + data.pendingId, + currentMinute, + data.reward, + data.pending, + networkTotalContribution, + secondDistribution, + ); + } // 每分钟结束时,写入汇总的MiningRecord if (isMinuteEnd) { @@ -398,6 +449,164 @@ export class MiningDistributionService { return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount }; } + /** + * 在给定事务中分配系统账户和待解锁算力的挖矿收益 + * 返回分配结果和需要累积到Redis的数据 + */ + private async distributeToSystemAndPendingInTx( + tx: TransactionClient, + currentSecond: Date, + currentMinute: Date, + networkTotalContribution: ShareAmount, + secondDistribution: ShareAmount, + ): Promise<{ + systemDistributed: ShareAmount; + pendingDistributed: ShareAmount; + systemParticipantCount: number; + pendingParticipantCount: number; + systemRedisData: Array<{ + accountType: SystemAccountType; + reward: ShareAmount; + contribution: ShareAmount; + }>; + pendingRedisData: Array<{ + pendingId: bigint; + reward: ShareAmount; + pending: any; + }>; + }> { + let systemDistributed = ShareAmount.zero(); + let pendingDistributed = ShareAmount.zero(); + let systemParticipantCount = 0; + let pendingParticipantCount = 0; + const systemRedisData: Array<{ + accountType: SystemAccountType; + reward: ShareAmount; + contribution: ShareAmount; + }> = []; + const pendingRedisData: Array<{ + pendingId: bigint; + reward: ShareAmount; + pending: any; + }> = []; + + // 预先获取所有需要的数据 + const systemAccounts = await this.systemMiningAccountRepository.findAll(); + const pendingContributions = await tx.pendingContributionMining.findMany({ + where: { isExpired: false }, + }); + + // 计算所有系统账户的挖矿奖励 + const systemRewards: Array<{ + accountType: SystemAccountType; + reward: ShareAmount; + contribution: ShareAmount; + memo: string; + }> = []; + + for (const systemAccount of systemAccounts) { + // 总部账户不直接参与挖矿,它只接收未解锁算力的收益 + if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) { + continue; + } + + if (systemAccount.totalContribution.isZero()) { + continue; + } + + const reward = this.calculator.calculateUserMiningReward( + systemAccount.totalContribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + systemRewards.push({ + accountType: systemAccount.accountType, + reward, + contribution: systemAccount.totalContribution, + memo: `秒挖矿 ${currentSecond.getTime()}`, + }); + } + } + + // 计算所有待解锁算力的挖矿奖励(归总部账户) + const pendingRewards: Array<{ + pending: any; + reward: ShareAmount; + memo: string; + }> = []; + + for (const pending of pendingContributions) { + const contribution = new ShareAmount(pending.amount); + if (contribution.isZero()) continue; + + const reward = this.calculator.calculateUserMiningReward( + contribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + pendingRewards.push({ + pending, + reward, + memo: `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`, + }); + } + } + + // 如果没有需要处理的数据,直接返回 + if (systemRewards.length === 0 && pendingRewards.length === 0) { + return { + systemDistributed, + pendingDistributed, + systemParticipantCount, + pendingParticipantCount, + systemRedisData, + pendingRedisData, + }; + } + + // 处理系统账户挖矿(复用外部事务) + for (const { accountType, reward, contribution, memo } of systemRewards) { + await this.systemMiningAccountRepository.mine(accountType, reward, memo, tx); + systemDistributed = systemDistributed.add(reward); + systemParticipantCount++; + systemRedisData.push({ accountType, reward, contribution }); + } + + // 处理待解锁算力挖矿(归总部账户) + // 计算总部账户的总收益 + let headquartersTotal = ShareAmount.zero(); + + for (const { pending, reward } of pendingRewards) { + headquartersTotal = headquartersTotal.add(reward); + pendingDistributed = pendingDistributed.add(reward); + pendingParticipantCount++; + pendingRedisData.push({ pendingId: pending.id, reward, pending }); + } + + // 一次性更新总部账户(而不是每个待解锁算力单独更新) + if (!headquartersTotal.isZero()) { + await this.systemMiningAccountRepository.mine( + SystemAccountType.HEADQUARTERS, + headquartersTotal, + `秒挖矿 ${currentSecond.getTime()} - 待解锁算力汇总 (${pendingRewards.length}笔)`, + tx, + ); + } + + return { + systemDistributed, + pendingDistributed, + systemParticipantCount, + pendingParticipantCount, + systemRedisData, + pendingRedisData, + }; + } + /** * 累积每分钟的挖矿数据到Redis */ diff --git a/backend/services/mining-service/src/infrastructure/persistence/unit-of-work/unit-of-work.ts b/backend/services/mining-service/src/infrastructure/persistence/unit-of-work/unit-of-work.ts index c679d2e2..71e6a322 100644 --- a/backend/services/mining-service/src/infrastructure/persistence/unit-of-work/unit-of-work.ts +++ b/backend/services/mining-service/src/infrastructure/persistence/unit-of-work/unit-of-work.ts @@ -2,10 +2,11 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; import { Prisma } from '@prisma/client'; -export type TransactionClient = Omit< - PrismaService, - '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' ->; +/** + * 事务客户端类型 + * 使用 Prisma 官方的 TransactionClient 类型 + */ +export type TransactionClient = Prisma.TransactionClient; /** * 工作单元模式 @@ -19,8 +20,9 @@ export class UnitOfWork { /** * 获取当前事务客户端,如果没有活跃事务则返回普通客户端 + * 注意:返回 PrismaService 时可以兼容 TransactionClient 的操作 */ - getClient(): TransactionClient { + getClient(): TransactionClient | PrismaService { return this.transactionClient || this.prisma; }