fix: Debezium CDC 全面安全加固 (1.0 + 2.0)

问题背景:
  - 1.0 生产环境发现 6 个孤儿 replication slot, WAL 积压 8.6GB (已清理)
  - 1.0 使用 Debezium 2.4, 存在 DBZ-7316 (WAL 无限积压) bug
  - 1.0 和 2.0 均无 max_slot_wal_keep_size 安全阀 (已在线设置 10GB)
  - 2.0 outbox connector 使用 pg_logical_emit_message 心跳, 不经 publication
  - 2.0 outbox connector RegexRouter regex=".*" 导致 heartbeat 污染消费者

修复内容:

[docker-compose.yml - 1.0 基础设施]
  - Debezium: 2.4 → 2.5.4.Final (修复 DBZ-7316)
  - PostgreSQL: 添加 max_slot_wal_keep_size=10GB
  - Debezium REST API: 端口绑定 127.0.0.1 (防 SSRF 注入)
  - PostgreSQL: 端口绑定 127.0.0.1 (防公网直连)
  - Kafka Connect: 添加 OFFSET_FLUSH_INTERVAL_MS=10s

[docker-compose.2.0.yml - 2.0 基础设施]
  - Debezium: 2.5 → 2.5.4.Final (锁定精确版本)
  - PostgreSQL: 添加 max_slot_wal_keep_size=10GB
  - Kafka Connect: 添加 OFFSET_FLUSH_INTERVAL_MS=10s

[1.0 Connector 配置 - identity/authorization]
  - 添加 heartbeat.action.query (INSERT INTO debezium_heartbeat TABLE 方式)
  - 之前只有 heartbeat.interval.ms 无 action.query, 心跳不生效

[2.0 Outbox Connector 配置 - 5个全部更新]
  - heartbeat: pg_logical_emit_message → INSERT INTO debezium_heartbeat TABLE 方式
    (TABLE 方式经过 publication → Debezium 消费 → 推进 confirmed_flush_lsn)
  - RegexRouter: regex ".*" → ".*outbox_events" (只路由 outbox 事件, heartbeat 走默认 topic)
  - table.include.list: 添加 debezium_heartbeat (确保心跳变更生成 Kafka 消息)
  - publication.autocreate.mode: filtered → disabled (使用预创建的 publication)
  - auth/contribution: 添加 signal channel 配置 (支持增量快照数据重放)

经验总结:
  1. pg_logical_emit_message 写 WAL 但不经 publication, 无法推进 confirmed_flush_lsn
  2. RegexRouter regex=".*" 把所有变更(含 heartbeat)路由到 outbox topic, 污染消费者
  3. 删除 Kafka Connect connector 不会自动清理 PostgreSQL replication slot
  4. max_slot_wal_keep_size 是 sighup 级参数, 可在线 ALTER SYSTEM + pg_reload_conf

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-15 07:32:13 -08:00
parent 54eb472faa
commit 2a725af83e
9 changed files with 54 additions and 24 deletions

View File

@ -49,6 +49,7 @@ services:
-c max_replication_slots=20 -c max_replication_slots=20
-c max_wal_senders=20 -c max_wal_senders=20
-c max_connections=200 -c max_connections=200
-c max_slot_wal_keep_size=10GB
healthcheck: healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"]
interval: 5s interval: 5s
@ -81,7 +82,8 @@ services:
- rwa-2-network - rwa-2-network
debezium-2: debezium-2:
image: debezium/connect:2.4 # 必须锁定精确版本: 2.5.4.Final 修复 DBZ-7316 (WAL 积压 bug)
image: debezium/connect:2.5.4.Final
container_name: rwa-debezium-2 container_name: rwa-debezium-2
profiles: ["standalone"] profiles: ["standalone"]
depends_on: depends_on:
@ -102,6 +104,9 @@ services:
CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1
# Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口
OFFSET_FLUSH_INTERVAL_MS: 10000
OFFSET_FLUSH_TIMEOUT_MS: 5000
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
KEY_CONVERTER_SCHEMAS_ENABLE: "false" KEY_CONVERTER_SCHEMAS_ENABLE: "false"

View File

@ -22,7 +22,8 @@ services:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-rwa_secure_password} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-rwa_secure_password}
POSTGRES_MULTIPLE_DATABASES: rwa_identity,rwa_wallet,rwa_mpc,rwa_backup,rwa_planting,rwa_referral,rwa_reward,rwa_leaderboard,rwa_reporting,rwa_authorization,rwa_admin,rwa_presence,rwa_blockchain POSTGRES_MULTIPLE_DATABASES: rwa_identity,rwa_wallet,rwa_mpc,rwa_backup,rwa_planting,rwa_referral,rwa_reward,rwa_leaderboard,rwa_reporting,rwa_authorization,rwa_admin,rwa_presence,rwa_blockchain
ports: ports:
- "5432:5432" # 安全加固: 仅绑定 127.0.0.1, 禁止公网直连数据库
- "127.0.0.1:5432:5432"
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
- ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro - ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro
@ -38,6 +39,11 @@ services:
- "max_wal_senders=20" - "max_wal_senders=20"
- "-c" - "-c"
- "max_connections=300" - "max_connections=300"
- "-c"
# WAL 安全阀: 限制单个 replication slot 最多保留 10GB WAL
# 超过此值 PostgreSQL 会使 slot 失效, 防止磁盘被吃满
# (事故经验: 无此限制, 单个 stuck slot 累积 305GB WAL)
- "max_slot_wal_keep_size=10GB"
healthcheck: healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"]
interval: 5s interval: 5s
@ -130,7 +136,9 @@ services:
# Debezium Kafka Connect - CDC (Change Data Capture) # Debezium Kafka Connect - CDC (Change Data Capture)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
debezium-connect: debezium-connect:
image: debezium/connect:2.4 # 必须使用 2.5.4.Final+: 修复 DBZ-7316 (WAL 积压 bug)
# 2.4 版本 searchWalPosition 循环不推进 confirmed_flush_lsn
image: debezium/connect:2.5.4.Final
container_name: rwa-debezium-connect container_name: rwa-debezium-connect
depends_on: depends_on:
kafka: kafka:
@ -138,7 +146,9 @@ services:
postgres: postgres:
condition: service_healthy condition: service_healthy
ports: ports:
- "8084:8083" # 安全加固: 仅绑定 127.0.0.1, 禁止公网访问 Kafka Connect REST API
# 暴露公网会导致 SSRF 攻击 (恶意注入 connector)
- "127.0.0.1:8084:8083"
environment: environment:
TZ: Asia/Shanghai TZ: Asia/Shanghai
GROUP_ID: debezium-connect GROUP_ID: debezium-connect
@ -149,6 +159,9 @@ services:
CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1
# Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口
OFFSET_FLUSH_INTERVAL_MS: 10000
OFFSET_FLUSH_TIMEOUT_MS: 5000
# Connector settings # Connector settings
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

View File

@ -12,11 +12,11 @@
"topic.prefix": "cdc.auth", "topic.prefix": "cdc.auth",
"table.include.list": "public.outbox_events", "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal",
"plugin.name": "pgoutput", "plugin.name": "pgoutput",
"publication.name": "debezium_auth_outbox_publication", "publication.name": "debezium_auth_outbox_publication",
"publication.autocreate.mode": "filtered", "publication.autocreate.mode": "disabled",
"slot.name": "debezium_auth_outbox_slot", "slot.name": "debezium_auth_outbox_slot",
@ -31,11 +31,16 @@
"transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*", "transforms.route.regex": ".*outbox_events",
"transforms.route.replacement": "cdc.auth.outbox", "transforms.route.replacement": "cdc.auth.outbox",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"signal.enabled.channels": "source,kafka",
"signal.data.collection": "public.debezium_signal",
"signal.kafka.topic": "debezium-signals",
"signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -32,6 +32,7 @@
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -12,11 +12,11 @@
"topic.prefix": "cdc.contribution", "topic.prefix": "cdc.contribution",
"table.include.list": "public.outbox_events", "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal",
"plugin.name": "pgoutput", "plugin.name": "pgoutput",
"publication.name": "debezium_contribution_outbox_publication", "publication.name": "debezium_contribution_outbox_publication",
"publication.autocreate.mode": "filtered", "publication.autocreate.mode": "disabled",
"slot.name": "debezium_contribution_outbox_slot", "slot.name": "debezium_contribution_outbox_slot",
@ -31,11 +31,16 @@
"transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*", "transforms.route.regex": ".*outbox_events",
"transforms.route.replacement": "cdc.contribution.outbox", "transforms.route.replacement": "cdc.contribution.outbox",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"signal.enabled.channels": "source,kafka",
"signal.data.collection": "public.debezium_signal",
"signal.kafka.topic": "debezium-signals",
"signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -32,6 +32,7 @@
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -12,11 +12,11 @@
"topic.prefix": "cdc.mining", "topic.prefix": "cdc.mining",
"table.include.list": "public.outbox_events", "table.include.list": "public.outbox_events,public.debezium_heartbeat",
"plugin.name": "pgoutput", "plugin.name": "pgoutput",
"publication.name": "debezium_mining_outbox_publication", "publication.name": "debezium_mining_outbox_publication",
"publication.autocreate.mode": "filtered", "publication.autocreate.mode": "disabled",
"slot.name": "debezium_mining_outbox_slot", "slot.name": "debezium_mining_outbox_slot",
@ -31,11 +31,11 @@
"transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*", "transforms.route.regex": ".*outbox_events",
"transforms.route.replacement": "cdc.mining.outbox", "transforms.route.replacement": "cdc.mining.outbox",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -12,11 +12,11 @@
"topic.prefix": "cdc.mining-wallet", "topic.prefix": "cdc.mining-wallet",
"table.include.list": "public.outbox_events", "table.include.list": "public.outbox_events,public.debezium_heartbeat",
"plugin.name": "pgoutput", "plugin.name": "pgoutput",
"publication.name": "debezium_mining_wallet_outbox_publication", "publication.name": "debezium_mining_wallet_outbox_publication",
"publication.autocreate.mode": "filtered", "publication.autocreate.mode": "disabled",
"slot.name": "debezium_mining_wallet_outbox_slot", "slot.name": "debezium_mining_wallet_outbox_slot",
@ -31,11 +31,11 @@
"transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*", "transforms.route.regex": ".*outbox_events",
"transforms.route.replacement": "cdc.mining-wallet.outbox", "transforms.route.replacement": "cdc.mining-wallet.outbox",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"snapshot.mode": "initial", "snapshot.mode": "initial",

View File

@ -12,11 +12,11 @@
"topic.prefix": "cdc.trading", "topic.prefix": "cdc.trading",
"table.include.list": "public.outbox_events", "table.include.list": "public.outbox_events,public.debezium_heartbeat",
"plugin.name": "pgoutput", "plugin.name": "pgoutput",
"publication.name": "debezium_trading_outbox_publication", "publication.name": "debezium_trading_outbox_publication",
"publication.autocreate.mode": "filtered", "publication.autocreate.mode": "disabled",
"slot.name": "debezium_trading_outbox_slot", "slot.name": "debezium_trading_outbox_slot",
@ -31,11 +31,11 @@
"transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*", "transforms.route.regex": ".*outbox_events",
"transforms.route.replacement": "cdc.trading.outbox", "transforms.route.replacement": "cdc.trading.outbox",
"heartbeat.interval.ms": "10000", "heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()",
"snapshot.mode": "initial", "snapshot.mode": "initial",