From 471702d5625d35fdc0ea89d9c33e9825e331c805 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 20:14:46 -0800 Subject: [PATCH] fix(contribution): use ancestor_path to build upline chain for TEAM_LEVEL distribution Root cause: CDC sync order issue caused referrerAccountSequence to be null, resulting in empty ancestor chain and all TEAM_LEVEL contributions going to unallocated. Changes: - buildAncestorChainFromReferral: Uses ancestor_path (contains complete user_id chain) to build upline chain - getDirectReferrer: Gets direct referrer using ancestor_path as fallback - findAncestorChain: Updated to use ancestor_path when available Co-Authored-By: Claude Opus 4.5 --- .../contribution-calculation.service.ts | 78 +++++++++++++++++-- .../repositories/synced-data.repository.ts | 55 +++++++++---- 2 files changed, 114 insertions(+), 19 deletions(-) diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index dca61bc0..74185c3a 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -56,9 +56,11 @@ export class ContributionCalculationService { const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(adoption.accountSequence); // 获取上线链条(最多15级) + // 注意:不依赖 referrerAccountSequence(可能为 null 由于 CDC 同步顺序问题) + // 改用 ancestorPath 来构建上线链 let ancestorChain: SyncedReferral[] = []; - if (userReferral?.referrerAccountSequence) { - ancestorChain = await this.buildAncestorChain(userReferral.referrerAccountSequence); + if (userReferral) { + ancestorChain = await this.buildAncestorChainFromReferral(userReferral); } // 获取认种人的算力账户(用于判断 TEAM_BONUS 解锁状态) @@ -88,8 +90,10 @@ export class ContributionCalculationService { await this.updateAdopterUnlockStatus(adoption.accountSequence); // 更新直接上线的解锁状态 - if (userReferral?.referrerAccountSequence) { - await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence); + // 使用 ancestorPath 或 referrerAccountSequence 来获取直接上线 + const directReferrer = await this.getDirectReferrer(userReferral); + if (directReferrer) { + await this.updateReferrerUnlockStatus(directReferrer); } // 发布分配结果到 Kafka(通过 Outbox 模式) @@ -148,7 +152,71 @@ export class ContributionCalculationService { } /** - * 构建上线链条 + * 获取直接上线的 accountSequence + * 优先使用 ancestorPath,因为 referrerAccountSequence 可能为 null(CDC 同步顺序问题) + */ + private async getDirectReferrer(userReferral: SyncedReferral | null): Promise { + if (!userReferral) { + return null; + } + + // 优先使用 referrerAccountSequence + if (userReferral.referrerAccountSequence) { + return userReferral.referrerAccountSequence; + } + + // 回退:使用 ancestorPath 的第一个元素(直接上线的 userId) + if (userReferral.ancestorPath) { + const ancestorUserIds = userReferral.ancestorPath + .split(',') + .filter((id) => id.trim()); + + if (ancestorUserIds.length > 0) { + const directReferrerUserId = BigInt(ancestorUserIds[0].trim()); + const referrerReferral = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(directReferrerUserId); + if (referrerReferral) { + return referrerReferral.accountSequence; + } + } + } + + return null; + } + + /** + * 从 referral 记录构建上线链条 + * 使用 ancestor_path 而不是 referrerAccountSequence,以避免 CDC 同步顺序问题 + */ + private async buildAncestorChainFromReferral(userReferral: SyncedReferral): Promise { + const ancestors: SyncedReferral[] = []; + + // 优先使用 ancestor_path 来构建上线链 + if (userReferral.ancestorPath) { + const ancestorUserIds = userReferral.ancestorPath + .split(',') + .filter((id) => id.trim()) + .map((id) => BigInt(id.trim())); + + // ancestor_path 存储的是从直接上线到根的 user_id 列表 + for (let i = 0; i < ancestorUserIds.length && ancestors.length < 15; i++) { + const ancestorUserId = ancestorUserIds[i]; + const ancestorReferral = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(ancestorUserId); + if (ancestorReferral) { + ancestors.push(ancestorReferral); + } else { + this.logger.warn(`Ancestor referral not found for userId: ${ancestorUserId}`); + } + } + } else if (userReferral.referrerAccountSequence) { + // 回退方案:使用 referrerAccountSequence 逐级查找 + return await this.syncedDataRepository.findAncestorChain(userReferral.referrerAccountSequence, 15); + } + + return ancestors; + } + + /** + * 构建上线链条(保留向后兼容) */ private async buildAncestorChain(startAccountSequence: string): Promise { return await this.syncedDataRepository.findAncestorChain(startAccountSequence, 15); diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts index c8f986f1..e784a29e 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts @@ -255,25 +255,52 @@ export class SyncedDataRepository implements ISyncedDataRepository { async findAncestorChain(accountSequence: string, maxLevel: number = 15): Promise { const ancestors: SyncedReferral[] = []; - let currentSequence = accountSequence; - for (let i = 0; i < maxLevel; i++) { - // 获取当前账户的 referral 记录(包含该账户作为上线的信息) - const referral = await this.findSyncedReferralByAccountSequence(currentSequence); - if (!referral) { - break; + // 首先获取起始账户的 referral 记录,以获取 ancestor_path + const startReferral = await this.findSyncedReferralByAccountSequence(accountSequence); + if (!startReferral) { + // 如果起始账户没有 referral 记录,尝试直接返回该账户作为第一级上线 + // (用于处理 CDC 顺序问题:上线的 referral 记录可能存在但 referrerAccountSequence 为空) + return ancestors; + } + + // 把起始账户(直接上线)加入链条 + ancestors.push(startReferral); + + // 如果有 ancestor_path,使用它来构建完整的上线链 + if (startReferral.ancestorPath) { + const ancestorUserIds = startReferral.ancestorPath + .split(',') + .filter((id) => id.trim()) + .map((id) => BigInt(id.trim())); + + // ancestor_path 是从直接上线到根的 user_id 列表 + // 我们需要找到对应的 SyncedReferral 记录 + for (let i = 0; i < ancestorUserIds.length && ancestors.length < maxLevel; i++) { + const ancestorUserId = ancestorUserIds[i]; + const ancestorReferral = await this.findSyncedReferralByOriginalUserId(ancestorUserId); + if (ancestorReferral) { + ancestors.push(ancestorReferral); + } } + } else { + // 如果没有 ancestor_path,回退到使用 referrerAccountSequence 逐级查找 + let currentSequence = startReferral.referrerAccountSequence; - // 把当前账户加入上线链条 - ancestors.push(referral); + while (currentSequence && ancestors.length < maxLevel) { + const referral = await this.findSyncedReferralByAccountSequence(currentSequence); + if (!referral) { + break; + } - // 如果没有更上级的推荐人,终止 - if (!referral.referrerAccountSequence) { - break; + ancestors.push(referral); + + if (!referral.referrerAccountSequence) { + break; + } + + currentSequence = referral.referrerAccountSequence; } - - // 继续向上追溯 - currentSequence = referral.referrerAccountSequence; } return ancestors;