From 2b2e1efc7ae04af02d730c31e2d73d6172d56385 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 3 Mar 2026 07:24:52 -0800 Subject: [PATCH] =?UTF-8?q?fix(mining):=20=E4=BF=AE=E5=A4=8D=E6=8C=96?= =?UTF-8?q?=E7=9F=BF=E5=88=86=E9=85=8D=E5=B9=B6=E5=8F=91=E8=A6=86=E7=9B=96?= =?UTF-8?q?=E8=B4=A1=E7=8C=AE=E5=80=BC=E5=90=8C=E6=AD=A5=E7=9A=84=20Lost?= =?UTF-8?q?=20Update=20=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 挖矿分配每秒运行的 save() 无条件写回所有字段(含 totalContribution), 导致贡献值同步刚更新的正确值被立即覆盖回旧值。 同时修复 DailySnapshot 全量同步一直 synced 0 accounts 的安全网失效问题。 - repository save() 增加 skipContributionUpdate 选项 - 挖矿分配路径传入 skipContributionUpdate: true - contribution-service DailySnapshot 事件 payload 补全字段 - mining-service 适配字段名差异并修复 API 解析 bug Co-Authored-By: Claude Opus 4.6 --- .../application/services/snapshot.service.ts | 3 + .../contribution-event.handler.ts | 6 +- .../services/contribution-sync.service.ts | 9 +- .../services/mining-distribution.service.ts | 4 +- .../repositories/mining-account.repository.ts | 27 +- .../mining-contribution-sync-lost-update.md | 110 ++++++++ .../mining-contribution-sync-solutions.md | 256 ++++++++++++++++++ 7 files changed, 398 insertions(+), 17 deletions(-) create mode 100644 docs/issues/mining-contribution-sync-lost-update.md create mode 100644 docs/issues/mining-contribution-sync-solutions.md diff --git a/backend/services/contribution-service/src/application/services/snapshot.service.ts b/backend/services/contribution-service/src/application/services/snapshot.service.ts index 3a09d2ff..c7f4d1dd 100644 --- a/backend/services/contribution-service/src/application/services/snapshot.service.ts +++ b/backend/services/contribution-service/src/application/services/snapshot.service.ts @@ -101,9 +101,12 @@ export class SnapshotService { aggregateId: dateStr, eventType: 'DailySnapshotCreated', payload: { + snapshotId: dateStr, snapshotDate: dateStr, + totalContribution: networkTotalContribution.toString(), networkTotalContribution: networkTotalContribution.toString(), activeAccounts: activeAccounts.length, + accountCount: activeAccounts.length, totalAccounts, createdAt: new Date().toISOString(), }, diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index 71ea8ebc..4a3821f7 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -82,10 +82,10 @@ export class ContributionEventHandler implements OnModuleInit { this.logger.log(`Received DailySnapshotCreated for ${eventPayload.snapshotDate}`); await this.syncService.handleDailySnapshotCreated({ - snapshotId: eventPayload.snapshotId, + snapshotId: eventPayload.snapshotId || eventPayload.snapshotDate || 'unknown', snapshotDate: eventPayload.snapshotDate, - totalContribution: eventPayload.totalContribution, - activeAccounts: eventPayload.activeAccounts, + totalContribution: eventPayload.totalContribution || eventPayload.networkTotalContribution || '0', + activeAccounts: eventPayload.activeAccounts || eventPayload.accountCount || 0, }); } else if (eventType === 'SystemAccountSynced') { this.logger.log( diff --git a/backend/services/mining-service/src/application/services/contribution-sync.service.ts b/backend/services/mining-service/src/application/services/contribution-sync.service.ts index 48120ac8..e101c090 100644 --- a/backend/services/mining-service/src/application/services/contribution-sync.service.ts +++ b/backend/services/mining-service/src/application/services/contribution-sync.service.ts @@ -7,7 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service'; interface ContributionData { accountSequence: string; - totalContribution: string; + contribution: string; } /** @@ -94,11 +94,11 @@ export class ContributionSyncService { pageSize, ); - if (!response || response.data.length === 0) break; + if (!response || !response.data || response.data.length === 0) break; // 批量更新 for (const item of response.data) { - await this.syncAccountContribution(item.accountSequence, item.totalContribution); + await this.syncAccountContribution(item.accountSequence, item.contribution); syncedCount++; } @@ -133,7 +133,8 @@ export class ContributionSyncService { } const result = await response.json(); - return result.data; + // API 直接返回 { data: [...], total, totalContribution } + return result; } catch (error) { this.logger.error('Failed to fetch contribution ratios', error); return null; diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index be197f12..8151353b 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -132,7 +132,7 @@ export class MiningDistributionService { if (!reward.isZero()) { // 每秒更新账户余额 account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); - await this.miningAccountRepository.save(account, tx); + await this.miningAccountRepository.save(account, tx, { skipContributionUpdate: true }); // 收集Redis累积数据(事务外执行) redisAccumulateData.push({ @@ -288,7 +288,7 @@ export class MiningDistributionService { if (!reward.isZero()) { account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); - await this.miningAccountRepository.save(account); + await this.miningAccountRepository.save(account, undefined, { skipContributionUpdate: true }); await this.accumulateMinuteData( account.accountSequence, diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-account.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-account.repository.ts index d7481de7..1a87db52 100644 --- a/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-account.repository.ts +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-account.repository.ts @@ -37,12 +37,29 @@ export class MiningAccountRepository { * 保存账户(带外部事务支持) * @param aggregate 账户聚合 * @param tx 可选的外部事务客户端,如果不传则自动创建事务 + * @param options.skipContributionUpdate 为 true 时不写 totalContribution/lastSyncedAt, + * 用于挖矿分配路径避免覆盖贡献值同步的更新(解决并发 Lost Update 问题) */ - async save(aggregate: MiningAccountAggregate, tx?: TransactionClient): Promise { + async save( + aggregate: MiningAccountAggregate, + tx?: TransactionClient, + options?: { skipContributionUpdate?: boolean }, + ): Promise { const snapshot = aggregate.toSnapshot(); const transactions = aggregate.pendingTransactions; const executeInTx = async (client: TransactionClient) => { + // 构建 update 数据:挖矿分配路径不写 totalContribution/lastSyncedAt + const updateData: Record = { + totalMined: snapshot.totalMined.value, + availableBalance: snapshot.availableBalance.value, + frozenBalance: snapshot.frozenBalance.value, + }; + if (!options?.skipContributionUpdate) { + updateData.totalContribution = snapshot.totalContribution.value; + updateData.lastSyncedAt = snapshot.lastSyncedAt; + } + // 保存账户 await client.miningAccount.upsert({ where: { accountSequence: snapshot.accountSequence }, @@ -54,13 +71,7 @@ export class MiningAccountRepository { totalContribution: snapshot.totalContribution.value, lastSyncedAt: snapshot.lastSyncedAt, }, - update: { - totalMined: snapshot.totalMined.value, - availableBalance: snapshot.availableBalance.value, - frozenBalance: snapshot.frozenBalance.value, - totalContribution: snapshot.totalContribution.value, - lastSyncedAt: snapshot.lastSyncedAt, - }, + update: updateData, }); // 保存交易流水 diff --git a/docs/issues/mining-contribution-sync-lost-update.md b/docs/issues/mining-contribution-sync-lost-update.md new file mode 100644 index 00000000..ce15ab8f --- /dev/null +++ b/docs/issues/mining-contribution-sync-lost-update.md @@ -0,0 +1,110 @@ +# BUG: 挖矿分配并发覆盖贡献值同步 (Lost Update) + +**状态**: 待解决 +**严重程度**: 高 - 影响用户挖矿收益 +**发现日期**: 2026-03-03 +**影响范围**: 所有预种用户 (D2602260xxxx),12个账户数据异常 + +--- + +## 1. 问题描述 + +用户购买多份预种后,contribution-service 正确记录了累计贡献值,但 mining-service 中只显示第一份的贡献值,导致挖矿收益严重偏低。 + +**示例:** +- D26022600018 购买 4 份预种 → contribution 记录 13,117.860 → mining 只显示 3,279.465 +- D26022600001 购买 5 份合成 1 棵树 → contribution 记录 16,397.325 → mining 只显示 3,279.465 + +--- + +## 2. 根因分析 + +### 主因:MiningDistributionService 的 save() 覆盖 totalContribution + +mining-service 存在两个并发写入路径共享同一个 `save()` 方法: + +**路径 A - 贡献值同步**(Kafka 事件触发,不定时): +``` +ContributionEventHandler → ContributionSyncService.handleContributionCalculated() + → account.updateContribution(newValue) // 更新 totalContribution + → miningAccountRepository.save(account) // upsert 所有字段 +``` + +**路径 B - 挖矿分配**(每秒执行): +``` +MiningDistributionService.executeSecondDistribution() + → findAllWithContribution() // 加载账户到内存(含 totalContribution) + → account.mine(reward) // 只改 totalMined/availableBalance + → miningAccountRepository.save(account) // upsert 所有字段(含 stale totalContribution) +``` + +**竞争时序:** +``` +T0 挖矿分配加载 D26022600018, totalContribution=3279.465 +T1 贡献值同步更新 D26022600018 → totalContribution=13117.860 ✓ (DB 已更新) +T2 挖矿分配保存 D26022600018, 写回 totalContribution=3279.465 ← 覆盖! +``` + +**日志证据:** +``` +03/01 02:50:30 - Updated contribution for D26022600001: 16397.325 ← 同步成功 +03/03 当前 DB - D26022600001 totalContribution = 3279.465 ← 被覆盖回旧值 +``` + +挖矿分配每秒运行,贡献值同步后几乎必然在下一秒被覆盖。 + +**代码位置:** +- `mining-account.repository.ts:57-62` — save() 的 upsert 无条件写入所有字段 +- `mining-distribution.service.ts:122-135` — 每秒加载→mine()→save() +- `mining-account.aggregate.ts:126-141` — mine() 只改余额但 save() 会写回所有字段 + +### 次因:DailySnapshot 全量同步完全失效 + +本应作为安全网的每日全量同步,自上线以来一直 synced 0 accounts: + +``` +03/01 01:00 - DailySnapshotCreated for 2026-02-27 → synced 0 accounts +03/02 01:00 - DailySnapshotCreated for 2026-02-28 → synced 0 accounts +03/03 01:00 - DailySnapshotCreated for 2026-03-01 → synced 0 accounts +``` + +**原因:** +1. contribution-service 发布的事件 payload 缺少 `snapshotId` 字段 → mining-service 读到 undefined +2. 字段名不匹配:contribution 发 `networkTotalContribution`,mining 读 `totalContribution` +3. `fetchContributionRatios()` 返回数据解析失败(`result.data` 路径不对) + +**代码位置:** +- `contribution-service/snapshot.service.ts:99-110` — payload 缺少 snapshotId +- `mining-service/contribution-sync.service.ts:63-68` — 期望 snapshotId +- `mining-service/contribution-sync.service.ts:126-141` — API 调用和解析 + +--- + +## 3. 受影响用户 + +| 用户 | contribution 正确值 | mining 当前值 | 差额 | +|------|-------------------|-------------|------| +| D26022600001 | 16,397.325 | 3,279.465 | -13,117.860 | +| D26022600014 | 13,253.562 | 3,279.465 | -9,974.097 | +| D26022600015 | 6,649.398 | 3,279.465 | -3,369.933 | +| D26022600016 | 13,117.860 | 3,279.465 | -9,838.395 | +| D26022600018 | 13,117.860 | 3,279.465 | -9,838.395 | +| D26022600033 | 3,754.422 | 3,279.465 | -474.957 | +| D26022600034 | 6,988.653 | 3,279.465 | -3,709.188 | +| D26022600035 | 3,799.656 | 3,392.550 | -407.106 | +| D26022600036 | 17,211.537 | 3,279.465 | -13,932.072 | +| D26022600037 | 16,397.325 | 3,279.465 | -13,117.860 | +| D26022600038 | 19,676.790 | 3,279.465 | -16,397.325 | +| D26022600043 | 6,558.930 | 3,279.465 | -3,279.465 | + +D26022600000 是唯一正常的预种用户。旧用户 (D2512xxxx) 不受影响。 + +--- + +## 4. 为什么旧用户不受影响 + +旧用户在系统稳定后完成了一次性同步,之后没有新的贡献值变更事件。 +挖矿分配虽然每秒覆写 totalContribution,但写回的值恰好等于正确值(没有新同步来改变它)。 + +预种用户是分批多次产生贡献值的(先 1 份 → 2 份 → ... → N 份), +所以挖矿分配总是把后续累加的正确值覆盖回最初同步的旧值。 diff --git a/docs/issues/mining-contribution-sync-solutions.md b/docs/issues/mining-contribution-sync-solutions.md new file mode 100644 index 00000000..5dca364a --- /dev/null +++ b/docs/issues/mining-contribution-sync-solutions.md @@ -0,0 +1,256 @@ +# 修复方案评估:挖矿分配并发覆盖贡献值 + +## 概览 + +需要解决三个问题: +1. **主因修复** — 消除挖矿分配对 totalContribution 的并发覆盖 +2. **安全网修复** — 恢复 DailySnapshot 全量同步功能 +3. **数据修复** — 将 12 个受影响用户的 mining 数据校正 + +--- + +## 方案对比 + +| 方案 | 改动范围 | 风险 | 可靠性 | 实施难度 | +|------|---------|------|-------|---------| +| A. 挖矿分配只更新挖矿字段 | 1个文件 | **低** | **高** | **低** | +| B. Repository 分方法 | 2个文件 | **低** | **高** | 低 | +| C. 数据库层面原子更新 | 1个文件 | 中 | 高 | 中 | +| D. 乐观锁 version 字段 | 4个文件+迁移 | 中高 | 高 | 高 | +| E. 贡献值独立表 | 多文件+迁移 | **高** | 高 | **高** | + +--- + +## 方案 A:挖矿分配路径只更新挖矿字段(推荐) + +**思路**:挖矿分配调用 save() 时,不写 totalContribution 和 lastSyncedAt。 + +**改动**:只改 `mining-account.repository.ts`,给 save() 加一个选项参数。 + +```typescript +// mining-account.repository.ts +async save( + aggregate: MiningAccountAggregate, + tx?: TransactionClient, + options?: { skipContributionUpdate?: boolean }, +): Promise { + const snapshot = aggregate.toSnapshot(); + + const updateData: any = { + totalMined: snapshot.totalMined.value, + availableBalance: snapshot.availableBalance.value, + frozenBalance: snapshot.frozenBalance.value, + }; + + // 仅在贡献值同步路径才写入这两个字段 + if (!options?.skipContributionUpdate) { + updateData.totalContribution = snapshot.totalContribution.value; + updateData.lastSyncedAt = snapshot.lastSyncedAt; + } + + await client.miningAccount.upsert({ + where: { accountSequence: snapshot.accountSequence }, + create: { /* 所有字段 - 新账户需要全部 */ }, + update: updateData, + }); +} +``` + +**调用方改动**:只需在 mining-distribution.service.ts 的 save 调用加一个参数: +```typescript +await this.miningAccountRepository.save(account, tx, { skipContributionUpdate: true }); +``` + +**优点**: +- 改动极小(1个文件加参数,1个调用方加选项) +- 向后兼容 — 不传 options 的调用方行为不变 +- 完全消除竞争条件 — 挖矿分配路径不再触碰 totalContribution +- 不需要数据库迁移 + +**缺点**: +- 语义上 save() 变得不纯粹(不总是保存所有字段) +- 新增调用方需要知道传不传 options + +**风险:低** +- 逻辑简单直观,不引入新概念 +- mine() 本身就不修改 totalContribution,save() 不写它是逻辑自洽的 +- 可通过单元测试充分验证 + +--- + +## 方案 B:拆分 Repository 方法 + +**思路**:创建专用的 `saveMiningResult()` 方法,只更新挖矿相关字段。 + +```typescript +// mining-account.repository.ts 新增方法 +async saveMiningResult(aggregate: MiningAccountAggregate, tx?: TransactionClient): Promise { + const snapshot = aggregate.toSnapshot(); + const transactions = aggregate.pendingTransactions; + + const executeInTx = async (client: TransactionClient) => { + await client.miningAccount.update({ + where: { accountSequence: snapshot.accountSequence }, + data: { + totalMined: snapshot.totalMined.value, + availableBalance: snapshot.availableBalance.value, + frozenBalance: snapshot.frozenBalance.value, + // 不写 totalContribution 和 lastSyncedAt + }, + }); + + if (transactions.length > 0) { + await client.miningTransaction.createMany({ /* 同原逻辑 */ }); + } + }; + // ... +} +``` + +**优点**: +- 方法职责清晰 — save() 保存全量,saveMiningResult() 只保存挖矿结果 +- 不修改现有 save() 签名 + +**缺点**: +- 新增一个方法,代码量稍多 +- 需要在 mining-distribution.service.ts 改调用方 +- 用 update 而非 upsert,如果账户不存在会报错(但实际场景不会,因为有贡献值才会参与挖矿) + +**风险:低** +- 与方案 A 本质相同,只是组织方式不同 + +--- + +## 方案 C:数据库层面原子 increment + +**思路**:挖矿分配不加载整个 aggregate,直接用 Prisma 的 increment 操作。 + +```typescript +// mining-distribution.service.ts 中 +await tx.miningAccount.update({ + where: { accountSequence: account.accountSequence }, + data: { + totalMined: { increment: reward.value }, + availableBalance: { increment: reward.value }, + // 不写 totalContribution + }, +}); +``` + +**优点**: +- 完全无竞争 — 数据库层面原子操作 +- 不需要加载 aggregate + +**缺点**: +- 绕过了 DDD aggregate 模式 — mine() 方法和 pendingTransactions 不再被使用 +- 需要单独处理 MiningTransaction 的写入(不能从 aggregate 获取 balanceBefore/After) +- 改动较大,需要重构挖矿分配的核心逻辑 +- 丢失了 aggregate 内的业务校验 + +**风险:中** +- 需要重构核心挖矿循环 +- 需要额外逻辑计算 balanceBefore/After 用于交易流水 +- 不再经过 aggregate 校验 + +--- + +## 方案 D:乐观锁 (version 字段) + +**思路**:给 mining_accounts 加 version 字段,save 时检查版本号。 + +```prisma +model MiningAccount { + // ... existing fields + version Int @default(0) +} +``` + +```typescript +await client.miningAccount.update({ + where: { accountSequence: snapshot.accountSequence, version: snapshot.version }, + data: { ...fields, version: { increment: 1 } }, +}); +// 如果 version 不匹配,抛出异常,调用方重试 +``` + +**优点**: +- 标准并发控制模式 +- 适用于所有写入路径 + +**缺点**: +- 需要数据库迁移 +- 需要在所有写入路径添加重试逻辑 +- 挖矿分配每秒运行,重试频率可能很高 +- 增加系统复杂度 + +**风险:中高** +- 数据库迁移需要在生产环境执行 +- 重试逻辑引入额外复杂度 +- 高频写入场景下乐观锁冲突率高,可能影响性能 + +--- + +## 方案 E:贡献值独立表 + +**思路**:将 totalContribution 从 mining_accounts 移出,或在查询时 JOIN contribution 表。 + +**不推荐** — 改动过大,涉及 schema 迁移、多处查询改动、跨服务依赖增加。 + +--- + +## DailySnapshot 安全网修复(独立于主因修复) + +无论选择哪个主因方案,都需要修复 DailySnapshot 同步: + +### 改动点 1:contribution-service 事件 payload 补全 +```typescript +// snapshot.service.ts +payload: { + snapshotDate: dateStr, + snapshotId: snapshot.id?.toString(), // 补充 + totalContribution: networkTotalContribution.toString(), // 字段名对齐 + networkTotalContribution: networkTotalContribution.toString(), + activeAccounts: activeAccounts.length, + totalAccounts, + createdAt: new Date().toISOString(), +}, +``` + +### 改动点 2:mining-service 解析适配 +```typescript +// contribution-sync.service.ts handleDailySnapshotCreated +const data = { + snapshotId: data.snapshotId || 'unknown', + snapshotDate: data.snapshotDate, + totalContribution: data.totalContribution || data.networkTotalContribution || '0', + activeAccounts: data.activeAccounts || data.accountCount || 0, +}; +``` + +### 改动点 3:验证 API 端点 +确认 `GET /api/v1/snapshots/{date}/ratios` 的返回格式与 `fetchContributionRatios()` 的解析匹配。 + +--- + +## 数据修复方案 + +主因修复部署后,需要一次性修正 12 个受影响用户的数据: + +### 方式 1:触发全量 DailySnapshot 同步(推荐) +修好 DailySnapshot 后,手动触发一次全量同步,自动从 contribution-service 拉取所有正确值。 + +### 方式 2:手动 API 调用 +对每个受影响用户,调用 contribution-service 获取 effectiveContribution,更新到 mining_accounts。 + +### 方式 3:直接 SQL 更新 +从 contribution_accounts 表查出正确值,更新 mining_accounts(需要用户明确授权)。 + +--- + +## 推荐实施顺序 + +1. **方案 A**(主因修复)— 改动最小、风险最低、逻辑自洽 +2. **DailySnapshot 修复** — 恢复安全网 +3. **数据修复** — 部署后触发全量同步 + +三步均可在同一次部署中完成。