fix(deploy): 在 migration 后再次重置 CDC offset
问题:migration 会启动容器执行迁移,导致 CDC consumer 自动启动并消费消息。在数据库重建后启动服务时,消息 已经被消费完毕。 解决方案:在 migration 后增加 Step 7,停止容器并 再次重置 CDC offset,确保最终启动时能重新消费。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
350ce28c40
commit
273f2f1d96
|
|
@ -746,16 +746,16 @@ full_reset() {
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo ""
|
echo ""
|
||||||
log_step "Step 1/7: Stopping 2.0 services..."
|
log_step "Step 1/8: Stopping 2.0 services..."
|
||||||
for service in "${MINING_SERVICES[@]}"; do
|
for service in "${MINING_SERVICES[@]}"; do
|
||||||
service_stop "$service"
|
service_stop "$service"
|
||||||
done
|
done
|
||||||
|
|
||||||
log_step "Step 2/7: Waiting for Kafka consumers to become inactive..."
|
log_step "Step 2/8: Waiting for Kafka consumers to become inactive..."
|
||||||
log_info "Waiting 15 seconds for consumer group session timeout..."
|
log_info "Waiting 15 seconds for consumer group session timeout..."
|
||||||
sleep 15
|
sleep 15
|
||||||
|
|
||||||
log_step "Step 3/7: Resetting CDC consumer offsets..."
|
log_step "Step 3/8: Resetting CDC consumer offsets..."
|
||||||
# Reset offsets BEFORE migrations (which may start containers)
|
# Reset offsets BEFORE migrations (which may start containers)
|
||||||
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
||||||
log_info "Resetting consumer group: $group"
|
log_info "Resetting consumer group: $group"
|
||||||
|
|
@ -792,16 +792,61 @@ full_reset() {
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
log_step "Step 4/7: Dropping 2.0 databases..."
|
log_step "Step 4/8: Dropping 2.0 databases..."
|
||||||
db_drop
|
db_drop
|
||||||
|
|
||||||
log_step "Step 5/7: Creating 2.0 databases..."
|
log_step "Step 5/8: Creating 2.0 databases..."
|
||||||
db_create
|
db_create
|
||||||
|
|
||||||
log_step "Step 6/7: Running migrations..."
|
log_step "Step 6/8: Running migrations..."
|
||||||
db_migrate
|
db_migrate
|
||||||
|
|
||||||
log_step "Step 7/7: Starting 2.0 services..."
|
# Stop any containers that were started during migration
|
||||||
|
log_step "Step 7/8: Stopping containers and resetting CDC offsets again..."
|
||||||
|
log_info "Migration may have started CDC consumers, stopping them now..."
|
||||||
|
for service in "${MINING_SERVICES[@]}"; do
|
||||||
|
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true
|
||||||
|
done
|
||||||
|
|
||||||
|
log_info "Waiting 20 seconds for consumer groups to become inactive..."
|
||||||
|
sleep 20
|
||||||
|
|
||||||
|
# Reset CDC offsets again after migration
|
||||||
|
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
|
||||||
|
log_info "Resetting consumer group: $group"
|
||||||
|
local reset_success=false
|
||||||
|
local retry_count=0
|
||||||
|
local max_retries=3
|
||||||
|
|
||||||
|
while [ "$reset_success" = false ] && [ $retry_count -lt $max_retries ]; do
|
||||||
|
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
|
||||||
|
if docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
|
||||||
|
--group "$group" \
|
||||||
|
--reset-offsets \
|
||||||
|
--to-earliest \
|
||||||
|
--all-topics \
|
||||||
|
--execute 2>&1 | grep -q "NEW-OFFSET"; then
|
||||||
|
log_success "CDC offsets reset for $group"
|
||||||
|
reset_success=true
|
||||||
|
else
|
||||||
|
retry_count=$((retry_count + 1))
|
||||||
|
if [ $retry_count -lt $max_retries ]; then
|
||||||
|
log_warn "Consumer group still active, waiting 10s (retry $retry_count/$max_retries)..."
|
||||||
|
sleep 10
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
log_warn "Kafka container '$KAFKA_CONTAINER' not found"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ "$reset_success" = false ]; then
|
||||||
|
log_warn "Could not reset offsets for $group after $max_retries attempts"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
log_step "Step 8/8: Starting 2.0 services..."
|
||||||
for service in "${MINING_SERVICES[@]}"; do
|
for service in "${MINING_SERVICES[@]}"; do
|
||||||
service_start "$service"
|
service_start "$service"
|
||||||
done
|
done
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue