fix(contribution-service): 修复adoption handler事务嵌套问题

将upsertSyncedAdoption和calculateForAdoption分离为两个独立操作,
避免嵌套事务导致内层事务看不到外层事务尚未提交的数据

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-11 09:52:00 -08:00
parent 5006a5a170
commit a40e314c94
1 changed files with 35 additions and 30 deletions

View File

@ -3,7 +3,6 @@ import Decimal from 'decimal.js';
import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service';
import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository';
import { ContributionCalculationService } from '../services/contribution-calculation.service';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
/**
* CDC
@ -17,7 +16,6 @@ export class AdoptionSyncedHandler {
constructor(
private readonly syncedDataRepository: SyncedDataRepository,
private readonly contributionCalculationService: ContributionCalculationService,
private readonly unitOfWork: UnitOfWork,
) {}
async handle(event: CDCEvent): Promise<void> {
@ -53,22 +51,25 @@ export class AdoptionSyncedHandler {
const treeCount = data.tree_count || data.treeCount;
const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt;
await this.unitOfWork.executeInTransaction(async () => {
// 保存同步的认种订单数据
const adoption = await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: BigInt(orderId),
accountSequence: accountSequence,
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? null,
contributionPerTree: new Decimal('1'), // 每棵树1算力
sourceSequenceNum: sequenceNum,
});
// 触发算力计算
await this.contributionCalculationService.calculateForAdoption(adoption.originalAdoptionId);
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: BigInt(orderId),
accountSequence: accountSequence,
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? null,
contributionPerTree: new Decimal('1'), // 每棵树1算力
sourceSequenceNum: sequenceNum,
});
// 第二步:触发算力计算(在单独的事务中执行)
try {
await this.contributionCalculationService.calculateForAdoption(BigInt(orderId));
} catch (error) {
// 算力计算失败不影响数据同步,后续可通过批量任务重试
this.logger.error(`Failed to calculate contribution for order ${orderId}`, error);
}
this.logger.log(
`Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`,
);
@ -99,22 +100,26 @@ export class AdoptionSyncedHandler {
const treeCount = after.tree_count || after.treeCount;
const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt;
await this.unitOfWork.executeInTransaction(async () => {
const adoption = await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: originalAdoptionId,
accountSequence: accountSequence,
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? null,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
});
if (!existingAdoption?.contributionDistributed) {
await this.contributionCalculationService.calculateForAdoption(adoption.originalAdoptionId);
}
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: originalAdoptionId,
accountSequence: accountSequence,
treeCount: treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? null,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
});
// 第二步:触发算力计算(在单独的事务中执行)
if (!existingAdoption?.contributionDistributed) {
try {
await this.contributionCalculationService.calculateForAdoption(originalAdoptionId);
} catch (error) {
this.logger.error(`Failed to calculate contribution for order ${orderId}`, error);
}
}
this.logger.debug(`Planting order updated: ${originalAdoptionId}`);
}