fix(deploy): delete Debezium connector offsets during full-reset

This fixes an issue where Debezium would skip initial snapshot after
full-reset because the connector offset persisted in Kafka Connect's
internal connect-offsets topic.

The fix adds two strategies to delete connector offsets:
1. REST API method (Kafka Connect 3.6+): DELETE /connectors/<name>/offsets
2. Tombstone method (Kafka Connect 3.5-): Send NULL messages via kafkacat

Reference: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 23:17:45 -08:00
parent 5af39193e4
commit 416867b1d5
1 changed files with 89 additions and 1 deletions

View File

@ -927,7 +927,7 @@ full_reset() {
fi
done
log_step "Step 4/18: Deleting Debezium outbox connectors and Kafka topics..."
log_step "Step 4/18: Deleting Debezium outbox connectors, offsets, and Kafka topics..."
# Delete connectors BEFORE dropping databases to release replication slots
local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector")
for connector in "${connectors[@]}"; do
@ -937,6 +937,94 @@ full_reset() {
log_info "Waiting 5 seconds for connectors to be fully removed..."
sleep 5
# Delete connector offsets from Kafka Connect internal storage
# This is CRITICAL: without this, Debezium will skip initial snapshot on re-registration
# because it thinks the snapshot was already completed (offset exists in connect-offsets topic)
#
# Reference: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector
# Strategy 1: Kafka Connect 3.6+ REST API: DELETE /connectors/<name>/offsets (after connector deleted)
# Strategy 2: Kafka Connect 3.5-: Send tombstone messages to connect-offsets topic via kafkacat
log_info "Deleting connector offsets from Kafka Connect internal storage..."
# Strategy 1: Try REST API method (Kafka Connect 3.6+)
# Note: Connector must be deleted first (which we did above)
local rest_api_worked=false
for connector in "${connectors[@]}"; do
log_info "Attempting to delete offset via REST API: $connector"
local delete_result
delete_result=$(curl -s -w "\n%{http_code}" -X DELETE "http://localhost:8084/connectors/$connector/offsets" 2>/dev/null)
local http_code=$(echo "$delete_result" | tail -1)
if [ "$http_code" = "200" ] || [ "$http_code" = "204" ]; then
log_success "Deleted offset via REST API: $connector"
rest_api_worked=true
else
log_warn "REST API offset deletion returned HTTP $http_code for $connector"
fi
done
# Strategy 2: Fallback - send tombstones via kafkacat (Kafka Connect 3.5 or earlier)
# Only use this if REST API didn't work
if [ "$rest_api_worked" = false ]; then
log_info "REST API method not available, trying tombstone method..."
# The offset key format is: ["connector-name",{"server":"topic.prefix"}]
for connector in "${connectors[@]}"; do
# Map connector name to topic prefix
local topic_prefix=""
case "$connector" in
"auth-outbox-connector") topic_prefix="cdc.auth" ;;
"contribution-outbox-connector") topic_prefix="cdc.contribution" ;;
"mining-outbox-connector") topic_prefix="cdc.mining" ;;
"trading-outbox-connector") topic_prefix="cdc.trading" ;;
"mining-wallet-outbox-connector") topic_prefix="cdc.mining-wallet" ;;
esac
local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]"
log_info "Sending tombstone for: $connector (key: $offset_key)"
# Try using kafkacat/kcat to send tombstone (NULL message)
# -Z flag enables NULL message delivery
# -K| sets key separator to |
# First try Debezium Connect container (might have kafkacat), then Kafka container
local tombstone_sent=false
# Try Debezium Connect container
local connect_container="rwa-debezium-connect"
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then
if docker exec "$connect_container" which kafkacat &>/dev/null; then
if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t connect-offsets -K \| 2>/dev/null; then
log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector"
tombstone_sent=true
fi
fi
fi
# Try Kafka container if Debezium Connect didn't work
if [ "$tombstone_sent" = false ]; then
if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then
log_success "Sent tombstone via kafkacat (kafka) for: $connector"
tombstone_sent=true
fi
elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then
log_success "Sent tombstone via kcat for: $connector"
tombstone_sent=true
fi
fi
fi
if [ "$tombstone_sent" = false ]; then
log_warn "Could not send tombstone for $connector"
log_warn "Neither kafkacat nor kcat available, offset may persist"
fi
done
fi
# Wait for offset changes to be processed
log_info "Waiting 5 seconds for offset deletions to be processed..."
sleep 5
# Delete Kafka outbox topics to clear old messages
# This is critical: old messages in Kafka will corrupt the sync if not cleared
log_info "Deleting Kafka outbox topics to clear old messages..."