From 65bd6a857fd6e9381689c9de84e07ffc80620cb5 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 8 Jan 2026 05:46:43 -0800 Subject: [PATCH] =?UTF-8?q?fix(reporting-service):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=B4=BB=E5=8A=A8=E8=AE=B0=E5=BD=95=E5=B9=82=E7=AD=89=E6=80=A7?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:Kafka 消息重试或重复消费时,同一事件会被记录多次活动, 导致"最近活动"显示重复条目,统计数据也会被重复累加。 修复: 1. 仓储层新增 exists() 和 createIfNotExists() 方法 2. 所有事件消费者改用幂等创建,仅首次创建时累加统计 3. 添加数据库唯一约束 uk_sa_entity_activity 作为最后防线 4. 迁移脚本会自动清理历史重复数据 影响的事件: - identity.UserAccountCreated - identity.UserAccountAutoCreated - authorization-events - planting.order.paid - reporting.report.generated 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 20 +++ .../reporting-service/prisma/schema.prisma | 2 + .../system-activity.repository.interface.ts | 18 +++ .../activity-event-consumer.controller.ts | 146 +++++++++++------- .../system-activity.repository.impl.ts | 31 ++++ 5 files changed, 158 insertions(+), 59 deletions(-) create mode 100644 backend/services/reporting-service/prisma/migrations/20260108100000_add_activity_unique_constraint/migration.sql diff --git a/backend/services/reporting-service/prisma/migrations/20260108100000_add_activity_unique_constraint/migration.sql b/backend/services/reporting-service/prisma/migrations/20260108100000_add_activity_unique_constraint/migration.sql new file mode 100644 index 00000000..a0eed36e --- /dev/null +++ b/backend/services/reporting-service/prisma/migrations/20260108100000_add_activity_unique_constraint/migration.sql @@ -0,0 +1,20 @@ +-- [2026-01-08] 添加活动记录唯一约束 +-- 防止同一实体的相同类型活动被重复记录(Kafka 消息重试场景) +-- 注意:需要先清理已有重复数据 + +-- 清理已有的重复数据(保留最早的记录) +DELETE FROM system_activities a +USING ( + SELECT activity_type, related_entity_type, related_entity_id, MIN(activity_id) as min_id + FROM system_activities + WHERE related_entity_type IS NOT NULL AND related_entity_id IS NOT NULL + GROUP BY activity_type, related_entity_type, related_entity_id + HAVING COUNT(*) > 1 +) b +WHERE a.activity_type = b.activity_type + AND a.related_entity_type = b.related_entity_type + AND a.related_entity_id = b.related_entity_id + AND a.activity_id != b.min_id; + +-- 创建唯一约束 +CREATE UNIQUE INDEX "uk_sa_entity_activity" ON "system_activities"("activity_type", "related_entity_type", "related_entity_id"); diff --git a/backend/services/reporting-service/prisma/schema.prisma b/backend/services/reporting-service/prisma/schema.prisma index 9100bd1f..159d3204 100644 --- a/backend/services/reporting-service/prisma/schema.prisma +++ b/backend/services/reporting-service/prisma/schema.prisma @@ -472,4 +472,6 @@ model SystemActivity { @@index([createdAt(sort: Desc)], name: "idx_sa_created") @@index([relatedUserId], name: "idx_sa_user") @@index([relatedEntityType, relatedEntityId], name: "idx_sa_entity") + // [2026-01-08] 新增:唯一约束防止同一实体的重复活动记录 + @@unique([activityType, relatedEntityType, relatedEntityId], name: "uk_sa_entity_activity") } diff --git a/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts b/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts index 6a4bac9d..0c434794 100644 --- a/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts +++ b/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts @@ -84,4 +84,22 @@ export interface ISystemActivityRepository { * 统计活动数量 */ countByType(type: ActivityType | string): Promise; + + /** + * 检查活动是否已存在(用于幂等性检查) + * @param activityType 活动类型 + * @param entityType 实体类型 + * @param entityId 实体ID + */ + exists( + activityType: ActivityType | string, + entityType: string, + entityId: string, + ): Promise; + + /** + * 幂等创建活动记录(如果不存在则创建) + * @returns 创建的活动记录,如果已存在则返回 null + */ + createIfNotExists(activity: SystemActivityData): Promise; } diff --git a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts index 606724fb..4cd475ee 100644 --- a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts +++ b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts @@ -113,6 +113,7 @@ export class ActivityEventConsumerController { /** * 监听用户账户创建事件 (identity-service) * Topic: identity.UserAccountCreated + * [2026-01-08] 更新:添加幂等性检查,防止重复记录 */ @MessagePattern('identity.UserAccountCreated') async handleUserAccountCreated(@Payload() message: UserAccountCreatedEvent) { @@ -121,8 +122,8 @@ export class ActivityEventConsumerController { try { const { payload } = message; - // 记录活动日志 - await this.activityRepo.create({ + // 幂等创建活动日志(如果已存在则跳过) + const created = await this.activityRepo.createIfNotExists({ activityType: 'user_register' as ActivityType, title: '新用户注册', description: `用户 ${this.maskPhone(payload.phoneNumber)} 完成注册`, @@ -139,14 +140,17 @@ export class ActivityEventConsumerController { }, }); - // 累加统计数据 - const today = new Date(); - await Promise.all([ - this.realtimeStatsRepo.incrementNewUser(today), - this.globalStatsRepo.incrementUser(), - ]); - - this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`); + if (created) { + // 仅在首次创建时累加统计数据 + const today = new Date(); + await Promise.all([ + this.realtimeStatsRepo.incrementNewUser(today), + this.globalStatsRepo.incrementUser(), + ]); + this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`); + } else { + this.logger.log(`Skipped duplicate user registration event: ${payload.userId}`); + } } catch (error) { this.logger.error(`Error recording user registration activity:`, error); } @@ -155,6 +159,7 @@ export class ActivityEventConsumerController { /** * 监听用户账户自动创建事件 (identity-service) * Topic: identity.UserAccountAutoCreated + * [2026-01-08] 更新:添加幂等性检查,防止重复记录 */ @MessagePattern('identity.UserAccountAutoCreated') async handleUserAccountAutoCreated(@Payload() message: UserAccountCreatedEvent) { @@ -163,8 +168,8 @@ export class ActivityEventConsumerController { try { const { payload } = message; - // 记录活动日志 - await this.activityRepo.create({ + // 幂等创建活动日志(如果已存在则跳过) + const created = await this.activityRepo.createIfNotExists({ activityType: 'user_register' as ActivityType, title: '新用户自动注册', description: `用户通过推荐链接完成注册`, @@ -182,14 +187,17 @@ export class ActivityEventConsumerController { }, }); - // 累加统计数据 - const today = new Date(); - await Promise.all([ - this.realtimeStatsRepo.incrementNewUser(today), - this.globalStatsRepo.incrementUser(), - ]); - - this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`); + if (created) { + // 仅在首次创建时累加统计数据 + const today = new Date(); + await Promise.all([ + this.realtimeStatsRepo.incrementNewUser(today), + this.globalStatsRepo.incrementUser(), + ]); + this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`); + } else { + this.logger.log(`Skipped duplicate auto user registration event: ${payload.userId}`); + } } catch (error) { this.logger.error(`Error recording auto user registration activity:`, error); } @@ -198,6 +206,7 @@ export class ActivityEventConsumerController { /** * 监听授权角色事件 (authorization-service) * Topic: authorization-events + * [2026-01-08] 更新:添加幂等性检查,防止重复记录 */ @MessagePattern('authorization-events') async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) { @@ -209,16 +218,17 @@ export class ActivityEventConsumerController { // 根据事件类型决定是否记录活动 if (eventType.includes('authorized') || eventType.includes('Authorized')) { const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); + const entityId = payload.authorizationId || message.aggregateId; - // 记录活动日志 - await this.activityRepo.create({ + // 幂等创建活动日志(如果已存在则跳过) + const created = await this.activityRepo.createIfNotExists({ activityType: 'company_authorization' as ActivityType, title: '授权成功', description: `${payload.regionName} ${roleTypeLabel} 完成授权`, icon: '🏢', relatedUserId: BigInt(payload.userId), relatedEntityType: 'authorization', - relatedEntityId: payload.authorizationId || message.aggregateId, + relatedEntityId: entityId, metadata: { roleType: payload.roleType, regionCode: payload.regionCode, @@ -228,30 +238,34 @@ export class ActivityEventConsumerController { }, }); - // 累加统计数据 - 区分省公司和市公司 - const today = new Date(); - const isProvinceCompany = - payload.roleType === 'PROVINCE_COMPANY' || - payload.roleType === 'AUTH_PROVINCE_COMPANY'; - const isCityCompany = - payload.roleType === 'CITY_COMPANY' || - payload.roleType === 'AUTH_CITY_COMPANY'; + if (created) { + // 仅在首次创建时累加统计数据 - 区分省公司和市公司 + const today = new Date(); + const isProvinceCompany = + payload.roleType === 'PROVINCE_COMPANY' || + payload.roleType === 'AUTH_PROVINCE_COMPANY'; + const isCityCompany = + payload.roleType === 'CITY_COMPANY' || + payload.roleType === 'AUTH_CITY_COMPANY'; - if (isProvinceCompany) { - await Promise.all([ - this.realtimeStatsRepo.incrementProvinceAuth(today), - this.globalStatsRepo.incrementProvinceCompany(), - ]); - this.logger.log(`Province company stats incremented: ${payload.regionCode}`); - } else if (isCityCompany) { - await Promise.all([ - this.realtimeStatsRepo.incrementCityAuth(today), - this.globalStatsRepo.incrementCityCompany(), - ]); - this.logger.log(`City company stats incremented: ${payload.regionCode}`); + if (isProvinceCompany) { + await Promise.all([ + this.realtimeStatsRepo.incrementProvinceAuth(today), + this.globalStatsRepo.incrementProvinceCompany(), + ]); + this.logger.log(`Province company stats incremented: ${payload.regionCode}`); + } else if (isCityCompany) { + await Promise.all([ + this.realtimeStatsRepo.incrementCityAuth(today), + this.globalStatsRepo.incrementCityCompany(), + ]); + this.logger.log(`City company stats incremented: ${payload.regionCode}`); + } + + this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`); + } else { + this.logger.log(`Skipped duplicate authorization event: ${entityId}`); } - - this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`); } } catch (error) { this.logger.error(`Error recording authorization activity:`, error); @@ -262,6 +276,7 @@ export class ActivityEventConsumerController { * 监听认种订单支付成功事件 (referral-service 或 planting-service) * Topic: planting.order.paid * [2026-01-07] 更新:兼容两种消息格式 + * [2026-01-08] 更新:添加幂等性检查,防止重复记录 * - referral-service: { eventName, data: {...} } * - planting-service: { orderId, treeCount, ... } */ @@ -282,8 +297,8 @@ export class ActivityEventConsumerController { paidAt: message.paidAt || new Date().toISOString(), }; - // 记录活动日志 - 不依赖 userId,使用 accountSequence - await this.activityRepo.create({ + // 幂等创建活动日志(如果已存在则跳过) + const created = await this.activityRepo.createIfNotExists({ activityType: 'planting_order' as ActivityType, title: '认种订单', description: `用户认种了 ${data.treeCount} 棵榴莲树`, @@ -302,15 +317,18 @@ export class ActivityEventConsumerController { }, }); - // 累加统计数据 - const today = new Date(); - const amount = new Decimal(data.totalAmount || '0'); - await Promise.all([ - this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount), - this.globalStatsRepo.incrementPlanting(data.treeCount, amount), - ]); - - this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`); + if (created) { + // 仅在首次创建时累加统计数据 + const today = new Date(); + const amount = new Decimal(data.totalAmount || '0'); + await Promise.all([ + this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount), + this.globalStatsRepo.incrementPlanting(data.treeCount, amount), + ]); + this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`); + } else { + this.logger.log(`Skipped duplicate planting order event: ${data.orderId}`); + } } catch (error) { this.logger.error(`Error recording planting order activity:`, error); } @@ -319,6 +337,7 @@ export class ActivityEventConsumerController { /** * 监听报表生成完成事件 * Topic: reporting.report.generated + * [2026-01-08] 更新:添加幂等性检查,防止重复记录 */ @MessagePattern('reporting.report.generated') async handleReportGenerated( @@ -334,21 +353,30 @@ export class ActivityEventConsumerController { this.logger.log(`Received reporting.report.generated event`); try { - await this.activityRepo.create({ + // 使用 reportCode + periodKey 作为唯一标识 + const entityId = `${message.reportCode}_${message.periodKey}`; + + // 幂等创建活动日志(如果已存在则跳过) + const created = await this.activityRepo.createIfNotExists({ activityType: 'report_generated' as ActivityType, title: '报表生成', description: `${message.reportName} 已生成`, icon: '📊', relatedEntityType: 'report', - relatedEntityId: message.reportCode, + relatedEntityId: entityId, metadata: { + reportCode: message.reportCode, reportType: message.reportType, periodKey: message.periodKey, generatedAt: message.generatedAt, }, }); - this.logger.log(`Activity recorded for report generation: ${message.reportCode}`); + if (created) { + this.logger.log(`Activity recorded for report generation: ${message.reportCode}`); + } else { + this.logger.log(`Skipped duplicate report generation event: ${entityId}`); + } } catch (error) { this.logger.error(`Error recording report generation activity:`, error); } diff --git a/backend/services/reporting-service/src/infrastructure/persistence/repositories/system-activity.repository.impl.ts b/backend/services/reporting-service/src/infrastructure/persistence/repositories/system-activity.repository.impl.ts index dddc5db3..86f3b2ce 100644 --- a/backend/services/reporting-service/src/infrastructure/persistence/repositories/system-activity.repository.impl.ts +++ b/backend/services/reporting-service/src/infrastructure/persistence/repositories/system-activity.repository.impl.ts @@ -134,6 +134,37 @@ export class SystemActivityRepository implements ISystemActivityRepository { }); } + async exists( + activityType: ActivityType | string, + entityType: string, + entityId: string, + ): Promise { + const count = await this.prisma.systemActivity.count({ + where: { + activityType, + relatedEntityType: entityType, + relatedEntityId: entityId, + }, + }); + return count > 0; + } + + async createIfNotExists(activity: SystemActivityData): Promise { + // 先检查是否已存在 + if (activity.relatedEntityType && activity.relatedEntityId) { + const alreadyExists = await this.exists( + activity.activityType, + activity.relatedEntityType, + activity.relatedEntityId, + ); + if (alreadyExists) { + return null; // 已存在,跳过创建 + } + } + // 不存在则创建 + return this.create(activity); + } + private toDomain( record: Awaited>, ): SystemActivityData {