rwadurian/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.con...

803 lines
25 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Controller, Logger, Inject } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { Decimal } from '@prisma/client/runtime/library';
import {
ISystemActivityRepository,
SYSTEM_ACTIVITY_REPOSITORY,
ActivityType,
IRealtimeStatsRepository,
REALTIME_STATS_REPOSITORY,
IGlobalStatsRepository,
GLOBAL_STATS_REPOSITORY,
} from '../../domain/repositories';
/**
* Outbox 元数据 (B方案)
*/
interface OutboxMeta {
id: string;
aggregateId: string;
eventType: string;
}
/**
* 用户账户创建事件 (identity-service)
* Topic: identity.UserAccountCreated 或 identity.UserAccountAutoCreated
*/
interface UserAccountCreatedEvent {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
aggregateType: string;
payload: {
userId: string;
accountSequence: string;
phoneNumber?: string;
nickname?: string;
referralCode: string;
inviterSequence?: string;
registeredAt: string;
};
_outbox?: OutboxMeta;
}
/**
* 授权角色创建/更新事件 (authorization-service)
* Topic: authorization-events
*/
interface AuthorizationRoleEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
authorizationId?: string;
userId: string;
accountSequence?: string;
roleType?: string;
// 不同事件类型使用不同的字段名
regionCode?: string;
regionName?: string;
provinceCode?: string;
provinceName?: string;
cityCode?: string;
cityName?: string;
communityName?: string;
status?: string;
authorizedAt?: string;
authorizedBy?: string | null;
};
_outbox?: OutboxMeta;
}
/**
* 认种订单支付事件 (referral-service 或 planting-service)
* Topic: planting.order.paid
* [2026-01-07] 更新:兼容两种消息格式
* - referral-service 发送: { eventName, data: {...} }
* - planting-service 发送: { orderId, treeCount, ... } (直接数据)
*/
interface PlantingOrderPaidEvent {
// referral-service 格式
eventName?: string;
data?: PlantingOrderPaidData;
// planting-service 直接格式
orderId?: string;
userId?: string;
accountSequence?: string;
treeCount?: number;
provinceCode?: string;
cityCode?: string;
totalAmount?: string;
paidAt?: string;
}
interface PlantingOrderPaidData {
orderId: string;
userId?: string;
accountSequence?: string;
treeCount: number;
provinceCode?: string;
cityCode?: string;
totalAmount?: string;
paidAt: string;
}
@Controller()
export class ActivityEventConsumerController {
private readonly logger = new Logger(ActivityEventConsumerController.name);
constructor(
@Inject(SYSTEM_ACTIVITY_REPOSITORY)
private readonly activityRepo: ISystemActivityRepository,
@Inject(REALTIME_STATS_REPOSITORY)
private readonly realtimeStatsRepo: IRealtimeStatsRepository,
@Inject(GLOBAL_STATS_REPOSITORY)
private readonly globalStatsRepo: IGlobalStatsRepository,
) {}
/**
* 监听用户账户创建事件 (identity-service)
* Topic: identity.UserAccountCreated
* [2026-01-08] 更新:添加幂等性检查,防止重复记录
*/
@MessagePattern('identity.UserAccountCreated')
async handleUserAccountCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountCreated event`);
try {
const { payload } = message;
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'user_register' as ActivityType,
title: '新用户注册',
description: `用户 ${this.maskPhone(payload.phoneNumber)} 完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
},
});
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for user registration: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate user registration event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording user registration activity:`, error);
}
}
/**
* 监听用户账户自动创建事件 (identity-service)
* Topic: identity.UserAccountAutoCreated
* [2026-01-08] 更新:添加幂等性检查,防止重复记录
*/
@MessagePattern('identity.UserAccountAutoCreated')
async handleUserAccountAutoCreated(@Payload() message: UserAccountCreatedEvent) {
this.logger.log(`Received identity.UserAccountAutoCreated event`);
try {
const { payload } = message;
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'user_register' as ActivityType,
title: '新用户自动注册',
description: `用户通过推荐链接完成注册`,
icon: '👤',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'user',
relatedEntityId: payload.userId,
metadata: {
accountSequence: payload.accountSequence,
nickname: payload.nickname,
referralCode: payload.referralCode,
inviterSequence: payload.inviterSequence,
registeredAt: payload.registeredAt,
autoCreated: true,
},
});
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
await Promise.all([
this.realtimeStatsRepo.incrementNewUser(today),
this.globalStatsRepo.incrementUser(),
]);
this.logger.log(`Activity and stats recorded for auto user registration: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate auto user registration event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording auto user registration activity:`, error);
}
}
/**
* 监听授权角色事件 (authorization-service)
* Topic: authorization-events
* [2026-01-08] 更新:添加幂等性检查,防止重复记录
* [2026-01-08] 更新添加权益激活事件处理authorization.benefit.activated
*/
@MessagePattern('authorization-events')
async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) {
this.logger.log(`Received authorization-events: ${message.eventType}`);
try {
const { payload, eventType } = message;
// 处理权益激活事件
if (eventType.includes('benefit.activated')) {
await this.handleBenefitActivatedInternal(message);
return;
}
// 根据事件类型决定是否记录活动
if (eventType.includes('authorized') || eventType.includes('Authorized') || eventType.includes('granted')) {
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType || '');
const entityId = payload.authorizationId || message.aggregateId;
// 从不同事件类型中提取区域信息
// - province 事件使用 provinceCode/provinceName
// - city 事件使用 cityCode/cityName
// - community 事件使用 communityName
// - 其他使用 regionCode/regionName
const regionCode =
payload.regionCode || payload.provinceCode || payload.cityCode || '';
const regionName =
payload.regionName ||
payload.provinceName ||
payload.cityName ||
payload.communityName ||
regionCode ||
'未知区域';
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'company_authorization' as ActivityType,
title: '授权成功',
description: `${regionName} ${roleTypeLabel} 完成授权`,
icon: '🏢',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'authorization',
relatedEntityId: entityId,
metadata: {
roleType: payload.roleType,
regionCode: regionCode,
regionName: regionName,
accountSequence: payload.accountSequence,
authorizedAt: payload.authorizedAt || message.occurredAt,
},
});
if (created) {
// 仅在首次创建时累加统计数据 - 区分省公司和市公司
const today = new Date();
const isProvinceCompany =
payload.roleType === 'PROVINCE_COMPANY' ||
payload.roleType === 'AUTH_PROVINCE_COMPANY';
const isCityCompany =
payload.roleType === 'CITY_COMPANY' ||
payload.roleType === 'AUTH_CITY_COMPANY';
if (isProvinceCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementProvinceAuth(today),
this.globalStatsRepo.incrementProvinceCompany(),
]);
this.logger.log(`Province company stats incremented: ${regionCode}`);
} else if (isCityCompany) {
await Promise.all([
this.realtimeStatsRepo.incrementCityAuth(today),
this.globalStatsRepo.incrementCityCompany(),
]);
this.logger.log(`City company stats incremented: ${regionCode}`);
}
this.logger.log(`Activity and stats recorded for authorization: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate authorization event: ${entityId}`);
}
}
} catch (error) {
this.logger.error(`Error recording authorization activity:`, error);
}
}
/**
* 内部方法:处理权益激活事件
*/
private async handleBenefitActivatedInternal(message: AuthorizationRoleEvent) {
try {
const { payload, aggregateId } = message;
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType || '');
// 从不同事件类型中提取区域信息
const regionCode =
payload.regionCode || payload.provinceCode || payload.cityCode || '';
const regionName =
payload.regionName ||
payload.provinceName ||
payload.cityName ||
payload.communityName ||
regionCode ||
'未知区域';
const created = await this.activityRepo.createIfNotExists({
activityType: 'benefit_applied' as ActivityType,
title: '权益激活',
description: `${regionName} ${roleTypeLabel} 激活权益`,
icon: '🎁',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'benefit',
relatedEntityId: `benefit_${aggregateId}`,
metadata: {
authorizationId: payload.authorizationId,
accountSequence: payload.accountSequence,
roleType: payload.roleType,
regionCode: regionCode,
regionName: regionName,
activatedAt: message.occurredAt,
},
});
if (created) {
this.logger.log(`Activity recorded for benefit activation: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate benefit activation event: ${aggregateId}`);
}
} catch (error) {
this.logger.error(`Error recording benefit activation activity:`, error);
}
}
/**
* 监听认种订单支付成功事件 (referral-service 或 planting-service)
* Topic: planting.order.paid
* [2026-01-07] 更新:兼容两种消息格式
* [2026-01-08] 更新:添加幂等性检查,防止重复记录
* - referral-service: { eventName, data: {...} }
* - planting-service: { orderId, treeCount, ... }
*/
@MessagePattern('planting.order.paid')
async handlePlantingOrderPaid(@Payload() message: PlantingOrderPaidEvent) {
this.logger.log(`Received planting.order.paid event: ${JSON.stringify(message).substring(0, 200)}`);
try {
// [2026-01-07] 兼容两种格式referral-service 的 data 包装格式和 planting-service 的直接格式
const data: PlantingOrderPaidData = message.data || {
orderId: message.orderId || '',
userId: message.userId,
accountSequence: message.accountSequence,
treeCount: message.treeCount || 0,
provinceCode: message.provinceCode,
cityCode: message.cityCode,
totalAmount: message.totalAmount,
paidAt: message.paidAt || new Date().toISOString(),
};
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'planting_order' as ActivityType,
title: '认种订单',
description: `用户认种了 ${data.treeCount} 棵榴莲树`,
icon: '🌳',
// 不设置 relatedUserId因为 userId 可能为空
relatedEntityType: 'order',
relatedEntityId: data.orderId,
metadata: {
orderId: data.orderId,
accountSequence: data.accountSequence,
treeCount: data.treeCount,
totalAmount: data.totalAmount,
provinceCode: data.provinceCode,
cityCode: data.cityCode,
paidAt: data.paidAt,
},
});
if (created) {
// 仅在首次创建时累加统计数据
const today = new Date();
const amount = new Decimal(data.totalAmount || '0');
await Promise.all([
this.realtimeStatsRepo.incrementPlanting(today, data.treeCount, amount),
this.globalStatsRepo.incrementPlanting(data.treeCount, amount),
]);
this.logger.log(`Activity and stats recorded for planting order: ${data.orderId}`);
} else {
this.logger.log(`Skipped duplicate planting order event: ${data.orderId}`);
}
} catch (error) {
this.logger.error(`Error recording planting order activity:`, error);
}
}
/**
* 监听报表生成完成事件
* Topic: reporting.report.generated
* [2026-01-08] 更新:添加幂等性检查,防止重复记录
*/
@MessagePattern('reporting.report.generated')
async handleReportGenerated(
@Payload()
message: {
reportCode: string;
reportName: string;
reportType: string;
periodKey: string;
generatedAt: string;
},
) {
this.logger.log(`Received reporting.report.generated event`);
try {
// 使用 reportCode + periodKey 作为唯一标识
const entityId = `${message.reportCode}_${message.periodKey}`;
// 幂等创建活动日志(如果已存在则跳过)
const created = await this.activityRepo.createIfNotExists({
activityType: 'report_generated' as ActivityType,
title: '报表生成',
description: `${message.reportName} 已生成`,
icon: '📊',
relatedEntityType: 'report',
relatedEntityId: entityId,
metadata: {
reportCode: message.reportCode,
reportType: message.reportType,
periodKey: message.periodKey,
generatedAt: message.generatedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for report generation: ${message.reportCode}`);
} else {
this.logger.log(`Skipped duplicate report generation event: ${entityId}`);
}
} catch (error) {
this.logger.error(`Error recording report generation activity:`, error);
}
}
/**
* 获取角色类型显示名称
*/
private getRoleTypeLabel(roleType: string): string {
const labels: Record<string, string> = {
COMMUNITY: '社区',
AUTH_PROVINCE_COMPANY: '授权省公司',
PROVINCE_COMPANY: '正式省公司',
AUTH_CITY_COMPANY: '授权市公司',
CITY_COMPANY: '正式市公司',
};
return labels[roleType] || roleType;
}
/**
* 手机号脱敏
*/
private maskPhone(phone?: string): string {
if (!phone || phone.length < 7) {
return '***用户';
}
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
// ============================================================================
// [2026-01-08] 新增:更多活动事件消费者
// ============================================================================
/**
* 监听 KYC 提交事件 (identity-service)
* Topic: identity.KYCSubmitted
*/
@MessagePattern('identity.KYCSubmitted')
async handleKYCSubmitted(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
realName?: string;
idNumber?: string;
submittedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCSubmitted event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_submitted' as ActivityType,
title: 'KYC认证提交',
description: `用户提交了实名认证申请`,
icon: '📋',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_submit_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
submittedAt: payload.submittedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC submission: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC submission event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC submission activity:`, error);
}
}
/**
* 监听 KYC 通过事件 (identity-service)
* Topic: identity.KYCApproved
*/
@MessagePattern('identity.KYCApproved')
async handleKYCApproved(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
approvedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCApproved event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_approved' as ActivityType,
title: 'KYC认证通过',
description: `用户实名认证已通过`,
icon: '✅',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_approve_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
approvedAt: payload.approvedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC approval: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC approval event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC approval activity:`, error);
}
}
/**
* 监听 KYC 拒绝事件 (identity-service)
* Topic: identity.KYCRejected
*/
@MessagePattern('identity.KYCRejected')
async handleKYCRejected(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
reason?: string;
rejectedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCRejected event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_rejected' as ActivityType,
title: 'KYC认证拒绝',
description: `用户实名认证被拒绝`,
icon: '❌',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_reject_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
reason: payload.reason,
rejectedAt: payload.rejectedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC rejection: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC rejection event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC rejection activity:`, error);
}
}
/**
* 监听合同签署事件 (planting-service)
* Topic: contract.signed
*/
@MessagePattern('contract.signed')
async handleContractSigned(
@Payload()
message: {
orderNo: string;
userId: string;
accountSequence: string;
treeCount: number;
totalAmount: number;
provinceCode: string;
cityCode: string;
signedAt: string;
},
) {
this.logger.log(`Received contract.signed event: ${message.orderNo}`);
try {
const created = await this.activityRepo.createIfNotExists({
activityType: 'contract_signed' as ActivityType,
title: '合同签署',
description: `用户签署了 ${message.treeCount} 棵榴莲树认种合同`,
icon: '📝',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'contract',
relatedEntityId: message.orderNo,
metadata: {
orderNo: message.orderNo,
accountSequence: message.accountSequence,
treeCount: message.treeCount,
totalAmount: message.totalAmount,
provinceCode: message.provinceCode,
cityCode: message.cityCode,
signedAt: message.signedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for contract signing: ${message.orderNo}`);
} else {
this.logger.log(`Skipped duplicate contract signing event: ${message.orderNo}`);
}
} catch (error) {
this.logger.error(`Error recording contract signing activity:`, error);
}
}
/**
* 监听充值到账事件 (wallet-service 发布到 wallet.acks)
* Topic: wallet.acks (eventType: wallet.deposit.credited)
*/
@MessagePattern('wallet.acks')
async handleWalletAcks(
@Payload()
message: {
eventType: string;
depositId?: string;
userId?: string;
accountSequence?: string;
amount?: string;
assetType?: string;
txHash?: string;
creditedAt?: string;
},
) {
// 只处理充值到账事件
if (message.eventType !== 'wallet.deposit.credited') {
return;
}
this.logger.log(`Received wallet.deposit.credited event: ${message.depositId}`);
try {
if (!message.depositId || !message.userId) {
this.logger.warn(`Missing required fields in deposit event`);
return;
}
const created = await this.activityRepo.createIfNotExists({
activityType: 'deposit' as ActivityType,
title: '充值到账',
description: `用户充值 ${message.amount || '0'} ${message.assetType || 'USDT'}`,
icon: '💰',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'deposit',
relatedEntityId: message.depositId,
metadata: {
depositId: message.depositId,
accountSequence: message.accountSequence,
amount: message.amount,
assetType: message.assetType,
txHash: message.txHash,
creditedAt: message.creditedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for deposit: ${message.depositId}`);
} else {
this.logger.log(`Skipped duplicate deposit event: ${message.depositId}`);
}
} catch (error) {
this.logger.error(`Error recording deposit activity:`, error);
}
}
/**
* 监听提现完成事件 (wallet-service)
* Topic: wallet.withdrawals (eventType: wallet.withdrawal.completed)
*/
@MessagePattern('wallet.withdrawals')
async handleWithdrawalEvent(
@Payload()
message: {
eventType: string;
withdrawalId: string;
userId: string;
accountSequence?: string;
amount: string;
assetType: string;
status: string;
txHash?: string;
completedAt?: string;
requestedAt?: string;
},
) {
// 只处理提现完成事件
if (message.eventType !== 'wallet.withdrawal.completed') {
return;
}
this.logger.log(`Received wallet.withdrawal.completed event: ${message.withdrawalId}`);
try {
const created = await this.activityRepo.createIfNotExists({
activityType: 'withdrawal' as ActivityType,
title: '提现成功',
description: `用户提现 ${message.amount} ${message.assetType}`,
icon: '💸',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'withdrawal',
relatedEntityId: message.withdrawalId,
metadata: {
withdrawalId: message.withdrawalId,
accountSequence: message.accountSequence,
amount: message.amount,
assetType: message.assetType,
txHash: message.txHash,
completedAt: message.completedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for withdrawal: ${message.withdrawalId}`);
} else {
this.logger.log(`Skipped duplicate withdrawal event: ${message.withdrawalId}`);
}
} catch (error) {
this.logger.error(`Error recording withdrawal activity:`, error);
}
}
}