From d53c2212a62290b44b000a060da79ddb98f49ae2 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 1 Feb 2026 22:33:28 -0800 Subject: [PATCH] =?UTF-8?q?fix(deploy-mining):=20=E4=BF=AE=E5=A4=8D=20stan?= =?UTF-8?q?dalone=20=E6=A8=A1=E5=BC=8F=E8=B7=A8=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?=20full-reset=20=E5=8D=A1=E4=BD=8F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - run_kafka_cmd 添加 timeout 30 防止跨服务器 Kafka 命令无限挂起 - CDC consumer offset 重置前先检查 group 是否存在,不存在则跳过 - 等待时间从 15s/20s 增加到 30s(匹配 KafkaJS 默认 sessionTimeout) - max_retries 从 3 增加到 5,并输出实际 Kafka 错误信息 - 重试失败后 fallback 删除 consumer group(服务重启时自动重建) - 同步修复 sync_reset() 和 full_reset() Step 3/Step 8 三处逻辑 Co-Authored-By: Claude Opus 4.5 --- backend/services/deploy-mining.sh | 118 +++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 26 deletions(-) diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 15103a3d..46298963 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -336,7 +336,8 @@ run_kafka_cmd() { if [ "$DEPLOY_MODE" = "standalone" ]; then # Standalone: Kafka is remote, use temporary container with host networking - docker run --rm --network host confluentinc/cp-kafka:7.5.0 \ + # Add timeout to prevent hangs when Kafka is unreachable across servers + timeout 30 docker run --rm --network host confluentinc/cp-kafka:7.5.0 \ $cmd --bootstrap-server "$KAFKA_BROKERS" "$@" return $? fi @@ -363,7 +364,7 @@ run_kafka_cmd_stdin() { shift if [ "$DEPLOY_MODE" = "standalone" ]; then - docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \ + timeout 30 docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \ $cmd --bootstrap-server "$KAFKA_BROKERS" "$@" return $? fi @@ -823,32 +824,47 @@ sync_reset() { # Wait for consumer groups to become inactive log_step "Waiting for Kafka consumers to become inactive..." - log_info "Waiting 20 seconds for consumer group session timeout..." - sleep 20 + log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..." + sleep 30 # Reset offsets for all consumer groups with retry logic log_step "Resetting consumer group offsets" for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting: $group" + + # Check if the group exists on the Kafka broker + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist, skipping" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then - reset_success=true - fi + --execute 2>&1 || true) - if [ "$reset_success" = false ]; then + if echo "$reset_output" | grep -q "NEW-OFFSET"; then + reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true + else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi @@ -857,7 +873,12 @@ sync_reset() { if [ "$reset_success" = true ]; then log_success "Offsets reset for $group" else - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done @@ -1255,37 +1276,61 @@ full_reset() { done log_step "Step 2/19: Waiting for Kafka consumers to become inactive..." - log_info "Waiting 15 seconds for consumer group session timeout..." - sleep 15 + log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..." + sleep 30 log_step "Step 3/19: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" + + # First check if the group exists on the remote Kafka broker + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist (fresh setup), skipping reset" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then + --execute 2>&1 || true) + + if echo "$reset_output" | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi done if [ "$reset_success" = false ]; then - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + # Fallback: delete the consumer group entirely + # When the service restarts with fromBeginning: true, it creates a new group + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done @@ -1446,36 +1491,57 @@ full_reset() { docker compose $COMPOSE_ARGS stop "$service" 2>/dev/null || true done - log_info "Waiting 20 seconds for consumer groups to become inactive..." - sleep 20 + log_info "Waiting 30 seconds for consumer groups to become inactive (cross-server)..." + sleep 30 # Reset CDC offsets again after migration for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" + + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist, skipping reset" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then + --execute 2>&1 || true) + + if echo "$reset_output" | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi done if [ "$reset_success" = false ]; then - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done