From 273f2f1d96a2a63106a7bbbf10524e53f2da0343 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 12 Jan 2026 00:53:55 -0800 Subject: [PATCH] =?UTF-8?q?fix(deploy):=20=E5=9C=A8=20migration=20?= =?UTF-8?q?=E5=90=8E=E5=86=8D=E6=AC=A1=E9=87=8D=E7=BD=AE=20CDC=20offset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:migration 会启动容器执行迁移,导致 CDC consumer 自动启动并消费消息。在数据库重建后启动服务时,消息 已经被消费完毕。 解决方案:在 migration 后增加 Step 7,停止容器并 再次重置 CDC offset,确保最终启动时能重新消费。 Co-Authored-By: Claude Opus 4.5 --- backend/services/deploy-mining.sh | 59 +++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 8127d71c..07d154a4 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -746,16 +746,16 @@ full_reset() { fi echo "" - log_step "Step 1/7: Stopping 2.0 services..." + log_step "Step 1/8: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done - log_step "Step 2/7: Waiting for Kafka consumers to become inactive..." + log_step "Step 2/8: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 - log_step "Step 3/7: Resetting CDC consumer offsets..." + log_step "Step 3/8: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" @@ -792,16 +792,61 @@ full_reset() { fi done - log_step "Step 4/7: Dropping 2.0 databases..." + log_step "Step 4/8: Dropping 2.0 databases..." db_drop - log_step "Step 5/7: Creating 2.0 databases..." + log_step "Step 5/8: Creating 2.0 databases..." db_create - log_step "Step 6/7: Running migrations..." + log_step "Step 6/8: Running migrations..." db_migrate - log_step "Step 7/7: Starting 2.0 services..." + # Stop any containers that were started during migration + log_step "Step 7/8: Stopping containers and resetting CDC offsets again..." + log_info "Migration may have started CDC consumers, stopping them now..." + for service in "${MINING_SERVICES[@]}"; do + docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true + done + + log_info "Waiting 20 seconds for consumer groups to become inactive..." + sleep 20 + + # Reset CDC offsets again after migration + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + log_info "Resetting consumer group: $group" + local reset_success=false + local retry_count=0 + local max_retries=3 + + while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then + if docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ + --group "$group" \ + --reset-offsets \ + --to-earliest \ + --all-topics \ + --execute 2>&1 | grep -q "NEW-OFFSET"; then + log_success "CDC offsets reset for $group" + reset_success=true + else + retry_count=$((retry_count + 1)) + if [ $retry_count -lt $max_retries ]; then + log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + sleep 10 + fi + fi + else + log_warn "Kafka container '$KAFKA_CONTAINER' not found" + break + fi + done + + if [ "$reset_success" = false ]; then + log_warn "Could not reset offsets for $group after $max_retries attempts" + fi + done + + log_step "Step 8/8: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done