From 8199bc4d66bf44ac856bbe781ce24e9a910e751d Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 21:34:58 -0800 Subject: [PATCH] feat(contribution): add CDC sync status API and fix deploy script timing - Add initialSyncCompleted flag to track CDC sequential sync completion - Add getSyncStatus() method to CDCConsumerService - Add /health/cdc-sync endpoint to expose sync status - Update deploy-mining.sh to wait for CDC sync completion before calling publish APIs Co-Authored-By: Claude Opus 4.5 --- .../src/api/controllers/health.controller.ts | 13 ++++++ .../kafka/cdc-consumer.service.ts | 29 +++++++----- backend/services/deploy-mining.sh | 44 +++++++++++++++++-- 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/backend/services/contribution-service/src/api/controllers/health.controller.ts b/backend/services/contribution-service/src/api/controllers/health.controller.ts index 186ec926..efb5a2d1 100644 --- a/backend/services/contribution-service/src/api/controllers/health.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/health.controller.ts @@ -2,6 +2,7 @@ import { Controller, Get } from '@nestjs/common'; import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; +import { CDCConsumerService } from '../../infrastructure/kafka/cdc-consumer.service'; import { Public } from '../../shared/guards/jwt-auth.guard'; interface HealthStatus { @@ -20,6 +21,7 @@ export class HealthController { constructor( private readonly prisma: PrismaService, private readonly redis: RedisService, + private readonly cdcConsumer: CDCConsumerService, ) {} @Get() @@ -68,4 +70,15 @@ export class HealthController { async live(): Promise<{ alive: boolean }> { return { alive: true }; } + + @Get('cdc-sync') + @ApiOperation({ summary: 'CDC 同步状态检查' }) + @ApiResponse({ status: 200, description: 'CDC 同步状态' }) + async cdcSyncStatus(): Promise<{ + isRunning: boolean; + sequentialMode: boolean; + allPhasesCompleted: boolean; + }> { + return this.cdcConsumer.getSyncStatus(); + } } diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index d2c09171..a8156d7c 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -72,6 +72,9 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private currentPhaseIndex = 0; private sequentialMode = false; + // 初始同步完成标记(只有顺序同步全部完成后才为 true) + private initialSyncCompleted = false; + constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, @@ -356,10 +359,10 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { return; } - // 使用唯一的 group id(每次启动都重新消费) - const uniqueGroupId = `contribution-service-cdc-phase-${phase.tableName}-${Date.now()}`; + // 使用固定的 group id(首次从头消费,重启后从上次位置继续) + const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`; const phaseConsumer = this.kafka.consumer({ - groupId: uniqueGroupId, + groupId: phaseGroupId, }); try { @@ -418,13 +421,6 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { // 停止消费 await phaseConsumer.stop(); await phaseConsumer.disconnect(); - - // 删除临时 consumer group - try { - await admin.deleteGroups([uniqueGroupId]); - } catch { - // 忽略删除失败 - } await admin.disconnect(); } catch (error) { @@ -440,6 +436,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { */ private async startContinuousMode(): Promise { this.sequentialMode = false; + this.initialSyncCompleted = true; // 标记初始同步完成 const topics = this.topicPhases.map(p => p.topic); @@ -459,6 +456,18 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { this.logger.log('[CDC] Continuous mode started - all topics being consumed in parallel'); } + /** + * 获取 CDC 同步状态 + * - initialSyncCompleted = true: 初始顺序同步已完成 + */ + getSyncStatus(): { isRunning: boolean; sequentialMode: boolean; allPhasesCompleted: boolean } { + return { + isRunning: this.isRunning, + sequentialMode: this.sequentialMode, + allPhasesCompleted: this.initialSyncCompleted, + }; + } + /** * 停止消费者 */ diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index f4792098..01622356 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -1102,9 +1102,47 @@ full_reset() { service_start "$service" done - log_step "Step 10/18: Waiting for services to be ready and sync from 1.0..." - log_info "Waiting 30 seconds for all services to start and sync data from 1.0 CDC..." - sleep 30 + log_step "Step 10/18: Waiting for contribution-service CDC sync to complete..." + log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..." + + # 等待 contribution-service 的 CDC 顺序同步完成 + # 通过 /health/cdc-sync API 检查同步状态 + local max_wait=600 # 最多等待 10 分钟 + local wait_count=0 + local sync_completed=false + local cdc_sync_url="http://localhost:3020/health/cdc-sync" + + while [ "$wait_count" -lt "$max_wait" ] && [ "$sync_completed" = false ]; do + # 调用 API 检查同步状态 + local sync_status + sync_status=$(curl -s "$cdc_sync_url" 2>/dev/null || echo '{}') + + if echo "$sync_status" | grep -q '"allPhasesCompleted":true'; then + sync_completed=true + log_success "CDC sync completed - all phases finished" + else + # 显示当前状态 + local is_running + local sequential_mode + is_running=$(echo "$sync_status" | grep -o '"isRunning":[^,}]*' | cut -d':' -f2) + sequential_mode=$(echo "$sync_status" | grep -o '"sequentialMode":[^,}]*' | cut -d':' -f2) + + if [ "$is_running" = "true" ] && [ "$sequential_mode" = "true" ]; then + log_info "CDC sync in progress (sequential mode)... (waited ${wait_count}s)" + elif [ "$is_running" = "true" ]; then + log_info "CDC consumer running... (waited ${wait_count}s)" + else + log_info "Waiting for CDC consumer to start... (waited ${wait_count}s)" + fi + sleep 5 + wait_count=$((wait_count + 5)) + fi + done + + if [ "$sync_completed" = false ]; then + log_warn "CDC sync did not complete within ${max_wait}s, proceeding anyway..." + log_info "You may need to wait longer or check: curl $cdc_sync_url" + fi log_step "Step 11/18: Registering Debezium outbox connectors..." # Register outbox connectors AFTER services are running and have synced data