diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index 042d8a18..fa2378fe 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -15,19 +15,11 @@ export interface AdoptionSyncResult { * 认种订单 CDC 事件处理器 * 处理从1.0 planting-service同步过来的planting_orders数据 * - * 重要设计说明(符合业界最佳实践): + * 设计说明: * =========================================== - * - handle() 方法在事务内执行,只负责数据同步(synced_adoptions 表) - * - 返回 AdoptionSyncResult,包含需要计算算力的认种ID - * - 算力计算(calculateForAdoption)必须在事务提交后单独执行 - * - * 为什么不能在事务内调用 calculateForAdoption: - * 1. calculateForAdoption 内部使用独立的数据库连接查询数据 - * 2. 在 Serializable 隔离级别下,内部查询无法看到外部事务未提交的数据 - * 3. 这会导致 "Adoption not found" 错误,因为 synced_adoptions 还未提交 - * - * 参考:Kafka Idempotent Consumer & Transactional Outbox Pattern - * https://www.lydtechconsulting.com/blog/kafka-idempotent-consumer-transactional-outbox + * - handle() 方法100%同步数据,不跳过任何更新 + * - 算力计算只在 status 变为 MINING_ENABLED 时触发 + * - 算力计算在事务提交后执行(避免 Serializable 隔离级别的可见性问题) */ @Injectable() export class AdoptionSyncedHandler { @@ -92,15 +84,15 @@ export class AdoptionSyncedHandler { return null; } - // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city const orderId = data.order_id || data.id; const accountSequence = data.account_sequence || data.accountSequence; const treeCount = data.tree_count || data.treeCount; const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt; const selectedProvince = data.selected_province || data.selectedProvince || null; const selectedCity = data.selected_city || data.selectedCity || null; + const status = data.status ?? null; - this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); + this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, status=${status}`); if (!orderId || !accountSequence) { this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data }); @@ -109,8 +101,7 @@ export class AdoptionSyncedHandler { const originalAdoptionId = BigInt(orderId); - // 在事务中保存同步的认种订单数据 - this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`); + // 100%同步数据 await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, create: { @@ -118,10 +109,10 @@ export class AdoptionSyncedHandler { accountSequence, treeCount, adoptionDate: new Date(createdAt), - status: data.status ?? null, + status, selectedProvince, selectedCity, - contributionPerTree: new Decimal('1'), // 每棵树1算力 + contributionPerTree: new Decimal('1'), sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, @@ -129,21 +120,22 @@ export class AdoptionSyncedHandler { accountSequence, treeCount, adoptionDate: new Date(createdAt), - status: data.status ?? undefined, - selectedProvince: selectedProvince ?? undefined, - selectedCity: selectedCity ?? undefined, + status, + selectedProvince, + selectedCity, contributionPerTree: new Decimal('1'), sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); - this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`); + this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${status}`); - // 返回结果,供事务提交后计算算力 + // 只有 MINING_ENABLED 状态才触发算力计算 + const needsCalculation = status === 'MINING_ENABLED'; return { originalAdoptionId, - needsCalculation: true, + needsCalculation, }; } @@ -155,37 +147,22 @@ export class AdoptionSyncedHandler { const orderId = after.order_id || after.id; const originalAdoptionId = BigInt(orderId); - - this.logger.log(`[CDC] Adoption update: orderId=${orderId}`); - - // 检查是否已经处理过(使用事务客户端) - const existingAdoption = await tx.syncedAdoption.findUnique({ - where: { originalAdoptionId }, - }); - - if (existingAdoption?.contributionDistributed) { - // 如果树数量发生变化,需要重新计算(这种情况较少) - const newTreeCount = after.tree_count || after.treeCount; - if (existingAdoption.treeCount !== newTreeCount) { - this.logger.warn( - `[CDC] Adoption tree count changed after processing: ${originalAdoptionId}, old=${existingAdoption.treeCount}, new=${newTreeCount}. This requires special handling.`, - ); - // TODO: 实现树数量变化的处理逻辑 - } else { - this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`); - } - return null; - } - const accountSequence = after.account_sequence || after.accountSequence; const treeCount = after.tree_count || after.treeCount; const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt; const selectedProvince = after.selected_province || after.selectedProvince || null; const selectedCity = after.selected_city || after.selectedCity || null; + const newStatus = after.status ?? null; + const oldStatus = before?.status ?? null; - this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); + this.logger.log(`[CDC] Adoption update: orderId=${orderId}, status=${oldStatus} -> ${newStatus}`); - // 在事务中保存同步的认种订单数据 + // 查询现有记录 + const existingAdoption = await tx.syncedAdoption.findUnique({ + where: { originalAdoptionId }, + }); + + // 100%同步数据,不跳过任何更新 await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, create: { @@ -193,7 +170,7 @@ export class AdoptionSyncedHandler { accountSequence, treeCount, adoptionDate: new Date(createdAt), - status: after.status ?? null, + status: newStatus, selectedProvince, selectedCity, contributionPerTree: new Decimal('1'), @@ -204,21 +181,24 @@ export class AdoptionSyncedHandler { accountSequence, treeCount, adoptionDate: new Date(createdAt), - status: after.status ?? undefined, - selectedProvince: selectedProvince ?? undefined, - selectedCity: selectedCity ?? undefined, + status: newStatus, + selectedProvince, + selectedCity, contributionPerTree: new Decimal('1'), sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); - this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`); + this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${newStatus}`); + + // 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算 + const statusChangedToMiningEnabled = newStatus === 'MINING_ENABLED' && oldStatus !== 'MINING_ENABLED'; + const needsCalculation = statusChangedToMiningEnabled && !existingAdoption?.contributionDistributed; - // 只有尚未分配算力的认种才需要计算 return { originalAdoptionId, - needsCalculation: !existingAdoption?.contributionDistributed, + needsCalculation, }; } diff --git a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts index 765e97f6..834243e4 100644 --- a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts @@ -5,22 +5,7 @@ import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-cons * 引荐关系 CDC 事件处理器 * 处理从1.0 referral-service同步过来的referral_relationships数据 * - * 1.0 表结构 (referral_relationships): - * - user_id: BigInt (用户ID) - * - account_sequence: String (账户序列号) - * - referrer_id: BigInt (推荐人用户ID, 注意:不是 account_sequence) - * - ancestor_path: BigInt[] (祖先路径数组,存储 user_id) - * - depth: Int (层级深度) - * - * 2.0 存储策略: - * - 保存 original_user_id (1.0 的 user_id) - * - 保存 referrer_user_id (1.0 的 referrer_id) - * - 尝试查找 referrer 的 account_sequence 并保存 - * - ancestor_path 转换为逗号分隔的字符串 - * - * 注意:此 handler 现在接收外部传入的事务客户端(tx), - * 所有数据库操作都必须使用此事务客户端执行, - * 以确保幂等记录和业务数据在同一事务中处理。 + * 设计说明:100%同步数据,不跳过任何字段更新 */ @Injectable() export class ReferralSyncedHandler { @@ -61,12 +46,11 @@ export class ReferralSyncedHandler { return; } - // 1.0 字段映射 const accountSequence = data.account_sequence || data.accountSequence; const originalUserId = data.user_id || data.userId; const referrerUserId = data.referrer_id || data.referrerId; const ancestorPathArray = data.ancestor_path || data.ancestorPath; - const depth = data.depth || 0; + const depth = data.depth ?? 0; this.logger.log(`[CDC] Referral create: account=${accountSequence}, userId=${originalUserId}, referrerId=${referrerUserId}, depth=${depth}`); @@ -75,11 +59,9 @@ export class ReferralSyncedHandler { return; } - // 将 BigInt[] 转换为逗号分隔的字符串 const ancestorPath = this.convertAncestorPath(ancestorPathArray); - this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`); - // 尝试查找推荐人的 account_sequence(使用事务客户端) + // 尝试查找推荐人的 account_sequence let referrerAccountSequence: string | null = null; if (referrerUserId) { const referrer = await tx.syncedReferral.findFirst({ @@ -87,14 +69,10 @@ export class ReferralSyncedHandler { }); if (referrer) { referrerAccountSequence = referrer.accountSequence; - this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`); - } else { - this.logger.log(`[CDC] Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`); } } - // 使用外部事务客户端执行所有操作 - this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`); + // 100%同步数据 await tx.syncedReferral.upsert({ where: { accountSequence }, create: { @@ -108,17 +86,17 @@ export class ReferralSyncedHandler { syncedAt: new Date(), }, update: { - referrerAccountSequence: referrerAccountSequence ?? undefined, - referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined, - originalUserId: originalUserId ? BigInt(originalUserId) : undefined, - ancestorPath: ancestorPath ?? undefined, - depth: depth ?? undefined, + referrerAccountSequence, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, + originalUserId: originalUserId ? BigInt(originalUserId) : null, + ancestorPath, + depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); - this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`); + this.logger.log(`[CDC] Referral synced: ${accountSequence}, referrerId=${referrerUserId || 'none'}, depth=${depth}`); } private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { @@ -131,7 +109,7 @@ export class ReferralSyncedHandler { const originalUserId = data.user_id || data.userId; const referrerUserId = data.referrer_id || data.referrerId; const ancestorPathArray = data.ancestor_path || data.ancestorPath; - const depth = data.depth || 0; + const depth = data.depth ?? 0; this.logger.log(`[CDC] Referral update: account=${accountSequence}, referrerId=${referrerUserId}, depth=${depth}`); @@ -142,7 +120,7 @@ export class ReferralSyncedHandler { const ancestorPath = this.convertAncestorPath(ancestorPathArray); - // 尝试查找推荐人的 account_sequence(使用事务客户端) + // 尝试查找推荐人的 account_sequence let referrerAccountSequence: string | null = null; if (referrerUserId) { const referrer = await tx.syncedReferral.findFirst({ @@ -150,10 +128,10 @@ export class ReferralSyncedHandler { }); if (referrer) { referrerAccountSequence = referrer.accountSequence; - this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`); } } + // 100%同步数据 await tx.syncedReferral.upsert({ where: { accountSequence }, create: { @@ -167,17 +145,17 @@ export class ReferralSyncedHandler { syncedAt: new Date(), }, update: { - referrerAccountSequence: referrerAccountSequence ?? undefined, - referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined, - originalUserId: originalUserId ? BigInt(originalUserId) : undefined, - ancestorPath: ancestorPath ?? undefined, - depth: depth ?? undefined, + referrerAccountSequence, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, + originalUserId: originalUserId ? BigInt(originalUserId) : null, + ancestorPath, + depth, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); - this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`); + this.logger.log(`[CDC] Referral synced: ${accountSequence}`); } private async handleDelete(data: any): Promise { diff --git a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts index c436fe99..cfb61ba5 100644 --- a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts @@ -6,9 +6,7 @@ import { ContributionAccountAggregate } from '../../domain/aggregates/contributi * 用户 CDC 事件处理器 * 处理从身份服务同步过来的用户数据 * - * 注意:此 handler 现在接收外部传入的事务客户端(tx), - * 所有数据库操作都必须使用此事务客户端执行, - * 以确保幂等记录和业务数据在同一事务中处理。 + * 设计说明:100%同步数据,不跳过任何字段更新 */ @Injectable() export class UserSyncedHandler { @@ -49,22 +47,19 @@ export class UserSyncedHandler { return; } - // 兼容不同的字段命名(CDC 使用 snake_case) const userId = data.user_id ?? data.id; const accountSequence = data.account_sequence ?? data.accountSequence; const phone = data.phone_number ?? data.phone ?? null; - const status = data.status ?? 'ACTIVE'; + const status = data.status ?? null; - this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, phone=${phone}, status=${status}`); + this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, status=${status}`); if (!userId || !accountSequence) { this.logger.warn(`[CDC] Invalid user data: missing user_id or account_sequence`, { data }); return; } - // 使用外部事务客户端执行所有操作 - // 保存同步的用户数据 - this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`); + // 100%同步数据 await tx.syncedUser.upsert({ where: { accountSequence }, create: { @@ -76,8 +71,9 @@ export class UserSyncedHandler { syncedAt: new Date(), }, update: { - phone: phone ?? undefined, - status: status ?? undefined, + originalUserId: BigInt(userId), + phone, + status, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, @@ -95,11 +91,9 @@ export class UserSyncedHandler { data: persistData, }); this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`); - } else { - this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`); } - this.logger.log(`[CDC] User synced successfully: ${accountSequence}`); + this.logger.log(`[CDC] User synced: ${accountSequence}`); } private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { @@ -108,11 +102,10 @@ export class UserSyncedHandler { return; } - // 兼容不同的字段命名(CDC 使用 snake_case) const userId = data.user_id ?? data.id; const accountSequence = data.account_sequence ?? data.accountSequence; const phone = data.phone_number ?? data.phone ?? null; - const status = data.status ?? 'ACTIVE'; + const status = data.status ?? null; this.logger.log(`[CDC] User update: userId=${userId}, accountSequence=${accountSequence}, status=${status}`); @@ -121,6 +114,7 @@ export class UserSyncedHandler { return; } + // 100%同步数据 await tx.syncedUser.upsert({ where: { accountSequence }, create: { @@ -132,14 +126,15 @@ export class UserSyncedHandler { syncedAt: new Date(), }, update: { - phone: phone ?? undefined, - status: status ?? undefined, + originalUserId: BigInt(userId), + phone, + status, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, }); - this.logger.log(`[CDC] User updated successfully: ${accountSequence}`); + this.logger.log(`[CDC] User synced: ${accountSequence}`); } private async handleDelete(data: any): Promise {