feat(reporting-service): 新增多种活动事件类型

扩展仪表板"最近活动"功能,新增以下活动类型:

活动类型新增:
- kyc_submitted: KYC认证提交
- kyc_approved: KYC认证通过
- kyc_rejected: KYC认证拒绝
- contract_signed: 合同签署
- deposit: 充值到账
- withdrawal: 提现成功
- benefit_applied: 权益申请

监听的 Kafka Topics:
- identity.KYCSubmitted
- identity.KYCApproved
- identity.KYCRejected
- contract.signed
- blockchain.deposit.credited
- wallet.withdrawals (仅处理 completed 事件)
- authorization.benefit.applied

所有新增事件处理器均使用幂等创建,防止重复记录。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-08 05:51:26 -08:00
parent 65bd6a857f
commit 38fff077dd
2 changed files with 378 additions and 5 deletions

View File

@ -4,11 +4,18 @@
export const SYSTEM_ACTIVITY_REPOSITORY = Symbol('SYSTEM_ACTIVITY_REPOSITORY');
export type ActivityType =
| 'user_register'
| 'company_authorization'
| 'planting_order'
| 'system_update'
| 'report_generated';
| 'user_register' // 用户注册
| 'planting_order' // 认种订单
| 'contract_signed' // 合同签署
| 'kyc_submitted' // KYC提交
| 'kyc_approved' // KYC通过
| 'kyc_rejected' // KYC拒绝
| 'deposit' // 充值
| 'withdrawal' // 提现
| 'company_authorization' // 公司授权
| 'benefit_applied' // 权益申请
| 'system_update' // 系统更新
| 'report_generated'; // 报表生成
export interface SystemActivityData {
id?: bigint;

View File

@ -405,4 +405,370 @@ export class ActivityEventConsumerController {
}
return phone.slice(0, 3) + '****' + phone.slice(-4);
}
// ============================================================================
// [2026-01-08] 新增:更多活动事件消费者
// ============================================================================
/**
* KYC (identity-service)
* Topic: identity.KYCSubmitted
*/
@MessagePattern('identity.KYCSubmitted')
async handleKYCSubmitted(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
realName?: string;
idNumber?: string;
submittedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCSubmitted event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_submitted' as ActivityType,
title: 'KYC认证提交',
description: `用户提交了实名认证申请`,
icon: '📋',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_submit_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
submittedAt: payload.submittedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC submission: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC submission event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC submission activity:`, error);
}
}
/**
* KYC (identity-service)
* Topic: identity.KYCApproved
*/
@MessagePattern('identity.KYCApproved')
async handleKYCApproved(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
approvedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCApproved event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_approved' as ActivityType,
title: 'KYC认证通过',
description: `用户实名认证已通过`,
icon: '✅',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_approve_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
approvedAt: payload.approvedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC approval: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC approval event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC approval activity:`, error);
}
}
/**
* KYC (identity-service)
* Topic: identity.KYCRejected
*/
@MessagePattern('identity.KYCRejected')
async handleKYCRejected(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
userId: string;
accountSequence?: string;
reason?: string;
rejectedAt: string;
};
},
) {
this.logger.log(`Received identity.KYCRejected event`);
try {
const { payload, aggregateId } = message;
const created = await this.activityRepo.createIfNotExists({
activityType: 'kyc_rejected' as ActivityType,
title: 'KYC认证拒绝',
description: `用户实名认证被拒绝`,
icon: '❌',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'kyc',
relatedEntityId: `kyc_reject_${aggregateId}`,
metadata: {
accountSequence: payload.accountSequence,
reason: payload.reason,
rejectedAt: payload.rejectedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for KYC rejection: ${payload.userId}`);
} else {
this.logger.log(`Skipped duplicate KYC rejection event: ${payload.userId}`);
}
} catch (error) {
this.logger.error(`Error recording KYC rejection activity:`, error);
}
}
/**
* (planting-service)
* Topic: contract.signed
*/
@MessagePattern('contract.signed')
async handleContractSigned(
@Payload()
message: {
orderNo: string;
userId: string;
accountSequence: string;
treeCount: number;
totalAmount: number;
provinceCode: string;
cityCode: string;
signedAt: string;
},
) {
this.logger.log(`Received contract.signed event: ${message.orderNo}`);
try {
const created = await this.activityRepo.createIfNotExists({
activityType: 'contract_signed' as ActivityType,
title: '合同签署',
description: `用户签署了 ${message.treeCount} 棵榴莲树认种合同`,
icon: '📝',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'contract',
relatedEntityId: message.orderNo,
metadata: {
orderNo: message.orderNo,
accountSequence: message.accountSequence,
treeCount: message.treeCount,
totalAmount: message.totalAmount,
provinceCode: message.provinceCode,
cityCode: message.cityCode,
signedAt: message.signedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for contract signing: ${message.orderNo}`);
} else {
this.logger.log(`Skipped duplicate contract signing event: ${message.orderNo}`);
}
} catch (error) {
this.logger.error(`Error recording contract signing activity:`, error);
}
}
/**
* (blockchain-service -> wallet-service)
* Topic: blockchain.deposit.credited
*/
@MessagePattern('blockchain.deposit.credited')
async handleDepositCredited(
@Payload()
message: {
depositId: string;
userId: string;
accountSequence?: string;
amount: string;
assetType: string;
txHash: string;
creditedAt: string;
},
) {
this.logger.log(`Received blockchain.deposit.credited event: ${message.depositId}`);
try {
const created = await this.activityRepo.createIfNotExists({
activityType: 'deposit' as ActivityType,
title: '充值到账',
description: `用户充值 ${message.amount} ${message.assetType}`,
icon: '💰',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'deposit',
relatedEntityId: message.depositId,
metadata: {
depositId: message.depositId,
accountSequence: message.accountSequence,
amount: message.amount,
assetType: message.assetType,
txHash: message.txHash,
creditedAt: message.creditedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for deposit: ${message.depositId}`);
} else {
this.logger.log(`Skipped duplicate deposit event: ${message.depositId}`);
}
} catch (error) {
this.logger.error(`Error recording deposit activity:`, error);
}
}
/**
* (wallet-service)
* Topic: wallet.withdrawals (eventType: wallet.withdrawal.completed)
*/
@MessagePattern('wallet.withdrawals')
async handleWithdrawalEvent(
@Payload()
message: {
eventType: string;
withdrawalId: string;
userId: string;
accountSequence?: string;
amount: string;
assetType: string;
status: string;
txHash?: string;
completedAt?: string;
requestedAt?: string;
},
) {
// 只处理提现完成事件
if (message.eventType !== 'wallet.withdrawal.completed') {
return;
}
this.logger.log(`Received wallet.withdrawal.completed event: ${message.withdrawalId}`);
try {
const created = await this.activityRepo.createIfNotExists({
activityType: 'withdrawal' as ActivityType,
title: '提现成功',
description: `用户提现 ${message.amount} ${message.assetType}`,
icon: '💸',
relatedUserId: BigInt(message.userId),
relatedEntityType: 'withdrawal',
relatedEntityId: message.withdrawalId,
metadata: {
withdrawalId: message.withdrawalId,
accountSequence: message.accountSequence,
amount: message.amount,
assetType: message.assetType,
txHash: message.txHash,
completedAt: message.completedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for withdrawal: ${message.withdrawalId}`);
} else {
this.logger.log(`Skipped duplicate withdrawal event: ${message.withdrawalId}`);
}
} catch (error) {
this.logger.error(`Error recording withdrawal activity:`, error);
}
}
/**
* (authorization-service)
* Topic: authorization-events (eventType benefit applied)
* authorization-events topic handleAuthorizationEvent
*/
@MessagePattern('authorization.benefit.applied')
async handleBenefitApplied(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
authorizationId: string;
userId: string;
accountSequence: string;
roleType: string;
regionCode: string;
regionName: string;
benefitType?: string;
appliedAt: string;
};
},
) {
this.logger.log(`Received authorization.benefit.applied event`);
try {
const { payload, aggregateId } = message;
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
const created = await this.activityRepo.createIfNotExists({
activityType: 'benefit_applied' as ActivityType,
title: '权益申请',
description: `${payload.regionName} ${roleTypeLabel} 申请权益`,
icon: '🎁',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'benefit',
relatedEntityId: `benefit_${aggregateId}`,
metadata: {
authorizationId: payload.authorizationId,
accountSequence: payload.accountSequence,
roleType: payload.roleType,
regionCode: payload.regionCode,
regionName: payload.regionName,
benefitType: payload.benefitType,
appliedAt: payload.appliedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for benefit application: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate benefit application event: ${aggregateId}`);
}
} catch (error) {
this.logger.error(`Error recording benefit application activity:`, error);
}
}
}