feat: Debezium CDC 安全加固 + WAL 防护 (来自 rwadurian 生产事故经验)

## 背景
rwadurian 2.0 服务器发生 Debezium CDC 事故:
- DBZ-7316 bug: Debezium ≤2.4 的 searchWalPosition 循环不推进 confirmed_flush_lsn
- PostgreSQL WAL 从正常涨到 306GB, 磁盘占用 23% → 修复后降至 6%
- Kafka Connect REST API 暴露公网, 被注入 3 个恶意 SSRF connector
- pg_logical_emit_message 心跳无效: 写入 WAL 但不经过 publication

## 变更内容

### 1. Debezium 版本锁定 (docker-compose.yml)
- debezium/connect:2.5 → debezium/connect:2.5.4.Final
- 2.5.1 修复 DBZ-7316, 2.5.4 为该系列最终稳定版

### 2. PostgreSQL WAL 安全阀 (docker-compose.yml)
- 新增 max_slot_wal_keep_size=10GB
- 限制单个 replication slot 最多保留 10GB WAL
- 超限后 PostgreSQL 使 slot 失效, 防止磁盘被吃满

### 3. 端口安全加固 (docker-compose.yml)
绑定 127.0.0.1, 禁止公网访问:
- PostgreSQL 5432 (数据库直连)
- Redis 6379 (无密码保护)
- Kafka Connect 8083 (SSRF 注入风险)
- Kafka 29092 (外部访问端口)
- Kong Admin 8001 (路由篡改风险)
- MinIO Console 9001 (默认密码)

### 4. 基础设施可用性 (docker-compose.yml)
- 所有基础设施服务添加 restart: unless-stopped
- Kafka Connect 添加 OFFSET_FLUSH_INTERVAL_MS=10s (默认 60s)

### 5. Debezium 支持表 (040_create_debezium_support.sql)
- debezium_heartbeat: 心跳表, singleton 约束, INSERT...ON CONFLICT DO UPDATE
- debezium_signal: 信号表, 用于增量快照等运维操作
- debezium_outbox_publication: 包含 outbox + heartbeat + signal 三表

### 6. Connector 配置 (scripts/debezium/outbox-connector.json)
- heartbeat TABLE 方式 (非 pg_logical_emit_message)
- publication.autocreate.mode=disabled (使用预建 publication)
- signal.enabled.channels=source,kafka (支持增量快照重放)
- ExtractNewRecordState + RegexRouter transforms

### 7. 部署脚本 (scripts/debezium/register-connectors.sh)
- 等待 Kafka Connect 就绪
- 支持创建和更新 connector
- 验证 connector 状态

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-15 06:42:03 -08:00
parent a1293e8445
commit 127f2cdd81
4 changed files with 179 additions and 7 deletions

View File

@ -13,7 +13,8 @@ services:
POSTGRES_PASSWORD: genex_dev_password
POSTGRES_DB: genex
ports:
- "5432:5432"
# 安全加固: 仅绑定 127.0.0.1, 禁止公网直连数据库
- "127.0.0.1:5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
@ -25,11 +26,17 @@ services:
- "max_replication_slots=10" # CDC connector slots
- "-c"
- "max_wal_senders=10" # WAL sender processes
- "-c"
# WAL 安全阀: 限制单个 replication slot 最多保留 10GB WAL
# 超过此值 PostgreSQL 会使 slot 失效, 防止磁盘被吃满
# (rwadurian 事故: 无此限制, 单个 stuck slot 累积 305GB WAL)
- "max_slot_wal_keep_size=10GB"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U genex"]
interval: 5s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- genex-network
@ -37,7 +44,8 @@ services:
image: redis:7-alpine
container_name: genex-redis
ports:
- "6379:6379"
# 安全加固: 仅绑定 127.0.0.1, Redis 无密码保护, 暴露公网极易被利用
- "127.0.0.1:6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
@ -46,6 +54,7 @@ services:
interval: 5s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- genex-network
@ -70,7 +79,8 @@ services:
CLUSTER_ID: "genex-kafka-cluster-001"
ports:
- "9092:9092"
- "29092:29092"
# 安全加固: 外部访问端口仅绑定 127.0.0.1
- "127.0.0.1:29092:29092"
volumes:
- kafka_data:/var/lib/kafka/data
healthcheck:
@ -78,6 +88,7 @@ services:
interval: 10s
timeout: 10s
retries: 5
restart: unless-stopped
networks:
- genex-network
@ -90,7 +101,8 @@ services:
MINIO_ROOT_PASSWORD: genex-minio-secret
ports:
- "9000:9000" # S3 API
- "9001:9001" # Console UI
# 安全加固: MinIO Console 仅绑定 127.0.0.1, 带默认密码暴露公网极其危险
- "127.0.0.1:9001:9001" # Console UI
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
@ -99,6 +111,7 @@ services:
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- genex-network
@ -127,8 +140,9 @@ services:
- genex-network
# Debezium Kafka Connect (CDC - Change Data Capture)
# 版本说明: 必须使用 2.5.1+ (修复 DBZ-7316: searchWalPosition 不推进 confirmed_flush_lsn, 导致 WAL 无限积压)
kafka-connect:
image: debezium/connect:2.5
image: debezium/connect:2.5.4.Final
container_name: genex-kafka-connect
environment:
BOOTSTRAP_SERVERS: kafka:9092
@ -139,13 +153,19 @@ services:
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口
OFFSET_FLUSH_INTERVAL_MS: 10000
OFFSET_FLUSH_TIMEOUT_MS: 5000
ports:
- "8083:8083" # Kafka Connect REST API
# 安全加固: 仅绑定 127.0.0.1, 禁止公网访问 Kafka Connect REST API
# 暴露公网会导致 SSRF 攻击 (恶意注入 connector 读取 /etc/passwd 等)
- "127.0.0.1:8083:8083"
depends_on:
kafka:
condition: service_healthy
postgres:
condition: service_healthy
restart: unless-stopped
networks:
- genex-network
@ -164,7 +184,8 @@ services:
KONG_PROXY_LISTEN: 0.0.0.0:8080
ports:
- "8080:8080" # Proxy (frontend connects here)
- "8001:8001" # Admin API
# 安全加固: Kong Admin API 仅绑定 127.0.0.1, 暴露公网可被用于篡改路由规则
- "127.0.0.1:8001:8001" # Admin API
volumes:
- ./kong/kong.yml:/etc/kong/kong.yml:ro
healthcheck:
@ -172,6 +193,7 @@ services:
interval: 10s
timeout: 10s
retries: 5
restart: unless-stopped
networks:
- genex-network

View File

@ -0,0 +1,39 @@
-- 040: Debezium CDC support tables — heartbeat + signal + publication
--
-- 经验教训 (来自 rwadurian 事故):
-- 1. Debezium ≤2.4 存在 DBZ-7316 bug: 当 outbox 表长时间无写入时,
-- confirmed_flush_lsn 停止推进, PostgreSQL WAL 无限积压 (曾导致 306GB WAL)
-- 2. heartbeat TABLE 方式比 pg_logical_emit_message() 更可靠:
-- - pg_logical_emit_message 写入 WAL 但不经过 publication, Debezium 无法感知
-- - heartbeat TABLE 方式: Debezium 定期 INSERT/UPDATE → 产生 WAL 变更 →
-- 经过 publication → Debezium 消费 → 推进 confirmed_flush_lsn
-- 3. signal 表用于 Debezium 增量快照 (incremental snapshot):
-- - 支持 Kafka 信号通道触发数据重放, 零数据库修改
-- - 出问题时可安全地重新投递 outbox 事件
-- 4. publication 必须包含 outbox + heartbeat + signal 三张表
-- Debezium 心跳表: 防止 WAL 积压
CREATE TABLE IF NOT EXISTS debezium_heartbeat (
id INTEGER PRIMARY KEY DEFAULT 1,
ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT debezium_heartbeat_singleton CHECK (id = 1)
);
-- 初始化心跳行 (Debezium 后续通过 ON CONFLICT DO UPDATE 更新)
INSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT DO NOTHING;
-- Debezium 信号表: 用于增量快照等运维操作
CREATE TABLE IF NOT EXISTS debezium_signal (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data TEXT
);
-- 创建 publication: 包含 outbox + heartbeat + signal 三张表
-- Debezium connector 使用 publication.autocreate.mode=disabled, 直接引用此 publication
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'debezium_outbox_publication') THEN
CREATE PUBLICATION debezium_outbox_publication FOR TABLE outbox, debezium_heartbeat, debezium_signal;
END IF;
END $$;

View File

@ -0,0 +1,50 @@
{
"name": "genex-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "${DEBEZIUM_DB_HOST:-postgres}",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-genex}",
"database.password": "${POSTGRES_PASSWORD:-genex_dev_password}",
"database.dbname": "genex",
"topic.prefix": "cdc.genex",
"table.include.list": "public.outbox,public.debezium_heartbeat,public.debezium_signal",
"plugin.name": "pgoutput",
"publication.name": "debezium_outbox_publication",
"publication.autocreate.mode": "disabled",
"slot.name": "debezium_genex_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.genex.outbox",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"signal.enabled.channels": "source,kafka",
"signal.data.collection": "public.debezium_signal",
"signal.kafka.topic": "debezium-signals",
"signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:9092}",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -0,0 +1,61 @@
#!/usr/bin/env bash
# =============================================================================
# Debezium connector 注册脚本
# 用法: ./scripts/debezium/register-connectors.sh [CONNECT_URL]
#
# 经验教训 (rwadurian 事故):
# - 必须使用 debezium/connect:2.5.4.Final (修复 DBZ-7316 WAL 积压 bug)
# - heartbeat 使用 TABLE 方式而非 pg_logical_emit_message
# - Kafka Connect REST API 不能暴露公网 (SSRF 注入风险)
# - 配置 signal channel 以便出问题时可用增量快照重放数据
# =============================================================================
set -euo pipefail
CONNECT_URL="${1:-http://localhost:8083}"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
echo "Waiting for Kafka Connect to be ready..."
until curl -sf "${CONNECT_URL}/" > /dev/null 2>&1; do
echo " Kafka Connect not ready, retrying in 5s..."
sleep 5
done
echo "Kafka Connect is ready at ${CONNECT_URL}"
# 注册 outbox connector
CONNECTOR_NAME="genex-outbox-connector"
CONFIG_FILE="${SCRIPT_DIR}/outbox-connector.json"
if [ ! -f "$CONFIG_FILE" ]; then
echo "ERROR: Config file not found: $CONFIG_FILE"
exit 1
fi
# 检查 connector 是否已存在
STATUS_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${CONNECT_URL}/connectors/${CONNECTOR_NAME}")
if [ "$STATUS_CODE" = "200" ]; then
echo "Connector '${CONNECTOR_NAME}' already exists. Updating config..."
# 提取 config 部分进行 PUT 更新
CONFIG_ONLY=$(python3 -c "import json,sys; d=json.load(open(sys.argv[1])); print(json.dumps(d['config']))" "$CONFIG_FILE" 2>/dev/null \
|| python -c "import json,sys; d=json.load(open(sys.argv[1])); print(json.dumps(d['config']))" "$CONFIG_FILE")
curl -sf -X PUT \
-H "Content-Type: application/json" \
-d "$CONFIG_ONLY" \
"${CONNECT_URL}/connectors/${CONNECTOR_NAME}/config" | python3 -m json.tool 2>/dev/null || true
echo "Connector '${CONNECTOR_NAME}' updated."
else
echo "Creating connector '${CONNECTOR_NAME}'..."
curl -sf -X POST \
-H "Content-Type: application/json" \
-d @"$CONFIG_FILE" \
"${CONNECT_URL}/connectors" | python3 -m json.tool 2>/dev/null || true
echo "Connector '${CONNECTOR_NAME}' created."
fi
# 验证状态
echo ""
echo "=== Connector Status ==="
curl -sf "${CONNECT_URL}/connectors/${CONNECTOR_NAME}/status" | python3 -m json.tool 2>/dev/null || \
curl -sf "${CONNECT_URL}/connectors/${CONNECTOR_NAME}/status"
echo ""
echo "Done."