diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 48b4b2a7..f4792098 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -983,40 +983,28 @@ full_reset() { local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" log_info "Sending tombstone for: $connector (key: $offset_key)" - # Try using kafkacat/kcat to send tombstone (NULL message) - # -Z flag enables NULL message delivery - # -K| sets key separator to | + # Send tombstone (NULL value) using kafka-console-producer + # Use null.marker to mark __NULL__ as NULL value + # Format: key\t__NULL__ with parse.key=true and null.marker=__NULL__ local tombstone_sent=false - # Try Debezium Connect container first (has kafkacat) - local connect_container="rwa-debezium-connect" - if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then - if docker exec "$connect_container" which kafkacat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t "$offset_topic" -K \| 2>/dev/null; then - log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector" - tombstone_sent=true - fi - fi - fi - - # Try Kafka container if Debezium Connect didn't work - if [ "$tombstone_sent" = false ]; then - if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then - log_success "Sent tombstone via kafkacat (kafka) for: $connector" - tombstone_sent=true - fi - elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then - if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then - log_success "Sent tombstone via kcat for: $connector" - tombstone_sent=true - fi - fi + # Use kafka-console-producer which is available in Kafka container + # --property parse.key=true: Enable key parsing + # --property key.separator=: Use literal tab as key-value separator + # --property null.marker=__NULL__: Treat __NULL__ as null value (tombstone) + # Note: Must use printf to properly pass tab character through SSH/docker + if printf '%s\t%s\n' "$offset_key" "__NULL__" | docker exec -i "$KAFKA_CONTAINER" kafka-console-producer \ + --bootstrap-server localhost:9092 \ + --topic "$offset_topic" \ + --property parse.key=true \ + --property "key.separator= " \ + --property "null.marker=__NULL__" 2>/dev/null; then + log_success "Sent tombstone via kafka-console-producer for: $connector" + tombstone_sent=true fi if [ "$tombstone_sent" = false ]; then log_warn "Could not send tombstone for $connector" - log_warn "Neither kafkacat nor kcat available, offset may persist" fi done