fix(deploy-mining): 修复 standalone 模式跨服务器 full-reset 卡住问题
- run_kafka_cmd 添加 timeout 30 防止跨服务器 Kafka 命令无限挂起 - CDC consumer offset 重置前先检查 group 是否存在,不存在则跳过 - 等待时间从 15s/20s 增加到 30s(匹配 KafkaJS 默认 sessionTimeout) - max_retries 从 3 增加到 5,并输出实际 Kafka 错误信息 - 重试失败后 fallback 删除 consumer group(服务重启时自动重建) - 同步修复 sync_reset() 和 full_reset() Step 3/Step 8 三处逻辑 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
394f2529cd
commit
d53c2212a6
|
|
@ -336,7 +336,8 @@ run_kafka_cmd() {
|
|||
|
||||
if [ "$DEPLOY_MODE" = "standalone" ]; then
|
||||
# Standalone: Kafka is remote, use temporary container with host networking
|
||||
docker run --rm --network host confluentinc/cp-kafka:7.5.0 \
|
||||
# Add timeout to prevent hangs when Kafka is unreachable across servers
|
||||
timeout 30 docker run --rm --network host confluentinc/cp-kafka:7.5.0 \
|
||||
$cmd --bootstrap-server "$KAFKA_BROKERS" "$@"
|
||||
return $?
|
||||
fi
|
||||
|
|
@ -363,7 +364,7 @@ run_kafka_cmd_stdin() {
|
|||
shift
|
||||
|
||||
if [ "$DEPLOY_MODE" = "standalone" ]; then
|
||||
docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \
|
||||
timeout 30 docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \
|
||||
$cmd --bootstrap-server "$KAFKA_BROKERS" "$@"
|
||||
return $?
|
||||
fi
|
||||
|
|
@ -823,32 +824,47 @@ sync_reset() {
|
|||
|
||||
# Wait for consumer groups to become inactive
|
||||
log_step "Waiting for Kafka consumers to become inactive..."
|
||||
log_info "Waiting 20 seconds for consumer group session timeout..."
|
||||
sleep 20
|
||||
log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..."
|
||||
sleep 30
|
||||
|
||||
# Reset offsets for all consumer groups with retry logic
|
||||
log_step "Resetting consumer group offsets"
|
||||
|
||||
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
||||
log_info "Resetting: $group"
|
||||
|
||||
# Check if the group exists on the Kafka broker
|
||||
local group_info
|
||||
group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true)
|
||||
|
||||
if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then
|
||||
log_info "Consumer group $group does not exist, skipping"
|
||||
continue
|
||||
fi
|
||||
|
||||
local reset_success=false
|
||||
local retry_count=0
|
||||
local max_retries=3
|
||||
local max_retries=5
|
||||
|
||||
while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do
|
||||
if run_kafka_cmd kafka-consumer-groups \
|
||||
local reset_output
|
||||
reset_output=$(run_kafka_cmd kafka-consumer-groups \
|
||||
--group "$group" \
|
||||
--reset-offsets \
|
||||
--to-earliest \
|
||||
--all-topics \
|
||||
--execute 2>&1 | grep -q "NEW-OFFSET"; then
|
||||
reset_success=true
|
||||
fi
|
||||
--execute 2>&1 || true)
|
||||
|
||||
if [ "$reset_success" = false ]; then
|
||||
if echo "$reset_output" | grep -q "NEW-OFFSET"; then
|
||||
reset_success=true
|
||||
elif echo "$reset_output" | grep -qi "does not exist"; then
|
||||
log_info "Consumer group $group does not exist, skipping"
|
||||
reset_success=true
|
||||
else
|
||||
retry_count=$((retry_count + 1))
|
||||
if [ $retry_count -lt $max_retries ]; then
|
||||
log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..."
|
||||
log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..."
|
||||
log_info "Kafka output: $(echo "$reset_output" | head -3)"
|
||||
sleep 10
|
||||
fi
|
||||
fi
|
||||
|
|
@ -857,7 +873,12 @@ sync_reset() {
|
|||
if [ "$reset_success" = true ]; then
|
||||
log_success "Offsets reset for $group"
|
||||
else
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts"
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..."
|
||||
if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then
|
||||
log_success "Deleted consumer group $group (will be recreated on service start)"
|
||||
else
|
||||
log_warn "Could not delete consumer group $group either, proceeding anyway"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
|
|
@ -1255,37 +1276,61 @@ full_reset() {
|
|||
done
|
||||
|
||||
log_step "Step 2/19: Waiting for Kafka consumers to become inactive..."
|
||||
log_info "Waiting 15 seconds for consumer group session timeout..."
|
||||
sleep 15
|
||||
log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..."
|
||||
sleep 30
|
||||
|
||||
log_step "Step 3/19: Resetting CDC consumer offsets..."
|
||||
# Reset offsets BEFORE migrations (which may start containers)
|
||||
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
||||
log_info "Resetting consumer group: $group"
|
||||
|
||||
# First check if the group exists on the remote Kafka broker
|
||||
local group_info
|
||||
group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true)
|
||||
|
||||
if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then
|
||||
log_info "Consumer group $group does not exist (fresh setup), skipping reset"
|
||||
continue
|
||||
fi
|
||||
|
||||
local reset_success=false
|
||||
local retry_count=0
|
||||
local max_retries=3
|
||||
local max_retries=5
|
||||
|
||||
while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do
|
||||
if run_kafka_cmd kafka-consumer-groups \
|
||||
local reset_output
|
||||
reset_output=$(run_kafka_cmd kafka-consumer-groups \
|
||||
--group "$group" \
|
||||
--reset-offsets \
|
||||
--to-earliest \
|
||||
--all-topics \
|
||||
--execute 2>&1 | grep -q "NEW-OFFSET"; then
|
||||
--execute 2>&1 || true)
|
||||
|
||||
if echo "$reset_output" | grep -q "NEW-OFFSET"; then
|
||||
log_success "CDC offsets reset for $group"
|
||||
reset_success=true
|
||||
elif echo "$reset_output" | grep -qi "does not exist"; then
|
||||
log_info "Consumer group $group does not exist, skipping"
|
||||
reset_success=true
|
||||
else
|
||||
retry_count=$((retry_count + 1))
|
||||
if [ $retry_count -lt $max_retries ]; then
|
||||
log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..."
|
||||
log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..."
|
||||
log_info "Kafka output: $(echo "$reset_output" | head -3)"
|
||||
sleep 10
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$reset_success" = false ]; then
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts"
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..."
|
||||
# Fallback: delete the consumer group entirely
|
||||
# When the service restarts with fromBeginning: true, it creates a new group
|
||||
if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then
|
||||
log_success "Deleted consumer group $group (will be recreated on service start)"
|
||||
else
|
||||
log_warn "Could not delete consumer group $group either, proceeding anyway"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
|
|
@ -1446,36 +1491,57 @@ full_reset() {
|
|||
docker compose $COMPOSE_ARGS stop "$service" 2>/dev/null || true
|
||||
done
|
||||
|
||||
log_info "Waiting 20 seconds for consumer groups to become inactive..."
|
||||
sleep 20
|
||||
log_info "Waiting 30 seconds for consumer groups to become inactive (cross-server)..."
|
||||
sleep 30
|
||||
|
||||
# Reset CDC offsets again after migration
|
||||
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
||||
log_info "Resetting consumer group: $group"
|
||||
|
||||
local group_info
|
||||
group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true)
|
||||
|
||||
if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then
|
||||
log_info "Consumer group $group does not exist, skipping reset"
|
||||
continue
|
||||
fi
|
||||
|
||||
local reset_success=false
|
||||
local retry_count=0
|
||||
local max_retries=3
|
||||
local max_retries=5
|
||||
|
||||
while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do
|
||||
if run_kafka_cmd kafka-consumer-groups \
|
||||
local reset_output
|
||||
reset_output=$(run_kafka_cmd kafka-consumer-groups \
|
||||
--group "$group" \
|
||||
--reset-offsets \
|
||||
--to-earliest \
|
||||
--all-topics \
|
||||
--execute 2>&1 | grep -q "NEW-OFFSET"; then
|
||||
--execute 2>&1 || true)
|
||||
|
||||
if echo "$reset_output" | grep -q "NEW-OFFSET"; then
|
||||
log_success "CDC offsets reset for $group"
|
||||
reset_success=true
|
||||
elif echo "$reset_output" | grep -qi "does not exist"; then
|
||||
log_info "Consumer group $group does not exist, skipping"
|
||||
reset_success=true
|
||||
else
|
||||
retry_count=$((retry_count + 1))
|
||||
if [ $retry_count -lt $max_retries ]; then
|
||||
log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..."
|
||||
log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..."
|
||||
log_info "Kafka output: $(echo "$reset_output" | head -3)"
|
||||
sleep 10
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$reset_success" = false ]; then
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts"
|
||||
log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..."
|
||||
if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then
|
||||
log_success "Deleted consumer group $group (will be recreated on service start)"
|
||||
else
|
||||
log_warn "Could not delete consumer group $group either, proceeding anyway"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue