From a40e314c94f96384de5ef7e96e2476db21fe7587 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 11 Jan 2026 09:52:00 -0800 Subject: [PATCH] =?UTF-8?q?fix(contribution-service):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?adoption=20handler=E4=BA=8B=E5=8A=A1=E5=B5=8C=E5=A5=97=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将upsertSyncedAdoption和calculateForAdoption分离为两个独立操作, 避免嵌套事务导致内层事务看不到外层事务尚未提交的数据 Co-Authored-By: Claude Opus 4.5 --- .../event-handlers/adoption-synced.handler.ts | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index a25d7f0d..6d797061 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -3,7 +3,6 @@ import Decimal from 'decimal.js'; import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service'; import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; -import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; /** * 认种订单 CDC 事件处理器 @@ -17,7 +16,6 @@ export class AdoptionSyncedHandler { constructor( private readonly syncedDataRepository: SyncedDataRepository, private readonly contributionCalculationService: ContributionCalculationService, - private readonly unitOfWork: UnitOfWork, ) {} async handle(event: CDCEvent): Promise { @@ -53,22 +51,25 @@ export class AdoptionSyncedHandler { const treeCount = data.tree_count || data.treeCount; const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt; - await this.unitOfWork.executeInTransaction(async () => { - // 保存同步的认种订单数据 - const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ - originalAdoptionId: BigInt(orderId), - accountSequence: accountSequence, - treeCount: treeCount, - adoptionDate: new Date(createdAt), - status: data.status ?? null, - contributionPerTree: new Decimal('1'), // 每棵树1算力 - sourceSequenceNum: sequenceNum, - }); - - // 触发算力计算 - await this.contributionCalculationService.calculateForAdoption(adoption.originalAdoptionId); + // 第一步:保存同步的认种订单数据 + await this.syncedDataRepository.upsertSyncedAdoption({ + originalAdoptionId: BigInt(orderId), + accountSequence: accountSequence, + treeCount: treeCount, + adoptionDate: new Date(createdAt), + status: data.status ?? null, + contributionPerTree: new Decimal('1'), // 每棵树1算力 + sourceSequenceNum: sequenceNum, }); + // 第二步:触发算力计算(在单独的事务中执行) + try { + await this.contributionCalculationService.calculateForAdoption(BigInt(orderId)); + } catch (error) { + // 算力计算失败不影响数据同步,后续可通过批量任务重试 + this.logger.error(`Failed to calculate contribution for order ${orderId}`, error); + } + this.logger.log( `Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`, ); @@ -99,22 +100,26 @@ export class AdoptionSyncedHandler { const treeCount = after.tree_count || after.treeCount; const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt; - await this.unitOfWork.executeInTransaction(async () => { - const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ - originalAdoptionId: originalAdoptionId, - accountSequence: accountSequence, - treeCount: treeCount, - adoptionDate: new Date(createdAt), - status: after.status ?? null, - contributionPerTree: new Decimal('1'), - sourceSequenceNum: sequenceNum, - }); - - if (!existingAdoption?.contributionDistributed) { - await this.contributionCalculationService.calculateForAdoption(adoption.originalAdoptionId); - } + // 第一步:保存同步的认种订单数据 + await this.syncedDataRepository.upsertSyncedAdoption({ + originalAdoptionId: originalAdoptionId, + accountSequence: accountSequence, + treeCount: treeCount, + adoptionDate: new Date(createdAt), + status: after.status ?? null, + contributionPerTree: new Decimal('1'), + sourceSequenceNum: sequenceNum, }); + // 第二步:触发算力计算(在单独的事务中执行) + if (!existingAdoption?.contributionDistributed) { + try { + await this.contributionCalculationService.calculateForAdoption(originalAdoptionId); + } catch (error) { + this.logger.error(`Failed to calculate contribution for order ${orderId}`, error); + } + } + this.logger.debug(`Planting order updated: ${originalAdoptionId}`); }