rwadurian/backend/services/planting-service/src/infrastructure/kafka/kyc-verified-event.consumer.ts

132 lines
4.3 KiB
TypeScript

import { Controller, Logger, Inject } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { ContractSigningService } from '../../application/services/contract-signing.service';
import {
IPlantingOrderRepository,
PLANTING_ORDER_REPOSITORY,
} from '../../domain/repositories';
import { IdentityServiceClient } from '../external/identity-service.client';
/**
* KYCVerified 事件消息结构
*/
interface KYCVerifiedEventMessage {
eventId: string;
eventType: string;
occurredAt: string;
aggregateId: string;
payload: {
userId: string;
accountSequence: string;
verifiedAt: string;
};
_outbox?: {
id: string;
aggregateId: string;
eventType: string;
};
}
/**
* KYC 认证完成事件消费者
*
* 监听 identity.KYCVerified 事件
* 当用户完成实名认证后,为其之前已支付但未创建合同的订单补创建合同
*/
@Controller()
export class KycVerifiedEventConsumer {
private readonly logger = new Logger(KycVerifiedEventConsumer.name);
constructor(
private readonly contractSigningService: ContractSigningService,
@Inject(PLANTING_ORDER_REPOSITORY)
private readonly orderRepo: IPlantingOrderRepository,
private readonly identityServiceClient: IdentityServiceClient,
) {}
/**
* 监听 KYCVerified 事件
*/
@EventPattern('identity.KYCVerified')
async handleKycVerified(@Payload() message: KYCVerifiedEventMessage): Promise<void> {
const userId = message.payload?.userId || message.aggregateId;
const accountSequence = message.payload?.accountSequence;
this.logger.log(`[KYC-VERIFIED] Received KYCVerified event for user: ${userId}, accountSequence: ${accountSequence}`);
if (!accountSequence) {
this.logger.error(`[KYC-VERIFIED] Missing accountSequence in event payload for userId: ${userId}`);
return;
}
try {
const userIdBigint = BigInt(userId);
this.logger.log(
`[KYC-VERIFIED] User ${accountSequence} completed KYC, checking for pending contracts...`,
);
// 2. 获取用户 KYC 信息
const kycInfo = await this.identityServiceClient.getUserKycInfo(accountSequence);
if (!kycInfo) {
this.logger.warn(`[KYC-VERIFIED] KYC info not available for user: ${accountSequence}`);
return;
}
// 3. 查找用户已支付但未创建合同的订单
const ordersWithoutContract = await this.orderRepo.findPaidOrdersWithoutContract(userIdBigint);
if (ordersWithoutContract.length === 0) {
this.logger.log(`[KYC-VERIFIED] No pending orders for user: ${accountSequence}`);
return;
}
this.logger.log(
`[KYC-VERIFIED] Found ${ordersWithoutContract.length} orders without contract for user: ${accountSequence}`,
);
// 4. 为每个订单创建合同签署任务
for (const order of ordersWithoutContract) {
try {
const provinceCitySelection = order.provinceCitySelection;
if (!provinceCitySelection) {
this.logger.warn(
`[KYC-VERIFIED] Order ${order.orderNo} has no province/city selection, skipping`,
);
continue;
}
await this.contractSigningService.createSigningTask({
orderNo: order.orderNo,
userId: order.userId,
accountSequence,
treeCount: order.treeCount.value,
totalAmount: order.totalAmount,
provinceCode: provinceCitySelection.provinceCode,
provinceName: provinceCitySelection.provinceName,
cityCode: provinceCitySelection.cityCode,
cityName: provinceCitySelection.cityName,
userPhoneNumber: kycInfo.phoneNumber,
userRealName: kycInfo.realName,
userIdCardNumber: kycInfo.idCardNumber,
});
this.logger.log(`[KYC-VERIFIED] Created contract for order: ${order.orderNo}`);
} catch (error) {
this.logger.error(
`[KYC-VERIFIED] Failed to create contract for order ${order.orderNo}:`,
error,
);
// 继续处理下一个订单
}
}
this.logger.log(
`[KYC-VERIFIED] Completed processing KYCVerified event for user: ${accountSequence}`,
);
} catch (error) {
this.logger.error(`[KYC-VERIFIED] Error processing KYCVerified event for user ${userId}:`, error);
}
}
}