From 68841abbf4182fe4226b40673dc86725d6d593c8 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 8 Jan 2026 06:01:08 -0800 Subject: [PATCH] =?UTF-8?q?fix(reporting-service):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E7=9A=84=20Kafka=20topic=20=E8=AE=A2?= =?UTF-8?q?=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 充值事件:blockchain.deposit.credited → wallet.acks (过滤 wallet.deposit.credited) - 权益事件:authorization.benefit.applied → 整合到 authorization-events (过滤 benefit.activated) 原来订阅的 topic 不存在,导致事件无法消费。现已修复为正确的 topic。 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../activity-event-consumer.controller.ts | 137 +++++++++--------- 1 file changed, 66 insertions(+), 71 deletions(-) diff --git a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts index 620fa3a3..efde5806 100644 --- a/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts +++ b/backend/services/reporting-service/src/infrastructure/kafka/activity-event-consumer.controller.ts @@ -207,6 +207,7 @@ export class ActivityEventConsumerController { * 监听授权角色事件 (authorization-service) * Topic: authorization-events * [2026-01-08] 更新:添加幂等性检查,防止重复记录 + * [2026-01-08] 更新:添加权益激活事件处理(authorization.benefit.activated) */ @MessagePattern('authorization-events') async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) { @@ -215,6 +216,12 @@ export class ActivityEventConsumerController { try { const { payload, eventType } = message; + // 处理权益激活事件 + if (eventType.includes('benefit.activated')) { + await this.handleBenefitActivatedInternal(message); + return; + } + // 根据事件类型决定是否记录活动 if (eventType.includes('authorized') || eventType.includes('Authorized')) { const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); @@ -272,6 +279,42 @@ export class ActivityEventConsumerController { } } + /** + * 内部方法:处理权益激活事件 + */ + private async handleBenefitActivatedInternal(message: AuthorizationRoleEvent) { + try { + const { payload, aggregateId } = message; + const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); + + const created = await this.activityRepo.createIfNotExists({ + activityType: 'benefit_applied' as ActivityType, + title: '权益激活', + description: `${payload.regionName} ${roleTypeLabel} 激活权益`, + icon: '🎁', + relatedUserId: BigInt(payload.userId), + relatedEntityType: 'benefit', + relatedEntityId: `benefit_${aggregateId}`, + metadata: { + authorizationId: payload.authorizationId, + accountSequence: payload.accountSequence, + roleType: payload.roleType, + regionCode: payload.regionCode, + regionName: payload.regionName, + activatedAt: message.occurredAt, + }, + }); + + if (created) { + this.logger.log(`Activity recorded for benefit activation: ${payload.accountSequence}`); + } else { + this.logger.log(`Skipped duplicate benefit activation event: ${aggregateId}`); + } + } catch (error) { + this.logger.error(`Error recording benefit activation activity:`, error); + } + } + /** * 监听认种订单支付成功事件 (referral-service 或 planting-service) * Topic: planting.order.paid @@ -609,29 +652,40 @@ export class ActivityEventConsumerController { } /** - * 监听充值到账事件 (blockchain-service -> wallet-service) - * Topic: blockchain.deposit.credited + * 监听充值到账事件 (wallet-service 发布到 wallet.acks) + * Topic: wallet.acks (eventType: wallet.deposit.credited) */ - @MessagePattern('blockchain.deposit.credited') - async handleDepositCredited( + @MessagePattern('wallet.acks') + async handleWalletAcks( @Payload() message: { - depositId: string; - userId: string; + eventType: string; + depositId?: string; + userId?: string; accountSequence?: string; - amount: string; - assetType: string; - txHash: string; - creditedAt: string; + amount?: string; + assetType?: string; + txHash?: string; + creditedAt?: string; }, ) { - this.logger.log(`Received blockchain.deposit.credited event: ${message.depositId}`); + // 只处理充值到账事件 + if (message.eventType !== 'wallet.deposit.credited') { + return; + } + + this.logger.log(`Received wallet.deposit.credited event: ${message.depositId}`); try { + if (!message.depositId || !message.userId) { + this.logger.warn(`Missing required fields in deposit event`); + return; + } + const created = await this.activityRepo.createIfNotExists({ activityType: 'deposit' as ActivityType, title: '充值到账', - description: `用户充值 ${message.amount} ${message.assetType}`, + description: `用户充值 ${message.amount || '0'} ${message.assetType || 'USDT'}`, icon: '💰', relatedUserId: BigInt(message.userId), relatedEntityType: 'deposit', @@ -712,63 +766,4 @@ export class ActivityEventConsumerController { } } - /** - * 监听权益申请事件 (authorization-service) - * Topic: authorization-events (eventType 包含 benefit 或 applied) - * 注意:此处理器复用 authorization-events topic,在 handleAuthorizationEvent 之外单独处理权益相关事件 - */ - @MessagePattern('authorization.benefit.applied') - async handleBenefitApplied( - @Payload() - message: { - eventId: string; - eventType: string; - aggregateId: string; - occurredAt: string; - payload: { - authorizationId: string; - userId: string; - accountSequence: string; - roleType: string; - regionCode: string; - regionName: string; - benefitType?: string; - appliedAt: string; - }; - }, - ) { - this.logger.log(`Received authorization.benefit.applied event`); - - try { - const { payload, aggregateId } = message; - const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); - - const created = await this.activityRepo.createIfNotExists({ - activityType: 'benefit_applied' as ActivityType, - title: '权益申请', - description: `${payload.regionName} ${roleTypeLabel} 申请权益`, - icon: '🎁', - relatedUserId: BigInt(payload.userId), - relatedEntityType: 'benefit', - relatedEntityId: `benefit_${aggregateId}`, - metadata: { - authorizationId: payload.authorizationId, - accountSequence: payload.accountSequence, - roleType: payload.roleType, - regionCode: payload.regionCode, - regionName: payload.regionName, - benefitType: payload.benefitType, - appliedAt: payload.appliedAt, - }, - }); - - if (created) { - this.logger.log(`Activity recorded for benefit application: ${payload.accountSequence}`); - } else { - this.logger.log(`Skipped duplicate benefit application event: ${aggregateId}`); - } - } catch (error) { - this.logger.error(`Error recording benefit application activity:`, error); - } - } }