fix(contribution): use ancestor_path to build upline chain for TEAM_LEVEL distribution

Root cause: CDC sync order issue caused referrerAccountSequence to be null,
resulting in empty ancestor chain and all TEAM_LEVEL contributions going to unallocated.

Changes:
- buildAncestorChainFromReferral: Uses ancestor_path (contains complete user_id chain) to build upline chain
- getDirectReferrer: Gets direct referrer using ancestor_path as fallback
- findAncestorChain: Updated to use ancestor_path when available

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 20:14:46 -08:00
parent dbf97ae487
commit 471702d562
2 changed files with 114 additions and 19 deletions

View File

@ -56,9 +56,11 @@ export class ContributionCalculationService {
const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(adoption.accountSequence); const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(adoption.accountSequence);
// 获取上线链条最多15级 // 获取上线链条最多15级
// 注意:不依赖 referrerAccountSequence可能为 null 由于 CDC 同步顺序问题)
// 改用 ancestorPath 来构建上线链
let ancestorChain: SyncedReferral[] = []; let ancestorChain: SyncedReferral[] = [];
if (userReferral?.referrerAccountSequence) { if (userReferral) {
ancestorChain = await this.buildAncestorChain(userReferral.referrerAccountSequence); ancestorChain = await this.buildAncestorChainFromReferral(userReferral);
} }
// 获取认种人的算力账户(用于判断 TEAM_BONUS 解锁状态) // 获取认种人的算力账户(用于判断 TEAM_BONUS 解锁状态)
@ -88,8 +90,10 @@ export class ContributionCalculationService {
await this.updateAdopterUnlockStatus(adoption.accountSequence); await this.updateAdopterUnlockStatus(adoption.accountSequence);
// 更新直接上线的解锁状态 // 更新直接上线的解锁状态
if (userReferral?.referrerAccountSequence) { // 使用 ancestorPath 或 referrerAccountSequence 来获取直接上线
await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence); const directReferrer = await this.getDirectReferrer(userReferral);
if (directReferrer) {
await this.updateReferrerUnlockStatus(directReferrer);
} }
// 发布分配结果到 Kafka通过 Outbox 模式) // 发布分配结果到 Kafka通过 Outbox 模式)
@ -148,7 +152,71 @@ export class ContributionCalculationService {
} }
/** /**
* 线 * 线 accountSequence
* 使 ancestorPath referrerAccountSequence nullCDC
*/
private async getDirectReferrer(userReferral: SyncedReferral | null): Promise<string | null> {
if (!userReferral) {
return null;
}
// 优先使用 referrerAccountSequence
if (userReferral.referrerAccountSequence) {
return userReferral.referrerAccountSequence;
}
// 回退:使用 ancestorPath 的第一个元素(直接上线的 userId
if (userReferral.ancestorPath) {
const ancestorUserIds = userReferral.ancestorPath
.split(',')
.filter((id) => id.trim());
if (ancestorUserIds.length > 0) {
const directReferrerUserId = BigInt(ancestorUserIds[0].trim());
const referrerReferral = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(directReferrerUserId);
if (referrerReferral) {
return referrerReferral.accountSequence;
}
}
}
return null;
}
/**
* referral 线
* 使 ancestor_path referrerAccountSequence CDC
*/
private async buildAncestorChainFromReferral(userReferral: SyncedReferral): Promise<SyncedReferral[]> {
const ancestors: SyncedReferral[] = [];
// 优先使用 ancestor_path 来构建上线链
if (userReferral.ancestorPath) {
const ancestorUserIds = userReferral.ancestorPath
.split(',')
.filter((id) => id.trim())
.map((id) => BigInt(id.trim()));
// ancestor_path 存储的是从直接上线到根的 user_id 列表
for (let i = 0; i < ancestorUserIds.length && ancestors.length < 15; i++) {
const ancestorUserId = ancestorUserIds[i];
const ancestorReferral = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(ancestorUserId);
if (ancestorReferral) {
ancestors.push(ancestorReferral);
} else {
this.logger.warn(`Ancestor referral not found for userId: ${ancestorUserId}`);
}
}
} else if (userReferral.referrerAccountSequence) {
// 回退方案:使用 referrerAccountSequence 逐级查找
return await this.syncedDataRepository.findAncestorChain(userReferral.referrerAccountSequence, 15);
}
return ancestors;
}
/**
* 线
*/ */
private async buildAncestorChain(startAccountSequence: string): Promise<SyncedReferral[]> { private async buildAncestorChain(startAccountSequence: string): Promise<SyncedReferral[]> {
return await this.syncedDataRepository.findAncestorChain(startAccountSequence, 15); return await this.syncedDataRepository.findAncestorChain(startAccountSequence, 15);

View File

@ -255,25 +255,52 @@ export class SyncedDataRepository implements ISyncedDataRepository {
async findAncestorChain(accountSequence: string, maxLevel: number = 15): Promise<SyncedReferral[]> { async findAncestorChain(accountSequence: string, maxLevel: number = 15): Promise<SyncedReferral[]> {
const ancestors: SyncedReferral[] = []; const ancestors: SyncedReferral[] = [];
let currentSequence = accountSequence;
for (let i = 0; i < maxLevel; i++) { // 首先获取起始账户的 referral 记录,以获取 ancestor_path
// 获取当前账户的 referral 记录(包含该账户作为上线的信息) const startReferral = await this.findSyncedReferralByAccountSequence(accountSequence);
const referral = await this.findSyncedReferralByAccountSequence(currentSequence); if (!startReferral) {
if (!referral) { // 如果起始账户没有 referral 记录,尝试直接返回该账户作为第一级上线
break; // (用于处理 CDC 顺序问题:上线的 referral 记录可能存在但 referrerAccountSequence 为空)
return ancestors;
}
// 把起始账户(直接上线)加入链条
ancestors.push(startReferral);
// 如果有 ancestor_path使用它来构建完整的上线链
if (startReferral.ancestorPath) {
const ancestorUserIds = startReferral.ancestorPath
.split(',')
.filter((id) => id.trim())
.map((id) => BigInt(id.trim()));
// ancestor_path 是从直接上线到根的 user_id 列表
// 我们需要找到对应的 SyncedReferral 记录
for (let i = 0; i < ancestorUserIds.length && ancestors.length < maxLevel; i++) {
const ancestorUserId = ancestorUserIds[i];
const ancestorReferral = await this.findSyncedReferralByOriginalUserId(ancestorUserId);
if (ancestorReferral) {
ancestors.push(ancestorReferral);
}
} }
} else {
// 如果没有 ancestor_path回退到使用 referrerAccountSequence 逐级查找
let currentSequence = startReferral.referrerAccountSequence;
// 把当前账户加入上线链条 while (currentSequence && ancestors.length < maxLevel) {
ancestors.push(referral); const referral = await this.findSyncedReferralByAccountSequence(currentSequence);
if (!referral) {
break;
}
// 如果没有更上级的推荐人,终止 ancestors.push(referral);
if (!referral.referrerAccountSequence) {
break; if (!referral.referrerAccountSequence) {
break;
}
currentSequence = referral.referrerAccountSequence;
} }
// 继续向上追溯
currentSequence = referral.referrerAccountSequence;
} }
return ancestors; return ancestors;