diff --git a/backend/services/planting-service/src/application/services/planting-application.service.ts b/backend/services/planting-service/src/application/services/planting-application.service.ts index 745d272d..b573c3aa 100644 --- a/backend/services/planting-service/src/application/services/planting-application.service.ts +++ b/backend/services/planting-service/src/application/services/planting-application.service.ts @@ -477,9 +477,12 @@ export class PlantingApplicationService { * 构建 Outbox 事件数据 * * 为以下服务创建事件: - * - reward-service: 分配奖励 (planting.order.paid) * - referral-service: 更新团队统计 (planting.planting.created) + * 统计更新后,referral-service 会发送 planting.order.paid 事件给 reward-service * - authorization-service: 更新KPI考核 (planting-events) + * + * 注意:planting.order.paid 事件已移至 referral-service 发送, + * 以确保统计数据更新后再计算奖励分配,避免竞态条件。 */ private buildOutboxEvents( order: PlantingOrder, @@ -489,27 +492,8 @@ export class PlantingApplicationService { this.logger.debug(`[OUTBOX] Building outbox events for order ${order.orderNo}`); - // 1. planting.order.paid 事件 (reward-service 消费) - events.push({ - eventType: 'planting.order.paid', - topic: 'planting.order.paid', - key: order.userId.toString(), - payload: { - eventName: 'planting.order.paid', - data: { - orderId: order.orderNo, - userId: order.userId.toString(), - treeCount: order.treeCount.value, - provinceCode: selection.provinceCode, - cityCode: selection.cityCode, - paidAt: order.paidAt!.toISOString(), - }, - }, - aggregateId: order.orderNo, - aggregateType: 'PlantingOrder', - }); - - // 2. planting.planting.created 事件 (referral-service 消费) + // 1. planting.planting.created 事件 (referral-service 消费) + // referral-service 处理完统计更新后,会发送 planting.order.paid 事件给 reward-service events.push({ eventType: 'planting.planting.created', topic: 'planting.planting.created', @@ -521,13 +505,16 @@ export class PlantingApplicationService { treeCount: order.treeCount.value, provinceCode: selection.provinceCode, cityCode: selection.cityCode, + // 新增:订单信息,供 referral-service 转发给 reward-service + orderId: order.orderNo, + paidAt: order.paidAt!.toISOString(), }, }, aggregateId: order.orderNo, aggregateType: 'PlantingOrder', }); - // 3. planting-events 事件 (authorization-service 消费) + // 2. planting-events 事件 (authorization-service 消费) // 发布所有领域事件 for (const domainEvent of order.domainEvents) { events.push({ diff --git a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts index aab33c8e..2cb8aaf4 100644 --- a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts +++ b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts @@ -11,6 +11,9 @@ interface PlantingCreatedEvent { treeCount: number; provinceCode: string; cityCode: string; + // 新增:订单信息,用于转发给 reward-service + orderId: string; + paidAt: string; }; _outbox?: { id: string; @@ -22,6 +25,12 @@ interface PlantingCreatedEvent { /** * 认种创建事件处理器 * 监听 planting-service 发出的认种事件 + * + * 处理流程: + * 1. 更新团队统计数据 + * 2. 发送 planting.order.paid 事件给 reward-service + * + * 这样确保统计数据已更新后再计算奖励分配,避免竞态条件。 */ @Injectable() export class PlantingCreatedHandler implements OnModuleInit { @@ -71,6 +80,7 @@ export class PlantingCreatedHandler implements OnModuleInit { } try { + // 步骤1:更新团队统计 const command = new UpdateTeamStatisticsCommand( BigInt(event.data.userId), event.data.treeCount, @@ -80,6 +90,14 @@ export class PlantingCreatedHandler implements OnModuleInit { await this.teamStatisticsService.handlePlantingEvent(command); + this.logger.log( + `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, + ); + + // 步骤2:发送 planting.order.paid 事件给 reward-service + // 统计更新完成后再触发奖励计算,确保数据一致性 + await this.publishOrderPaidEvent(event); + // 记录已处理的事件 if (eventId !== 'unknown') { await this.prisma.processedEvent.create({ @@ -90,17 +108,13 @@ export class PlantingCreatedHandler implements OnModuleInit { }); } - this.logger.log( - `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, - ); - // B方案:发送处理成功确认 if (outboxInfo) { await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); } } catch (error) { this.logger.error( - `Failed to update team statistics for user ${event.data.userId}:`, + `Failed to process planting event for user ${event.data.userId}:`, error, ); @@ -111,4 +125,57 @@ export class PlantingCreatedHandler implements OnModuleInit { } } } + + /** + * 发送 planting.order.paid 事件给 reward-service + * 这个事件会触发奖励计算和分配 + */ + private async publishOrderPaidEvent(event: PlantingCreatedEvent): Promise { + const maxRetries = 3; + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + await this.kafkaService.publish({ + topic: 'planting.order.paid', + key: event.data.userId, + value: { + eventName: 'planting.order.paid', + data: { + orderId: event.data.orderId, + userId: event.data.userId, + treeCount: event.data.treeCount, + provinceCode: event.data.provinceCode, + cityCode: event.data.cityCode, + paidAt: event.data.paidAt, + }, + }, + }); + + this.logger.log( + `Published planting.order.paid event for order ${event.data.orderId}, user ${event.data.userId}`, + ); + return; // 成功后退出 + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + this.logger.warn( + `Failed to publish planting.order.paid event (attempt ${attempt}/${maxRetries}): ${lastError.message}`, + ); + + if (attempt < maxRetries) { + // 等待后重试,使用指数退避 + await this.sleep(1000 * attempt); + } + } + } + + // 所有重试都失败了,抛出错误 + throw new Error( + `Failed to publish planting.order.paid event after ${maxRetries} attempts: ${lastError?.message}`, + ); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } } diff --git a/backend/services/referral-service/src/domain/aggregates/team-statistics/team-statistics.aggregate.ts b/backend/services/referral-service/src/domain/aggregates/team-statistics/team-statistics.aggregate.ts index 87c7da76..fba11b87 100644 --- a/backend/services/referral-service/src/domain/aggregates/team-statistics/team-statistics.aggregate.ts +++ b/backend/services/referral-service/src/domain/aggregates/team-statistics/team-statistics.aggregate.ts @@ -194,7 +194,7 @@ export class TeamStatistics { */ addPersonalPlanting(count: number, provinceCode: string, cityCode: string): void { this._personalPlantingCount += count; - this._totalTeamCount += count; + this._teamPlantingCount += count; this._provinceCityDistribution = this._provinceCityDistribution.add( provinceCode, cityCode, @@ -207,7 +207,7 @@ export class TeamStatistics { this._domainEvents.push( new TeamStatisticsUpdatedEvent( this._userId.value, - this._totalTeamCount, + this._teamPlantingCount, this._directReferralCount, this._leaderboardScore.score, 'planting_added',