fix(deploy): use correct offset topic name (debezium_offsets)
The Debezium Connect container uses OFFSET_STORAGE_TOPIC=debezium_offsets, not the default connect-offsets. This fix updates the tombstone method to use the correct topic name. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
416867b1d5
commit
fe2d4c3bcf
|
|
@ -962,10 +962,11 @@ full_reset() {
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# Strategy 2: Fallback - send tombstones via kafkacat (Kafka Connect 3.5 or earlier)
|
# Strategy 2: Always try tombstone method as primary approach
|
||||||
# Only use this if REST API didn't work
|
# The offset topic name is configured via OFFSET_STORAGE_TOPIC env var in Debezium Connect
|
||||||
if [ "$rest_api_worked" = false ]; then
|
# Default is "debezium_offsets" (not "connect-offsets"!)
|
||||||
log_info "REST API method not available, trying tombstone method..."
|
local offset_topic="debezium_offsets"
|
||||||
|
log_info "Sending tombstones to offset topic: $offset_topic"
|
||||||
|
|
||||||
# The offset key format is: ["connector-name",{"server":"topic.prefix"}]
|
# The offset key format is: ["connector-name",{"server":"topic.prefix"}]
|
||||||
for connector in "${connectors[@]}"; do
|
for connector in "${connectors[@]}"; do
|
||||||
|
|
@ -985,14 +986,13 @@ full_reset() {
|
||||||
# Try using kafkacat/kcat to send tombstone (NULL message)
|
# Try using kafkacat/kcat to send tombstone (NULL message)
|
||||||
# -Z flag enables NULL message delivery
|
# -Z flag enables NULL message delivery
|
||||||
# -K| sets key separator to |
|
# -K| sets key separator to |
|
||||||
# First try Debezium Connect container (might have kafkacat), then Kafka container
|
|
||||||
local tombstone_sent=false
|
local tombstone_sent=false
|
||||||
|
|
||||||
# Try Debezium Connect container
|
# Try Debezium Connect container first (has kafkacat)
|
||||||
local connect_container="rwa-debezium-connect"
|
local connect_container="rwa-debezium-connect"
|
||||||
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then
|
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${connect_container}$"; then
|
||||||
if docker exec "$connect_container" which kafkacat &>/dev/null; then
|
if docker exec "$connect_container" which kafkacat &>/dev/null; then
|
||||||
if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t connect-offsets -K \| 2>/dev/null; then
|
if echo "${offset_key}|" | docker exec -i "$connect_container" kafkacat -P -Z -b kafka:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
||||||
log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector"
|
log_success "Sent tombstone via kafkacat (debezium-connect) for: $connector"
|
||||||
tombstone_sent=true
|
tombstone_sent=true
|
||||||
fi
|
fi
|
||||||
|
|
@ -1002,12 +1002,12 @@ full_reset() {
|
||||||
# Try Kafka container if Debezium Connect didn't work
|
# Try Kafka container if Debezium Connect didn't work
|
||||||
if [ "$tombstone_sent" = false ]; then
|
if [ "$tombstone_sent" = false ]; then
|
||||||
if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then
|
if docker exec "$KAFKA_CONTAINER" which kafkacat &>/dev/null; then
|
||||||
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then
|
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kafkacat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
||||||
log_success "Sent tombstone via kafkacat (kafka) for: $connector"
|
log_success "Sent tombstone via kafkacat (kafka) for: $connector"
|
||||||
tombstone_sent=true
|
tombstone_sent=true
|
||||||
fi
|
fi
|
||||||
elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then
|
elif docker exec "$KAFKA_CONTAINER" which kcat &>/dev/null; then
|
||||||
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t connect-offsets -K \| 2>/dev/null; then
|
if echo "${offset_key}|" | docker exec -i "$KAFKA_CONTAINER" kcat -P -Z -b localhost:9092 -t "$offset_topic" -K \| 2>/dev/null; then
|
||||||
log_success "Sent tombstone via kcat for: $connector"
|
log_success "Sent tombstone via kcat for: $connector"
|
||||||
tombstone_sent=true
|
tombstone_sent=true
|
||||||
fi
|
fi
|
||||||
|
|
@ -1019,7 +1019,6 @@ full_reset() {
|
||||||
log_warn "Neither kafkacat nor kcat available, offset may persist"
|
log_warn "Neither kafkacat nor kcat available, offset may persist"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
fi
|
|
||||||
|
|
||||||
# Wait for offset changes to be processed
|
# Wait for offset changes to be processed
|
||||||
log_info "Waiting 5 seconds for offset deletions to be processed..."
|
log_info "Waiting 5 seconds for offset deletions to be processed..."
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue