feat(contribution-service): enhance CDC event logging for debugging

Add detailed [CDC] prefixed logs to all CDC handlers:
- cdc-consumer.service.ts: log message parsing, handler dispatch
- user-synced.handler.ts: log user sync operations with field details
- adoption-synced.handler.ts: log adoption sync and contribution calc
- referral-synced.handler.ts: log referral relationship sync

All logs use consistent [CDC] prefix for easy filtering.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 06:41:24 -08:00
parent 9642901710
commit c0d0088b8e
4 changed files with 124 additions and 41 deletions

View File

@ -21,6 +21,9 @@ export class AdoptionSyncedHandler {
async handle(event: CDCEvent): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`);
this.logger.debug(`[CDC] Adoption event payload: ${JSON.stringify(after || before)}`);
try {
switch (op) {
case 'c': // create
@ -34,16 +37,19 @@ export class AdoptionSyncedHandler {
await this.handleDelete(before);
break;
default:
this.logger.warn(`Unknown CDC operation: ${op}`);
this.logger.warn(`[CDC] Unknown CDC operation: ${op}`);
}
} catch (error) {
this.logger.error(`Failed to handle adoption CDC event`, error);
this.logger.error(`[CDC] Failed to handle adoption CDC event, op=${op}, seq=${event.sequenceNum}`, error);
throw error;
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] Adoption create: empty data received`);
return;
}
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city
const orderId = data.order_id || data.id;
@ -53,7 +59,15 @@ export class AdoptionSyncedHandler {
const selectedProvince = data.selected_province || data.selectedProvince || null;
const selectedCity = data.selected_city || data.selectedCity || null;
this.logger.log(`[CDC] Adoption create: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
if (!orderId || !accountSequence) {
this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data });
return;
}
// 第一步:保存同步的认种订单数据
this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`);
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: BigInt(orderId),
accountSequence: accountSequence,
@ -67,24 +81,29 @@ export class AdoptionSyncedHandler {
});
// 第二步:触发算力计算(在单独的事务中执行)
this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`);
try {
await this.contributionCalculationService.calculateForAdoption(BigInt(orderId));
this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`);
} catch (error) {
// 算力计算失败不影响数据同步,后续可通过批量任务重试
this.logger.error(`Failed to calculate contribution for order ${orderId}`, error);
this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error);
}
this.logger.log(
`Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`,
);
this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`);
}
private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise<void> {
if (!after) return;
if (!after) {
this.logger.warn(`[CDC] Adoption update: empty after data received`);
return;
}
const orderId = after.order_id || after.id;
const originalAdoptionId = BigInt(orderId);
this.logger.log(`[CDC] Adoption update: orderId=${orderId}`);
// 检查是否已经处理过
const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId);
@ -93,9 +112,11 @@ export class AdoptionSyncedHandler {
const newTreeCount = after.tree_count || after.treeCount;
if (existingAdoption.treeCount !== newTreeCount) {
this.logger.warn(
`Planting order tree count changed after processing: ${originalAdoptionId}. This requires special handling.`,
`[CDC] Adoption tree count changed after processing: ${originalAdoptionId}, old=${existingAdoption.treeCount}, new=${newTreeCount}. This requires special handling.`,
);
// TODO: 实现树数量变化的处理逻辑
} else {
this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`);
}
return;
}
@ -106,6 +127,8 @@ export class AdoptionSyncedHandler {
const selectedProvince = after.selected_province || after.selectedProvince || null;
const selectedCity = after.selected_city || after.selectedCity || null;
this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: originalAdoptionId,
@ -121,20 +144,26 @@ export class AdoptionSyncedHandler {
// 第二步:触发算力计算(在单独的事务中执行)
if (!existingAdoption?.contributionDistributed) {
this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`);
try {
await this.contributionCalculationService.calculateForAdoption(originalAdoptionId);
this.logger.log(`[CDC] Contribution calculation completed for updated adoption: ${orderId}`);
} catch (error) {
this.logger.error(`Failed to calculate contribution for order ${orderId}`, error);
this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error);
}
}
this.logger.debug(`Planting order updated: ${originalAdoptionId}`);
this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`);
}
private async handleDelete(data: any): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] Adoption delete: empty data received`);
return;
}
const orderId = data.order_id || data.id;
// 认种删除需要特殊处理(回滚算力)
// 但通常不会发生删除操作
this.logger.warn(`Adoption delete event received: ${data.id}. This may require contribution rollback.`);
this.logger.warn(`[CDC] Adoption delete event received: ${orderId}. This may require contribution rollback.`);
}
}

View File

@ -32,6 +32,9 @@ export class ReferralSyncedHandler {
async handle(event: CDCEvent): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] Referral event received: op=${op}, seq=${event.sequenceNum}`);
this.logger.debug(`[CDC] Referral event payload: ${JSON.stringify(after || before)}`);
try {
switch (op) {
case 'c': // create
@ -45,16 +48,19 @@ export class ReferralSyncedHandler {
await this.handleDelete(before);
break;
default:
this.logger.warn(`Unknown CDC operation: ${op}`);
this.logger.warn(`[CDC] Unknown CDC operation: ${op}`);
}
} catch (error) {
this.logger.error(`Failed to handle referral CDC event`, error);
this.logger.error(`[CDC] Failed to handle referral CDC event, op=${op}, seq=${event.sequenceNum}`, error);
throw error;
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] Referral create: empty data received`);
return;
}
// 1.0 字段映射
const accountSequence = data.account_sequence || data.accountSequence;
@ -63,8 +69,16 @@ export class ReferralSyncedHandler {
const ancestorPathArray = data.ancestor_path || data.ancestorPath;
const depth = data.depth || 0;
this.logger.log(`[CDC] Referral create: account=${accountSequence}, userId=${originalUserId}, referrerId=${referrerUserId}, depth=${depth}`);
if (!accountSequence) {
this.logger.warn(`[CDC] Invalid referral data: missing account_sequence`, { data });
return;
}
// 将 BigInt[] 转换为逗号分隔的字符串
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`);
// 尝试查找推荐人的 account_sequence
let referrerAccountSequence: string | null = null;
@ -72,14 +86,14 @@ export class ReferralSyncedHandler {
const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId));
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`);
} else {
this.logger.debug(
`Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`,
);
this.logger.log(`[CDC] Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`);
}
}
await this.unitOfWork.executeInTransaction(async () => {
this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`);
await this.syncedDataRepository.upsertSyncedReferral({
accountSequence,
referrerAccountSequence,
@ -91,13 +105,14 @@ export class ReferralSyncedHandler {
});
});
this.logger.log(
`Referral synced: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}`,
);
this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`);
}
private async handleUpdate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] Referral update: empty data received`);
return;
}
const accountSequence = data.account_sequence || data.accountSequence;
const originalUserId = data.user_id || data.userId;
@ -105,6 +120,13 @@ export class ReferralSyncedHandler {
const ancestorPathArray = data.ancestor_path || data.ancestorPath;
const depth = data.depth || 0;
this.logger.log(`[CDC] Referral update: account=${accountSequence}, referrerId=${referrerUserId}, depth=${depth}`);
if (!accountSequence) {
this.logger.warn(`[CDC] Invalid referral update data: missing account_sequence`, { data });
return;
}
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
// 尝试查找推荐人的 account_sequence
@ -113,6 +135,7 @@ export class ReferralSyncedHandler {
const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId));
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`);
}
}
@ -126,13 +149,17 @@ export class ReferralSyncedHandler {
sourceSequenceNum: sequenceNum,
});
this.logger.debug(`Referral updated: ${accountSequence}`);
this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`);
}
private async handleDelete(data: any): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] Referral delete: empty data received`);
return;
}
const accountSequence = data.account_sequence || data.accountSequence;
// 引荐关系删除需要特殊处理
this.logger.warn(`Referral delete event received: ${data.account_sequence || data.accountSequence}`);
this.logger.warn(`[CDC] Referral delete event received: ${accountSequence} (not processed, keeping history)`);
}
/**

View File

@ -22,6 +22,9 @@ export class UserSyncedHandler {
async handle(event: CDCEvent): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] User event received: op=${op}, seq=${event.sequenceNum}`);
this.logger.debug(`[CDC] User event payload: ${JSON.stringify(after || before)}`);
try {
switch (op) {
case 'c': // create
@ -35,16 +38,19 @@ export class UserSyncedHandler {
await this.handleDelete(before);
break;
default:
this.logger.warn(`Unknown CDC operation: ${op}`);
this.logger.warn(`[CDC] Unknown CDC operation: ${op}`);
}
} catch (error) {
this.logger.error(`Failed to handle user CDC event`, error);
this.logger.error(`[CDC] Failed to handle user CDC event, op=${op}, seq=${event.sequenceNum}`, error);
throw error;
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] User create: empty data received`);
return;
}
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
@ -52,13 +58,16 @@ export class UserSyncedHandler {
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
this.logger.log(`[CDC] User create: userId=${userId}, accountSequence=${accountSequence}, phone=${phone}, status=${status}`);
if (!userId || !accountSequence) {
this.logger.warn(`Invalid user data: missing user_id or account_sequence`, { data });
this.logger.warn(`[CDC] Invalid user data: missing user_id or account_sequence`, { data });
return;
}
await this.unitOfWork.executeInTransaction(async () => {
// 保存同步的用户数据
this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`);
await this.syncedDataRepository.upsertSyncedUser({
originalUserId: BigInt(userId),
accountSequence,
@ -73,15 +82,20 @@ export class UserSyncedHandler {
if (!existingAccount) {
const newAccount = ContributionAccountAggregate.create(accountSequence);
await this.contributionAccountRepository.save(newAccount);
this.logger.log(`Created contribution account for user: ${accountSequence}`);
this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`);
} else {
this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`);
}
});
this.logger.debug(`User synced: ${accountSequence}`);
this.logger.log(`[CDC] User synced successfully: ${accountSequence}`);
}
private async handleUpdate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] User update: empty data received`);
return;
}
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
@ -89,8 +103,10 @@ export class UserSyncedHandler {
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
this.logger.log(`[CDC] User update: userId=${userId}, accountSequence=${accountSequence}, status=${status}`);
if (!userId || !accountSequence) {
this.logger.warn(`Invalid user update data: missing user_id or account_sequence`, { data });
this.logger.warn(`[CDC] Invalid user update data: missing user_id or account_sequence`, { data });
return;
}
@ -102,13 +118,16 @@ export class UserSyncedHandler {
sourceSequenceNum: sequenceNum,
});
this.logger.debug(`User updated: ${accountSequence}`);
this.logger.log(`[CDC] User updated successfully: ${accountSequence}`);
}
private async handleDelete(data: any): Promise<void> {
if (!data) return;
if (!data) {
this.logger.warn(`[CDC] User delete: empty data received`);
return;
}
const accountSequence = data.account_sequence ?? data.accountSequence;
// 用户删除一般不处理,保留历史数据
this.logger.debug(`User delete event received: ${accountSequence}`);
this.logger.log(`[CDC] User delete event received: ${accountSequence} (not processed, keeping history)`);
}
}

View File

@ -136,10 +136,14 @@ export class CDCConsumerService implements OnModuleInit {
try {
if (!message.value) {
this.logger.warn(`[CDC] Empty message received from topic ${topic}, partition ${partition}, offset ${message.offset}`);
return;
}
const rawData = JSON.parse(message.value.toString());
const rawValue = message.value.toString();
this.logger.debug(`[CDC] Raw message from ${topic}: ${rawValue.substring(0, 500)}${rawValue.length > 500 ? '...' : ''}`);
const rawData = JSON.parse(rawValue);
// Debezium ExtractNewRecordState 转换后的扁平化格式
// 元数据字段: __op, __table, __source_ts_ms, __deleted
@ -149,6 +153,8 @@ export class CDCConsumerService implements OnModuleInit {
const sourceTsMs = rawData.__source_ts_ms || 0;
const deleted = rawData.__deleted === 'true' || rawData.__deleted === true;
this.logger.log(`[CDC] Message parsed: topic=${topic}, op=${op}, table=${table}, offset=${message.offset}`);
// 从原始数据中移除元数据字段,剩下的就是业务数据
const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData;
@ -171,16 +177,18 @@ export class CDCConsumerService implements OnModuleInit {
const parts = topic.split('.');
const tableName = table || parts[parts.length - 1];
this.logger.log(`[CDC] Dispatching to handler: tableName=${tableName}, op=${op}`);
const handler = this.handlers.get(tableName);
if (handler) {
await handler(event);
this.logger.debug(`Processed CDC event for table ${tableName}, op: ${op}`);
this.logger.log(`[CDC] Successfully processed event for table ${tableName}, op=${op}, offset=${message.offset}`);
} else {
this.logger.warn(`No handler registered for table: ${tableName}`);
this.logger.warn(`[CDC] No handler registered for table: ${tableName}. Available handlers: ${Array.from(this.handlers.keys()).join(', ')}`);
}
} catch (error) {
this.logger.error(
`Error processing CDC message from topic ${topic}, partition ${partition}`,
`[CDC] Error processing message from topic ${topic}, partition ${partition}, offset ${message.offset}`,
error,
);
// 根据业务需求决定是否重试或记录到死信队列