fix: 修复权益分配竞态条件和统计数据bug

1. 事件流重构:将 planting.order.paid 事件从 planting-service 移至 referral-service 发送
   - 确保统计数据更新后再触发奖励计算,避免竞态条件
   - planting-service 只发送 planting.planting.created 事件(包含订单信息)
   - referral-service 处理完统计更新后转发 planting.order.paid 给 reward-service

2. 修复 addPersonalPlanting 方法:
   - 原代码错误地更新 _totalTeamCount(团队人数)而非 _teamPlantingCount(团队认种数)
   - 导致 subordinateTeamPlantingCount 计算错误,权益无法正确分配

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-12 05:35:31 -08:00
parent 3eeef11197
commit 8148d1d127
3 changed files with 84 additions and 30 deletions

View File

@ -477,9 +477,12 @@ export class PlantingApplicationService {
* Outbox
*
* :
* - reward-service: 分配奖励 (planting.order.paid)
* - referral-service: 更新团队统计 (planting.planting.created)
* referral-service planting.order.paid reward-service
* - authorization-service: 更新KPI考核 (planting-events)
*
* planting.order.paid referral-service
*
*/
private buildOutboxEvents(
order: PlantingOrder,
@ -489,27 +492,8 @@ export class PlantingApplicationService {
this.logger.debug(`[OUTBOX] Building outbox events for order ${order.orderNo}`);
// 1. planting.order.paid 事件 (reward-service 消费)
events.push({
eventType: 'planting.order.paid',
topic: 'planting.order.paid',
key: order.userId.toString(),
payload: {
eventName: 'planting.order.paid',
data: {
orderId: order.orderNo,
userId: order.userId.toString(),
treeCount: order.treeCount.value,
provinceCode: selection.provinceCode,
cityCode: selection.cityCode,
paidAt: order.paidAt!.toISOString(),
},
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
// 2. planting.planting.created 事件 (referral-service 消费)
// 1. planting.planting.created 事件 (referral-service 消费)
// referral-service 处理完统计更新后,会发送 planting.order.paid 事件给 reward-service
events.push({
eventType: 'planting.planting.created',
topic: 'planting.planting.created',
@ -521,13 +505,16 @@ export class PlantingApplicationService {
treeCount: order.treeCount.value,
provinceCode: selection.provinceCode,
cityCode: selection.cityCode,
// 新增:订单信息,供 referral-service 转发给 reward-service
orderId: order.orderNo,
paidAt: order.paidAt!.toISOString(),
},
},
aggregateId: order.orderNo,
aggregateType: 'PlantingOrder',
});
// 3. planting-events 事件 (authorization-service 消费)
// 2. planting-events 事件 (authorization-service 消费)
// 发布所有领域事件
for (const domainEvent of order.domainEvents) {
events.push({

View File

@ -11,6 +11,9 @@ interface PlantingCreatedEvent {
treeCount: number;
provinceCode: string;
cityCode: string;
// 新增:订单信息,用于转发给 reward-service
orderId: string;
paidAt: string;
};
_outbox?: {
id: string;
@ -22,6 +25,12 @@ interface PlantingCreatedEvent {
/**
*
* planting-service
*
*
* 1.
* 2. planting.order.paid reward-service
*
*
*/
@Injectable()
export class PlantingCreatedHandler implements OnModuleInit {
@ -71,6 +80,7 @@ export class PlantingCreatedHandler implements OnModuleInit {
}
try {
// 步骤1更新团队统计
const command = new UpdateTeamStatisticsCommand(
BigInt(event.data.userId),
event.data.treeCount,
@ -80,6 +90,14 @@ export class PlantingCreatedHandler implements OnModuleInit {
await this.teamStatisticsService.handlePlantingEvent(command);
this.logger.log(
`Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`,
);
// 步骤2发送 planting.order.paid 事件给 reward-service
// 统计更新完成后再触发奖励计算,确保数据一致性
await this.publishOrderPaidEvent(event);
// 记录已处理的事件
if (eventId !== 'unknown') {
await this.prisma.processedEvent.create({
@ -90,17 +108,13 @@ export class PlantingCreatedHandler implements OnModuleInit {
});
}
this.logger.log(
`Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`,
);
// B方案发送处理成功确认
if (outboxInfo) {
await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType);
}
} catch (error) {
this.logger.error(
`Failed to update team statistics for user ${event.data.userId}:`,
`Failed to process planting event for user ${event.data.userId}:`,
error,
);
@ -111,4 +125,57 @@ export class PlantingCreatedHandler implements OnModuleInit {
}
}
}
/**
* planting.order.paid reward-service
*
*/
private async publishOrderPaidEvent(event: PlantingCreatedEvent): Promise<void> {
const maxRetries = 3;
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.kafkaService.publish({
topic: 'planting.order.paid',
key: event.data.userId,
value: {
eventName: 'planting.order.paid',
data: {
orderId: event.data.orderId,
userId: event.data.userId,
treeCount: event.data.treeCount,
provinceCode: event.data.provinceCode,
cityCode: event.data.cityCode,
paidAt: event.data.paidAt,
},
},
});
this.logger.log(
`Published planting.order.paid event for order ${event.data.orderId}, user ${event.data.userId}`,
);
return; // 成功后退出
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
this.logger.warn(
`Failed to publish planting.order.paid event (attempt ${attempt}/${maxRetries}): ${lastError.message}`,
);
if (attempt < maxRetries) {
// 等待后重试,使用指数退避
await this.sleep(1000 * attempt);
}
}
}
// 所有重试都失败了,抛出错误
throw new Error(
`Failed to publish planting.order.paid event after ${maxRetries} attempts: ${lastError?.message}`,
);
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

View File

@ -194,7 +194,7 @@ export class TeamStatistics {
*/
addPersonalPlanting(count: number, provinceCode: string, cityCode: string): void {
this._personalPlantingCount += count;
this._totalTeamCount += count;
this._teamPlantingCount += count;
this._provinceCityDistribution = this._provinceCityDistribution.add(
provinceCode,
cityCode,
@ -207,7 +207,7 @@ export class TeamStatistics {
this._domainEvents.push(
new TeamStatisticsUpdatedEvent(
this._userId.value,
this._totalTeamCount,
this._teamPlantingCount,
this._directReferralCount,
this._leaderboardScore.score,
'planting_added',