diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 69697486..48b4b2a7 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -962,64 +962,63 @@ full_reset() { fi done - # Strategy 2: Fallback - send tombstones via kafkacat (Kafka Connect 3.5 or earlier) - # Only use this if REST API didn't work - if [ "$rest_api_worked" = false ]; then - log_info "REST API method not available, trying tombstone method..." + # Strategy 2: Always try tombstone method as primary approach + # The offset topic name is configured via OFFSET_STORAGE_TOPIC env var in Debezium Connect + # Default is "debezium_offsets" (not "connect-offsets"!) + local offset_topic="debezium_offsets" + log_info "Sending tombstones to offset topic: $offset_topic" - # The offset key format is: ["connector-name",{"server":"topic.prefix"}] - for connector in "${connectors[@]}"; do - # Map connector name to topic prefix - local topic_prefix="" - case "$connector" in - "auth-outbox-connector") topic_prefix="cdc.auth" ;; - "contribution-outbox-connector") topic_prefix="cdc.contribution" ;; - "mining-outbox-connector") topic_prefix="cdc.mining" ;; - "trading-outbox-connector") topic_prefix="cdc.trading" ;; - "mining-wallet-outbox-connector") topic_prefix="cdc.mining-wallet" ;; - esac + # The offset key format is: ["connector-name",{"server":"topic.prefix"}] + for connector in "${connectors[@]}"; do + # Map connector name to topic prefix + local topic_prefix="" + case "$connector" in + "auth-outbox-connector") topic_prefix="cdc.auth" ;; + "contribution-outbox-connector") topic_prefix="cdc.contribution" ;; + "mining-outbox-connector") topic_prefix="cdc.mining" ;; + "trading-outbox-connector") topic_prefix="cdc.trading" ;; + "mining-wallet-outbox-connector") topic_prefix="cdc.mining-wallet" ;; + esac - local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" - log_info "Sending tombstone for: $connector (key: $offset_key)" + local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" + log_info "Sending tombstone for: $connector (key: $offset_key)" - # Try using kafkacat/kcat to send tombstone (NULL message) - # -Z flag enables NULL message delivery - # -K| sets key separator to | - # First try Debezium Connect container (might have kafkacat), then Kafka container - local tombstone_sent=false + # Try using kafkacat/kcat to send tombstone (NULL message) + # -Z flag enables NULL message delivery + # -K| sets key separator to | + local tombstone_sent=false - # Try Debezium Connect container - local connect_container="rwa-debezium-connect" - if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then - if docker exec "$connect_container" which kafkacat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t connect-offsets -K \| 2>/dev/null; then - log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector" - tombstone_sent=true - fi + # Try Debezium Connect container first (has kafkacat) + local connect_container="rwa-debezium-connect" + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then + if docker exec "$connect_container" which kafkacat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t "$offset_topic" -K \| 2>/dev/null; then + log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector" + tombstone_sent=true fi fi + fi - # Try Kafka container if Debezium Connect didn't work - if [ "$tombstone_sent" = false ]; then - if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then - log_success "Sent tombstone via kafkacat (kafka) for: $connector" - tombstone_sent=true - fi - elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then - log_success "Sent tombstone via kcat for: $connector" - tombstone_sent=true - fi + # Try Kafka container if Debezium Connect didn't work + if [ "$tombstone_sent" = false ]; then + if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then + log_success "Sent tombstone via kafkacat (kafka) for: $connector" + tombstone_sent=true + fi + elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then + if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then + log_success "Sent tombstone via kcat for: $connector" + tombstone_sent=true fi fi + fi - if [ "$tombstone_sent" = false ]; then - log_warn "Could not send tombstone for $connector" - log_warn "Neither kafkacat nor kcat available, offset may persist" - fi - done - fi + if [ "$tombstone_sent" = false ]; then + log_warn "Could not send tombstone for $connector" + log_warn "Neither kafkacat nor kcat available, offset may persist" + fi + done # Wait for offset changes to be processed log_info "Waiting 5 seconds for offset deletions to be processed..."