From 416867b1d586a3ed62d4a4882e54ef7b54c6acd0 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 12 Jan 2026 23:17:45 -0800 Subject: [PATCH] fix(deploy): delete Debezium connector offsets during full-reset This fixes an issue where Debezium would skip initial snapshot after full-reset because the connector offset persisted in Kafka Connect's internal connect-offsets topic. The fix adds two strategies to delete connector offsets: 1. REST API method (Kafka Connect 3.6+): DELETE /connectors//offsets 2. Tombstone method (Kafka Connect 3.5-): Send NULL messages via kafkacat Reference: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector Co-Authored-By: Claude Opus 4.5 --- backend/services/deploy-mining.sh | 90 ++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index b63d4a54..69697486 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -927,7 +927,7 @@ full_reset() { fi done - log_step "Step 4/18: Deleting Debezium outbox connectors and Kafka topics..." + log_step "Step 4/18: Deleting Debezium outbox connectors, offsets, and Kafka topics..." # Delete connectors BEFORE dropping databases to release replication slots local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector") for connector in "${connectors[@]}"; do @@ -937,6 +937,94 @@ full_reset() { log_info "Waiting 5 seconds for connectors to be fully removed..." sleep 5 + # Delete connector offsets from Kafka Connect internal storage + # This is CRITICAL: without this, Debezium will skip initial snapshot on re-registration + # because it thinks the snapshot was already completed (offset exists in connect-offsets topic) + # + # Reference: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector + # Strategy 1: Kafka Connect 3.6+ REST API: DELETE /connectors//offsets (after connector deleted) + # Strategy 2: Kafka Connect 3.5-: Send tombstone messages to connect-offsets topic via kafkacat + log_info "Deleting connector offsets from Kafka Connect internal storage..." + + # Strategy 1: Try REST API method (Kafka Connect 3.6+) + # Note: Connector must be deleted first (which we did above) + local rest_api_worked=false + for connector in "${connectors[@]}"; do + log_info "Attempting to delete offset via REST API: $connector" + local delete_result + delete_result=$(curl -s -w "\n%{http_code}" -X DELETE "http://localhost:8084/connectors/$connector/offsets" 2>/dev/null) + local http_code=$(echo "$delete_result" | tail -1) + if [ "$http_code" = "200" ] || [ "$http_code" = "204" ]; then + log_success "Deleted offset via REST API: $connector" + rest_api_worked=true + else + log_warn "REST API offset deletion returned HTTP $http_code for $connector" + fi + done + + # Strategy 2: Fallback - send tombstones via kafkacat (Kafka Connect 3.5 or earlier) + # Only use this if REST API didn't work + if [ "$rest_api_worked" = false ]; then + log_info "REST API method not available, trying tombstone method..." + + # The offset key format is: ["connector-name",{"server":"topic.prefix"}] + for connector in "${connectors[@]}"; do + # Map connector name to topic prefix + local topic_prefix="" + case "$connector" in + "auth-outbox-connector") topic_prefix="cdc.auth" ;; + "contribution-outbox-connector") topic_prefix="cdc.contribution" ;; + "mining-outbox-connector") topic_prefix="cdc.mining" ;; + "trading-outbox-connector") topic_prefix="cdc.trading" ;; + "mining-wallet-outbox-connector") topic_prefix="cdc.mining-wallet" ;; + esac + + local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" + log_info "Sending tombstone for: $connector (key: $offset_key)" + + # Try using kafkacat/kcat to send tombstone (NULL message) + # -Z flag enables NULL message delivery + # -K| sets key separator to | + # First try Debezium Connect container (might have kafkacat), then Kafka container + local tombstone_sent=false + + # Try Debezium Connect container + local connect_container="rwa-debezium-connect" + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then + if docker exec "$connect_container" which kafkacat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t connect-offsets -K \| 2>/dev/null; then + log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector" + tombstone_sent=true + fi + fi + fi + + # Try Kafka container if Debezium Connect didn't work + if [ "$tombstone_sent" = false ]; then + if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then + log_success "Sent tombstone via kafkacat (kafka) for: $connector" + tombstone_sent=true + fi + elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then + log_success "Sent tombstone via kcat for: $connector" + tombstone_sent=true + fi + fi + fi + + if [ "$tombstone_sent" = false ]; then + log_warn "Could not send tombstone for $connector" + log_warn "Neither kafkacat nor kcat available, offset may persist" + fi + done + fi + + # Wait for offset changes to be processed + log_info "Waiting 5 seconds for offset deletions to be processed..." + sleep 5 + # Delete Kafka outbox topics to clear old messages # This is critical: old messages in Kafka will corrupt the sync if not cleared log_info "Deleting Kafka outbox topics to clear old messages..."