import { Injectable, Logger } from '@nestjs/common'; import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; /** * 引荐关系 CDC 事件处理器 * 处理从1.0 referral-service同步过来的referral_relationships数据 * * 设计说明:100%同步数据,不跳过任何字段更新 */ @Injectable() export class ReferralSyncedHandler { private readonly logger = new Logger(ReferralSyncedHandler.name); constructor() {} async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] Referral event received: op=${op}, seq=${event.sequenceNum}`); this.logger.debug(`[CDC] Referral event payload: ${JSON.stringify(after || before)}`); try { switch (op) { case 'c': // create case 'r': // read (snapshot) await this.handleCreate(after, event.sequenceNum, tx); break; case 'u': // update await this.handleUpdate(after, event.sequenceNum, tx); break; case 'd': // delete await this.handleDelete(before); break; default: this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); } } catch (error) { this.logger.error(`[CDC] Failed to handle referral CDC event, op=${op}, seq=${event.sequenceNum}`, error); throw error; } } private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Referral create: empty data received`); return; } const accountSequence = data.account_sequence || data.accountSequence; const originalUserId = data.user_id || data.userId; const referrerUserId = data.referrer_id || data.referrerId; const ancestorPathArray = data.ancestor_path || data.ancestorPath; const depth = data.depth ?? 0; this.logger.log(`[CDC] Referral create: account=${accountSequence}, userId=${originalUserId}, referrerId=${referrerUserId}, depth=${depth}`); if (!accountSequence) { this.logger.warn(`[CDC] Invalid referral data: missing account_sequence`, { data }); return; } const ancestorPath = this.convertAncestorPath(ancestorPathArray); // 尝试查找推荐人的 account_sequence let referrerAccountSequence: string | null = null; if (referrerUserId) { const referrer = await tx.syncedReferral.findFirst({ where: { originalUserId: BigInt(referrerUserId) }, }); if (referrer) { referrerAccountSequence = referrer.accountSequence; } } // 100%同步数据 await tx.syncedReferral.upsert({ where: { accountSequence }, create: { accountSequence, referrerAccountSequence, referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, originalUserId: originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, update: { referrerAccountSequence, referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, originalUserId: originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); this.logger.log(`[CDC] Referral synced: ${accountSequence}, referrerId=${referrerUserId || 'none'}, depth=${depth}`); } private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Referral update: empty data received`); return; } const accountSequence = data.account_sequence || data.accountSequence; const originalUserId = data.user_id || data.userId; const referrerUserId = data.referrer_id || data.referrerId; const ancestorPathArray = data.ancestor_path || data.ancestorPath; const depth = data.depth ?? 0; this.logger.log(`[CDC] Referral update: account=${accountSequence}, referrerId=${referrerUserId}, depth=${depth}`); if (!accountSequence) { this.logger.warn(`[CDC] Invalid referral update data: missing account_sequence`, { data }); return; } const ancestorPath = this.convertAncestorPath(ancestorPathArray); // 尝试查找推荐人的 account_sequence let referrerAccountSequence: string | null = null; if (referrerUserId) { const referrer = await tx.syncedReferral.findFirst({ where: { originalUserId: BigInt(referrerUserId) }, }); if (referrer) { referrerAccountSequence = referrer.accountSequence; } } // 100%同步数据 await tx.syncedReferral.upsert({ where: { accountSequence }, create: { accountSequence, referrerAccountSequence, referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, originalUserId: originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, update: { referrerAccountSequence, referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, originalUserId: originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); this.logger.log(`[CDC] Referral synced: ${accountSequence}`); } private async handleDelete(data: any): Promise { if (!data) { this.logger.warn(`[CDC] Referral delete: empty data received`); return; } const accountSequence = data.account_sequence || data.accountSequence; // 引荐关系删除需要特殊处理 this.logger.warn(`[CDC] Referral delete event received: ${accountSequence} (not processed, keeping history)`); } /** * 将 BigInt[] 数组转换为逗号分隔的字符串 * @param ancestorPath BigInt 数组或 null * @returns 逗号分隔的字符串或 null */ private convertAncestorPath(ancestorPath: any): string | null { if (!ancestorPath) return null; // 处理可能的数组格式 if (Array.isArray(ancestorPath)) { return ancestorPath.map((id) => String(id)).join(','); } // 如果已经是字符串 (可能是 PostgreSQL 数组的字符串表示) if (typeof ancestorPath === 'string') { // PostgreSQL 数组格式: {1,2,3} 或 [1,2,3] const cleaned = ancestorPath.replace(/[{}\[\]]/g, ''); return cleaned || null; } return null; } }