diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 14573e6f..04a89ee5 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -757,7 +757,15 @@ "Bash(cmd /c \"set DATABASE_URL=postgresql://user:pass@localhost:5432/db && npx prisma migrate dev --name add_nickname_to_synced_legacy_users --create-only\")", "Bash(dir \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\\\\frontend\")", "Bash(git commit -m \"$\\(cat <<''EOF''\nfeat\\(mining-app\\): fix login bugs and connect contribution page to real API\n\nLogin fixes:\n- Add AuthEventBus for global 401 error handling with auto-logout\n- Add route guards with GoRouter redirect to protect authenticated routes\n- Remove setMockUser\\(\\) security vulnerability and legacy login\\(\\) dead code\n- Remove unused AuthInterceptor class\n\nContribution page:\n- Add ContributionRecord entity and model for records API\n- Connect contribution details card to GET /accounts/{id}/records endpoint\n- Display real team stats \\(direct referrals, unlocked levels/tiers\\)\n- Calculate expiration countdown from actual record data\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", - "Bash(dependency of a provider changed\" error when 401 responses triggered\nlogout during provider rebuilds.\n\nNow 401 handling is done through normal exception flow in splash page\nand route guards respond to isLoggedInProvider state changes.\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")" + "Bash(dependency of a provider changed\" error when 401 responses triggered\nlogout during provider rebuilds.\n\nNow 401 handling is done through normal exception flow in splash page\nand route guards respond to isLoggedInProvider state changes.\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", + "Bash(ssh ceshi@rwa-colocation-1-lan:*)", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" diff frontend/mining-app/lib/presentation/pages/)", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" add frontend/mining-app/lib/presentation/pages/asset/asset_page.dart frontend/mining-app/lib/presentation/pages/auth/login_page.dart frontend/mining-app/lib/presentation/pages/auth/register_page.dart frontend/mining-app/lib/presentation/pages/contribution/contribution_page.dart frontend/mining-app/lib/presentation/pages/profile/profile_page.dart frontend/mining-app/lib/presentation/pages/trading/trading_page.dart)", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" commit -m \"$\\(cat <<''EOF''\nfix\\(mining-app\\): unify color scheme and fix scroll issues\n\n- Update login/register pages to use orange color scheme \\(#FF6B00\\)\n matching the navigation pages design\n- Fix SafeArea bottom: false on all navigation pages since MainShell\n handles bottom safe area via bottomNavigationBar\n- Add AlwaysScrollableScrollPhysics to asset page for consistent scroll\n- Increase bottom padding to 100px on all navigation pages to clear\n the navigation bar\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" push)", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" add frontend/mining-app/lib/presentation/pages/splash/splash_page.dart frontend/mining-app/lib/presentation/providers/user_providers.dart)", + "Bash(git -C \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\" commit -m \"$\\(cat <<''EOF''\nfix\\(mining-app\\): update splash page theme and fix token refresh\n\n- Update splash_page.dart to orange theme \\(#FF6B00\\) matching other pages\n- Change app name from \"榴莲挖矿\" to \"榴莲生态\"\n- Fix refreshTokenIfNeeded to properly throw on failure instead of\n silently calling logout \\(which caused Riverpod ref errors\\)\n- Clear local storage directly on refresh failure without remote API call\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", + "Bash(python3 -c \" import sys content = sys.stdin.read\\(\\) old = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' new = '''''' done # 清空 processed_cdc_events 表(因为 migration 时可能已经消费了一些消息) # 这是事务性幂等消费的关键:重置 Kafka offset 后必须同时清空幂等记录 log_info \"\"Truncating processed_cdc_events tables to allow re-consumption...\"\" for db in \"\"rwa_contribution\"\" \"\"rwa_auth\"\"; do if run_psql \"\"$db\"\" \"\"TRUNCATE TABLE processed_cdc_events;\"\" 2>/dev/null; then log_success \"\"Truncated processed_cdc_events in $db\"\" else log_warn \"\"Could not truncate processed_cdc_events in $db \\(table may not exist yet\\)\"\" fi done log_step \"\"Step 9/18: Starting 2.0 services...\"\"'''''' print\\(content.replace\\(old, new\\)\\) \")" ], "deny": [], "ask": [] diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index dc3e8323..042d8a18 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -3,18 +3,31 @@ import Decimal from 'decimal.js'; import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; +/** + * 认种同步结果,用于事务提交后的算力计算 + */ +export interface AdoptionSyncResult { + originalAdoptionId: bigint; + needsCalculation: boolean; +} + /** * 认种订单 CDC 事件处理器 * 处理从1.0 planting-service同步过来的planting_orders数据 - * 认种订单是触发算力计算的核心事件 * - * 注意:此 handler 现在接收外部传入的事务客户端(tx), - * 所有数据库操作都必须使用此事务客户端执行, - * 以确保幂等记录和业务数据在同一事务中处理。 + * 重要设计说明(符合业界最佳实践): + * =========================================== + * - handle() 方法在事务内执行,只负责数据同步(synced_adoptions 表) + * - 返回 AdoptionSyncResult,包含需要计算算力的认种ID + * - 算力计算(calculateForAdoption)必须在事务提交后单独执行 * - * 重要:算力计算(calculateForAdoption)会在外部事务提交后单独执行, - * 因为算力计算服务内部有自己的事务管理,且已有自己的幂等检查 - * (通过 existsBySourceAdoptionId 检查)。 + * 为什么不能在事务内调用 calculateForAdoption: + * 1. calculateForAdoption 内部使用独立的数据库连接查询数据 + * 2. 在 Serializable 隔离级别下,内部查询无法看到外部事务未提交的数据 + * 3. 这会导致 "Adoption not found" 错误,因为 synced_adoptions 还未提交 + * + * 参考:Kafka Idempotent Consumer & Transactional Outbox Pattern + * https://www.lydtechconsulting.com/blog/kafka-idempotent-consumer-transactional-outbox */ @Injectable() export class AdoptionSyncedHandler { @@ -24,7 +37,12 @@ export class AdoptionSyncedHandler { private readonly contributionCalculationService: ContributionCalculationService, ) {} - async handle(event: CDCEvent, tx: TransactionClient): Promise { + /** + * 处理认种 CDC 事件(在事务内执行) + * 只负责数据同步,不调用算力计算 + * @returns AdoptionSyncResult 包含需要计算算力的认种ID + */ + async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`); @@ -34,16 +52,15 @@ export class AdoptionSyncedHandler { switch (op) { case 'c': // create case 'r': // read (snapshot) - await this.handleCreate(after, event.sequenceNum, tx); - break; + return await this.handleCreate(after, event.sequenceNum, tx); case 'u': // update - await this.handleUpdate(after, before, event.sequenceNum, tx); - break; + return await this.handleUpdate(after, before, event.sequenceNum, tx); case 'd': // delete await this.handleDelete(before); - break; + return null; default: this.logger.warn(`[CDC] Unknown CDC operation: ${op}`); + return null; } } catch (error) { this.logger.error(`[CDC] Failed to handle adoption CDC event, op=${op}, seq=${event.sequenceNum}`, error); @@ -51,10 +68,28 @@ export class AdoptionSyncedHandler { } } - private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { + /** + * 在事务提交后计算算力(由 CDC dispatcher 在事务外调用) + */ + async calculateContributionAfterCommit(result: AdoptionSyncResult): Promise { + if (!result || !result.needsCalculation) { + return; + } + + this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${result.originalAdoptionId}`); + try { + await this.contributionCalculationService.calculateForAdoption(result.originalAdoptionId); + this.logger.log(`[CDC] Contribution calculation completed for adoption: ${result.originalAdoptionId}`); + } catch (error) { + // 算力计算失败不影响数据同步,后续可通过批量任务重试 + this.logger.error(`[CDC] Failed to calculate contribution for adoption ${result.originalAdoptionId}`, error); + } + } + + private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Adoption create: empty data received`); - return; + return null; } // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status, selected_province, selected_city @@ -69,12 +104,12 @@ export class AdoptionSyncedHandler { if (!orderId || !accountSequence) { this.logger.warn(`[CDC] Invalid adoption data: missing order_id or account_sequence`, { data }); - return; + return null; } const originalAdoptionId = BigInt(orderId); - // 第一步:在外部事务中保存同步的认种订单数据 + // 在事务中保存同步的认种订单数据 this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`); await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, @@ -103,25 +138,19 @@ export class AdoptionSyncedHandler { }, }); - // 第二步:触发算力计算 - // 注意:calculateForAdoption 有自己的幂等检查(existsBySourceAdoptionId), - // 所以即使这里重复调用也是安全的 - this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`); - try { - await this.contributionCalculationService.calculateForAdoption(originalAdoptionId); - this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`); - } catch (error) { - // 算力计算失败不影响数据同步,后续可通过批量任务重试 - this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error); - } - this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`); + + // 返回结果,供事务提交后计算算力 + return { + originalAdoptionId, + needsCalculation: true, + }; } - private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise { + private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!after) { this.logger.warn(`[CDC] Adoption update: empty after data received`); - return; + return null; } const orderId = after.order_id || after.id; @@ -145,7 +174,7 @@ export class AdoptionSyncedHandler { } else { this.logger.debug(`[CDC] Adoption ${orderId} already distributed, skipping update`); } - return; + return null; } const accountSequence = after.account_sequence || after.accountSequence; @@ -156,7 +185,7 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); - // 第一步:在外部事务中保存同步的认种订单数据 + // 在事务中保存同步的认种订单数据 await tx.syncedAdoption.upsert({ where: { originalAdoptionId }, create: { @@ -184,18 +213,13 @@ export class AdoptionSyncedHandler { }, }); - // 第二步:触发算力计算 - if (!existingAdoption?.contributionDistributed) { - this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`); - try { - await this.contributionCalculationService.calculateForAdoption(originalAdoptionId); - this.logger.log(`[CDC] Contribution calculation completed for updated adoption: ${orderId}`); - } catch (error) { - this.logger.error(`[CDC] Failed to calculate contribution for order ${orderId}`, error); - } - } - this.logger.log(`[CDC] Adoption updated successfully: ${originalAdoptionId}`); + + // 只有尚未分配算力的认种才需要计算 + return { + originalAdoptionId, + needsCalculation: !existingAdoption?.contributionDistributed, + }; } private async handleDelete(data: any): Promise { diff --git a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts index cefa3971..b79d78fd 100644 --- a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts +++ b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts @@ -2,7 +2,7 @@ import { Injectable, OnModuleInit, Logger } from '@nestjs/common'; import { CDCConsumerService, CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { UserSyncedHandler } from './user-synced.handler'; import { ReferralSyncedHandler } from './referral-synced.handler'; -import { AdoptionSyncedHandler } from './adoption-synced.handler'; +import { AdoptionSyncedHandler, AdoptionSyncResult } from './adoption-synced.handler'; /** * CDC 事件分发器 @@ -12,6 +12,10 @@ import { AdoptionSyncedHandler } from './adoption-synced.handler'; * - 每个 CDC 事件只处理一次(exactly-once 语义) * - 幂等记录和业务逻辑在同一事务中执行 * - 任何失败都会导致整个事务回滚 + * + * 对于认种事件,使用带后置回调的模式: + * - 数据同步在事务内完成 + * - 算力计算在事务提交后执行(避免 Serializable 隔离级别下的可见性问题) */ @Injectable() export class CDCEventDispatcher implements OnModuleInit { @@ -33,14 +37,19 @@ export class CDCEventDispatcher implements OnModuleInit { // - 用户数据 (identity-service: user_accounts) // - 推荐关系 (referral-service: referral_relationships) // - 认种订单 (planting-service: planting_orders) - // - // 使用 registerTransactionalHandler 确保: - // 1. CDC 事件幂等记录(processed_cdc_events) - // 2. 业务数据处理 - // 都在同一个 Serializable 事务中完成 - this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service - this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service - this.cdcConsumer.registerTransactionalHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service + + // 用户和推荐关系:简单的事务性处理 + this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this)); + this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this)); + + // 认种订单:使用带后置回调的处理模式 + // - 事务内:同步认种数据到 synced_adoptions 表 + // - 事务后:计算算力(需要读取已提交的数据) + this.cdcConsumer.registerTransactionalHandlerWithCallback( + 'planting_orders', + this.handleAdoptionEvent.bind(this), + this.handleAdoptionPostCommit.bind(this), + ); // 启动 CDC 消费者 try { @@ -60,7 +69,16 @@ export class CDCEventDispatcher implements OnModuleInit { await this.referralHandler.handle(event, tx); } - private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise { - await this.adoptionHandler.handle(event, tx); + private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise { + return await this.adoptionHandler.handle(event, tx); + } + + /** + * 认种事件的后置回调 - 在事务提交后执行算力计算 + */ + private async handleAdoptionPostCommit(result: AdoptionSyncResult | null): Promise { + if (result) { + await this.adoptionHandler.calculateContributionAfterCommit(result); + } } } diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 056cd2fb..7d4bb1e3 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -47,6 +47,12 @@ export type CDCHandler = (event: CDCEvent) => Promise; /** 事务性 handler(支持在事务中执行) */ export type TransactionalCDCHandler = (event: CDCEvent, tx: TransactionClient) => Promise; +/** 事务性 handler 返回结果,用于事务提交后的回调 */ +export type TransactionalCDCHandlerWithResult = (event: CDCEvent, tx: TransactionClient) => Promise; + +/** 事务提交后的回调函数 */ +export type PostCommitCallback = (result: T) => Promise; + @Injectable() export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CDCConsumerService.name); @@ -134,6 +140,76 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { }; } + /** + * 事务性幂等包装器(带后置回调)- 100% 保证 exactly-once 语义 + * + * 在同一个数据库事务中完成: + * 1. 尝试插入幂等记录(使用唯一约束防止重复) + * 2. 执行业务逻辑 + * 3. 事务提交后执行后置回调(如算力计算) + * + * 后置回调在事务提交后执行,失败不影响数据同步 + */ + withIdempotencyAndCallback( + handler: TransactionalCDCHandlerWithResult, + postCommitCallback?: PostCommitCallback, + ): CDCHandler { + return async (event: CDCEvent) => { + const idempotencyKey = `${event.topic}:${event.offset}`; + let result: T | null = null; + let shouldExecuteCallback = false; + + try { + await this.prisma.$transaction(async (tx) => { + // 1. 尝试插入幂等记录(使用唯一约束防止重复) + try { + await tx.processedCdcEvent.create({ + data: { + sourceTopic: event.topic, + offset: event.offset, + tableName: event.payload.table, + operation: event.payload.op, + }, + }); + } catch (error: any) { + // 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑) + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`); + return; + } + throw error; + } + + // 2. 执行业务逻辑(传入事务客户端) + result = await handler(event, tx); + shouldExecuteCallback = true; + + this.logger.debug(`[CDC] Processed event in transaction: ${idempotencyKey}`); + }, { + isolationLevel: Prisma.TransactionIsolationLevel.Serializable, + timeout: 60000, + }); + + // 3. 事务提交后执行后置回调 + if (shouldExecuteCallback && postCommitCallback && result !== null) { + try { + await postCommitCallback(result); + } catch (callbackError) { + // 后置回调失败不影响数据同步,只记录错误 + this.logger.error(`[CDC] Post-commit callback failed for: ${idempotencyKey}`, callbackError); + } + } + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate event (concurrent): ${idempotencyKey}`); + return; + } + this.logger.error(`[CDC] Failed to process event: ${idempotencyKey}`, error); + throw error; + } + }; + } + /** * 注册 CDC 事件处理器(普通模式,不保证幂等) * @deprecated 使用 registerTransactionalHandler 代替 @@ -153,6 +229,23 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { this.logger.log(`Registered transactional CDC handler for table: ${tableName}`); } + /** + * 注册事务性 CDC 事件处理器(带后置回调) + * 适用于需要在事务提交后执行额外操作的场景(如算力计算) + * + * @param tableName 表名 + * @param handler 事务性处理函数,返回后置回调需要的数据 + * @param postCommitCallback 事务提交后执行的回调函数 + */ + registerTransactionalHandlerWithCallback( + tableName: string, + handler: TransactionalCDCHandlerWithResult, + postCommitCallback: PostCommitCallback, + ): void { + this.handlers.set(tableName, this.withIdempotencyAndCallback(handler, postCommitCallback)); + this.logger.log(`Registered transactional CDC handler with post-commit callback for table: ${tableName}`); + } + /** * 启动消费者 */ diff --git a/frontend/admin-web/public/drawable/container1.xml b/frontend/admin-web/public/drawable/container1.xml new file mode 100644 index 00000000..8506b426 --- /dev/null +++ b/frontend/admin-web/public/drawable/container1.xml @@ -0,0 +1,10 @@ + + + + diff --git a/frontend/admin-web/public/drawable/container2.xml b/frontend/admin-web/public/drawable/container2.xml new file mode 100644 index 00000000..81588103 --- /dev/null +++ b/frontend/admin-web/public/drawable/container2.xml @@ -0,0 +1,10 @@ + + + + diff --git a/frontend/admin-web/public/drawable/container3.xml b/frontend/admin-web/public/drawable/container3.xml new file mode 100644 index 00000000..98773d7f --- /dev/null +++ b/frontend/admin-web/public/drawable/container3.xml @@ -0,0 +1,10 @@ + + + + diff --git a/frontend/admin-web/public/drawable/container4.xml b/frontend/admin-web/public/drawable/container4.xml new file mode 100644 index 00000000..89f51a10 --- /dev/null +++ b/frontend/admin-web/public/drawable/container4.xml @@ -0,0 +1,10 @@ + + + + diff --git a/frontend/admin-web/public/drawable/container5.xml b/frontend/admin-web/public/drawable/container5.xml new file mode 100644 index 00000000..feee424c --- /dev/null +++ b/frontend/admin-web/public/drawable/container5.xml @@ -0,0 +1,10 @@ + + + +