fix(deploy): use kafka-console-producer for tombstone messages
kafkacat/kcat not available in containers. Switch to kafka-console-producer with null.marker property to send tombstone messages for Debezium offset deletion. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
fe2d4c3bcf
commit
f84e8b4700
|
|
@ -983,40 +983,28 @@ full_reset() {
|
||||||
local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]"
|
local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]"
|
||||||
log_info "Sending tombstone for: $connector (key: $offset_key)"
|
log_info "Sending tombstone for: $connector (key: $offset_key)"
|
||||||
|
|
||||||
# Try using kafkacat/kcat to send tombstone (NULL message)
|
# Send tombstone (NULL value) using kafka-console-producer
|
||||||
# -Z flag enables NULL message delivery
|
# Use null.marker to mark __NULL__ as NULL value
|
||||||
# -K| sets key separator to |
|
# Format: key\t__NULL__ with parse.key=true and null.marker=__NULL__
|
||||||
local tombstone_sent=false
|
local tombstone_sent=false
|
||||||
|
|
||||||
# Try Debezium Connect container first (has kafkacat)
|
# Use kafka-console-producer which is available in Kafka container
|
||||||
local connect_container="rwa-debezium-connect"
|
# --property parse.key=true: Enable key parsing
|
||||||
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then
|
# --property key.separator=<TAB>: Use literal tab as key-value separator
|
||||||
if docker exec "$connect_container" which kafkacat &>/dev/null; then
|
# --property null.marker=__NULL__: Treat __NULL__ as null value (tombstone)
|
||||||
if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
# Note: Must use printf to properly pass tab character through SSH/docker
|
||||||
log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector"
|
if printf '%s\t%s\n' "$offset_key" "__NULL__" | docker exec -i "$KAFKA_CONTAINER" kafka-console-producer \
|
||||||
tombstone_sent=true
|
--bootstrap-server localhost:9092 \
|
||||||
fi
|
--topic "$offset_topic" \
|
||||||
fi
|
--property parse.key=true \
|
||||||
fi
|
--property "key.separator= " \
|
||||||
|
--property "null.marker=__NULL__" 2>/dev/null; then
|
||||||
# Try Kafka container if Debezium Connect didn't work
|
log_success "Sent tombstone via kafka-console-producer for: $connector"
|
||||||
if [ "$tombstone_sent" = false ]; then
|
tombstone_sent=true
|
||||||
if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then
|
|
||||||
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
|
||||||
log_success "Sent tombstone via kafkacat (kafka) for: $connector"
|
|
||||||
tombstone_sent=true
|
|
||||||
fi
|
|
||||||
elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then
|
|
||||||
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
|
||||||
log_success "Sent tombstone via kcat for: $connector"
|
|
||||||
tombstone_sent=true
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "$tombstone_sent" = false ]; then
|
if [ "$tombstone_sent" = false ]; then
|
||||||
log_warn "Could not send tombstone for $connector"
|
log_warn "Could not send tombstone for $connector"
|
||||||
log_warn "Neither kafkacat nor kcat available, offset may persist"
|
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue