fix(mining): resolve transaction timeout by using single transaction for system accounts

Problem:
- Multiple concurrent transactions updating system_mining_accounts caused row lock contention
- 16+ transactions waiting for tuple/transactionid locks led to timeout errors
- This prevented writeMinuteRecords() from executing, leaving mining_records empty

Solution:
- Modified SystemMiningAccountRepository.mine() to accept optional external transaction client
- Created new distributeToSystemAndPending() method that processes all system accounts
  and pending contributions in a single transaction
- Pre-calculate all rewards before transaction, then execute updates sequentially
- Aggregate all pending contribution rewards into single HEADQUARTERS update
- Move Redis accumulation outside transaction to avoid blocking

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-16 18:53:53 -08:00
parent d5e5bf642c
commit 9deffe2565
2 changed files with 152 additions and 75 deletions

View File

@ -139,55 +139,17 @@ export class MiningDistributionService {
page++;
}
// 2. 分配给系统账户(运营/省/市)
const systemAccounts = await this.systemMiningAccountRepository.findAll();
for (const systemAccount of systemAccounts) {
// 总部账户不直接参与挖矿,它只接收未解锁算力的收益
if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) {
continue;
}
if (systemAccount.totalContribution.isZero()) {
continue;
}
const reward = this.calculator.calculateUserMiningReward(
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
await this.systemMiningAccountRepository.mine(
systemAccount.accountType,
reward,
`秒挖矿 ${currentSecond.getTime()}`,
);
// 累积系统账户每分钟的挖矿数据到Redis
await this.accumulateSystemMinuteData(
systemAccount.accountType,
currentMinute,
reward,
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
systemParticipantCount++;
}
}
// 3. 分配未解锁算力的收益给总部账户
const pendingMiningResult = await this.distributePendingContributions(
// 2. 分配给系统账户(运营/省/市)和待解锁算力 - 在单个事务中执行
const systemAndPendingResult = await this.distributeToSystemAndPending(
currentSecond,
currentMinute,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(pendingMiningResult.totalDistributed);
pendingParticipantCount = pendingMiningResult.participantCount;
totalDistributed = totalDistributed.add(systemAndPendingResult.systemDistributed);
totalDistributed = totalDistributed.add(systemAndPendingResult.pendingDistributed);
systemParticipantCount = systemAndPendingResult.systemParticipantCount;
pendingParticipantCount = systemAndPendingResult.pendingParticipantCount;
// 每分钟结束时写入汇总的MiningRecord
if (isMinuteEnd) {
@ -286,22 +248,72 @@ export class MiningDistributionService {
}
/**
*
*
*
*/
private async distributePendingContributions(
private async distributeToSystemAndPending(
currentSecond: Date,
currentMinute: Date,
networkTotalContribution: ShareAmount,
secondDistribution: ShareAmount,
): Promise<{ totalDistributed: ShareAmount; participantCount: number }> {
let totalDistributed = ShareAmount.zero();
let participantCount = 0;
): Promise<{
systemDistributed: ShareAmount;
pendingDistributed: ShareAmount;
systemParticipantCount: number;
pendingParticipantCount: number;
}> {
let systemDistributed = ShareAmount.zero();
let pendingDistributed = ShareAmount.zero();
let systemParticipantCount = 0;
let pendingParticipantCount = 0;
// 获取所有未过期的待解锁算力
// 预先获取所有需要的数据
const systemAccounts = await this.systemMiningAccountRepository.findAll();
const pendingContributions = await this.prisma.pendingContributionMining.findMany({
where: { isExpired: false },
});
// 计算所有系统账户的挖矿奖励
const systemRewards: Array<{
accountType: SystemAccountType;
reward: ShareAmount;
contribution: ShareAmount;
memo: string;
}> = [];
for (const systemAccount of systemAccounts) {
// 总部账户不直接参与挖矿,它只接收未解锁算力的收益
if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) {
continue;
}
if (systemAccount.totalContribution.isZero()) {
continue;
}
const reward = this.calculator.calculateUserMiningReward(
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
systemRewards.push({
accountType: systemAccount.accountType,
reward,
contribution: systemAccount.totalContribution,
memo: `秒挖矿 ${currentSecond.getTime()}`,
});
}
}
// 计算所有待解锁算力的挖矿奖励(归总部账户)
const pendingRewards: Array<{
pending: any;
reward: ShareAmount;
memo: string;
}> = [];
for (const pending of pendingContributions) {
const contribution = new ShareAmount(pending.amount);
if (contribution.isZero()) continue;
@ -313,29 +325,75 @@ export class MiningDistributionService {
);
if (!reward.isZero()) {
// 收益归总部账户
await this.systemMiningAccountRepository.mine(
SystemAccountType.HEADQUARTERS,
reward,
`未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`,
);
// 累积待解锁算力每分钟的挖矿数据到Redis
await this.accumulatePendingMinuteData(
pending.id,
currentMinute,
reward,
pendingRewards.push({
pending,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
participantCount++;
reward,
memo: `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`,
});
}
}
return { totalDistributed, participantCount };
// 如果没有需要处理的数据,直接返回
if (systemRewards.length === 0 && pendingRewards.length === 0) {
return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount };
}
// 在单个事务中执行所有系统账户和待解锁算力的挖矿
await this.prisma.$transaction(async (tx) => {
// 处理系统账户挖矿
for (const { accountType, reward, memo } of systemRewards) {
await this.systemMiningAccountRepository.mine(accountType, reward, memo, tx);
systemDistributed = systemDistributed.add(reward);
systemParticipantCount++;
}
// 处理待解锁算力挖矿(归总部账户)
// 计算总部账户的总收益
let headquartersTotal = ShareAmount.zero();
const headquartersMemos: string[] = [];
for (const { reward, memo } of pendingRewards) {
headquartersTotal = headquartersTotal.add(reward);
headquartersMemos.push(memo);
pendingDistributed = pendingDistributed.add(reward);
pendingParticipantCount++;
}
// 一次性更新总部账户(而不是每个待解锁算力单独更新)
if (!headquartersTotal.isZero()) {
await this.systemMiningAccountRepository.mine(
SystemAccountType.HEADQUARTERS,
headquartersTotal,
`秒挖矿 ${currentSecond.getTime()} - 待解锁算力汇总 (${pendingRewards.length}笔)`,
tx,
);
}
});
// 事务成功后,累积 Redis 数据Redis 操作不需要在事务内)
for (const { accountType, reward, contribution } of systemRewards) {
await this.accumulateSystemMinuteData(
accountType,
currentMinute,
reward,
contribution,
networkTotalContribution,
secondDistribution,
);
}
for (const { pending, reward } of pendingRewards) {
await this.accumulatePendingMinuteData(
pending.id,
currentMinute,
reward,
pending,
networkTotalContribution,
secondDistribution,
);
}
return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount };
}
/**

View File

@ -1,7 +1,10 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { ShareAmount } from '../../../domain/value-objects/share-amount.vo';
import { SystemAccountType } from '@prisma/client';
import { SystemAccountType, Prisma } from '@prisma/client';
// 事务客户端类型
type TransactionClient = Prisma.TransactionClient;
export interface SystemMiningAccountSnapshot {
accountType: SystemAccountType;
@ -86,13 +89,21 @@ export class SystemMiningAccountRepository {
return new ShareAmount(result._sum.totalContribution || 0);
}
/**
*
* @param accountType
* @param amount
* @param memo
* @param tx
*/
async mine(
accountType: SystemAccountType,
amount: ShareAmount,
memo: string,
tx?: TransactionClient,
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
const account = await tx.systemMiningAccount.findUnique({
const executeInTx = async (client: TransactionClient) => {
const account = await client.systemMiningAccount.findUnique({
where: { accountType },
});
@ -104,7 +115,7 @@ export class SystemMiningAccountRepository {
const balanceAfter = balanceBefore.plus(amount.value);
const totalMined = account.totalMined.plus(amount.value);
await tx.systemMiningAccount.update({
await client.systemMiningAccount.update({
where: { accountType },
data: {
totalMined,
@ -112,7 +123,7 @@ export class SystemMiningAccountRepository {
},
});
await tx.systemMiningTransaction.create({
await client.systemMiningTransaction.create({
data: {
accountType,
type: 'MINE',
@ -122,7 +133,15 @@ export class SystemMiningAccountRepository {
memo,
},
});
});
};
if (tx) {
// 使用外部事务
await executeInTx(tx);
} else {
// 自动创建事务(向后兼容)
await this.prisma.$transaction(executeInTx);
}
}
async saveMinuteRecord(