From 2a725af83e339fb1f9929de1c23848a817891d34 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 15 Feb 2026 07:32:13 -0800 Subject: [PATCH] =?UTF-8?q?fix:=20Debezium=20CDC=20=E5=85=A8=E9=9D=A2?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E5=8A=A0=E5=9B=BA=20(1.0=20+=202.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题背景: - 1.0 生产环境发现 6 个孤儿 replication slot, WAL 积压 8.6GB (已清理) - 1.0 使用 Debezium 2.4, 存在 DBZ-7316 (WAL 无限积压) bug - 1.0 和 2.0 均无 max_slot_wal_keep_size 安全阀 (已在线设置 10GB) - 2.0 outbox connector 使用 pg_logical_emit_message 心跳, 不经 publication - 2.0 outbox connector RegexRouter regex=".*" 导致 heartbeat 污染消费者 修复内容: [docker-compose.yml - 1.0 基础设施] - Debezium: 2.4 → 2.5.4.Final (修复 DBZ-7316) - PostgreSQL: 添加 max_slot_wal_keep_size=10GB - Debezium REST API: 端口绑定 127.0.0.1 (防 SSRF 注入) - PostgreSQL: 端口绑定 127.0.0.1 (防公网直连) - Kafka Connect: 添加 OFFSET_FLUSH_INTERVAL_MS=10s [docker-compose.2.0.yml - 2.0 基础设施] - Debezium: 2.5 → 2.5.4.Final (锁定精确版本) - PostgreSQL: 添加 max_slot_wal_keep_size=10GB - Kafka Connect: 添加 OFFSET_FLUSH_INTERVAL_MS=10s [1.0 Connector 配置 - identity/authorization] - 添加 heartbeat.action.query (INSERT INTO debezium_heartbeat TABLE 方式) - 之前只有 heartbeat.interval.ms 无 action.query, 心跳不生效 [2.0 Outbox Connector 配置 - 5个全部更新] - heartbeat: pg_logical_emit_message → INSERT INTO debezium_heartbeat TABLE 方式 (TABLE 方式经过 publication → Debezium 消费 → 推进 confirmed_flush_lsn) - RegexRouter: regex ".*" → ".*outbox_events" (只路由 outbox 事件, heartbeat 走默认 topic) - table.include.list: 添加 debezium_heartbeat (确保心跳变更生成 Kafka 消息) - publication.autocreate.mode: filtered → disabled (使用预创建的 publication) - auth/contribution: 添加 signal channel 配置 (支持增量快照数据重放) 经验总结: 1. pg_logical_emit_message 写 WAL 但不经 publication, 无法推进 confirmed_flush_lsn 2. RegexRouter regex=".*" 把所有变更(含 heartbeat)路由到 outbox topic, 污染消费者 3. 删除 Kafka Connect connector 不会自动清理 PostgreSQL replication slot 4. max_slot_wal_keep_size 是 sighup 级参数, 可在线 ALTER SYSTEM + pg_reload_conf Co-Authored-By: Claude Opus 4.6 --- backend/services/docker-compose.2.0.yml | 7 ++++++- backend/services/docker-compose.yml | 19 ++++++++++++++++--- .../debezium/auth-outbox-connector.json | 13 +++++++++---- .../debezium/authorization-connector.json | 1 + .../contribution-outbox-connector.json | 13 +++++++++---- .../scripts/debezium/identity-connector.json | 1 + .../debezium/mining-outbox-connector.json | 8 ++++---- .../mining-wallet-outbox-connector.json | 8 ++++---- .../debezium/trading-outbox-connector.json | 8 ++++---- 9 files changed, 54 insertions(+), 24 deletions(-) diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index ffd3fbe6..3f18ae41 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -49,6 +49,7 @@ services: -c max_replication_slots=20 -c max_wal_senders=20 -c max_connections=200 + -c max_slot_wal_keep_size=10GB healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] interval: 5s @@ -81,7 +82,8 @@ services: - rwa-2-network debezium-2: - image: debezium/connect:2.4 + # 必须锁定精确版本: 2.5.4.Final 修复 DBZ-7316 (WAL 积压 bug) + image: debezium/connect:2.5.4.Final container_name: rwa-debezium-2 profiles: ["standalone"] depends_on: @@ -102,6 +104,9 @@ services: CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口 + OFFSET_FLUSH_INTERVAL_MS: 10000 + OFFSET_FLUSH_TIMEOUT_MS: 5000 KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter KEY_CONVERTER_SCHEMAS_ENABLE: "false" diff --git a/backend/services/docker-compose.yml b/backend/services/docker-compose.yml index 40a688b7..5695ea97 100644 --- a/backend/services/docker-compose.yml +++ b/backend/services/docker-compose.yml @@ -22,7 +22,8 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-rwa_secure_password} POSTGRES_MULTIPLE_DATABASES: rwa_identity,rwa_wallet,rwa_mpc,rwa_backup,rwa_planting,rwa_referral,rwa_reward,rwa_leaderboard,rwa_reporting,rwa_authorization,rwa_admin,rwa_presence,rwa_blockchain ports: - - "5432:5432" + # 安全加固: 仅绑定 127.0.0.1, 禁止公网直连数据库 + - "127.0.0.1:5432:5432" volumes: - postgres_data:/var/lib/postgresql/data - ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro @@ -38,6 +39,11 @@ services: - "max_wal_senders=20" - "-c" - "max_connections=300" + - "-c" + # WAL 安全阀: 限制单个 replication slot 最多保留 10GB WAL + # 超过此值 PostgreSQL 会使 slot 失效, 防止磁盘被吃满 + # (事故经验: 无此限制, 单个 stuck slot 累积 305GB WAL) + - "max_slot_wal_keep_size=10GB" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] interval: 5s @@ -130,7 +136,9 @@ services: # Debezium Kafka Connect - CDC (Change Data Capture) # --------------------------------------------------------------------------- debezium-connect: - image: debezium/connect:2.4 + # 必须使用 2.5.4.Final+: 修复 DBZ-7316 (WAL 积压 bug) + # 2.4 版本 searchWalPosition 循环不推进 confirmed_flush_lsn + image: debezium/connect:2.5.4.Final container_name: rwa-debezium-connect depends_on: kafka: @@ -138,7 +146,9 @@ services: postgres: condition: service_healthy ports: - - "8084:8083" + # 安全加固: 仅绑定 127.0.0.1, 禁止公网访问 Kafka Connect REST API + # 暴露公网会导致 SSRF 攻击 (恶意注入 connector) + - "127.0.0.1:8084:8083" environment: TZ: Asia/Shanghai GROUP_ID: debezium-connect @@ -149,6 +159,9 @@ services: CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口 + OFFSET_FLUSH_INTERVAL_MS: 10000 + OFFSET_FLUSH_TIMEOUT_MS: 5000 # Connector settings KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter diff --git a/backend/services/scripts/debezium/auth-outbox-connector.json b/backend/services/scripts/debezium/auth-outbox-connector.json index a57d91fb..923c7d49 100644 --- a/backend/services/scripts/debezium/auth-outbox-connector.json +++ b/backend/services/scripts/debezium/auth-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.auth", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal", "plugin.name": "pgoutput", "publication.name": "debezium_auth_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_auth_outbox_slot", @@ -31,11 +31,16 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.auth.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", + + "signal.enabled.channels": "source,kafka", + "signal.data.collection": "public.debezium_signal", + "signal.kafka.topic": "debezium-signals", + "signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/authorization-connector.json b/backend/services/scripts/debezium/authorization-connector.json index 5169a93d..5c9f80fb 100644 --- a/backend/services/scripts/debezium/authorization-connector.json +++ b/backend/services/scripts/debezium/authorization-connector.json @@ -32,6 +32,7 @@ "transforms.unwrap.add.fields": "op,table,source.ts_ms", "heartbeat.interval.ms": "10000", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/contribution-outbox-connector.json b/backend/services/scripts/debezium/contribution-outbox-connector.json index 2b0c3357..ea673504 100644 --- a/backend/services/scripts/debezium/contribution-outbox-connector.json +++ b/backend/services/scripts/debezium/contribution-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.contribution", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal", "plugin.name": "pgoutput", "publication.name": "debezium_contribution_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_contribution_outbox_slot", @@ -31,11 +31,16 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.contribution.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", + + "signal.enabled.channels": "source,kafka", + "signal.data.collection": "public.debezium_signal", + "signal.kafka.topic": "debezium-signals", + "signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/identity-connector.json b/backend/services/scripts/debezium/identity-connector.json index 792b6455..7eb5b720 100644 --- a/backend/services/scripts/debezium/identity-connector.json +++ b/backend/services/scripts/debezium/identity-connector.json @@ -32,6 +32,7 @@ "transforms.unwrap.add.fields": "op,table,source.ts_ms", "heartbeat.interval.ms": "10000", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/mining-outbox-connector.json b/backend/services/scripts/debezium/mining-outbox-connector.json index 106a2ea7..2ed24306 100644 --- a/backend/services/scripts/debezium/mining-outbox-connector.json +++ b/backend/services/scripts/debezium/mining-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.mining", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_mining_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_mining_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.mining.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/mining-wallet-outbox-connector.json b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json index 4bc44c64..7eaa880a 100644 --- a/backend/services/scripts/debezium/mining-wallet-outbox-connector.json +++ b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.mining-wallet", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_mining_wallet_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_mining_wallet_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.mining-wallet.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/trading-outbox-connector.json b/backend/services/scripts/debezium/trading-outbox-connector.json index 17b80316..1f899fc5 100644 --- a/backend/services/scripts/debezium/trading-outbox-connector.json +++ b/backend/services/scripts/debezium/trading-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.trading", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_trading_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_trading_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.trading.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial",