fix(reporting-service): 修复错误的 Kafka topic 订阅

- 充值事件:blockchain.deposit.credited → wallet.acks (过滤 wallet.deposit.credited)
- 权益事件:authorization.benefit.applied → 整合到 authorization-events (过滤 benefit.activated)

原来订阅的 topic 不存在,导致事件无法消费。现已修复为正确的 topic。

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-08 06:01:08 -08:00
parent 38fff077dd
commit 68841abbf4
1 changed files with 66 additions and 71 deletions

View File

@ -207,6 +207,7 @@ export class ActivityEventConsumerController {
* (authorization-service) * (authorization-service)
* Topic: authorization-events * Topic: authorization-events
* [2026-01-08] * [2026-01-08]
* [2026-01-08] authorization.benefit.activated
*/ */
@MessagePattern('authorization-events') @MessagePattern('authorization-events')
async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) { async handleAuthorizationEvent(@Payload() message: AuthorizationRoleEvent) {
@ -215,6 +216,12 @@ export class ActivityEventConsumerController {
try { try {
const { payload, eventType } = message; const { payload, eventType } = message;
// 处理权益激活事件
if (eventType.includes('benefit.activated')) {
await this.handleBenefitActivatedInternal(message);
return;
}
// 根据事件类型决定是否记录活动 // 根据事件类型决定是否记录活动
if (eventType.includes('authorized') || eventType.includes('Authorized')) { if (eventType.includes('authorized') || eventType.includes('Authorized')) {
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType); const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
@ -272,6 +279,42 @@ export class ActivityEventConsumerController {
} }
} }
/**
*
*/
private async handleBenefitActivatedInternal(message: AuthorizationRoleEvent) {
try {
const { payload, aggregateId } = message;
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
const created = await this.activityRepo.createIfNotExists({
activityType: 'benefit_applied' as ActivityType,
title: '权益激活',
description: `${payload.regionName} ${roleTypeLabel} 激活权益`,
icon: '🎁',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'benefit',
relatedEntityId: `benefit_${aggregateId}`,
metadata: {
authorizationId: payload.authorizationId,
accountSequence: payload.accountSequence,
roleType: payload.roleType,
regionCode: payload.regionCode,
regionName: payload.regionName,
activatedAt: message.occurredAt,
},
});
if (created) {
this.logger.log(`Activity recorded for benefit activation: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate benefit activation event: ${aggregateId}`);
}
} catch (error) {
this.logger.error(`Error recording benefit activation activity:`, error);
}
}
/** /**
* (referral-service planting-service) * (referral-service planting-service)
* Topic: planting.order.paid * Topic: planting.order.paid
@ -609,29 +652,40 @@ export class ActivityEventConsumerController {
} }
/** /**
* (blockchain-service -> wallet-service) * (wallet-service wallet.acks)
* Topic: blockchain.deposit.credited * Topic: wallet.acks (eventType: wallet.deposit.credited)
*/ */
@MessagePattern('blockchain.deposit.credited') @MessagePattern('wallet.acks')
async handleDepositCredited( async handleWalletAcks(
@Payload() @Payload()
message: { message: {
depositId: string; eventType: string;
userId: string; depositId?: string;
userId?: string;
accountSequence?: string; accountSequence?: string;
amount: string; amount?: string;
assetType: string; assetType?: string;
txHash: string; txHash?: string;
creditedAt: string; creditedAt?: string;
}, },
) { ) {
this.logger.log(`Received blockchain.deposit.credited event: ${message.depositId}`); // 只处理充值到账事件
if (message.eventType !== 'wallet.deposit.credited') {
return;
}
this.logger.log(`Received wallet.deposit.credited event: ${message.depositId}`);
try { try {
if (!message.depositId || !message.userId) {
this.logger.warn(`Missing required fields in deposit event`);
return;
}
const created = await this.activityRepo.createIfNotExists({ const created = await this.activityRepo.createIfNotExists({
activityType: 'deposit' as ActivityType, activityType: 'deposit' as ActivityType,
title: '充值到账', title: '充值到账',
description: `用户充值 ${message.amount} ${message.assetType}`, description: `用户充值 ${message.amount || '0'} ${message.assetType || 'USDT'}`,
icon: '💰', icon: '💰',
relatedUserId: BigInt(message.userId), relatedUserId: BigInt(message.userId),
relatedEntityType: 'deposit', relatedEntityType: 'deposit',
@ -712,63 +766,4 @@ export class ActivityEventConsumerController {
} }
} }
/**
* (authorization-service)
* Topic: authorization-events (eventType benefit applied)
* authorization-events topic handleAuthorizationEvent
*/
@MessagePattern('authorization.benefit.applied')
async handleBenefitApplied(
@Payload()
message: {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: string;
payload: {
authorizationId: string;
userId: string;
accountSequence: string;
roleType: string;
regionCode: string;
regionName: string;
benefitType?: string;
appliedAt: string;
};
},
) {
this.logger.log(`Received authorization.benefit.applied event`);
try {
const { payload, aggregateId } = message;
const roleTypeLabel = this.getRoleTypeLabel(payload.roleType);
const created = await this.activityRepo.createIfNotExists({
activityType: 'benefit_applied' as ActivityType,
title: '权益申请',
description: `${payload.regionName} ${roleTypeLabel} 申请权益`,
icon: '🎁',
relatedUserId: BigInt(payload.userId),
relatedEntityType: 'benefit',
relatedEntityId: `benefit_${aggregateId}`,
metadata: {
authorizationId: payload.authorizationId,
accountSequence: payload.accountSequence,
roleType: payload.roleType,
regionCode: payload.regionCode,
regionName: payload.regionName,
benefitType: payload.benefitType,
appliedAt: payload.appliedAt,
},
});
if (created) {
this.logger.log(`Activity recorded for benefit application: ${payload.accountSequence}`);
} else {
this.logger.log(`Skipped duplicate benefit application event: ${aggregateId}`);
}
} catch (error) {
this.logger.error(`Error recording benefit application activity:`, error);
}
}
} }