feat(auth): 新 1.0 用户自动发布事件到 mining-admin-service

- auth-service CDC consumer 在同步新用户时自动发布 LegacyUserMigratedEvent
- 只有 op='c' (create) 的新用户才发布事件,snapshot 由 publish-all API 处理
- deploy-mining.sh full-reset 更新步骤编号为 10 步

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 01:25:01 -08:00
parent 50854f04d5
commit 489966fae9
2 changed files with 56 additions and 11 deletions

View File

@ -2,6 +2,8 @@ import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/commo
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { OutboxService } from '@/application/services/outbox.service';
import { LegacyUserMigratedEvent } from '@/domain';
/**
* ExtractNewRecordState
@ -37,6 +39,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
private readonly outboxService: OutboxService,
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
@ -118,13 +121,19 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
case 'c': // Create
case 'r': // Read (snapshot)
case 'u': // Update
await this.upsertLegacyUser(event, sequenceNum);
await this.upsertLegacyUser(event, sequenceNum, op);
break;
}
}
private async upsertLegacyUser(user: UnwrappedCdcUser, sequenceNum: bigint) {
private async upsertLegacyUser(user: UnwrappedCdcUser, sequenceNum: bigint, op: string) {
try {
// 检查是否是新用户(不存在于数据库中)
const existingUser = await this.prisma.syncedLegacyUser.findUnique({
where: { legacyId: BigInt(user.user_id) },
});
const isNewUser = !existingUser;
await this.prisma.syncedLegacyUser.upsert({
where: { legacyId: BigInt(user.user_id) },
update: {
@ -146,6 +155,18 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy {
},
});
// 只有新创建的用户才发布事件到 outbox供 mining-admin-service 消费)
// 快照读取 (r) 不发布事件,因为 full-reset 时会通过 publish-all-legacy-users API 统一发布
if (isNewUser && op === 'c') {
const event = new LegacyUserMigratedEvent(
user.account_sequence,
user.phone_number || '',
new Date(user.registered_at),
);
await this.outboxService.publish(event);
this.logger.log(`Published LegacyUserMigratedEvent for new user: ${user.account_sequence}`);
}
this.logger.debug(`Synced legacy user: ${user.account_sequence}`);
} catch (error) {
this.logger.error(`Failed to upsert legacy user ${user.user_id}`, error);

View File

@ -747,16 +747,16 @@ full_reset() {
fi
echo ""
log_step "Step 1/8: Stopping 2.0 services..."
log_step "Step 1/10: Stopping 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do
service_stop "$service"
done
log_step "Step 2/8: Waiting for Kafka consumers to become inactive..."
log_step "Step 2/10: Waiting for Kafka consumers to become inactive..."
log_info "Waiting 15 seconds for consumer group session timeout..."
sleep 15
log_step "Step 3/8: Resetting CDC consumer offsets..."
log_step "Step 3/10: Resetting CDC consumer offsets..."
# Reset offsets BEFORE migrations (which may start containers)
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting consumer group: $group"
@ -793,17 +793,17 @@ full_reset() {
fi
done
log_step "Step 4/8: Dropping 2.0 databases..."
log_step "Step 4/10: Dropping 2.0 databases..."
db_drop
log_step "Step 5/8: Creating 2.0 databases..."
log_step "Step 5/10: Creating 2.0 databases..."
db_create
log_step "Step 6/8: Running migrations..."
log_step "Step 6/10: Running migrations..."
db_migrate
# Stop any containers that were started during migration
log_step "Step 7/8: Stopping containers and resetting CDC offsets again..."
log_step "Step 7/10: Stopping containers and resetting CDC offsets again..."
log_info "Migration may have started CDC consumers, stopping them now..."
for service in "${MINING_SERVICES[@]}"; do
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true
@ -847,16 +847,40 @@ full_reset() {
fi
done
log_step "Step 8/8: Starting 2.0 services..."
log_step "Step 8/10: Starting 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do
service_start "$service"
done
log_step "Step 9/10: Waiting for services to be ready..."
log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..."
sleep 20
log_step "Step 10/10: Publishing legacy users to mining-admin-service..."
# 调用 auth-service API 发布所有旧用户事件到 outbox
# 这样 mining-admin-service 才能通过 Debezium 收到用户数据
local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all"
local publish_result
publish_result=$(curl -s -X POST "$publish_url" 2>/dev/null || echo '{"error": "curl failed"}')
if echo "$publish_result" | grep -q '"success":true'; then
local published_count
published_count=$(echo "$publish_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*')
log_success "Published $published_count legacy user events to outbox"
else
log_warn "Failed to publish legacy users: $publish_result"
log_info "You may need to manually call: curl -X POST $publish_url"
fi
# 等待 mining-admin-service 消费 outbox 事件
log_info "Waiting 10 seconds for mining-admin-service to sync..."
sleep 10
echo ""
echo -e "${GREEN}${BOLD}╔════════════════════════════════════════════════════════════╗${NC}"
echo -e "${GREEN}${BOLD}║ Full reset completed successfully! ║${NC}"
echo -e "${GREEN}${BOLD}║ ║${NC}"
echo -e "${GREEN}${BOLD}║ The 2.0 system will now sync all data from 1.0 via CDC. ║${NC}"
echo -e "${GREEN}${BOLD}║ The 2.0 system has synced all data from 1.0 via CDC. ${NC}"
echo -e "${GREEN}${BOLD}║ Monitor with: ./deploy-mining.sh logs contribution-service║${NC}"
echo -e "${GREEN}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}"
}