import { Injectable, Logger } from '@nestjs/common'; import Decimal from 'decimal.js'; import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; import { ContributionRateService } from '../services/contribution-rate.service'; /** * 认种同步结果,用于事务提交后的算力计算 */ export interface AdoptionSyncResult { originalAdoptionId: bigint; needsCalculation: boolean; } /** * 认种订单 CDC 事件处理器 * 处理从1.0 planting-service同步过来的planting_orders数据 * * 设计说明: * =========================================== * - handle() 方法100%同步数据,不跳过任何更新 * - 算力计算只在 status 变为 MINING_ENABLED 时触发 * - 算力计算在事务提交后执行(避免 Serializable 隔离级别的可见性问题) */ @Injectable() export class AdoptionSyncedHandler { private readonly logger = new Logger(AdoptionSyncedHandler.name); constructor( private readonly contributionCalculationService: ContributionCalculationService, private readonly contributionRateService: ContributionRateService, ) {} /** * 处理认种 CDC 事件(在事务内执行) * 只负责数据同步,不调用算力计算 * @returns AdoptionSyncResult 包含需要计算算力的认种ID */ async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`); this.logger.debug(`[CDC] Adoption event payload: ${JSON.stringify(after || before)}`); // 获取认种日期,用于查询当日贡献值 const data = after || before; const adoptionDate = data?.created_at || data?.createdAt || data?.paid_at || data?.paidAt; // 在事务外获取当日每棵树的贡献值 let contributionPerTree = new Decimal('22617'); // 默认值 if (adoptionDate) { try { contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(adoptionDate)); this.logger.log(`[CDC] Got contributionPerTree for ${adoptionDate}: ${contributionPerTree.toString()}`); } catch (error) { this.logger.warn(`[CDC] Failed to get contributionPerTree, using default 22617`, error); } } try { switch (op) { case 'c': // create case 'r': // read (snapshot) return await this.handleCreate(after, event.sequenceNum, tx, contributionPerTree); case 'u': // update return await this.handleUpdate(after, before, event.sequenceNum, tx, contributionPerTree); case 'd': // delete await this.handleDelete(before); return null; default: this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); return null; } } catch (error) { this.logger.error(`[CDC] Failed to handle adoption CDC event, op=${op}, seq=${event.sequenceNum}`, error); throw error; } } /** * 在事务提交后计算算力(由 CDC dispatcher 在事务外调用) */ async calculateContributionAfterCommit(result: AdoptionSyncResult): Promise { if (!result || !result.needsCalculation) { return; } this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${result.originalAdoptionId}`); try { await this.contributionCalculationService.calculateForAdoption(result.originalAdoptionId); this.logger.log(`[CDC] Contribution calculation completed for adoption: ${result.originalAdoptionId}`); } catch (error) { // 算力计算失败不影响数据同步,后续可通过批量任务重试 this.logger.error(`[CDC] Failed to calculate contribution for adoption ${result.originalAdoptionId}`, error); } } private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient, contributionPerTree: Decimal): Promise { if (!data) { this.logger.warn(`[CDC] Adoption create: empty data received`); return null; } const orderId = data.order_id || data.id; const accountSequence = data.account_sequence || data.accountSequence; const treeCount = data.tree_count || data.treeCount; const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt; const selectedProvince = data.selected_province || data.selectedProvince || null; const selectedCity = data.selected_city || data.selectedCity || null; const status = data.status ?? null; this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, status=${status}, contributionPerTree=${contributionPerTree.toString()}`); if (!orderId || !accountSequence) { this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data }); return null; } const originalAdoptionId = BigInt(orderId); // 100%同步数据,使用真实的每棵树贡献值 await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, create: { originalAdoptionId, accountSequence, treeCount, adoptionDate: new Date(createdAt), status, selectedProvince, selectedCity, contributionPerTree, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, update: { accountSequence, treeCount, adoptionDate: new Date(createdAt), status, selectedProvince, selectedCity, contributionPerTree, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${status}`); // 只有 MINING_ENABLED 状态才触发算力计算 const needsCalculation = status === 'MINING_ENABLED'; return { originalAdoptionId, needsCalculation, }; } private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient, contributionPerTree: Decimal): Promise { if (!after) { this.logger.warn(`[CDC] Adoption update: empty after data received`); return null; } const orderId = after.order_id || after.id; const originalAdoptionId = BigInt(orderId); const accountSequence = after.account_sequence || after.accountSequence; const treeCount = after.tree_count || after.treeCount; const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt; const selectedProvince = after.selected_province || after.selectedProvince || null; const selectedCity = after.selected_city || after.selectedCity || null; const newStatus = after.status ?? null; const oldStatus = before?.status ?? null; this.logger.log(`[CDC] Adoption update: orderId=${orderId}, status=${oldStatus} -> ${newStatus}, contributionPerTree=${contributionPerTree.toString()}`); // 查询现有记录 const existingAdoption = await tx.syncedAdoption.findUnique({ where: { originalAdoptionId }, }); // 100%同步数据,使用真实的每棵树贡献值 await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, create: { originalAdoptionId, accountSequence, treeCount, adoptionDate: new Date(createdAt), status: newStatus, selectedProvince, selectedCity, contributionPerTree, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, update: { accountSequence, treeCount, adoptionDate: new Date(createdAt), status: newStatus, selectedProvince, selectedCity, contributionPerTree, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${newStatus}`); // 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算 const statusChangedToMiningEnabled = newStatus === 'MINING_ENABLED' && oldStatus !== 'MINING_ENABLED'; const needsCalculation = statusChangedToMiningEnabled && !existingAdoption?.contributionDistributed; return { originalAdoptionId, needsCalculation, }; } private async handleDelete(data: any): Promise { if (!data) { this.logger.warn(`[CDC] Adoption delete: empty data received`); return; } const orderId = data.order_id || data.id; // 认种删除需要特殊处理(回滚算力) // 但通常不会发生删除操作 this.logger.warn(`[CDC] Adoption delete event received: ${orderId}. This may require contribution rollback.`); } }