diff --git a/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts b/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts index 0c434794..c8c9c022 100644 --- a/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts +++ b/backend/services/reporting-service/src/domain/repositories/system-activity.repository.interface.ts @@ -4,11 +4,18 @@ export const SYSTEM_ACTIVITY_REPOSITORY = Symbol('SYSTEM_ACTIVITY_REPOSITORY'); export type ActivityType = - | 'user_register' - | 'company_authorization' - | 'planting_order' - | 'system_update' - | 'report_generated'; + | 'user_register' // 用户注册 + | 'planting_order' // 认种订单 + | 'contract_signed' // 合同签署 + | 'kyc_submitted' // KYC提交 + | 'kyc_approved' // KYC通过 + | 'kyc_rejected' // KYC拒绝 + | 'deposit' // 充值 + | 'withdrawal' // 提现 + | 'company_authorization' // 公司授权 + | 'benefit_applied' // 权益申请 + | 'system_update' // 系统更新 + | 'report_generated'; // 报表生成 export interface SystemActivityData { id?: bigint; diff --git a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts index 4cd475ee..620fa3a3 100644 --- a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts +++ b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts @@ -405,4 +405,370 @@ export class ActivityEventConsumerController { } return phone.slice(0, 3) + '****' + phone.slice(-4); } + + // ============================================================================ + // [2026-01-08] 新增:更多活动事件消费者 + // ============================================================================ + + /** + * 监听 KYC 提交事件 (identity-service) + * Topic: identity.KYCSubmitted + */ + @MessagePattern('identity.KYCSubmitted') + async handleKYCSubmitted( + @Payload() + message: { + eventId: string; + eventType: string; + aggregateId: string; + occurredAt: string; + payload: { + userId: string; + accountSequence?: string; + realName?: string; + idNumber?: string; + submittedAt: string; + }; + }, + ) { + this.logger.log(`Received identity.KYCSubmitted event`); + + try { + const { payload, aggregateId } = message; + + const created = await this.activityRepo.createIfNotExists({ + activityType: 'kyc_submitted' as ActivityType, + title: 'KYC认证提交', + description: `用户提交了实名认证申请`, + icon: '📋', + relatedUserId: BigInt(payload.userId), + relatedEntityType: 'kyc', + relatedEntityId: `kyc_submit_${aggregateId}`, + metadata: { + accountSequence: payload.accountSequence, + submittedAt: payload.submittedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for KYC submission: ${payload.userId}`); + } else { + this.logger.log(`Skipped duplicate KYC submission event: ${payload.userId}`); + } + } catch (error) { + this.logger.error(`Error recording KYC submission activity:`, error); + } + } + + /** + * 监听 KYC 通过事件 (identity-service) + * Topic: identity.KYCApproved + */ + @MessagePattern('identity.KYCApproved') + async handleKYCApproved( + @Payload() + message: { + eventId: string; + eventType: string; + aggregateId: string; + occurredAt: string; + payload: { + userId: string; + accountSequence?: string; + approvedAt: string; + }; + }, + ) { + this.logger.log(`Received identity.KYCApproved event`); + + try { + const { payload, aggregateId } = message; + + const created = await this.activityRepo.createIfNotExists({ + activityType: 'kyc_approved' as ActivityType, + title: 'KYC认证通过', + description: `用户实名认证已通过`, + icon: '✅', + relatedUserId: BigInt(payload.userId), + relatedEntityType: 'kyc', + relatedEntityId: `kyc_approve_${aggregateId}`, + metadata: { + accountSequence: payload.accountSequence, + approvedAt: payload.approvedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for KYC approval: ${payload.userId}`); + } else { + this.logger.log(`Skipped duplicate KYC approval event: ${payload.userId}`); + } + } catch (error) { + this.logger.error(`Error recording KYC approval activity:`, error); + } + } + + /** + * 监听 KYC 拒绝事件 (identity-service) + * Topic: identity.KYCRejected + */ + @MessagePattern('identity.KYCRejected') + async handleKYCRejected( + @Payload() + message: { + eventId: string; + eventType: string; + aggregateId: string; + occurredAt: string; + payload: { + userId: string; + accountSequence?: string; + reason?: string; + rejectedAt: string; + }; + }, + ) { + this.logger.log(`Received identity.KYCRejected event`); + + try { + const { payload, aggregateId } = message; + + const created = await this.activityRepo.createIfNotExists({ + activityType: 'kyc_rejected' as ActivityType, + title: 'KYC认证拒绝', + description: `用户实名认证被拒绝`, + icon: '❌', + relatedUserId: BigInt(payload.userId), + relatedEntityType: 'kyc', + relatedEntityId: `kyc_reject_${aggregateId}`, + metadata: { + accountSequence: payload.accountSequence, + reason: payload.reason, + rejectedAt: payload.rejectedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for KYC rejection: ${payload.userId}`); + } else { + this.logger.log(`Skipped duplicate KYC rejection event: ${payload.userId}`); + } + } catch (error) { + this.logger.error(`Error recording KYC rejection activity:`, error); + } + } + + /** + * 监听合同签署事件 (planting-service) + * Topic: contract.signed + */ + @MessagePattern('contract.signed') + async handleContractSigned( + @Payload() + message: { + orderNo: string; + userId: string; + accountSequence: string; + treeCount: number; + totalAmount: number; + provinceCode: string; + cityCode: string; + signedAt: string; + }, + ) { + this.logger.log(`Received contract.signed event: ${message.orderNo}`); + + try { + const created = await this.activityRepo.createIfNotExists({ + activityType: 'contract_signed' as ActivityType, + title: '合同签署', + description: `用户签署了 ${message.treeCount} 棵榴莲树认种合同`, + icon: '📝', + relatedUserId: BigInt(message.userId), + relatedEntityType: 'contract', + relatedEntityId: message.orderNo, + metadata: { + orderNo: message.orderNo, + accountSequence: message.accountSequence, + treeCount: message.treeCount, + totalAmount: message.totalAmount, + provinceCode: message.provinceCode, + cityCode: message.cityCode, + signedAt: message.signedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for contract signing: ${message.orderNo}`); + } else { + this.logger.log(`Skipped duplicate contract signing event: ${message.orderNo}`); + } + } catch (error) { + this.logger.error(`Error recording contract signing activity:`, error); + } + } + + /** + * 监听充值到账事件 (blockchain-service -> wallet-service) + * Topic: blockchain.deposit.credited + */ + @MessagePattern('blockchain.deposit.credited') + async handleDepositCredited( + @Payload() + message: { + depositId: string; + userId: string; + accountSequence?: string; + amount: string; + assetType: string; + txHash: string; + creditedAt: string; + }, + ) { + this.logger.log(`Received blockchain.deposit.credited event: ${message.depositId}`); + + try { + const created = await this.activityRepo.createIfNotExists({ + activityType: 'deposit' as ActivityType, + title: '充值到账', + description: `用户充值 ${message.amount} ${message.assetType}`, + icon: '💰', + relatedUserId: BigInt(message.userId), + relatedEntityType: 'deposit', + relatedEntityId: message.depositId, + metadata: { + depositId: message.depositId, + accountSequence: message.accountSequence, + amount: message.amount, + assetType: message.assetType, + txHash: message.txHash, + creditedAt: message.creditedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for deposit: ${message.depositId}`); + } else { + this.logger.log(`Skipped duplicate deposit event: ${message.depositId}`); + } + } catch (error) { + this.logger.error(`Error recording deposit activity:`, error); + } + } + + /** + * 监听提现完成事件 (wallet-service) + * Topic: wallet.withdrawals (eventType: wallet.withdrawal.completed) + */ + @MessagePattern('wallet.withdrawals') + async handleWithdrawalEvent( + @Payload() + message: { + eventType: string; + withdrawalId: string; + userId: string; + accountSequence?: string; + amount: string; + assetType: string; + status: string; + txHash?: string; + completedAt?: string; + requestedAt?: string; + }, + ) { + // 只处理提现完成事件 + if (message.eventType !== 'wallet.withdrawal.completed') { + return; + } + + this.logger.log(`Received wallet.withdrawal.completed event: ${message.withdrawalId}`); + + try { + const created = await this.activityRepo.createIfNotExists({ + activityType: 'withdrawal' as ActivityType, + title: '提现成功', + description: `用户提现 ${message.amount} ${message.assetType}`, + icon: '💸', + relatedUserId: BigInt(message.userId), + relatedEntityType: 'withdrawal', + relatedEntityId: message.withdrawalId, + metadata: { + withdrawalId: message.withdrawalId, + accountSequence: message.accountSequence, + amount: message.amount, + assetType: message.assetType, + txHash: message.txHash, + completedAt: message.completedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for withdrawal: ${message.withdrawalId}`); + } else { + this.logger.log(`Skipped duplicate withdrawal event: ${message.withdrawalId}`); + } + } catch (error) { + this.logger.error(`Error recording withdrawal activity:`, error); + } + } + + /** + * 监听权益申请事件 (authorization-service) + * Topic: authorization-events (eventType 包含 benefit 或 applied) + * 注意:此处理器复用 authorization-events topic,在 handleAuthorizationEvent 之外单独处理权益相关事件 + */ + @MessagePattern('authorization.benefit.applied') + async handleBenefitApplied( + @Payload() + message: { + eventId: string; + eventType: string; + aggregateId: string; + occurredAt: string; + payload: { + authorizationId: string; + userId: string; + accountSequence: string; + roleType: string; + regionCode: string; + regionName: string; + benefitType?: string; + appliedAt: string; + }; + }, + ) { + this.logger.log(`Received authorization.benefit.applied event`); + + try { + const { payload, aggregateId } = message; + const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); + + const created = await this.activityRepo.createIfNotExists({ + activityType: 'benefit_applied' as ActivityType, + title: '权益申请', + description: `${payload.regionName} ${roleTypeLabel} 申请权益`, + icon: '🎁', + relatedUserId: BigInt(payload.userId), + relatedEntityType: 'benefit', + relatedEntityId: `benefit_${aggregateId}`, + metadata: { + authorizationId: payload.authorizationId, + accountSequence: payload.accountSequence, + roleType: payload.roleType, + regionCode: payload.regionCode, + regionName: payload.regionName, + benefitType: payload.benefitType, + appliedAt: payload.appliedAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for benefit application: ${payload.accountSequence}`); + } else { + this.logger.log(`Skipped duplicate benefit application event: ${aggregateId}`); + } + } catch (error) { + this.logger.error(`Error recording benefit application activity:`, error); + } + } }