fix(contribution): move calculateForAdoption out of CDC transaction
Root cause: calculateForAdoption uses separate DB connections, which cannot see uncommitted data in Serializable isolation level, causing "Adoption not found" errors. Solution (following Kafka Idempotent Consumer best practice): - Add TransactionalCDCHandlerWithResult<T> type for handlers with return - Add withIdempotencyAndCallback() wrapper for post-commit callbacks - Add registerTransactionalHandlerWithCallback() registration method - AdoptionSyncedHandler.handle() now returns AdoptionSyncResult - Contribution calculation runs AFTER transaction commits via callback Reference: Lydtech Consulting - Kafka Idempotent Consumer Pattern https://www.lydtechconsulting.com/blog/kafka-idempotent-consumer-transactional-outbox Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
2a4cb829fe
commit
5447545486
|
|
@ -757,7 +757,15 @@
|
|||
"Bash(cmd /c \"set DATABASE_URL=postgresql://user:pass@localhost:5432/db && npx prisma migrate dev --name add_nickname_to_synced_legacy_users --create-only\")",
|
||||
"Bash(dir \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\\\\frontend\")",
|
||||
"Bash(git commit -m \"$\\(cat <<''EOF''\nfeat\\(mining-app\\): fix login bugs and connect contribution page to real API\n\nLogin fixes:\n- Add AuthEventBus for global 401 error handling with auto-logout\n- Add route guards with GoRouter redirect to protect authenticated routes\n- Remove setMockUser\\(\\) security vulnerability and legacy login\\(\\) dead code\n- Remove unused AuthInterceptor class\n\nContribution page:\n- Add ContributionRecord entity and model for records API\n- Connect contribution details card to GET /accounts/{id}/records endpoint\n- Display real team stats \\(direct referrals, unlocked levels/tiers\\)\n- Calculate expiration countdown from actual record data\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(dependency of a provider changed\" error when 401 responses triggered\nlogout during provider rebuilds.\n\nNow 401 handling is done through normal exception flow in splash page\nand route guards respond to isLoggedInProvider state changes.\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")"
|
||||
"Bash(dependency of a provider changed\" error when 401 responses triggered\nlogout during provider rebuilds.\n\nNow 401 handling is done through normal exception flow in splash page\nand route guards respond to isLoggedInProvider state changes.\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(ssh ceshi@rwa-colocation-1-lan:*)",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" diff frontend/mining-app/lib/presentation/pages/)",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" add frontend/mining-app/lib/presentation/pages/asset/asset_page.dart frontend/mining-app/lib/presentation/pages/auth/login_page.dart frontend/mining-app/lib/presentation/pages/auth/register_page.dart frontend/mining-app/lib/presentation/pages/contribution/contribution_page.dart frontend/mining-app/lib/presentation/pages/profile/profile_page.dart frontend/mining-app/lib/presentation/pages/trading/trading_page.dart)",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" commit -m \"$\\(cat <<''EOF''\nfix\\(mining-app\\): unify color scheme and fix scroll issues\n\n- Update login/register pages to use orange color scheme \\(#FF6B00\\)\n matching the navigation pages design\n- Fix SafeArea bottom: false on all navigation pages since MainShell\n handles bottom safe area via bottomNavigationBar\n- Add AlwaysScrollableScrollPhysics to asset page for consistent scroll\n- Increase bottom padding to 100px on all navigation pages to clear\n the navigation bar\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" push)",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" add frontend/mining-app/lib/presentation/pages/splash/splash_page.dart frontend/mining-app/lib/presentation/providers/user_providers.dart)",
|
||||
"Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" commit -m \"$\\(cat <<''EOF''\nfix\\(mining-app\\): update splash page theme and fix token refresh\n\n- Update splash_page.dart to orange theme \\(#FF6B00\\) matching other pages\n- Change app name from \"榴莲挖矿\" to \"榴莲生态\"\n- Fix refreshTokenIfNeeded to properly throw on failure instead of\n silently calling logout \\(which caused Riverpod ref errors\\)\n- Clear local storage directly on refresh failure without remote API call\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
|
||||
"Bash(python3 -c \" import sys content = sys.stdin.read\\(\\) old = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' new = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' print\\(content.replace\\(old, new\\)\\) \")"
|
||||
],
|
||||
"deny": [],
|
||||
"ask": []
|
||||
|
|
|
|||
|
|
@ -3,18 +3,31 @@ import Decimal from 'decimal.js';
|
|||
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||||
import { ContributionCalculationService } from '../services/contribution-calculation.service';
|
||||
|
||||
/**
|
||||
* 认种同步结果,用于事务提交后的算力计算
|
||||
*/
|
||||
export interface AdoptionSyncResult {
|
||||
originalAdoptionId: bigint;
|
||||
needsCalculation: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* 认种订单 CDC 事件处理器
|
||||
* 处理从1.0 planting-service同步过来的planting_orders数据
|
||||
* 认种订单是触发算力计算的核心事件
|
||||
*
|
||||
* 注意:此 handler 现在接收外部传入的事务客户端(tx),
|
||||
* 所有数据库操作都必须使用此事务客户端执行,
|
||||
* 以确保幂等记录和业务数据在同一事务中处理。
|
||||
* 重要设计说明(符合业界最佳实践):
|
||||
* ===========================================
|
||||
* - handle() 方法在事务内执行,只负责数据同步(synced_adoptions 表)
|
||||
* - 返回 AdoptionSyncResult,包含需要计算算力的认种ID
|
||||
* - 算力计算(calculateForAdoption)必须在事务提交后单独执行
|
||||
*
|
||||
* 重要:算力计算(calculateForAdoption)会在外部事务提交后单独执行,
|
||||
* 因为算力计算服务内部有自己的事务管理,且已有自己的幂等检查
|
||||
* (通过 existsBySourceAdoptionId 检查)。
|
||||
* 为什么不能在事务内调用 calculateForAdoption:
|
||||
* 1. calculateForAdoption 内部使用独立的数据库连接查询数据
|
||||
* 2. 在 Serializable 隔离级别下,内部查询无法看到外部事务未提交的数据
|
||||
* 3. 这会导致 "Adoption not found" 错误,因为 synced_adoptions 还未提交
|
||||
*
|
||||
* 参考:Kafka Idempotent Consumer & Transactional Outbox Pattern
|
||||
* https://www.lydtechconsulting.com/blog/kafka-idempotent-consumer-transactional-outbox
|
||||
*/
|
||||
@Injectable()
|
||||
export class AdoptionSyncedHandler {
|
||||
|
|
@ -24,7 +37,12 @@ export class AdoptionSyncedHandler {
|
|||
private readonly contributionCalculationService: ContributionCalculationService,
|
||||
) {}
|
||||
|
||||
async handle(event: CDCEvent, tx: TransactionClient): Promise<void> {
|
||||
/**
|
||||
* 处理认种 CDC 事件(在事务内执行)
|
||||
* 只负责数据同步,不调用算力计算
|
||||
* @returns AdoptionSyncResult 包含需要计算算力的认种ID
|
||||
*/
|
||||
async handle(event: CDCEvent, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
||||
const { op, before, after } = event.payload;
|
||||
|
||||
this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`);
|
||||
|
|
@ -34,16 +52,15 @@ export class AdoptionSyncedHandler {
|
|||
switch (op) {
|
||||
case 'c': // create
|
||||
case 'r': // read (snapshot)
|
||||
await this.handleCreate(after, event.sequenceNum, tx);
|
||||
break;
|
||||
return await this.handleCreate(after, event.sequenceNum, tx);
|
||||
case 'u': // update
|
||||
await this.handleUpdate(after, before, event.sequenceNum, tx);
|
||||
break;
|
||||
return await this.handleUpdate(after, before, event.sequenceNum, tx);
|
||||
case 'd': // delete
|
||||
await this.handleDelete(before);
|
||||
break;
|
||||
return null;
|
||||
default:
|
||||
this.logger.warn(`[CDC] Unknown CDC operation: ${op}`);
|
||||
return null;
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`[CDC] Failed to handle adoption CDC event, op=${op}, seq=${event.sequenceNum}`, error);
|
||||
|
|
@ -51,10 +68,28 @@ export class AdoptionSyncedHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
|
||||
/**
|
||||
* 在事务提交后计算算力(由 CDC dispatcher 在事务外调用)
|
||||
*/
|
||||
async calculateContributionAfterCommit(result: AdoptionSyncResult): Promise<void> {
|
||||
if (!result || !result.needsCalculation) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${result.originalAdoptionId}`);
|
||||
try {
|
||||
await this.contributionCalculationService.calculateForAdoption(result.originalAdoptionId);
|
||||
this.logger.log(`[CDC] Contribution calculation completed for adoption: ${result.originalAdoptionId}`);
|
||||
} catch (error) {
|
||||
// 算力计算失败不影响数据同步,后续可通过批量任务重试
|
||||
this.logger.error(`[CDC] Failed to calculate contribution for adoption ${result.originalAdoptionId}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
||||
if (!data) {
|
||||
this.logger.warn(`[CDC] Adoption create: empty data received`);
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
// planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city
|
||||
|
|
@ -69,12 +104,12 @@ export class AdoptionSyncedHandler {
|
|||
|
||||
if (!orderId || !accountSequence) {
|
||||
this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data });
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
const originalAdoptionId = BigInt(orderId);
|
||||
|
||||
// 第一步:在外部事务中保存同步的认种订单数据
|
||||
// 在事务中保存同步的认种订单数据
|
||||
this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`);
|
||||
await tx.syncedAdoption.upsert({
|
||||
where: { originalAdoptionId },
|
||||
|
|
@ -103,25 +138,19 @@ export class AdoptionSyncedHandler {
|
|||
},
|
||||
});
|
||||
|
||||
// 第二步:触发算力计算
|
||||
// 注意:calculateForAdoption 有自己的幂等检查(existsBySourceAdoptionId),
|
||||
// 所以即使这里重复调用也是安全的
|
||||
this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`);
|
||||
try {
|
||||
await this.contributionCalculationService.calculateForAdoption(originalAdoptionId);
|
||||
this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`);
|
||||
} catch (error) {
|
||||
// 算力计算失败不影响数据同步,后续可通过批量任务重试
|
||||
this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error);
|
||||
}
|
||||
|
||||
this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`);
|
||||
|
||||
// 返回结果,供事务提交后计算算力
|
||||
return {
|
||||
originalAdoptionId,
|
||||
needsCalculation: true,
|
||||
};
|
||||
}
|
||||
|
||||
private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
|
||||
private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
||||
if (!after) {
|
||||
this.logger.warn(`[CDC] Adoption update: empty after data received`);
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
const orderId = after.order_id || after.id;
|
||||
|
|
@ -145,7 +174,7 @@ export class AdoptionSyncedHandler {
|
|||
} else {
|
||||
this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
const accountSequence = after.account_sequence || after.accountSequence;
|
||||
|
|
@ -156,7 +185,7 @@ export class AdoptionSyncedHandler {
|
|||
|
||||
this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
|
||||
|
||||
// 第一步:在外部事务中保存同步的认种订单数据
|
||||
// 在事务中保存同步的认种订单数据
|
||||
await tx.syncedAdoption.upsert({
|
||||
where: { originalAdoptionId },
|
||||
create: {
|
||||
|
|
@ -184,18 +213,13 @@ export class AdoptionSyncedHandler {
|
|||
},
|
||||
});
|
||||
|
||||
// 第二步:触发算力计算
|
||||
if (!existingAdoption?.contributionDistributed) {
|
||||
this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`);
|
||||
try {
|
||||
await this.contributionCalculationService.calculateForAdoption(originalAdoptionId);
|
||||
this.logger.log(`[CDC] Contribution calculation completed for updated adoption: ${orderId}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`);
|
||||
|
||||
// 只有尚未分配算力的认种才需要计算
|
||||
return {
|
||||
originalAdoptionId,
|
||||
needsCalculation: !existingAdoption?.contributionDistributed,
|
||||
};
|
||||
}
|
||||
|
||||
private async handleDelete(data: any): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
|
|||
import { CDCConsumerService, CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||||
import { UserSyncedHandler } from './user-synced.handler';
|
||||
import { ReferralSyncedHandler } from './referral-synced.handler';
|
||||
import { AdoptionSyncedHandler } from './adoption-synced.handler';
|
||||
import { AdoptionSyncedHandler, AdoptionSyncResult } from './adoption-synced.handler';
|
||||
|
||||
/**
|
||||
* CDC 事件分发器
|
||||
|
|
@ -12,6 +12,10 @@ import { AdoptionSyncedHandler } from './adoption-synced.handler';
|
|||
* - 每个 CDC 事件只处理一次(exactly-once 语义)
|
||||
* - 幂等记录和业务逻辑在同一事务中执行
|
||||
* - 任何失败都会导致整个事务回滚
|
||||
*
|
||||
* 对于认种事件,使用带后置回调的模式:
|
||||
* - 数据同步在事务内完成
|
||||
* - 算力计算在事务提交后执行(避免 Serializable 隔离级别下的可见性问题)
|
||||
*/
|
||||
@Injectable()
|
||||
export class CDCEventDispatcher implements OnModuleInit {
|
||||
|
|
@ -33,14 +37,19 @@ export class CDCEventDispatcher implements OnModuleInit {
|
|||
// - 用户数据 (identity-service: user_accounts)
|
||||
// - 推荐关系 (referral-service: referral_relationships)
|
||||
// - 认种订单 (planting-service: planting_orders)
|
||||
//
|
||||
// 使用 registerTransactionalHandler 确保:
|
||||
// 1. CDC 事件幂等记录(processed_cdc_events)
|
||||
// 2. 业务数据处理
|
||||
// 都在同一个 Serializable 事务中完成
|
||||
this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service
|
||||
this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service
|
||||
this.cdcConsumer.registerTransactionalHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service
|
||||
|
||||
// 用户和推荐关系:简单的事务性处理
|
||||
this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this));
|
||||
this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this));
|
||||
|
||||
// 认种订单:使用带后置回调的处理模式
|
||||
// - 事务内:同步认种数据到 synced_adoptions 表
|
||||
// - 事务后:计算算力(需要读取已提交的数据)
|
||||
this.cdcConsumer.registerTransactionalHandlerWithCallback<AdoptionSyncResult | null>(
|
||||
'planting_orders',
|
||||
this.handleAdoptionEvent.bind(this),
|
||||
this.handleAdoptionPostCommit.bind(this),
|
||||
);
|
||||
|
||||
// 启动 CDC 消费者
|
||||
try {
|
||||
|
|
@ -60,7 +69,16 @@ export class CDCEventDispatcher implements OnModuleInit {
|
|||
await this.referralHandler.handle(event, tx);
|
||||
}
|
||||
|
||||
private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
|
||||
await this.adoptionHandler.handle(event, tx);
|
||||
private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
||||
return await this.adoptionHandler.handle(event, tx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 认种事件的后置回调 - 在事务提交后执行算力计算
|
||||
*/
|
||||
private async handleAdoptionPostCommit(result: AdoptionSyncResult | null): Promise<void> {
|
||||
if (result) {
|
||||
await this.adoptionHandler.calculateContributionAfterCommit(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,6 +47,12 @@ export type CDCHandler = (event: CDCEvent) => Promise<void>;
|
|||
/** 事务性 handler(支持在事务中执行) */
|
||||
export type TransactionalCDCHandler = (event: CDCEvent, tx: TransactionClient) => Promise<void>;
|
||||
|
||||
/** 事务性 handler 返回结果,用于事务提交后的回调 */
|
||||
export type TransactionalCDCHandlerWithResult<T> = (event: CDCEvent, tx: TransactionClient) => Promise<T>;
|
||||
|
||||
/** 事务提交后的回调函数 */
|
||||
export type PostCommitCallback<T> = (result: T) => Promise<void>;
|
||||
|
||||
@Injectable()
|
||||
export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(CDCConsumerService.name);
|
||||
|
|
@ -134,6 +140,76 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 事务性幂等包装器(带后置回调)- 100% 保证 exactly-once 语义
|
||||
*
|
||||
* 在同一个数据库事务中完成:
|
||||
* 1. 尝试插入幂等记录(使用唯一约束防止重复)
|
||||
* 2. 执行业务逻辑
|
||||
* 3. 事务提交后执行后置回调(如算力计算)
|
||||
*
|
||||
* 后置回调在事务提交后执行,失败不影响数据同步
|
||||
*/
|
||||
withIdempotencyAndCallback<T>(
|
||||
handler: TransactionalCDCHandlerWithResult<T>,
|
||||
postCommitCallback?: PostCommitCallback<T>,
|
||||
): CDCHandler {
|
||||
return async (event: CDCEvent) => {
|
||||
const idempotencyKey = `${event.topic}:${event.offset}`;
|
||||
let result: T | null = null;
|
||||
let shouldExecuteCallback = false;
|
||||
|
||||
try {
|
||||
await this.prisma.$transaction(async (tx) => {
|
||||
// 1. 尝试插入幂等记录(使用唯一约束防止重复)
|
||||
try {
|
||||
await tx.processedCdcEvent.create({
|
||||
data: {
|
||||
sourceTopic: event.topic,
|
||||
offset: event.offset,
|
||||
tableName: event.payload.table,
|
||||
operation: event.payload.op,
|
||||
},
|
||||
});
|
||||
} catch (error: any) {
|
||||
// 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑)
|
||||
if (error.code === 'P2002') {
|
||||
this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`);
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 2. 执行业务逻辑(传入事务客户端)
|
||||
result = await handler(event, tx);
|
||||
shouldExecuteCallback = true;
|
||||
|
||||
this.logger.debug(`[CDC] Processed event in transaction: ${idempotencyKey}`);
|
||||
}, {
|
||||
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
|
||||
timeout: 60000,
|
||||
});
|
||||
|
||||
// 3. 事务提交后执行后置回调
|
||||
if (shouldExecuteCallback && postCommitCallback && result !== null) {
|
||||
try {
|
||||
await postCommitCallback(result);
|
||||
} catch (callbackError) {
|
||||
// 后置回调失败不影响数据同步,只记录错误
|
||||
this.logger.error(`[CDC] Post-commit callback failed for: ${idempotencyKey}`, callbackError);
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (error.code === 'P2002') {
|
||||
this.logger.debug(`[CDC] Skipping duplicate event (concurrent): ${idempotencyKey}`);
|
||||
return;
|
||||
}
|
||||
this.logger.error(`[CDC] Failed to process event: ${idempotencyKey}`, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册 CDC 事件处理器(普通模式,不保证幂等)
|
||||
* @deprecated 使用 registerTransactionalHandler 代替
|
||||
|
|
@ -153,6 +229,23 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
|
|||
this.logger.log(`Registered transactional CDC handler for table: ${tableName}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册事务性 CDC 事件处理器(带后置回调)
|
||||
* 适用于需要在事务提交后执行额外操作的场景(如算力计算)
|
||||
*
|
||||
* @param tableName 表名
|
||||
* @param handler 事务性处理函数,返回后置回调需要的数据
|
||||
* @param postCommitCallback 事务提交后执行的回调函数
|
||||
*/
|
||||
registerTransactionalHandlerWithCallback<T>(
|
||||
tableName: string,
|
||||
handler: TransactionalCDCHandlerWithResult<T>,
|
||||
postCommitCallback: PostCommitCallback<T>,
|
||||
): void {
|
||||
this.handlers.set(tableName, this.withIdempotencyAndCallback(handler, postCommitCallback));
|
||||
this.logger.log(`Registered transactional CDC handler with post-commit callback for table: ${tableName}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动消费者
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<vector xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
android:width="24dp"
|
||||
android:height="24dp"
|
||||
android:viewportWidth="24"
|
||||
android:viewportHeight="24">
|
||||
<path
|
||||
android:fillColor="#FFFF6B00"
|
||||
android:pathData="M18.98 5.02h-1.96V3.98c0-0.51-0.47-0.98-1.04-0.98H8.02C7.45 3 6.98 3.47 6.98 3.98v1.04H5.02C3.89 5.02 3 5.9 3 6.98v1.04c0 2.53 1.92 4.59 4.4 4.92 0.62 1.5 1.97 2.62 3.62 2.95V19h-3c-0.57 0-1.04 0.46-1.04 1.03 0 0.51 0.47 0.98 1.04 0.98h7.96c0.57 0 1.04-0.47 1.04-0.98 0-0.57-0.47-1.04-1.04-1.04h-3V15.9c1.64-0.33 3-1.45 3.61-2.95C19.08 12.6 21 10.54 21 8.02V6.98c0-1.07-0.9-1.96-2.02-1.96Zm-13.96 3V6.98h1.96v3.85C5.86 10.4 5.02 9.28 5.02 8.02Zm13.96 0c0 1.26-0.84 2.39-1.96 2.8V6.99h1.96v1.04Z"/>
|
||||
</vector>
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<vector xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
android:width="24dp"
|
||||
android:height="24dp"
|
||||
android:viewportWidth="24"
|
||||
android:viewportHeight="24">
|
||||
<path
|
||||
android:fillColor="#FF9CA3AF"
|
||||
android:pathData="M6.14 11.86l-2.76 2.81c-0.2 0.19-0.2 0.47 0 0.7l2.76 2.77c0.33 0.33 0.84 0.1 0.84-0.33v-1.83h6c0.57 0 1.04-0.42 1.04-0.98s-0.47-0.98-1.04-0.98h-6v-1.83c0-0.42-0.51-0.66-0.84-0.33Zm14.53-3.19l-2.81-2.81c-0.28-0.33-0.84-0.1-0.84 0.33v1.83h-6c-0.57 0-1.04 0.42-1.04 0.98s0.47 0.98 1.04 0.98h6v1.83c0 0.42 0.51 0.66 0.84 0.33l2.77-2.81c0.23-0.19 0.23-0.47 0.04-0.66Z"/>
|
||||
</vector>
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<vector xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
android:width="24dp"
|
||||
android:height="24dp"
|
||||
android:viewportWidth="24"
|
||||
android:viewportHeight="24">
|
||||
<path
|
||||
android:fillColor="#FF9CA3AF"
|
||||
android:pathData="M9.98 15.98V8.02C9.98 6.89 10.88 6 12 6h9V5.02C21 3.89 20.1 3 18.98 3H5.02C3.89 3 3 3.9 3 5.02v13.96C3 20.11 3.9 21 5.02 21h13.96c1.13 0 2.02-0.9 2.02-2.02V18h-9c-1.13 0-2.02-0.9-2.02-2.02Zm3-7.96C12.47 8.02 12 8.44 12 9v6c0 0.56 0.47 0.98 0.98 0.98h9V8.02h-9Zm3 5.48c-0.8 0-1.5-0.66-1.5-1.5s0.7-1.5 1.5-1.5c0.85 0 1.5 0.66 1.5 1.5s-0.65 1.5-1.5 1.5Z"/>
|
||||
</vector>
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<vector xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
android:width="24dp"
|
||||
android:height="24dp"
|
||||
android:viewportWidth="24"
|
||||
android:viewportHeight="24">
|
||||
<path
|
||||
android:fillColor="#FF9CA3AF"
|
||||
android:pathData="M12 12c2.2 0 3.98-1.78 3.98-3.98S14.2 3.98 12 3.98 8.02 5.81 8.02 8.02C8.02 10.22 9.8 12 12 12Zm0 2.02c-2.67 0-8.02 1.3-8.02 3.98v0.98c0 0.57 0.47 1.04 1.04 1.04h13.96c0.57 0 1.04-0.47 1.04-1.04V18c0-2.67-5.35-3.98-8.02-3.98Z"/>
|
||||
</vector>
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<vector xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
android:width="24dp"
|
||||
android:height="24dp"
|
||||
android:viewportWidth="24"
|
||||
android:viewportHeight="24">
|
||||
<path
|
||||
android:fillColor="#FFFF6B00"
|
||||
android:pathData="M8.02 2.02C6.89 2.02 6 2.9 6 3.98v3.2c0 0.5 0.23 1.02 0.6 1.4L9.99 12l-3.37 3.42C6.23 15.8 6 16.32 6 16.82v3.2c0 1.07 0.9 1.96 2.02 1.96h7.96c1.13 0 2.02-0.89 2.02-1.96v-3.2c0-0.5-0.19-1.02-0.56-1.4L14.02 12l3.37-3.42C17.81 8.2 18 7.68 18 7.18v-3.2c0-1.07-0.9-1.96-2.02-1.96H8.02Zm7.96 14.9v2.06c0 0.57-0.42 1.04-0.98 1.04H9c-0.56 0-0.98-0.47-0.98-1.04v-2.06c0-0.28 0.09-0.51 0.28-0.7l3.7-3.7 3.7 3.7c0.2 0.19 0.28 0.42 0.28 0.7Z"/>
|
||||
</vector>
|
||||
Loading…
Reference in New Issue