diff --git a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts index eeed68c7..5bcc92b1 100644 --- a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts +++ b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts @@ -2,6 +2,8 @@ import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/commo import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { OutboxService } from '@/application/services/outbox.service'; +import { LegacyUserMigratedEvent } from '@/domain'; /** * ExtractNewRecordState 转换后的消息格式 @@ -37,6 +39,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, + private readonly outboxService: OutboxService, ) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); @@ -118,13 +121,19 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { case 'c': // Create case 'r': // Read (snapshot) case 'u': // Update - await this.upsertLegacyUser(event, sequenceNum); + await this.upsertLegacyUser(event, sequenceNum, op); break; } } - private async upsertLegacyUser(user: UnwrappedCdcUser, sequenceNum: bigint) { + private async upsertLegacyUser(user: UnwrappedCdcUser, sequenceNum: bigint, op: string) { try { + // 检查是否是新用户(不存在于数据库中) + const existingUser = await this.prisma.syncedLegacyUser.findUnique({ + where: { legacyId: BigInt(user.user_id) }, + }); + const isNewUser = !existingUser; + await this.prisma.syncedLegacyUser.upsert({ where: { legacyId: BigInt(user.user_id) }, update: { @@ -146,6 +155,18 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { }, }); + // 只有新创建的用户才发布事件到 outbox(供 mining-admin-service 消费) + // 快照读取 (r) 不发布事件,因为 full-reset 时会通过 publish-all-legacy-users API 统一发布 + if (isNewUser && op === 'c') { + const event = new LegacyUserMigratedEvent( + user.account_sequence, + user.phone_number || '', + new Date(user.registered_at), + ); + await this.outboxService.publish(event); + this.logger.log(`Published LegacyUserMigratedEvent for new user: ${user.account_sequence}`); + } + this.logger.debug(`Synced legacy user: ${user.account_sequence}`); } catch (error) { this.logger.error(`Failed to upsert legacy user ${user.user_id}`, error); diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 75a0ce2a..20761cfc 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -747,16 +747,16 @@ full_reset() { fi echo "" - log_step "Step 1/8: Stopping 2.0 services..." + log_step "Step 1/10: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done - log_step "Step 2/8: Waiting for Kafka consumers to become inactive..." + log_step "Step 2/10: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 - log_step "Step 3/8: Resetting CDC consumer offsets..." + log_step "Step 3/10: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" @@ -793,17 +793,17 @@ full_reset() { fi done - log_step "Step 4/8: Dropping 2.0 databases..." + log_step "Step 4/10: Dropping 2.0 databases..." db_drop - log_step "Step 5/8: Creating 2.0 databases..." + log_step "Step 5/10: Creating 2.0 databases..." db_create - log_step "Step 6/8: Running migrations..." + log_step "Step 6/10: Running migrations..." db_migrate # Stop any containers that were started during migration - log_step "Step 7/8: Stopping containers and resetting CDC offsets again..." + log_step "Step 7/10: Stopping containers and resetting CDC offsets again..." log_info "Migration may have started CDC consumers, stopping them now..." for service in "${MINING_SERVICES[@]}"; do docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true @@ -847,16 +847,40 @@ full_reset() { fi done - log_step "Step 8/8: Starting 2.0 services..." + log_step "Step 8/10: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done + log_step "Step 9/10: Waiting for services to be ready..." + log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..." + sleep 20 + + log_step "Step 10/10: Publishing legacy users to mining-admin-service..." + # 调用 auth-service API 发布所有旧用户事件到 outbox + # 这样 mining-admin-service 才能通过 Debezium 收到用户数据 + local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" + local publish_result + publish_result=$(curl -s -X POST "$publish_url" 2>/dev/null || echo '{"error": "curl failed"}') + + if echo "$publish_result" | grep -q '"success":true'; then + local published_count + published_count=$(echo "$publish_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') + log_success "Published $published_count legacy user events to outbox" + else + log_warn "Failed to publish legacy users: $publish_result" + log_info "You may need to manually call: curl -X POST $publish_url" + fi + + # 等待 mining-admin-service 消费 outbox 事件 + log_info "Waiting 10 seconds for mining-admin-service to sync..." + sleep 10 + echo "" echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}" echo -e "${GREEN}${BOLD}║ Full reset completed successfully! ║${NC}" echo -e "${GREEN}${BOLD}║ ║${NC}" - echo -e "${GREEN}${BOLD}║ The 2.0 system will now sync all data from 1.0 via CDC. ║${NC}" + echo -e "${GREEN}${BOLD}║ The 2.0 system has synced all data from 1.0 via CDC. ║${NC}" echo -e "${GREEN}${BOLD}║ Monitor with: ./deploy-mining.sh logs contribution-service║${NC}" echo -e "${GREEN}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}" }