diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index d8ba6497..28069b61 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -139,55 +139,17 @@ export class MiningDistributionService { page++; } - // 2. 分配给系统账户(运营/省/市) - const systemAccounts = await this.systemMiningAccountRepository.findAll(); - for (const systemAccount of systemAccounts) { - // 总部账户不直接参与挖矿,它只接收未解锁算力的收益 - if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) { - continue; - } - - if (systemAccount.totalContribution.isZero()) { - continue; - } - - const reward = this.calculator.calculateUserMiningReward( - systemAccount.totalContribution, - networkTotalContribution, - secondDistribution, - ); - - if (!reward.isZero()) { - await this.systemMiningAccountRepository.mine( - systemAccount.accountType, - reward, - `秒挖矿 ${currentSecond.getTime()}`, - ); - - // 累积系统账户每分钟的挖矿数据到Redis - await this.accumulateSystemMinuteData( - systemAccount.accountType, - currentMinute, - reward, - systemAccount.totalContribution, - networkTotalContribution, - secondDistribution, - ); - - totalDistributed = totalDistributed.add(reward); - systemParticipantCount++; - } - } - - // 3. 分配未解锁算力的收益给总部账户 - const pendingMiningResult = await this.distributePendingContributions( + // 2. 分配给系统账户(运营/省/市)和待解锁算力 - 在单个事务中执行 + const systemAndPendingResult = await this.distributeToSystemAndPending( currentSecond, currentMinute, networkTotalContribution, secondDistribution, ); - totalDistributed = totalDistributed.add(pendingMiningResult.totalDistributed); - pendingParticipantCount = pendingMiningResult.participantCount; + totalDistributed = totalDistributed.add(systemAndPendingResult.systemDistributed); + totalDistributed = totalDistributed.add(systemAndPendingResult.pendingDistributed); + systemParticipantCount = systemAndPendingResult.systemParticipantCount; + pendingParticipantCount = systemAndPendingResult.pendingParticipantCount; // 每分钟结束时,写入汇总的MiningRecord if (isMinuteEnd) { @@ -286,22 +248,72 @@ export class MiningDistributionService { } /** - * 分配未解锁算力的挖矿收益给总部账户 + * 在单个事务中分配系统账户和待解锁算力的挖矿收益 + * 解决并发事务更新同一行导致的行锁竞争问题 */ - private async distributePendingContributions( + private async distributeToSystemAndPending( currentSecond: Date, currentMinute: Date, networkTotalContribution: ShareAmount, secondDistribution: ShareAmount, - ): Promise<{ totalDistributed: ShareAmount; participantCount: number }> { - let totalDistributed = ShareAmount.zero(); - let participantCount = 0; + ): Promise<{ + systemDistributed: ShareAmount; + pendingDistributed: ShareAmount; + systemParticipantCount: number; + pendingParticipantCount: number; + }> { + let systemDistributed = ShareAmount.zero(); + let pendingDistributed = ShareAmount.zero(); + let systemParticipantCount = 0; + let pendingParticipantCount = 0; - // 获取所有未过期的待解锁算力 + // 预先获取所有需要的数据 + const systemAccounts = await this.systemMiningAccountRepository.findAll(); const pendingContributions = await this.prisma.pendingContributionMining.findMany({ where: { isExpired: false }, }); + // 计算所有系统账户的挖矿奖励 + const systemRewards: Array<{ + accountType: SystemAccountType; + reward: ShareAmount; + contribution: ShareAmount; + memo: string; + }> = []; + + for (const systemAccount of systemAccounts) { + // 总部账户不直接参与挖矿,它只接收未解锁算力的收益 + if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) { + continue; + } + + if (systemAccount.totalContribution.isZero()) { + continue; + } + + const reward = this.calculator.calculateUserMiningReward( + systemAccount.totalContribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + systemRewards.push({ + accountType: systemAccount.accountType, + reward, + contribution: systemAccount.totalContribution, + memo: `秒挖矿 ${currentSecond.getTime()}`, + }); + } + } + + // 计算所有待解锁算力的挖矿奖励(归总部账户) + const pendingRewards: Array<{ + pending: any; + reward: ShareAmount; + memo: string; + }> = []; + for (const pending of pendingContributions) { const contribution = new ShareAmount(pending.amount); if (contribution.isZero()) continue; @@ -313,29 +325,75 @@ export class MiningDistributionService { ); if (!reward.isZero()) { - // 收益归总部账户 - await this.systemMiningAccountRepository.mine( - SystemAccountType.HEADQUARTERS, - reward, - `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`, - ); - - // 累积待解锁算力每分钟的挖矿数据到Redis - await this.accumulatePendingMinuteData( - pending.id, - currentMinute, - reward, + pendingRewards.push({ pending, - networkTotalContribution, - secondDistribution, - ); - - totalDistributed = totalDistributed.add(reward); - participantCount++; + reward, + memo: `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`, + }); } } - return { totalDistributed, participantCount }; + // 如果没有需要处理的数据,直接返回 + if (systemRewards.length === 0 && pendingRewards.length === 0) { + return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount }; + } + + // 在单个事务中执行所有系统账户和待解锁算力的挖矿 + await this.prisma.$transaction(async (tx) => { + // 处理系统账户挖矿 + for (const { accountType, reward, memo } of systemRewards) { + await this.systemMiningAccountRepository.mine(accountType, reward, memo, tx); + systemDistributed = systemDistributed.add(reward); + systemParticipantCount++; + } + + // 处理待解锁算力挖矿(归总部账户) + // 计算总部账户的总收益 + let headquartersTotal = ShareAmount.zero(); + const headquartersMemos: string[] = []; + + for (const { reward, memo } of pendingRewards) { + headquartersTotal = headquartersTotal.add(reward); + headquartersMemos.push(memo); + pendingDistributed = pendingDistributed.add(reward); + pendingParticipantCount++; + } + + // 一次性更新总部账户(而不是每个待解锁算力单独更新) + if (!headquartersTotal.isZero()) { + await this.systemMiningAccountRepository.mine( + SystemAccountType.HEADQUARTERS, + headquartersTotal, + `秒挖矿 ${currentSecond.getTime()} - 待解锁算力汇总 (${pendingRewards.length}笔)`, + tx, + ); + } + }); + + // 事务成功后,累积 Redis 数据(Redis 操作不需要在事务内) + for (const { accountType, reward, contribution } of systemRewards) { + await this.accumulateSystemMinuteData( + accountType, + currentMinute, + reward, + contribution, + networkTotalContribution, + secondDistribution, + ); + } + + for (const { pending, reward } of pendingRewards) { + await this.accumulatePendingMinuteData( + pending.id, + currentMinute, + reward, + pending, + networkTotalContribution, + secondDistribution, + ); + } + + return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount }; } /** diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts index 0ff88234..cc5df892 100644 --- a/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts @@ -1,7 +1,10 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; import { ShareAmount } from '../../../domain/value-objects/share-amount.vo'; -import { SystemAccountType } from '@prisma/client'; +import { SystemAccountType, Prisma } from '@prisma/client'; + +// 事务客户端类型 +type TransactionClient = Prisma.TransactionClient; export interface SystemMiningAccountSnapshot { accountType: SystemAccountType; @@ -86,13 +89,21 @@ export class SystemMiningAccountRepository { return new ShareAmount(result._sum.totalContribution || 0); } + /** + * 执行系统账户挖矿(带外部事务支持) + * @param accountType 账户类型 + * @param amount 挖矿数量 + * @param memo 备注 + * @param tx 可选的外部事务客户端,如果不传则自动创建事务 + */ async mine( accountType: SystemAccountType, amount: ShareAmount, memo: string, + tx?: TransactionClient, ): Promise { - await this.prisma.$transaction(async (tx) => { - const account = await tx.systemMiningAccount.findUnique({ + const executeInTx = async (client: TransactionClient) => { + const account = await client.systemMiningAccount.findUnique({ where: { accountType }, }); @@ -104,7 +115,7 @@ export class SystemMiningAccountRepository { const balanceAfter = balanceBefore.plus(amount.value); const totalMined = account.totalMined.plus(amount.value); - await tx.systemMiningAccount.update({ + await client.systemMiningAccount.update({ where: { accountType }, data: { totalMined, @@ -112,7 +123,7 @@ export class SystemMiningAccountRepository { }, }); - await tx.systemMiningTransaction.create({ + await client.systemMiningTransaction.create({ data: { accountType, type: 'MINE', @@ -122,7 +133,15 @@ export class SystemMiningAccountRepository { memo, }, }); - }); + }; + + if (tx) { + // 使用外部事务 + await executeInTx(tx); + } else { + // 自动创建事务(向后兼容) + await this.prisma.$transaction(executeInTx); + } } async saveMinuteRecord(