fix(mining): 修复挖矿分配并发覆盖贡献值同步的 Lost Update 问题

挖矿分配每秒运行的 save() 无条件写回所有字段(含 totalContribution),
导致贡献值同步刚更新的正确值被立即覆盖回旧值。
同时修复 DailySnapshot 全量同步一直 synced 0 accounts 的安全网失效问题。

- repository save() 增加 skipContributionUpdate 选项
- 挖矿分配路径传入 skipContributionUpdate: true
- contribution-service DailySnapshot 事件 payload 补全字段
- mining-service 适配字段名差异并修复 API 解析 bug

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-03 07:24:52 -08:00
parent 92c305c749
commit 2b2e1efc7a
7 changed files with 398 additions and 17 deletions

View File

@ -101,9 +101,12 @@ export class SnapshotService {
aggregateId: dateStr,
eventType: 'DailySnapshotCreated',
payload: {
snapshotId: dateStr,
snapshotDate: dateStr,
totalContribution: networkTotalContribution.toString(),
networkTotalContribution: networkTotalContribution.toString(),
activeAccounts: activeAccounts.length,
accountCount: activeAccounts.length,
totalAccounts,
createdAt: new Date().toISOString(),
},

View File

@ -82,10 +82,10 @@ export class ContributionEventHandler implements OnModuleInit {
this.logger.log(`Received DailySnapshotCreated for ${eventPayload.snapshotDate}`);
await this.syncService.handleDailySnapshotCreated({
snapshotId: eventPayload.snapshotId,
snapshotId: eventPayload.snapshotId || eventPayload.snapshotDate || 'unknown',
snapshotDate: eventPayload.snapshotDate,
totalContribution: eventPayload.totalContribution,
activeAccounts: eventPayload.activeAccounts,
totalContribution: eventPayload.totalContribution || eventPayload.networkTotalContribution || '0',
activeAccounts: eventPayload.activeAccounts || eventPayload.accountCount || 0,
});
} else if (eventType === 'SystemAccountSynced') {
this.logger.log(

View File

@ -7,7 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service';
interface ContributionData {
accountSequence: string;
totalContribution: string;
contribution: string;
}
/**
@ -94,11 +94,11 @@ export class ContributionSyncService {
pageSize,
);
if (!response || response.data.length === 0) break;
if (!response || !response.data || response.data.length === 0) break;
// 批量更新
for (const item of response.data) {
await this.syncAccountContribution(item.accountSequence, item.totalContribution);
await this.syncAccountContribution(item.accountSequence, item.contribution);
syncedCount++;
}
@ -133,7 +133,8 @@ export class ContributionSyncService {
}
const result = await response.json();
return result.data;
// API 直接返回 { data: [...], total, totalContribution }
return result;
} catch (error) {
this.logger.error('Failed to fetch contribution ratios', error);
return null;

View File

@ -132,7 +132,7 @@ export class MiningDistributionService {
if (!reward.isZero()) {
// 每秒更新账户余额
account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`);
await this.miningAccountRepository.save(account, tx);
await this.miningAccountRepository.save(account, tx, { skipContributionUpdate: true });
// 收集Redis累积数据事务外执行
redisAccumulateData.push({
@ -288,7 +288,7 @@ export class MiningDistributionService {
if (!reward.isZero()) {
account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`);
await this.miningAccountRepository.save(account);
await this.miningAccountRepository.save(account, undefined, { skipContributionUpdate: true });
await this.accumulateMinuteData(
account.accountSequence,

View File

@ -37,12 +37,29 @@ export class MiningAccountRepository {
*
* @param aggregate
* @param tx
* @param options.skipContributionUpdate true totalContribution/lastSyncedAt
* Lost Update
*/
async save(aggregate: MiningAccountAggregate, tx?: TransactionClient): Promise<void> {
async save(
aggregate: MiningAccountAggregate,
tx?: TransactionClient,
options?: { skipContributionUpdate?: boolean },
): Promise<void> {
const snapshot = aggregate.toSnapshot();
const transactions = aggregate.pendingTransactions;
const executeInTx = async (client: TransactionClient) => {
// 构建 update 数据:挖矿分配路径不写 totalContribution/lastSyncedAt
const updateData: Record<string, any> = {
totalMined: snapshot.totalMined.value,
availableBalance: snapshot.availableBalance.value,
frozenBalance: snapshot.frozenBalance.value,
};
if (!options?.skipContributionUpdate) {
updateData.totalContribution = snapshot.totalContribution.value;
updateData.lastSyncedAt = snapshot.lastSyncedAt;
}
// 保存账户
await client.miningAccount.upsert({
where: { accountSequence: snapshot.accountSequence },
@ -54,13 +71,7 @@ export class MiningAccountRepository {
totalContribution: snapshot.totalContribution.value,
lastSyncedAt: snapshot.lastSyncedAt,
},
update: {
totalMined: snapshot.totalMined.value,
availableBalance: snapshot.availableBalance.value,
frozenBalance: snapshot.frozenBalance.value,
totalContribution: snapshot.totalContribution.value,
lastSyncedAt: snapshot.lastSyncedAt,
},
update: updateData,
});
// 保存交易流水

View File

@ -0,0 +1,110 @@
# BUG: 挖矿分配并发覆盖贡献值同步 (Lost Update)
**状态**: 待解决
**严重程度**: 高 - 影响用户挖矿收益
**发现日期**: 2026-03-03
**影响范围**: 所有预种用户 (D2602260xxxx)12个账户数据异常
---
## 1. 问题描述
用户购买多份预种后contribution-service 正确记录了累计贡献值,但 mining-service 中只显示第一份的贡献值,导致挖矿收益严重偏低。
**示例:**
- D26022600018 购买 4 份预种 → contribution 记录 13,117.860 → mining 只显示 3,279.465
- D26022600001 购买 5 份合成 1 棵树 → contribution 记录 16,397.325 → mining 只显示 3,279.465
---
## 2. 根因分析
### 主因MiningDistributionService 的 save() 覆盖 totalContribution
mining-service 存在两个并发写入路径共享同一个 `save()` 方法:
**路径 A - 贡献值同步**Kafka 事件触发,不定时):
```
ContributionEventHandler → ContributionSyncService.handleContributionCalculated()
→ account.updateContribution(newValue) // 更新 totalContribution
→ miningAccountRepository.save(account) // upsert 所有字段
```
**路径 B - 挖矿分配**(每秒执行):
```
MiningDistributionService.executeSecondDistribution()
→ findAllWithContribution() // 加载账户到内存(含 totalContribution
→ account.mine(reward) // 只改 totalMined/availableBalance
→ miningAccountRepository.save(account) // upsert 所有字段(含 stale totalContribution
```
**竞争时序:**
```
T0 挖矿分配加载 D26022600018, totalContribution=3279.465
T1 贡献值同步更新 D26022600018 → totalContribution=13117.860 ✓ (DB 已更新)
T2 挖矿分配保存 D26022600018, 写回 totalContribution=3279.465 ← 覆盖!
```
**日志证据:**
```
03/01 02:50:30 - Updated contribution for D26022600001: 16397.325 ← 同步成功
03/03 当前 DB - D26022600001 totalContribution = 3279.465 ← 被覆盖回旧值
```
挖矿分配每秒运行,贡献值同步后几乎必然在下一秒被覆盖。
**代码位置:**
- `mining-account.repository.ts:57-62` — save() 的 upsert 无条件写入所有字段
- `mining-distribution.service.ts:122-135` — 每秒加载→mine()→save()
- `mining-account.aggregate.ts:126-141` — mine() 只改余额但 save() 会写回所有字段
### 次因DailySnapshot 全量同步完全失效
本应作为安全网的每日全量同步,自上线以来一直 synced 0 accounts
```
03/01 01:00 - DailySnapshotCreated for 2026-02-27 → synced 0 accounts
03/02 01:00 - DailySnapshotCreated for 2026-02-28 → synced 0 accounts
03/03 01:00 - DailySnapshotCreated for 2026-03-01 → synced 0 accounts
```
**原因:**
1. contribution-service 发布的事件 payload 缺少 `snapshotId` 字段 → mining-service 读到 undefined
2. 字段名不匹配contribution 发 `networkTotalContribution`mining 读 `totalContribution`
3. `fetchContributionRatios()` 返回数据解析失败(`result.data` 路径不对)
**代码位置:**
- `contribution-service/snapshot.service.ts:99-110` — payload 缺少 snapshotId
- `mining-service/contribution-sync.service.ts:63-68` — 期望 snapshotId
- `mining-service/contribution-sync.service.ts:126-141` — API 调用和解析
---
## 3. 受影响用户
| 用户 | contribution 正确值 | mining 当前值 | 差额 |
|------|-------------------|-------------|------|
| D26022600001 | 16,397.325 | 3,279.465 | -13,117.860 |
| D26022600014 | 13,253.562 | 3,279.465 | -9,974.097 |
| D26022600015 | 6,649.398 | 3,279.465 | -3,369.933 |
| D26022600016 | 13,117.860 | 3,279.465 | -9,838.395 |
| D26022600018 | 13,117.860 | 3,279.465 | -9,838.395 |
| D26022600033 | 3,754.422 | 3,279.465 | -474.957 |
| D26022600034 | 6,988.653 | 3,279.465 | -3,709.188 |
| D26022600035 | 3,799.656 | 3,392.550 | -407.106 |
| D26022600036 | 17,211.537 | 3,279.465 | -13,932.072 |
| D26022600037 | 16,397.325 | 3,279.465 | -13,117.860 |
| D26022600038 | 19,676.790 | 3,279.465 | -16,397.325 |
| D26022600043 | 6,558.930 | 3,279.465 | -3,279.465 |
D26022600000 是唯一正常的预种用户。旧用户 (D2512xxxx) 不受影响。
---
## 4. 为什么旧用户不受影响
旧用户在系统稳定后完成了一次性同步,之后没有新的贡献值变更事件。
挖矿分配虽然每秒覆写 totalContribution但写回的值恰好等于正确值没有新同步来改变它
预种用户是分批多次产生贡献值的(先 1 份 → 2 份 → ... → N 份),
所以挖矿分配总是把后续累加的正确值覆盖回最初同步的旧值。

View File

@ -0,0 +1,256 @@
# 修复方案评估:挖矿分配并发覆盖贡献值
## 概览
需要解决三个问题:
1. **主因修复** — 消除挖矿分配对 totalContribution 的并发覆盖
2. **安全网修复** — 恢复 DailySnapshot 全量同步功能
3. **数据修复** — 将 12 个受影响用户的 mining 数据校正
---
## 方案对比
| 方案 | 改动范围 | 风险 | 可靠性 | 实施难度 |
|------|---------|------|-------|---------|
| A. 挖矿分配只更新挖矿字段 | 1个文件 | **低** | **高** | **低** |
| B. Repository 分方法 | 2个文件 | **低** | **高** | 低 |
| C. 数据库层面原子更新 | 1个文件 | 中 | 高 | 中 |
| D. 乐观锁 version 字段 | 4个文件+迁移 | 中高 | 高 | 高 |
| E. 贡献值独立表 | 多文件+迁移 | **高** | 高 | **高** |
---
## 方案 A挖矿分配路径只更新挖矿字段推荐
**思路**:挖矿分配调用 save() 时,不写 totalContribution 和 lastSyncedAt。
**改动**:只改 `mining-account.repository.ts`,给 save() 加一个选项参数。
```typescript
// mining-account.repository.ts
async save(
aggregate: MiningAccountAggregate,
tx?: TransactionClient,
options?: { skipContributionUpdate?: boolean },
): Promise<void> {
const snapshot = aggregate.toSnapshot();
const updateData: any = {
totalMined: snapshot.totalMined.value,
availableBalance: snapshot.availableBalance.value,
frozenBalance: snapshot.frozenBalance.value,
};
// 仅在贡献值同步路径才写入这两个字段
if (!options?.skipContributionUpdate) {
updateData.totalContribution = snapshot.totalContribution.value;
updateData.lastSyncedAt = snapshot.lastSyncedAt;
}
await client.miningAccount.upsert({
where: { accountSequence: snapshot.accountSequence },
create: { /* 所有字段 - 新账户需要全部 */ },
update: updateData,
});
}
```
**调用方改动**:只需在 mining-distribution.service.ts 的 save 调用加一个参数:
```typescript
await this.miningAccountRepository.save(account, tx, { skipContributionUpdate: true });
```
**优点**
- 改动极小1个文件加参数1个调用方加选项
- 向后兼容 — 不传 options 的调用方行为不变
- 完全消除竞争条件 — 挖矿分配路径不再触碰 totalContribution
- 不需要数据库迁移
**缺点**
- 语义上 save() 变得不纯粹(不总是保存所有字段)
- 新增调用方需要知道传不传 options
**风险:低**
- 逻辑简单直观,不引入新概念
- mine() 本身就不修改 totalContributionsave() 不写它是逻辑自洽的
- 可通过单元测试充分验证
---
## 方案 B拆分 Repository 方法
**思路**:创建专用的 `saveMiningResult()` 方法,只更新挖矿相关字段。
```typescript
// mining-account.repository.ts 新增方法
async saveMiningResult(aggregate: MiningAccountAggregate, tx?: TransactionClient): Promise<void> {
const snapshot = aggregate.toSnapshot();
const transactions = aggregate.pendingTransactions;
const executeInTx = async (client: TransactionClient) => {
await client.miningAccount.update({
where: { accountSequence: snapshot.accountSequence },
data: {
totalMined: snapshot.totalMined.value,
availableBalance: snapshot.availableBalance.value,
frozenBalance: snapshot.frozenBalance.value,
// 不写 totalContribution 和 lastSyncedAt
},
});
if (transactions.length > 0) {
await client.miningTransaction.createMany({ /* 同原逻辑 */ });
}
};
// ...
}
```
**优点**
- 方法职责清晰 — save() 保存全量saveMiningResult() 只保存挖矿结果
- 不修改现有 save() 签名
**缺点**
- 新增一个方法,代码量稍多
- 需要在 mining-distribution.service.ts 改调用方
- 用 update 而非 upsert如果账户不存在会报错但实际场景不会因为有贡献值才会参与挖矿
**风险:低**
- 与方案 A 本质相同,只是组织方式不同
---
## 方案 C数据库层面原子 increment
**思路**:挖矿分配不加载整个 aggregate直接用 Prisma 的 increment 操作。
```typescript
// mining-distribution.service.ts 中
await tx.miningAccount.update({
where: { accountSequence: account.accountSequence },
data: {
totalMined: { increment: reward.value },
availableBalance: { increment: reward.value },
// 不写 totalContribution
},
});
```
**优点**
- 完全无竞争 — 数据库层面原子操作
- 不需要加载 aggregate
**缺点**
- 绕过了 DDD aggregate 模式 — mine() 方法和 pendingTransactions 不再被使用
- 需要单独处理 MiningTransaction 的写入(不能从 aggregate 获取 balanceBefore/After
- 改动较大,需要重构挖矿分配的核心逻辑
- 丢失了 aggregate 内的业务校验
**风险:中**
- 需要重构核心挖矿循环
- 需要额外逻辑计算 balanceBefore/After 用于交易流水
- 不再经过 aggregate 校验
---
## 方案 D乐观锁 (version 字段)
**思路**:给 mining_accounts 加 version 字段save 时检查版本号。
```prisma
model MiningAccount {
// ... existing fields
version Int @default(0)
}
```
```typescript
await client.miningAccount.update({
where: { accountSequence: snapshot.accountSequence, version: snapshot.version },
data: { ...fields, version: { increment: 1 } },
});
// 如果 version 不匹配,抛出异常,调用方重试
```
**优点**
- 标准并发控制模式
- 适用于所有写入路径
**缺点**
- 需要数据库迁移
- 需要在所有写入路径添加重试逻辑
- 挖矿分配每秒运行,重试频率可能很高
- 增加系统复杂度
**风险:中高**
- 数据库迁移需要在生产环境执行
- 重试逻辑引入额外复杂度
- 高频写入场景下乐观锁冲突率高,可能影响性能
---
## 方案 E贡献值独立表
**思路**:将 totalContribution 从 mining_accounts 移出,或在查询时 JOIN contribution 表。
**不推荐** — 改动过大,涉及 schema 迁移、多处查询改动、跨服务依赖增加。
---
## DailySnapshot 安全网修复(独立于主因修复)
无论选择哪个主因方案,都需要修复 DailySnapshot 同步:
### 改动点 1contribution-service 事件 payload 补全
```typescript
// snapshot.service.ts
payload: {
snapshotDate: dateStr,
snapshotId: snapshot.id?.toString(), // 补充
totalContribution: networkTotalContribution.toString(), // 字段名对齐
networkTotalContribution: networkTotalContribution.toString(),
activeAccounts: activeAccounts.length,
totalAccounts,
createdAt: new Date().toISOString(),
},
```
### 改动点 2mining-service 解析适配
```typescript
// contribution-sync.service.ts handleDailySnapshotCreated
const data = {
snapshotId: data.snapshotId || 'unknown',
snapshotDate: data.snapshotDate,
totalContribution: data.totalContribution || data.networkTotalContribution || '0',
activeAccounts: data.activeAccounts || data.accountCount || 0,
};
```
### 改动点 3验证 API 端点
确认 `GET /api/v1/snapshots/{date}/ratios` 的返回格式与 `fetchContributionRatios()` 的解析匹配。
---
## 数据修复方案
主因修复部署后,需要一次性修正 12 个受影响用户的数据:
### 方式 1触发全量 DailySnapshot 同步(推荐)
修好 DailySnapshot 后,手动触发一次全量同步,自动从 contribution-service 拉取所有正确值。
### 方式 2手动 API 调用
对每个受影响用户,调用 contribution-service 获取 effectiveContribution更新到 mining_accounts。
### 方式 3直接 SQL 更新
从 contribution_accounts 表查出正确值,更新 mining_accounts需要用户明确授权
---
## 推荐实施顺序
1. **方案 A**(主因修复)— 改动最小、风险最低、逻辑自洽
2. **DailySnapshot 修复** — 恢复安全网
3. **数据修复** — 部署后触发全量同步
三步均可在同一次部署中完成。