fix(reporting-service): 添加活动记录幂等性处理

问题:Kafka 消息重试或重复消费时,同一事件会被记录多次活动,
导致"最近活动"显示重复条目,统计数据也会被重复累加。

修复:
1. 仓储层新增 exists() 和 createIfNotExists() 方法
2. 所有事件消费者改用幂等创建,仅首次创建时累加统计
3. 添加数据库唯一约束 uk_sa_entity_activity 作为最后防线
4. 迁移脚本会自动清理历史重复数据

影响的事件:
- identity.UserAccountCreated
- identity.UserAccountAutoCreated
- authorization-events
- planting.order.paid
- reporting.report.generated

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-08 05:46:43 -08:00
parent c65d02ebea
commit 65bd6a857f
5 changed files with 158 additions and 59 deletions

View File

@ -0,0 +1,20 @@
-- [2026-01-08] 添加活动记录唯一约束
-- 防止同一实体的相同类型活动被重复记录Kafka 消息重试场景)
-- 注意:需要先清理已有重复数据
-- 清理已有的重复数据(保留最早的记录)
DELETE FROM system_activities a
USING (
SELECT activity_type, related_entity_type, related_entity_id, MIN(activity_id) as min_id
FROM system_activities
WHERE related_entity_type IS NOT NULL AND related_entity_id IS NOT NULL
GROUP BY activity_type, related_entity_type, related_entity_id
HAVING COUNT(*) > 1
) b
WHERE a.activity_type = b.activity_type
AND a.related_entity_type = b.related_entity_type
AND a.related_entity_id = b.related_entity_id
AND a.activity_id != b.min_id;
-- 创建唯一约束
CREATE UNIQUE INDEX "uk_sa_entity_activity" ON "system_activities"("activity_type", "related_entity_type", "related_entity_id");

View File

@ -472,4 +472,6 @@ model SystemActivity {
@@index([createdAt(sort: Desc)], name: "idx_sa_created")
@@index([relatedUserId], name: "idx_sa_user")
@@index([relatedEntityType, relatedEntityId], name: "idx_sa_entity")
// [2026-01-08] 新增:唯一约束防止同一实体的重复活动记录
@@unique([activityType, relatedEntityType, relatedEntityId], name: "uk_sa_entity_activity")
}

View File

@ -84,4 +84,22 @@ export interface ISystemActivityRepository {
*
*/
countByType(type: ActivityType | string): Promise<number>;
/**
*
* @param activityType
* @param entityType
* @param entityId ID
*/
exists(
activityType: ActivityType | string,
entityType: string,
entityId: string,
): Promise<boolean>;
/**
*
* @returns null
*/
createIfNotExists(activity: SystemActivityData): Promise<SystemActivityData | null>;
}

View File

@ -113,6 +113,7 @@ export class ActivityEventConsumerController {
/**
* (identity-service)
* Topic: identity.UserAccountCreated
* [2026-01-08]
*/
@MessagePattern('identity.UserAccountCreated')
async handleUserAccountCreated(@Payload() message: UserAccountCreatedEvent) {
@ -121,8 +122,8 @@ export class ActivityEventConsumerController {
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'user_register' as ActivityType,
title: '新用户注册',
description: `用户 ${this.maskPhone(payload.phoneNumber)} 完成注册`,
@ -139,14 +140,17 @@ export class ActivityEventConsumerController {
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`);
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate user registration event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording user registration activity:`, error);
}
@ -155,6 +159,7 @@ export class ActivityEventConsumerController {
/**
* (identity-service)
* Topic: identity.UserAccountAutoCreated
* [2026-01-08]
*/
@MessagePattern('identity.UserAccountAutoCreated')
async handleUserAccountAutoCreated(@Payload() message: UserAccountCreatedEvent) {
@ -163,8 +168,8 @@ export class ActivityEventConsumerController {
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'user_register' as ActivityType,
title: '新用户自动注册',
description: `用户通过推荐链接完成注册`,
@ -182,14 +187,17 @@ export class ActivityEventConsumerController {
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`);
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate auto user registration event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording auto user registration activity:`, error);
}
@ -198,6 +206,7 @@ export class ActivityEventConsumerController {
/**
* (authorization-service)
* Topic: authorization-events
* [2026-01-08]
*/
@MessagePattern('authorization-events')
async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) {
@ -209,16 +218,17 @@ export class ActivityEventConsumerController {
// 根据事件类型决定是否记录活动
if (eventType.includes('authorized') || eventType.includes('Authorized')) {
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
const entityId = payload.authorizationId || message.aggregateId;
// 记录活动日志
await this.activityRepo.create({
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'company_authorization' as ActivityType,
title: '授权成功',
description: `${payload.regionName} ${roleTypeLabel} 完成授权`,
icon: '🏢',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'authorization',
relatedEntityId: payload.authorizationId || message.aggregateId,
relatedEntityId: entityId,
metadata: {
roleType: payload.roleType,
regionCode: payload.regionCode,
@ -228,30 +238,34 @@ export class ActivityEventConsumerController {
},
});
// 累加统计数据 - 区分省公司和市公司
const today = new Date();
const isProvinceCompany =
payload.roleType === 'PROVINCE_COMPANY' ||
payload.roleType === 'AUTH_PROVINCE_COMPANY';
const isCityCompany =
payload.roleType === 'CITY_COMPANY' ||
payload.roleType === 'AUTH_CITY_COMPANY';
if (created) {
// 仅在首次创建时累加统计数据 - 区分省公司和市公司
const today = new Date();
const isProvinceCompany =
payload.roleType === 'PROVINCE_COMPANY' ||
payload.roleType === 'AUTH_PROVINCE_COMPANY';
const isCityCompany =
payload.roleType === 'CITY_COMPANY' ||
payload.roleType === 'AUTH_CITY_COMPANY';
if (isProvinceCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementProvinceAuth(today),
this.globalStatsRepo.incrementProvinceCompany(),
]);
this.logger.log(`Province company stats incremented: ${payload.regionCode}`);
} else if (isCityCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementCityAuth(today),
this.globalStatsRepo.incrementCityCompany(),
]);
this.logger.log(`City company stats incremented: ${payload.regionCode}`);
if (isProvinceCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementProvinceAuth(today),
this.globalStatsRepo.incrementProvinceCompany(),
]);
this.logger.log(`Province company stats incremented: ${payload.regionCode}`);
} else if (isCityCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementCityAuth(today),
this.globalStatsRepo.incrementCityCompany(),
]);
this.logger.log(`City company stats incremented: ${payload.regionCode}`);
}
this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate authorization event: ${entityId}`);
}
this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`);
}
} catch (error) {
this.logger.error(`Error recording authorization activity:`, error);
@ -262,6 +276,7 @@ export class ActivityEventConsumerController {
* (referral-service planting-service)
* Topic: planting.order.paid
* [2026-01-07]
* [2026-01-08]
* - referral-service: { eventName, data: {...} }
* - planting-service: { orderId, treeCount, ... }
*/
@ -282,8 +297,8 @@ export class ActivityEventConsumerController {
paidAt: message.paidAt || new Date().toISOString(),
};
// 记录活动日志 - 不依赖 userId使用 accountSequence
await this.activityRepo.create({
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'planting_order' as ActivityType,
title: '认种订单',
description: `用户认种了 ${data.treeCount} 棵榴莲树`,
@ -302,15 +317,18 @@ export class ActivityEventConsumerController {
},
});
// 累加统计数据
const today = new Date();
const amount = new Decimal(data.totalAmount || '0');
await Promise.all([
this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount),
this.globalStatsRepo.incrementPlanting(data.treeCount, amount),
]);
this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`);
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
const amount = new Decimal(data.totalAmount || '0');
await Promise.all([
this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount),
this.globalStatsRepo.incrementPlanting(data.treeCount, amount),
]);
this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`);
} else {
this.logger.log(`Skipped duplicate planting order event: ${data.orderId}`);
}
} catch (error) {
this.logger.error(`Error recording planting order activity:`, error);
}
@ -319,6 +337,7 @@ export class ActivityEventConsumerController {
/**
*
* Topic: reporting.report.generated
* [2026-01-08]
*/
@MessagePattern('reporting.report.generated')
async handleReportGenerated(
@ -334,21 +353,30 @@ export class ActivityEventConsumerController {
this.logger.log(`Received reporting.report.generated event`);
try {
await this.activityRepo.create({
// 使用 reportCode + periodKey 作为唯一标识
const entityId = `${message.reportCode}_${message.periodKey}`;
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'report_generated' as ActivityType,
title: '报表生成',
description: `${message.reportName} 已生成`,
icon: '📊',
relatedEntityType: 'report',
relatedEntityId: message.reportCode,
relatedEntityId: entityId,
metadata: {
reportCode: message.reportCode,
reportType: message.reportType,
periodKey: message.periodKey,
generatedAt: message.generatedAt,
},
});
this.logger.log(`Activity recorded for report generation: ${message.reportCode}`);
if (created) {
this.logger.log(`Activity recorded for report generation: ${message.reportCode}`);
} else {
this.logger.log(`Skipped duplicate report generation event: ${entityId}`);
}
} catch (error) {
this.logger.error(`Error recording report generation activity:`, error);
}

View File

@ -134,6 +134,37 @@ export class SystemActivityRepository implements ISystemActivityRepository {
});
}
async exists(
activityType: ActivityType | string,
entityType: string,
entityId: string,
): Promise<boolean> {
const count = await this.prisma.systemActivity.count({
where: {
activityType,
relatedEntityType: entityType,
relatedEntityId: entityId,
},
});
return count > 0;
}
async createIfNotExists(activity: SystemActivityData): Promise<SystemActivityData | null> {
// 先检查是否已存在
if (activity.relatedEntityType && activity.relatedEntityId) {
const alreadyExists = await this.exists(
activity.activityType,
activity.relatedEntityType,
activity.relatedEntityId,
);
if (alreadyExists) {
return null; // 已存在,跳过创建
}
}
// 不存在则创建
return this.create(activity);
}
private toDomain(
record: Awaited<ReturnType<typeof this.prisma.systemActivity.findFirst>>,
): SystemActivityData {