diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 15103a3d..46298963 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -336,7 +336,8 @@ run_kafka_cmd() { if [ "$DEPLOY_MODE" = "standalone" ]; then # Standalone: Kafka is remote, use temporary container with host networking - docker run --rm --network host confluentinc/cp-kafka:7.5.0 \ + # Add timeout to prevent hangs when Kafka is unreachable across servers + timeout 30 docker run --rm --network host confluentinc/cp-kafka:7.5.0 \ $cmd --bootstrap-server "$KAFKA_BROKERS" "$@" return $? fi @@ -363,7 +364,7 @@ run_kafka_cmd_stdin() { shift if [ "$DEPLOY_MODE" = "standalone" ]; then - docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \ + timeout 30 docker run --rm -i --network host confluentinc/cp-kafka:7.5.0 \ $cmd --bootstrap-server "$KAFKA_BROKERS" "$@" return $? fi @@ -823,32 +824,47 @@ sync_reset() { # Wait for consumer groups to become inactive log_step "Waiting for Kafka consumers to become inactive..." - log_info "Waiting 20 seconds for consumer group session timeout..." - sleep 20 + log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..." + sleep 30 # Reset offsets for all consumer groups with retry logic log_step "Resetting consumer group offsets" for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting: $group" + + # Check if the group exists on the Kafka broker + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist, skipping" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then - reset_success=true - fi + --execute 2>&1 || true) - if [ "$reset_success" = false ]; then + if echo "$reset_output" | grep -q "NEW-OFFSET"; then + reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true + else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi @@ -857,7 +873,12 @@ sync_reset() { if [ "$reset_success" = true ]; then log_success "Offsets reset for $group" else - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done @@ -1255,37 +1276,61 @@ full_reset() { done log_step "Step 2/19: Waiting for Kafka consumers to become inactive..." - log_info "Waiting 15 seconds for consumer group session timeout..." - sleep 15 + log_info "Waiting 30 seconds for consumer group session timeout (cross-server)..." + sleep 30 log_step "Step 3/19: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" + + # First check if the group exists on the remote Kafka broker + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist (fresh setup), skipping reset" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then + --execute 2>&1 || true) + + if echo "$reset_output" | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi done if [ "$reset_success" = false ]; then - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + # Fallback: delete the consumer group entirely + # When the service restarts with fromBeginning: true, it creates a new group + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done @@ -1446,36 +1491,57 @@ full_reset() { docker compose $COMPOSE_ARGS stop "$service" 2>/dev/null || true done - log_info "Waiting 20 seconds for consumer groups to become inactive..." - sleep 20 + log_info "Waiting 30 seconds for consumer groups to become inactive (cross-server)..." + sleep 30 # Reset CDC offsets again after migration for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" + + local group_info + group_info=$(run_kafka_cmd kafka-consumer-groups --describe --group "$group" 2>&1 || true) + + if echo "$group_info" | grep -qi "does not exist\|missing\|Error:.*GroupIdNotFoundException"; then + log_info "Consumer group $group does not exist, skipping reset" + continue + fi + local reset_success=false local retry_count=0 - local max_retries=3 + local max_retries=5 while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do - if run_kafka_cmd kafka-consumer-groups \ + local reset_output + reset_output=$(run_kafka_cmd kafka-consumer-groups \ --group "$group" \ --reset-offsets \ --to-earliest \ --all-topics \ - --execute 2>&1 | grep -q "NEW-OFFSET"; then + --execute 2>&1 || true) + + if echo "$reset_output" | grep -q "NEW-OFFSET"; then log_success "CDC offsets reset for $group" reset_success=true + elif echo "$reset_output" | grep -qi "does not exist"; then + log_info "Consumer group $group does not exist, skipping" + reset_success=true else retry_count=$((retry_count + 1)) if [ $retry_count -lt $max_retries ]; then - log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..." + log_warn "Reset failed for $group (retry $retry_count/$max_retries), waiting 10s..." + log_info "Kafka output: $(echo "$reset_output" | head -3)" sleep 10 fi fi done if [ "$reset_success" = false ]; then - log_warn "Could not reset offsets for $group after $max_retries attempts" + log_warn "Could not reset offsets for $group after $max_retries attempts, trying delete..." + if run_kafka_cmd kafka-consumer-groups --delete --group "$group" 2>&1 | grep -qi "deletion.*successful\|Deletion.*$group"; then + log_success "Deleted consumer group $group (will be recreated on service start)" + else + log_warn "Could not delete consumer group $group either, proceeding anyway" + fi fi done