feat(contribution): add CDC sync status API and fix deploy script timing

- Add initialSyncCompleted flag to track CDC sequential sync completion
- Add getSyncStatus() method to CDCConsumerService
- Add /health/cdc-sync endpoint to expose sync status
- Update deploy-mining.sh to wait for CDC sync completion before calling publish APIs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 21:34:58 -08:00
parent aef6feb2cd
commit 8199bc4d66
3 changed files with 73 additions and 13 deletions

View File

@ -2,6 +2,7 @@ import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '../../infrastructure/redis/redis.service'; import { RedisService } from '../../infrastructure/redis/redis.service';
import { CDCConsumerService } from '../../infrastructure/kafka/cdc-consumer.service';
import { Public } from '../../shared/guards/jwt-auth.guard'; import { Public } from '../../shared/guards/jwt-auth.guard';
interface HealthStatus { interface HealthStatus {
@ -20,6 +21,7 @@ export class HealthController {
constructor( constructor(
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly redis: RedisService, private readonly redis: RedisService,
private readonly cdcConsumer: CDCConsumerService,
) {} ) {}
@Get() @Get()
@ -68,4 +70,15 @@ export class HealthController {
async live(): Promise<{ alive: boolean }> { async live(): Promise<{ alive: boolean }> {
return { alive: true }; return { alive: true };
} }
@Get('cdc-sync')
@ApiOperation({ summary: 'CDC 同步状态检查' })
@ApiResponse({ status: 200, description: 'CDC 同步状态' })
async cdcSyncStatus(): Promise<{
isRunning: boolean;
sequentialMode: boolean;
allPhasesCompleted: boolean;
}> {
return this.cdcConsumer.getSyncStatus();
}
} }

View File

@ -72,6 +72,9 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
private currentPhaseIndex = 0; private currentPhaseIndex = 0;
private sequentialMode = false; private sequentialMode = false;
// 初始同步完成标记(只有顺序同步全部完成后才为 true
private initialSyncCompleted = false;
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
@ -356,10 +359,10 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
return; return;
} }
// 使用唯一的 group id每次启动都重新消费 // 使用固定的 group id首次从头消费重启后从上次位置继续
const uniqueGroupId = `contribution-service-cdc-phase-${phase.tableName}-${Date.now()}`; const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`;
const phaseConsumer = this.kafka.consumer({ const phaseConsumer = this.kafka.consumer({
groupId: uniqueGroupId, groupId: phaseGroupId,
}); });
try { try {
@ -418,13 +421,6 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
// 停止消费 // 停止消费
await phaseConsumer.stop(); await phaseConsumer.stop();
await phaseConsumer.disconnect(); await phaseConsumer.disconnect();
// 删除临时 consumer group
try {
await admin.deleteGroups([uniqueGroupId]);
} catch {
// 忽略删除失败
}
await admin.disconnect(); await admin.disconnect();
} catch (error) { } catch (error) {
@ -440,6 +436,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
*/ */
private async startContinuousMode(): Promise<void> { private async startContinuousMode(): Promise<void> {
this.sequentialMode = false; this.sequentialMode = false;
this.initialSyncCompleted = true; // 标记初始同步完成
const topics = this.topicPhases.map(p => p.topic); const topics = this.topicPhases.map(p => p.topic);
@ -459,6 +456,18 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
this.logger.log('[CDC] Continuous mode started - all topics being consumed in parallel'); this.logger.log('[CDC] Continuous mode started - all topics being consumed in parallel');
} }
/**
* CDC
* - initialSyncCompleted = true:
*/
getSyncStatus(): { isRunning: boolean; sequentialMode: boolean; allPhasesCompleted: boolean } {
return {
isRunning: this.isRunning,
sequentialMode: this.sequentialMode,
allPhasesCompleted: this.initialSyncCompleted,
};
}
/** /**
* *
*/ */

View File

@ -1102,9 +1102,47 @@ full_reset() {
service_start "$service" service_start "$service"
done done
log_step "Step 10/18: Waiting for services to be ready and sync from 1.0..." log_step "Step 10/18: Waiting for contribution-service CDC sync to complete..."
log_info "Waiting 30 seconds for all services to start and sync data from 1.0 CDC..." log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..."
sleep 30
# 等待 contribution-service 的 CDC 顺序同步完成
# 通过 /health/cdc-sync API 检查同步状态
local max_wait=600 # 最多等待 10 分钟
local wait_count=0
local sync_completed=false
local cdc_sync_url="http://localhost:3020/health/cdc-sync"
while [ "$wait_count" -lt "$max_wait" ] && [ "$sync_completed" = false ]; do
# 调用 API 检查同步状态
local sync_status
sync_status=$(curl -s "$cdc_sync_url" 2>/dev/null || echo '{}')
if echo "$sync_status" | grep -q '"allPhasesCompleted":true'; then
sync_completed=true
log_success "CDC sync completed - all phases finished"
else
# 显示当前状态
local is_running
local sequential_mode
is_running=$(echo "$sync_status" | grep -o '"isRunning":[^,}]*' | cut -d':' -f2)
sequential_mode=$(echo "$sync_status" | grep -o '"sequentialMode":[^,}]*' | cut -d':' -f2)
if [ "$is_running" = "true" ] && [ "$sequential_mode" = "true" ]; then
log_info "CDC sync in progress (sequential mode)... (waited ${wait_count}s)"
elif [ "$is_running" = "true" ]; then
log_info "CDC consumer running... (waited ${wait_count}s)"
else
log_info "Waiting for CDC consumer to start... (waited ${wait_count}s)"
fi
sleep 5
wait_count=$((wait_count + 5))
fi
done
if [ "$sync_completed" = false ]; then
log_warn "CDC sync did not complete within ${max_wait}s, proceeding anyway..."
log_info "You may need to wait longer or check: curl $cdc_sync_url"
fi
log_step "Step 11/18: Registering Debezium outbox connectors..." log_step "Step 11/18: Registering Debezium outbox connectors..."
# Register outbox connectors AFTER services are running and have synced data # Register outbox connectors AFTER services are running and have synced data