fix(mining): use unified transaction to prevent timeout errors

- Wrap all database operations in executeSecondDistribution with
  UnitOfWork.executeInTransaction
- Pass transaction client to repository save methods
- Use longer transaction timeout (60s) for batch operations
- Move Redis operations outside transaction (non-ACID)
- Add distributeToSystemAndPendingInTx method that accepts tx client

This resolves the "Unable to start a transaction in the given time"
error caused by multiple concurrent transactions competing for
database connections.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-17 00:31:20 -08:00
parent 725fb80f80
commit 49949ff979
2 changed files with 256 additions and 45 deletions

View File

@ -102,56 +102,107 @@ export class MiningDistributionService {
let systemParticipantCount = 0; let systemParticipantCount = 0;
let pendingParticipantCount = 0; let pendingParticipantCount = 0;
// 1. 分配给用户账户 // 收集需要累积到Redis的数据事务外执行
let page = 1; const redisAccumulateData: Array<{
const pageSize = 1000; accountSequence: string;
reward: ShareAmount;
accountContribution: ShareAmount;
}> = [];
while (true) { // 使用统一事务执行所有数据库操作
const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize); const result = await this.unitOfWork.executeInTransaction(
if (accounts.length === 0) break; async (tx) => {
// 1. 分配给用户账户
let page = 1;
const pageSize = 1000;
for (const account of accounts) { while (true) {
const reward = this.calculator.calculateUserMiningReward( const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize);
account.totalContribution, if (accounts.length === 0) break;
for (const account of accounts) {
const reward = this.calculator.calculateUserMiningReward(
account.totalContribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
// 每秒更新账户余额
account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`);
await this.miningAccountRepository.save(account, tx);
// 收集Redis累积数据事务外执行
redisAccumulateData.push({
accountSequence: account.accountSequence,
reward,
accountContribution: account.totalContribution,
});
totalDistributed = totalDistributed.add(reward);
userParticipantCount++;
}
}
if (page * pageSize >= total) break;
page++;
}
// 2. 分配给系统账户(运营/省/市)和待解锁算力 - 复用同一事务
const systemAndPendingResult = await this.distributeToSystemAndPendingInTx(
tx,
currentSecond,
currentMinute,
networkTotalContribution, networkTotalContribution,
secondDistribution, secondDistribution,
); );
if (!reward.isZero()) { return systemAndPendingResult;
// 每秒更新账户余额 },
account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); {
await this.miningAccountRepository.save(account); maxWait: 10000, // 10秒等待获取事务
timeout: 60000, // 60秒事务超时
},
);
// 累积每分钟的挖矿数据到Redis totalDistributed = totalDistributed.add(result.systemDistributed);
await this.accumulateMinuteData( totalDistributed = totalDistributed.add(result.pendingDistributed);
account.accountSequence, systemParticipantCount = result.systemParticipantCount;
currentMinute, pendingParticipantCount = result.pendingParticipantCount;
reward,
account.totalContribution,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward); // 事务成功后批量累积Redis数据非事务性操作
userParticipantCount++; for (const data of redisAccumulateData) {
} await this.accumulateMinuteData(
} data.accountSequence,
currentMinute,
if (page * pageSize >= total) break; data.reward,
page++; data.accountContribution,
networkTotalContribution,
secondDistribution,
);
} }
// 2. 分配给系统账户(运营/省/市)和待解锁算力 - 在单个事务中执行 // 累积系统账户和待解锁算力的Redis数据
const systemAndPendingResult = await this.distributeToSystemAndPending( for (const data of result.systemRedisData) {
currentSecond, await this.accumulateSystemMinuteData(
currentMinute, data.accountType,
networkTotalContribution, currentMinute,
secondDistribution, data.reward,
); data.contribution,
totalDistributed = totalDistributed.add(systemAndPendingResult.systemDistributed); networkTotalContribution,
totalDistributed = totalDistributed.add(systemAndPendingResult.pendingDistributed); secondDistribution,
systemParticipantCount = systemAndPendingResult.systemParticipantCount; );
pendingParticipantCount = systemAndPendingResult.pendingParticipantCount; }
for (const data of result.pendingRedisData) {
await this.accumulatePendingMinuteData(
data.pendingId,
currentMinute,
data.reward,
data.pending,
networkTotalContribution,
secondDistribution,
);
}
// 每分钟结束时写入汇总的MiningRecord // 每分钟结束时写入汇总的MiningRecord
if (isMinuteEnd) { if (isMinuteEnd) {
@ -398,6 +449,164 @@ export class MiningDistributionService {
return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount }; return { systemDistributed, pendingDistributed, systemParticipantCount, pendingParticipantCount };
} }
/**
*
* Redis的数据
*/
private async distributeToSystemAndPendingInTx(
tx: TransactionClient,
currentSecond: Date,
currentMinute: Date,
networkTotalContribution: ShareAmount,
secondDistribution: ShareAmount,
): Promise<{
systemDistributed: ShareAmount;
pendingDistributed: ShareAmount;
systemParticipantCount: number;
pendingParticipantCount: number;
systemRedisData: Array<{
accountType: SystemAccountType;
reward: ShareAmount;
contribution: ShareAmount;
}>;
pendingRedisData: Array<{
pendingId: bigint;
reward: ShareAmount;
pending: any;
}>;
}> {
let systemDistributed = ShareAmount.zero();
let pendingDistributed = ShareAmount.zero();
let systemParticipantCount = 0;
let pendingParticipantCount = 0;
const systemRedisData: Array<{
accountType: SystemAccountType;
reward: ShareAmount;
contribution: ShareAmount;
}> = [];
const pendingRedisData: Array<{
pendingId: bigint;
reward: ShareAmount;
pending: any;
}> = [];
// 预先获取所有需要的数据
const systemAccounts = await this.systemMiningAccountRepository.findAll();
const pendingContributions = await tx.pendingContributionMining.findMany({
where: { isExpired: false },
});
// 计算所有系统账户的挖矿奖励
const systemRewards: Array<{
accountType: SystemAccountType;
reward: ShareAmount;
contribution: ShareAmount;
memo: string;
}> = [];
for (const systemAccount of systemAccounts) {
// 总部账户不直接参与挖矿,它只接收未解锁算力的收益
if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) {
continue;
}
if (systemAccount.totalContribution.isZero()) {
continue;
}
const reward = this.calculator.calculateUserMiningReward(
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
systemRewards.push({
accountType: systemAccount.accountType,
reward,
contribution: systemAccount.totalContribution,
memo: `秒挖矿 ${currentSecond.getTime()}`,
});
}
}
// 计算所有待解锁算力的挖矿奖励(归总部账户)
const pendingRewards: Array<{
pending: any;
reward: ShareAmount;
memo: string;
}> = [];
for (const pending of pendingContributions) {
const contribution = new ShareAmount(pending.amount);
if (contribution.isZero()) continue;
const reward = this.calculator.calculateUserMiningReward(
contribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
pendingRewards.push({
pending,
reward,
memo: `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`,
});
}
}
// 如果没有需要处理的数据,直接返回
if (systemRewards.length === 0 && pendingRewards.length === 0) {
return {
systemDistributed,
pendingDistributed,
systemParticipantCount,
pendingParticipantCount,
systemRedisData,
pendingRedisData,
};
}
// 处理系统账户挖矿(复用外部事务)
for (const { accountType, reward, contribution, memo } of systemRewards) {
await this.systemMiningAccountRepository.mine(accountType, reward, memo, tx);
systemDistributed = systemDistributed.add(reward);
systemParticipantCount++;
systemRedisData.push({ accountType, reward, contribution });
}
// 处理待解锁算力挖矿(归总部账户)
// 计算总部账户的总收益
let headquartersTotal = ShareAmount.zero();
for (const { pending, reward } of pendingRewards) {
headquartersTotal = headquartersTotal.add(reward);
pendingDistributed = pendingDistributed.add(reward);
pendingParticipantCount++;
pendingRedisData.push({ pendingId: pending.id, reward, pending });
}
// 一次性更新总部账户(而不是每个待解锁算力单独更新)
if (!headquartersTotal.isZero()) {
await this.systemMiningAccountRepository.mine(
SystemAccountType.HEADQUARTERS,
headquartersTotal,
`秒挖矿 ${currentSecond.getTime()} - 待解锁算力汇总 (${pendingRewards.length}笔)`,
tx,
);
}
return {
systemDistributed,
pendingDistributed,
systemParticipantCount,
pendingParticipantCount,
systemRedisData,
pendingRedisData,
};
}
/** /**
* Redis * Redis
*/ */

View File

@ -2,10 +2,11 @@ import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service'; import { PrismaService } from '../prisma/prisma.service';
import { Prisma } from '@prisma/client'; import { Prisma } from '@prisma/client';
export type TransactionClient = Omit< /**
PrismaService, *
'$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' * 使 Prisma TransactionClient
>; */
export type TransactionClient = Prisma.TransactionClient;
/** /**
* *
@ -19,8 +20,9 @@ export class UnitOfWork {
/** /**
* *
* PrismaService TransactionClient
*/ */
getClient(): TransactionClient { getClient(): TransactionClient | PrismaService {
return this.transactionClient || this.prisma; return this.transactionClient || this.prisma;
} }