rwadurian/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.con...

381 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Controller, Logger, Inject } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { Decimal } from '@prisma/client/runtime/library';
import {
ISystemActivityRepository,
SYSTEM_ACTIVITY_REPOSITORY,
ActivityType,
IRealtimeStatsRepository,
REALTIME_STATS_REPOSITORY,
IGlobalStatsRepository,
GLOBAL_STATS_REPOSITORY,
} from '../../domain/repositories';
/**
* Outbox 元数据 (B方案)
*/
interface OutboxMeta {
id: string;
aggregateId: string;
eventType: string;
}
/**
* 用户账户创建事件 (identity-service)
* Topic: identity.UserAccountCreated 或 identity.UserAccountAutoCreated
*/
interface UserAccountCreatedEvent {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
aggregateType: string;
payload: {
userId: string;
accountSequence: string;
phoneNumber?: string;
nickname?: string;
referralCode: string;
inviterSequence?: string;
registeredAt: string;
};
_outbox?: OutboxMeta;
}
/**
* 授权角色创建/更新事件 (authorization-service)
* Topic: authorization-events
*/
interface AuthorizationRoleEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
authorizationId?: string;
userId: string;
accountSequence: string;
roleType: string;
regionCode: string;
regionName: string;
status?: string;
authorizedAt?: string;
};
_outbox?: OutboxMeta;
}
/**
* 认种订单支付事件 (referral-service 或 planting-service)
* Topic: planting.order.paid
* [2026-01-07] 更新:兼容两种消息格式
* - referral-service 发送: { eventName, data: {...} }
* - planting-service 发送: { orderId, treeCount, ... } (直接数据)
*/
interface PlantingOrderPaidEvent {
// referral-service 格式
eventName?: string;
data?: PlantingOrderPaidData;
// planting-service 直接格式
orderId?: string;
userId?: string;
accountSequence?: string;
treeCount?: number;
provinceCode?: string;
cityCode?: string;
totalAmount?: string;
paidAt?: string;
}
interface PlantingOrderPaidData {
orderId: string;
userId?: string;
accountSequence?: string;
treeCount: number;
provinceCode?: string;
cityCode?: string;
totalAmount?: string;
paidAt: string;
}
@Controller()
export class ActivityEventConsumerController {
private readonly logger = new Logger(ActivityEventConsumerController.name);
constructor(
@Inject(SYSTEM_ACTIVITY_REPOSITORY)
private readonly activityRepo: ISystemActivityRepository,
@Inject(REALTIME_STATS_REPOSITORY)
private readonly realtimeStatsRepo: IRealtimeStatsRepository,
@Inject(GLOBAL_STATS_REPOSITORY)
private readonly globalStatsRepo: IGlobalStatsRepository,
) {}
/**
* 监听用户账户创建事件 (identity-service)
* Topic: identity.UserAccountCreated
*/
@MessagePattern('identity.UserAccountCreated')
async handleUserAccountCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountCreated event`);
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
activityType: 'user_register' as ActivityType,
title: '新用户注册',
description: `用户 ${this.maskPhone(payload.phoneNumber)} 完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`);
} catch (error) {
this.logger.error(`Error recording user registration activity:`, error);
}
}
/**
* 监听用户账户自动创建事件 (identity-service)
* Topic: identity.UserAccountAutoCreated
*/
@MessagePattern('identity.UserAccountAutoCreated')
async handleUserAccountAutoCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountAutoCreated event`);
try {
const { payload } = message;
// 记录活动日志
await this.activityRepo.create({
activityType: 'user_register' as ActivityType,
title: '新用户自动注册',
description: `用户通过推荐链接完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
autoCreated: true,
},
});
// 累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`);
} catch (error) {
this.logger.error(`Error recording auto user registration activity:`, error);
}
}
/**
* 监听授权角色事件 (authorization-service)
* Topic: authorization-events
*/
@MessagePattern('authorization-events')
async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) {
this.logger.log(`Received authorization-events: ${message.eventType}`);
try {
const { payload, eventType } = message;
// 根据事件类型决定是否记录活动
if (eventType.includes('authorized') || eventType.includes('Authorized')) {
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
// 记录活动日志
await this.activityRepo.create({
activityType: 'company_authorization' as ActivityType,
title: '授权成功',
description: `${payload.regionName} ${roleTypeLabel} 完成授权`,
icon: '🏢',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'authorization',
relatedEntityId: payload.authorizationId || message.aggregateId,
metadata: {
roleType: payload.roleType,
regionCode: payload.regionCode,
regionName: payload.regionName,
accountSequence: payload.accountSequence,
authorizedAt: payload.authorizedAt || message.occurredAt,
},
});
// 累加统计数据 - 区分省公司和市公司
const today = new Date();
const isProvinceCompany =
payload.roleType === 'PROVINCE_COMPANY' ||
payload.roleType === 'AUTH_PROVINCE_COMPANY';
const isCityCompany =
payload.roleType === 'CITY_COMPANY' ||
payload.roleType === 'AUTH_CITY_COMPANY';
if (isProvinceCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementProvinceAuth(today),
this.globalStatsRepo.incrementProvinceCompany(),
]);
this.logger.log(`Province company stats incremented: ${payload.regionCode}`);
} else if (isCityCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementCityAuth(today),
this.globalStatsRepo.incrementCityCompany(),
]);
this.logger.log(`City company stats incremented: ${payload.regionCode}`);
}
this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`);
}
} catch (error) {
this.logger.error(`Error recording authorization activity:`, error);
}
}
/**
* 监听认种订单支付成功事件 (referral-service 或 planting-service)
* Topic: planting.order.paid
* [2026-01-07] 更新:兼容两种消息格式
* - referral-service: { eventName, data: {...} }
* - planting-service: { orderId, treeCount, ... }
*/
@MessagePattern('planting.order.paid')
async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) {
this.logger.log(`Received planting.order.paid event: ${JSON.stringify(message).substring(0, 200)}`);
try {
// [2026-01-07] 兼容两种格式referral-service 的 data 包装格式和 planting-service 的直接格式
const data: PlantingOrderPaidData = message.data || {
orderId: message.orderId || '',
userId: message.userId,
accountSequence: message.accountSequence,
treeCount: message.treeCount || 0,
provinceCode: message.provinceCode,
cityCode: message.cityCode,
totalAmount: message.totalAmount,
paidAt: message.paidAt || new Date().toISOString(),
};
// 记录活动日志 - 不依赖 userId使用 accountSequence
await this.activityRepo.create({
activityType: 'planting_order' as ActivityType,
title: '认种订单',
description: `用户认种了 ${data.treeCount} 棵榴莲树`,
icon: '🌳',
// 不设置 relatedUserId因为 userId 可能为空
relatedEntityType: 'order',
relatedEntityId: data.orderId,
metadata: {
orderId: data.orderId,
accountSequence: data.accountSequence,
treeCount: data.treeCount,
totalAmount: data.totalAmount,
provinceCode: data.provinceCode,
cityCode: data.cityCode,
paidAt: data.paidAt,
},
});
// 累加统计数据
const today = new Date();
const amount = new Decimal(data.totalAmount || '0');
await Promise.all([
this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount),
this.globalStatsRepo.incrementPlanting(data.treeCount, amount),
]);
this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`);
} catch (error) {
this.logger.error(`Error recording planting order activity:`, error);
}
}
/**
* 监听报表生成完成事件
* Topic: reporting.report.generated
*/
@MessagePattern('reporting.report.generated')
async handleReportGenerated(
@Payload()
message: {
reportCode: string;
reportName: string;
reportType: string;
periodKey: string;
generatedAt: string;
},
) {
this.logger.log(`Received reporting.report.generated event`);
try {
await this.activityRepo.create({
activityType: 'report_generated' as ActivityType,
title: '报表生成',
description: `${message.reportName} 已生成`,
icon: '📊',
relatedEntityType: 'report',
relatedEntityId: message.reportCode,
metadata: {
reportType: message.reportType,
periodKey: message.periodKey,
generatedAt: message.generatedAt,
},
});
this.logger.log(`Activity recorded for report generation: ${message.reportCode}`);
} catch (error) {
this.logger.error(`Error recording report generation activity:`, error);
}
}
/**
* 获取角色类型显示名称
*/
private getRoleTypeLabel(roleType: string): string {
const labels: Record<string, string> = {
COMMUNITY: '社区',
AUTH_PROVINCE_COMPANY: '授权省公司',
PROVINCE_COMPANY: '正式省公司',
AUTH_CITY_COMPANY: '授权市公司',
CITY_COMPANY: '正式市公司',
};
return labels[roleType] || roleType;
}
/**
* 手机号脱敏
*/
private maskPhone(phone?: string): string {
if (!phone || phone.length < 7) {
return '***用户';
}
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
}