fix(contribution): 100% sync CDC data and fix calculation trigger timing

- Remove conditional skip logic in CDC handlers
- Always sync all field updates (including status changes)
- Trigger contribution calculation only when status becomes MINING_ENABLED
- Fix user and referral handlers to sync all fields without skipping

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 16:55:25 -08:00
parent 20eabbb85f
commit 3999d7cc51
3 changed files with 68 additions and 115 deletions

View File

@ -15,19 +15,11 @@ export interface AdoptionSyncResult {
* CDC
* 1.0 planting-service同步过来的planting_orders数据
*
*
*
* ===========================================
* - handle() synced_adoptions
* - AdoptionSyncResultID
* - calculateForAdoption
*
* calculateForAdoption
* 1. calculateForAdoption 使
* 2. Serializable
* 3. "Adoption not found" synced_adoptions
*
* Kafka Idempotent Consumer & Transactional Outbox Pattern
* https://www.lydtechconsulting.com/blog/kafka-idempotent-consumer-transactional-outbox
* - handle() 100%
* - status MINING_ENABLED
* - Serializable
*/
@Injectable()
export class AdoptionSyncedHandler {
@ -92,15 +84,15 @@ export class AdoptionSyncedHandler {
return null;
}
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city
const orderId = data.order_id || data.id;
const accountSequence = data.account_sequence || data.accountSequence;
const treeCount = data.tree_count || data.treeCount;
const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt;
const selectedProvince = data.selected_province || data.selectedProvince || null;
const selectedCity = data.selected_city || data.selectedCity || null;
const status = data.status ?? null;
this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, status=${status}`);
if (!orderId || !accountSequence) {
this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data });
@ -109,8 +101,7 @@ export class AdoptionSyncedHandler {
const originalAdoptionId = BigInt(orderId);
// 在事务中保存同步的认种订单数据
this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`);
// 100%同步数据
await tx.syncedAdoption.upsert({
where: { originalAdoptionId },
create: {
@ -118,10 +109,10 @@ export class AdoptionSyncedHandler {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? null,
status,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'), // 每棵树1算力
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
@ -129,21 +120,22 @@ export class AdoptionSyncedHandler {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? undefined,
selectedProvince: selectedProvince ?? undefined,
selectedCity: selectedCity ?? undefined,
status,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`);
this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${status}`);
// 返回结果,供事务提交后计算算力
// 只有 MINING_ENABLED 状态才触发算力计算
const needsCalculation = status === 'MINING_ENABLED';
return {
originalAdoptionId,
needsCalculation: true,
needsCalculation,
};
}
@ -155,37 +147,22 @@ export class AdoptionSyncedHandler {
const orderId = after.order_id || after.id;
const originalAdoptionId = BigInt(orderId);
this.logger.log(`[CDC] Adoption update: orderId=${orderId}`);
// 检查是否已经处理过(使用事务客户端)
const existingAdoption = await tx.syncedAdoption.findUnique({
where: { originalAdoptionId },
});
if (existingAdoption?.contributionDistributed) {
// 如果树数量发生变化,需要重新计算(这种情况较少)
const newTreeCount = after.tree_count || after.treeCount;
if (existingAdoption.treeCount !== newTreeCount) {
this.logger.warn(
`[CDC] Adoption tree count changed after processing: ${originalAdoptionId}, old=${existingAdoption.treeCount}, new=${newTreeCount}. This requires special handling.`,
);
// TODO: 实现树数量变化的处理逻辑
} else {
this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`);
}
return null;
}
const accountSequence = after.account_sequence || after.accountSequence;
const treeCount = after.tree_count || after.treeCount;
const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt;
const selectedProvince = after.selected_province || after.selectedProvince || null;
const selectedCity = after.selected_city || after.selectedCity || null;
const newStatus = after.status ?? null;
const oldStatus = before?.status ?? null;
this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
this.logger.log(`[CDC] Adoption update: orderId=${orderId}, status=${oldStatus} -> ${newStatus}`);
// 在事务中保存同步的认种订单数据
// 查询现有记录
const existingAdoption = await tx.syncedAdoption.findUnique({
where: { originalAdoptionId },
});
// 100%同步数据,不跳过任何更新
await tx.syncedAdoption.upsert({
where: { originalAdoptionId },
create: {
@ -193,7 +170,7 @@ export class AdoptionSyncedHandler {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? null,
status: newStatus,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'),
@ -204,21 +181,24 @@ export class AdoptionSyncedHandler {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? undefined,
selectedProvince: selectedProvince ?? undefined,
selectedCity: selectedCity ?? undefined,
status: newStatus,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`);
this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${newStatus}`);
// 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算
const statusChangedToMiningEnabled = newStatus === 'MINING_ENABLED' && oldStatus !== 'MINING_ENABLED';
const needsCalculation = statusChangedToMiningEnabled && !existingAdoption?.contributionDistributed;
// 只有尚未分配算力的认种才需要计算
return {
originalAdoptionId,
needsCalculation: !existingAdoption?.contributionDistributed,
needsCalculation,
};
}

View File

@ -5,22 +5,7 @@ import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-cons
* CDC
* 1.0 referral-service同步过来的referral_relationships数据
*
* 1.0 (referral_relationships):
* - user_id: BigInt (ID)
* - account_sequence: String ()
* - referrer_id: BigInt (ID, account_sequence)
* - ancestor_path: BigInt[] ( user_id)
* - depth: Int ()
*
* 2.0 :
* - original_user_id (1.0 user_id)
* - referrer_user_id (1.0 referrer_id)
* - referrer account_sequence
* - ancestor_path
*
* handler tx
* 使
*
* 100%
*/
@Injectable()
export class ReferralSyncedHandler {
@ -61,12 +46,11 @@ export class ReferralSyncedHandler {
return;
}
// 1.0 字段映射
const accountSequence = data.account_sequence || data.accountSequence;
const originalUserId = data.user_id || data.userId;
const referrerUserId = data.referrer_id || data.referrerId;
const ancestorPathArray = data.ancestor_path || data.ancestorPath;
const depth = data.depth || 0;
const depth = data.depth ?? 0;
this.logger.log(`[CDC] Referral create: account=${accountSequence}, userId=${originalUserId}, referrerId=${referrerUserId}, depth=${depth}`);
@ -75,11 +59,9 @@ export class ReferralSyncedHandler {
return;
}
// 将 BigInt[] 转换为逗号分隔的字符串
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`);
// 尝试查找推荐人的 account_sequence(使用事务客户端)
// 尝试查找推荐人的 account_sequence
let referrerAccountSequence: string | null = null;
if (referrerUserId) {
const referrer = await tx.syncedReferral.findFirst({
@ -87,14 +69,10 @@ export class ReferralSyncedHandler {
});
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`);
} else {
this.logger.log(`[CDC] Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`);
}
}
// 使用外部事务客户端执行所有操作
this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`);
// 100%同步数据
await tx.syncedReferral.upsert({
where: { accountSequence },
create: {
@ -108,17 +86,17 @@ export class ReferralSyncedHandler {
syncedAt: new Date(),
},
update: {
referrerAccountSequence: referrerAccountSequence ?? undefined,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined,
originalUserId: originalUserId ? BigInt(originalUserId) : undefined,
ancestorPath: ancestorPath ?? undefined,
depth: depth ?? undefined,
referrerAccountSequence,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : null,
originalUserId: originalUserId ? BigInt(originalUserId) : null,
ancestorPath,
depth,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`);
this.logger.log(`[CDC] Referral synced: ${accountSequence}, referrerId=${referrerUserId || 'none'}, depth=${depth}`);
}
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
@ -131,7 +109,7 @@ export class ReferralSyncedHandler {
const originalUserId = data.user_id || data.userId;
const referrerUserId = data.referrer_id || data.referrerId;
const ancestorPathArray = data.ancestor_path || data.ancestorPath;
const depth = data.depth || 0;
const depth = data.depth ?? 0;
this.logger.log(`[CDC] Referral update: account=${accountSequence}, referrerId=${referrerUserId}, depth=${depth}`);
@ -142,7 +120,7 @@ export class ReferralSyncedHandler {
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
// 尝试查找推荐人的 account_sequence(使用事务客户端)
// 尝试查找推荐人的 account_sequence
let referrerAccountSequence: string | null = null;
if (referrerUserId) {
const referrer = await tx.syncedReferral.findFirst({
@ -150,10 +128,10 @@ export class ReferralSyncedHandler {
});
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`);
}
}
// 100%同步数据
await tx.syncedReferral.upsert({
where: { accountSequence },
create: {
@ -167,17 +145,17 @@ export class ReferralSyncedHandler {
syncedAt: new Date(),
},
update: {
referrerAccountSequence: referrerAccountSequence ?? undefined,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined,
originalUserId: originalUserId ? BigInt(originalUserId) : undefined,
ancestorPath: ancestorPath ?? undefined,
depth: depth ?? undefined,
referrerAccountSequence,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : null,
originalUserId: originalUserId ? BigInt(originalUserId) : null,
ancestorPath,
depth,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`);
this.logger.log(`[CDC] Referral synced: ${accountSequence}`);
}
private async handleDelete(data: any): Promise<void> {

View File

@ -6,9 +6,7 @@ import { ContributionAccountAggregate } from '../../domain/aggregates/contributi
* CDC
*
*
* handler tx
* 使
*
* 100%
*/
@Injectable()
export class UserSyncedHandler {
@ -49,22 +47,19 @@ export class UserSyncedHandler {
return;
}
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
const accountSequence = data.account_sequence ?? data.accountSequence;
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
const status = data.status ?? null;
this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, phone=${phone}, status=${status}`);
this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, status=${status}`);
if (!userId || !accountSequence) {
this.logger.warn(`[CDC] Invalid user data: missing user_id or account_sequence`, { data });
return;
}
// 使用外部事务客户端执行所有操作
// 保存同步的用户数据
this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`);
// 100%同步数据
await tx.syncedUser.upsert({
where: { accountSequence },
create: {
@ -76,8 +71,9 @@ export class UserSyncedHandler {
syncedAt: new Date(),
},
update: {
phone: phone ?? undefined,
status: status ?? undefined,
originalUserId: BigInt(userId),
phone,
status,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
@ -95,11 +91,9 @@ export class UserSyncedHandler {
data: persistData,
});
this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`);
} else {
this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`);
}
this.logger.log(`[CDC] User synced successfully: ${accountSequence}`);
this.logger.log(`[CDC] User synced: ${accountSequence}`);
}
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
@ -108,11 +102,10 @@ export class UserSyncedHandler {
return;
}
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
const accountSequence = data.account_sequence ?? data.accountSequence;
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
const status = data.status ?? null;
this.logger.log(`[CDC] User update: userId=${userId}, accountSequence=${accountSequence}, status=${status}`);
@ -121,6 +114,7 @@ export class UserSyncedHandler {
return;
}
// 100%同步数据
await tx.syncedUser.upsert({
where: { accountSequence },
create: {
@ -132,14 +126,15 @@ export class UserSyncedHandler {
syncedAt: new Date(),
},
update: {
phone: phone ?? undefined,
status: status ?? undefined,
originalUserId: BigInt(userId),
phone,
status,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] User updated successfully: ${accountSequence}`);
this.logger.log(`[CDC] User synced: ${accountSequence}`);
}
private async handleDelete(data: any): Promise<void> {