diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index 8b47b16b..537650a2 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -21,6 +21,9 @@ export class AdoptionSyncedHandler { async handle(event: CDCEvent): Promise { const { op, before, after } = event.payload; + this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`); + this.logger.debug(`[CDC] Adoption event payload: ${JSON.stringify(after || before)}`); + try { switch (op) { case 'c': // create @@ -34,16 +37,19 @@ export class AdoptionSyncedHandler { await this.handleDelete(before); break; default: - this.logger.warn(`Unknown CDC operation: ${op}`); + this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); } } catch (error) { - this.logger.error(`Failed to handle adoption CDC event`, error); + this.logger.error(`[CDC] Failed to handle adoption CDC event, op=${op}, seq=${event.sequenceNum}`, error); throw error; } } private async handleCreate(data: any, sequenceNum: bigint): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] Adoption create: empty data received`); + return; + } // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city const orderId = data.order_id || data.id; @@ -53,7 +59,15 @@ export class AdoptionSyncedHandler { const selectedProvince = data.selected_province || data.selectedProvince || null; const selectedCity = data.selected_city || data.selectedCity || null; + this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); + + if (!orderId || !accountSequence) { + this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data }); + return; + } + // 第一步:保存同步的认种订单数据 + this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`); await this.syncedDataRepository.upsertSyncedAdoption({ originalAdoptionId: BigInt(orderId), accountSequence: accountSequence, @@ -67,24 +81,29 @@ export class AdoptionSyncedHandler { }); // 第二步:触发算力计算(在单独的事务中执行) + this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`); try { await this.contributionCalculationService.calculateForAdoption(BigInt(orderId)); + this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`); } catch (error) { // 算力计算失败不影响数据同步,后续可通过批量任务重试 - this.logger.error(`Failed to calculate contribution for order ${orderId}`, error); + this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error); } - this.logger.log( - `Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`, - ); + this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`); } private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise { - if (!after) return; + if (!after) { + this.logger.warn(`[CDC] Adoption update: empty after data received`); + return; + } const orderId = after.order_id || after.id; const originalAdoptionId = BigInt(orderId); + this.logger.log(`[CDC] Adoption update: orderId=${orderId}`); + // 检查是否已经处理过 const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId); @@ -93,9 +112,11 @@ export class AdoptionSyncedHandler { const newTreeCount = after.tree_count || after.treeCount; if (existingAdoption.treeCount !== newTreeCount) { this.logger.warn( - `Planting order tree count changed after processing: ${originalAdoptionId}. This requires special handling.`, + `[CDC] Adoption tree count changed after processing: ${originalAdoptionId}, old=${existingAdoption.treeCount}, new=${newTreeCount}. This requires special handling.`, ); // TODO: 实现树数量变化的处理逻辑 + } else { + this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`); } return; } @@ -106,6 +127,8 @@ export class AdoptionSyncedHandler { const selectedProvince = after.selected_province || after.selectedProvince || null; const selectedCity = after.selected_city || after.selectedCity || null; + this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); + // 第一步:保存同步的认种订单数据 await this.syncedDataRepository.upsertSyncedAdoption({ originalAdoptionId: originalAdoptionId, @@ -121,20 +144,26 @@ export class AdoptionSyncedHandler { // 第二步:触发算力计算(在单独的事务中执行) if (!existingAdoption?.contributionDistributed) { + this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`); try { await this.contributionCalculationService.calculateForAdoption(originalAdoptionId); + this.logger.log(`[CDC] Contribution calculation completed for updated adoption: ${orderId}`); } catch (error) { - this.logger.error(`Failed to calculate contribution for order ${orderId}`, error); + this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error); } } - this.logger.debug(`Planting order updated: ${originalAdoptionId}`); + this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`); } private async handleDelete(data: any): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] Adoption delete: empty data received`); + return; + } + const orderId = data.order_id || data.id; // 认种删除需要特殊处理(回滚算力) // 但通常不会发生删除操作 - this.logger.warn(`Adoption delete event received: ${data.id}. This may require contribution rollback.`); + this.logger.warn(`[CDC] Adoption delete event received: ${orderId}. This may require contribution rollback.`); } } diff --git a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts index bf4344d7..af32b509 100644 --- a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts @@ -32,6 +32,9 @@ export class ReferralSyncedHandler { async handle(event: CDCEvent): Promise { const { op, before, after } = event.payload; + this.logger.log(`[CDC] Referral event received: op=${op}, seq=${event.sequenceNum}`); + this.logger.debug(`[CDC] Referral event payload: ${JSON.stringify(after || before)}`); + try { switch (op) { case 'c': // create @@ -45,16 +48,19 @@ export class ReferralSyncedHandler { await this.handleDelete(before); break; default: - this.logger.warn(`Unknown CDC operation: ${op}`); + this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); } } catch (error) { - this.logger.error(`Failed to handle referral CDC event`, error); + this.logger.error(`[CDC] Failed to handle referral CDC event, op=${op}, seq=${event.sequenceNum}`, error); throw error; } } private async handleCreate(data: any, sequenceNum: bigint): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] Referral create: empty data received`); + return; + } // 1.0 字段映射 const accountSequence = data.account_sequence || data.accountSequence; @@ -63,8 +69,16 @@ export class ReferralSyncedHandler { const ancestorPathArray = data.ancestor_path || data.ancestorPath; const depth = data.depth || 0; + this.logger.log(`[CDC] Referral create: account=${accountSequence}, userId=${originalUserId}, referrerId=${referrerUserId}, depth=${depth}`); + + if (!accountSequence) { + this.logger.warn(`[CDC] Invalid referral data: missing account_sequence`, { data }); + return; + } + // 将 BigInt[] 转换为逗号分隔的字符串 const ancestorPath = this.convertAncestorPath(ancestorPathArray); + this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`); // 尝试查找推荐人的 account_sequence let referrerAccountSequence: string | null = null; @@ -72,14 +86,14 @@ export class ReferralSyncedHandler { const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); if (referrer) { referrerAccountSequence = referrer.accountSequence; + this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`); } else { - this.logger.debug( - `Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`, - ); + this.logger.log(`[CDC] Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`); } } await this.unitOfWork.executeInTransaction(async () => { + this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`); await this.syncedDataRepository.upsertSyncedReferral({ accountSequence, referrerAccountSequence, @@ -91,13 +105,14 @@ export class ReferralSyncedHandler { }); }); - this.logger.log( - `Referral synced: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}`, - ); + this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`); } private async handleUpdate(data: any, sequenceNum: bigint): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] Referral update: empty data received`); + return; + } const accountSequence = data.account_sequence || data.accountSequence; const originalUserId = data.user_id || data.userId; @@ -105,6 +120,13 @@ export class ReferralSyncedHandler { const ancestorPathArray = data.ancestor_path || data.ancestorPath; const depth = data.depth || 0; + this.logger.log(`[CDC] Referral update: account=${accountSequence}, referrerId=${referrerUserId}, depth=${depth}`); + + if (!accountSequence) { + this.logger.warn(`[CDC] Invalid referral update data: missing account_sequence`, { data }); + return; + } + const ancestorPath = this.convertAncestorPath(ancestorPathArray); // 尝试查找推荐人的 account_sequence @@ -113,6 +135,7 @@ export class ReferralSyncedHandler { const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); if (referrer) { referrerAccountSequence = referrer.accountSequence; + this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`); } } @@ -126,13 +149,17 @@ export class ReferralSyncedHandler { sourceSequenceNum: sequenceNum, }); - this.logger.debug(`Referral updated: ${accountSequence}`); + this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`); } private async handleDelete(data: any): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] Referral delete: empty data received`); + return; + } + const accountSequence = data.account_sequence || data.accountSequence; // 引荐关系删除需要特殊处理 - this.logger.warn(`Referral delete event received: ${data.account_sequence || data.accountSequence}`); + this.logger.warn(`[CDC] Referral delete event received: ${accountSequence} (not processed, keeping history)`); } /** diff --git a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts index 9d8cc817..7adb506c 100644 --- a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts @@ -22,6 +22,9 @@ export class UserSyncedHandler { async handle(event: CDCEvent): Promise { const { op, before, after } = event.payload; + this.logger.log(`[CDC] User event received: op=${op}, seq=${event.sequenceNum}`); + this.logger.debug(`[CDC] User event payload: ${JSON.stringify(after || before)}`); + try { switch (op) { case 'c': // create @@ -35,16 +38,19 @@ export class UserSyncedHandler { await this.handleDelete(before); break; default: - this.logger.warn(`Unknown CDC operation: ${op}`); + this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); } } catch (error) { - this.logger.error(`Failed to handle user CDC event`, error); + this.logger.error(`[CDC] Failed to handle user CDC event, op=${op}, seq=${event.sequenceNum}`, error); throw error; } } private async handleCreate(data: any, sequenceNum: bigint): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] User create: empty data received`); + return; + } // 兼容不同的字段命名(CDC 使用 snake_case) const userId = data.user_id ?? data.id; @@ -52,13 +58,16 @@ export class UserSyncedHandler { const phone = data.phone_number ?? data.phone ?? null; const status = data.status ?? 'ACTIVE'; + this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, phone=${phone}, status=${status}`); + if (!userId || !accountSequence) { - this.logger.warn(`Invalid user data: missing user_id or account_sequence`, { data }); + this.logger.warn(`[CDC] Invalid user data: missing user_id or account_sequence`, { data }); return; } await this.unitOfWork.executeInTransaction(async () => { // 保存同步的用户数据 + this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`); await this.syncedDataRepository.upsertSyncedUser({ originalUserId: BigInt(userId), accountSequence, @@ -73,15 +82,20 @@ export class UserSyncedHandler { if (!existingAccount) { const newAccount = ContributionAccountAggregate.create(accountSequence); await this.contributionAccountRepository.save(newAccount); - this.logger.log(`Created contribution account for user: ${accountSequence}`); + this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`); + } else { + this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`); } }); - this.logger.debug(`User synced: ${accountSequence}`); + this.logger.log(`[CDC] User synced successfully: ${accountSequence}`); } private async handleUpdate(data: any, sequenceNum: bigint): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] User update: empty data received`); + return; + } // 兼容不同的字段命名(CDC 使用 snake_case) const userId = data.user_id ?? data.id; @@ -89,8 +103,10 @@ export class UserSyncedHandler { const phone = data.phone_number ?? data.phone ?? null; const status = data.status ?? 'ACTIVE'; + this.logger.log(`[CDC] User update: userId=${userId}, accountSequence=${accountSequence}, status=${status}`); + if (!userId || !accountSequence) { - this.logger.warn(`Invalid user update data: missing user_id or account_sequence`, { data }); + this.logger.warn(`[CDC] Invalid user update data: missing user_id or account_sequence`, { data }); return; } @@ -102,13 +118,16 @@ export class UserSyncedHandler { sourceSequenceNum: sequenceNum, }); - this.logger.debug(`User updated: ${accountSequence}`); + this.logger.log(`[CDC] User updated successfully: ${accountSequence}`); } private async handleDelete(data: any): Promise { - if (!data) return; + if (!data) { + this.logger.warn(`[CDC] User delete: empty data received`); + return; + } const accountSequence = data.account_sequence ?? data.accountSequence; // 用户删除一般不处理,保留历史数据 - this.logger.debug(`User delete event received: ${accountSequence}`); + this.logger.log(`[CDC] User delete event received: ${accountSequence} (not processed, keeping history)`); } } diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 01192aff..ac05b917 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -136,10 +136,14 @@ export class CDCConsumerService implements OnModuleInit { try { if (!message.value) { + this.logger.warn(`[CDC] Empty message received from topic ${topic}, partition ${partition}, offset ${message.offset}`); return; } - const rawData = JSON.parse(message.value.toString()); + const rawValue = message.value.toString(); + this.logger.debug(`[CDC] Raw message from ${topic}: ${rawValue.substring(0, 500)}${rawValue.length > 500 ? '...' : ''}`); + + const rawData = JSON.parse(rawValue); // Debezium ExtractNewRecordState 转换后的扁平化格式 // 元数据字段: __op, __table, __source_ts_ms, __deleted @@ -149,6 +153,8 @@ export class CDCConsumerService implements OnModuleInit { const sourceTsMs = rawData.__source_ts_ms || 0; const deleted = rawData.__deleted === 'true' || rawData.__deleted === true; + this.logger.log(`[CDC] Message parsed: topic=${topic}, op=${op}, table=${table}, offset=${message.offset}`); + // 从原始数据中移除元数据字段,剩下的就是业务数据 const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData; @@ -171,16 +177,18 @@ export class CDCConsumerService implements OnModuleInit { const parts = topic.split('.'); const tableName = table || parts[parts.length - 1]; + this.logger.log(`[CDC] Dispatching to handler: tableName=${tableName}, op=${op}`); + const handler = this.handlers.get(tableName); if (handler) { await handler(event); - this.logger.debug(`Processed CDC event for table ${tableName}, op: ${op}`); + this.logger.log(`[CDC] Successfully processed event for table ${tableName}, op=${op}, offset=${message.offset}`); } else { - this.logger.warn(`No handler registered for table: ${tableName}`); + this.logger.warn(`[CDC] No handler registered for table: ${tableName}. Available handlers: ${Array.from(this.handlers.keys()).join(', ')}`); } } catch (error) { this.logger.error( - `Error processing CDC message from topic ${topic}, partition ${partition}`, + `[CDC] Error processing message from topic ${topic}, partition ${partition}, offset ${message.offset}`, error, ); // 根据业务需求决定是否重试或记录到死信队列