fix(cdc): 修复用户同步字段映射和多 consumer group 重置

contribution-service:
- 修复 UserSyncedHandler 使用错误字段名 (data.id -> data.user_id)
- 兼容 CDC snake_case 字段命名 (user_id, account_sequence, phone_number)
- 添加数据验证,跳过无效记录

deploy-mining.sh:
- 添加 auth-service-cdc-group 到 CDC_CONSUMER_GROUPS
- full-reset 现在重置所有 CDC consumer groups
- sync_reset 和 sync_status 支持多个 consumer groups

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-11 23:59:39 -08:00
parent 5cab38c7f1
commit ff27195be2
2 changed files with 119 additions and 81 deletions

View File

@ -46,18 +46,28 @@ export class UserSyncedHandler {
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
const accountSequence = data.account_sequence ?? data.accountSequence;
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
if (!userId || !accountSequence) {
this.logger.warn(`Invalid user data: missing user_id or account_sequence`, { data });
return;
}
await this.unitOfWork.executeInTransaction(async () => {
// 保存同步的用户数据
await this.syncedDataRepository.upsertSyncedUser({
originalUserId: BigInt(data.id),
accountSequence: data.account_sequence || data.accountSequence,
phone: data.phone || null,
status: data.status || 'ACTIVE',
originalUserId: BigInt(userId),
accountSequence,
phone,
status,
sourceSequenceNum: sequenceNum,
});
// 为用户创建算力账户(如果不存在)
const accountSequence = data.account_sequence || data.accountSequence;
const existingAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (!existingAccount) {
@ -67,26 +77,38 @@ export class UserSyncedHandler {
}
});
this.logger.debug(`User synced: ${data.account_sequence || data.accountSequence}`);
this.logger.debug(`User synced: ${accountSequence}`);
}
private async handleUpdate(data: any, sequenceNum: bigint): Promise<void> {
if (!data) return;
// 兼容不同的字段命名CDC 使用 snake_case
const userId = data.user_id ?? data.id;
const accountSequence = data.account_sequence ?? data.accountSequence;
const phone = data.phone_number ?? data.phone ?? null;
const status = data.status ?? 'ACTIVE';
if (!userId || !accountSequence) {
this.logger.warn(`Invalid user update data: missing user_id or account_sequence`, { data });
return;
}
await this.syncedDataRepository.upsertSyncedUser({
originalUserId: BigInt(data.id),
accountSequence: data.account_sequence || data.accountSequence,
phone: data.phone || null,
status: data.status || 'ACTIVE',
originalUserId: BigInt(userId),
accountSequence,
phone,
status,
sourceSequenceNum: sequenceNum,
});
this.logger.debug(`User updated: ${data.account_sequence || data.accountSequence}`);
this.logger.debug(`User updated: ${accountSequence}`);
}
private async handleDelete(data: any): Promise<void> {
if (!data) return;
const accountSequence = data.account_sequence ?? data.accountSequence;
// 用户删除一般不处理,保留历史数据
this.logger.debug(`User delete event received: ${data.account_sequence || data.accountSequence}`);
this.logger.debug(`User delete event received: ${accountSequence}`);
}
}

View File

@ -102,8 +102,11 @@ declare -A SERVICE_PORTS=(
["mining-wallet-service"]="3025"
)
# CDC Consumer Group
CDC_CONSUMER_GROUP="contribution-service-cdc-group"
# CDC Consumer Groups (all groups that need to be reset during full-reset)
CDC_CONSUMER_GROUPS=(
"contribution-service-cdc-group"
"auth-service-cdc-group"
)
# Colors
RED='\033[0;31m'
@ -614,8 +617,11 @@ services_logs() {
sync_reset() {
print_section "Resetting CDC Consumer Offsets"
echo -e "${YELLOW}This will reset the CDC consumer to read from the beginning.${NC}"
echo "Consumer Group: $CDC_CONSUMER_GROUP"
echo -e "${YELLOW}This will reset CDC consumers to read from the beginning.${NC}"
echo "Consumer Groups:"
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
echo " - $group"
done
echo ""
read -p "Continue? (y/n): " confirm
@ -624,65 +630,71 @@ sync_reset() {
return 1
fi
# Stop contribution-service first
log_step "Stopping contribution-service"
# Stop services that use CDC
log_step "Stopping CDC consumer services"
service_stop "contribution-service"
service_stop "auth-service"
# Reset offsets
# Reset offsets for all consumer groups
log_step "Resetting consumer group offsets"
local reset_success=false
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting: $group"
local reset_success=false
# Try local kafka-consumer-groups.sh first
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$CDC_CONSUMER_GROUP" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>/dev/null && reset_success=true
fi
# Try local kafka-consumer-groups.sh first
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$group" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>/dev/null && reset_success=true
fi
# Try docker exec if local failed
if [ "$reset_success" = false ] && docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
log_info "Using Docker container: $KAFKA_CONTAINER"
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$CDC_CONSUMER_GROUP" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>&1 && reset_success=true
fi
# Try docker exec if local failed
if [ "$reset_success" = false ] && docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$group" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>&1 && reset_success=true
fi
if [ "$reset_success" = true ]; then
log_success "CDC consumer offsets reset to beginning"
else
log_warn "Could not reset offsets automatically"
fi
if [ "$reset_success" = true ]; then
log_success "Offsets reset for $group"
else
log_warn "Could not reset offsets for $group"
fi
done
log_info "Start contribution-service to begin syncing from the beginning"
log_info "Start services to begin syncing from the beginning"
}
sync_status() {
print_section "CDC Sync Status"
echo -e "${BOLD}Consumer Group:${NC} $CDC_CONSUMER_GROUP"
echo ""
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
echo -e "${BOLD}Consumer Group:${NC} $group"
echo ""
# Try local first, then docker
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$CDC_CONSUMER_GROUP" \
--describe 2>/dev/null && return 0
fi
# Try local first, then docker
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$group" \
--describe 2>/dev/null && echo "" && continue
fi
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$CDC_CONSUMER_GROUP" \
--describe 2>&1 || log_warn "Could not get consumer group status"
else
log_warn "Kafka container '$KAFKA_CONTAINER' not found"
fi
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$group" \
--describe 2>&1 || log_warn "Could not get status for $group"
else
log_warn "Kafka container '$KAFKA_CONTAINER' not found"
fi
echo ""
done
}
# ===========================================================================
@ -727,28 +739,32 @@ full_reset() {
db_migrate
log_step "Step 5/6: Resetting CDC consumer offsets..."
# Try local kafka-consumer-groups.sh first, then docker exec
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$CDC_CONSUMER_GROUP" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>/dev/null && log_success "CDC offsets reset" || log_warn "Local kafka-consumer-groups.sh failed"
fi
# Reset all CDC consumer groups
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting consumer group: $group"
# Try docker exec with the correct container name
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
log_info "Resetting offsets via Docker container: $KAFKA_CONTAINER"
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$CDC_CONSUMER_GROUP" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>&1 || log_warn "Could not reset offsets via Docker"
else
log_warn "Kafka container '$KAFKA_CONTAINER' not found. Manual offset reset may be needed."
fi
# Try local kafka-consumer-groups.sh first
if command -v kafka-consumer-groups.sh &>/dev/null; then
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
--group "$group" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>/dev/null && log_success "CDC offsets reset for $group" && continue
fi
# Try docker exec with the correct container name
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--group "$group" \
--reset-offsets \
--to-earliest \
--all-topics \
--execute 2>&1 && log_success "CDC offsets reset for $group" || log_warn "Could not reset offsets for $group"
else
log_warn "Kafka container '$KAFKA_CONTAINER' not found. Manual offset reset may be needed."
fi
done
log_step "Step 6/6: Starting 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do