diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index ffd3fbe6..3f18ae41 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -49,6 +49,7 @@ services: -c max_replication_slots=20 -c max_wal_senders=20 -c max_connections=200 + -c max_slot_wal_keep_size=10GB healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] interval: 5s @@ -81,7 +82,8 @@ services: - rwa-2-network debezium-2: - image: debezium/connect:2.4 + # 必须锁定精确版本: 2.5.4.Final 修复 DBZ-7316 (WAL 积压 bug) + image: debezium/connect:2.5.4.Final container_name: rwa-debezium-2 profiles: ["standalone"] depends_on: @@ -102,6 +104,9 @@ services: CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口 + OFFSET_FLUSH_INTERVAL_MS: 10000 + OFFSET_FLUSH_TIMEOUT_MS: 5000 KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter KEY_CONVERTER_SCHEMAS_ENABLE: "false" diff --git a/backend/services/docker-compose.yml b/backend/services/docker-compose.yml index 40a688b7..5695ea97 100644 --- a/backend/services/docker-compose.yml +++ b/backend/services/docker-compose.yml @@ -22,7 +22,8 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-rwa_secure_password} POSTGRES_MULTIPLE_DATABASES: rwa_identity,rwa_wallet,rwa_mpc,rwa_backup,rwa_planting,rwa_referral,rwa_reward,rwa_leaderboard,rwa_reporting,rwa_authorization,rwa_admin,rwa_presence,rwa_blockchain ports: - - "5432:5432" + # 安全加固: 仅绑定 127.0.0.1, 禁止公网直连数据库 + - "127.0.0.1:5432:5432" volumes: - postgres_data:/var/lib/postgresql/data - ./scripts/init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh:ro @@ -38,6 +39,11 @@ services: - "max_wal_senders=20" - "-c" - "max_connections=300" + - "-c" + # WAL 安全阀: 限制单个 replication slot 最多保留 10GB WAL + # 超过此值 PostgreSQL 会使 slot 失效, 防止磁盘被吃满 + # (事故经验: 无此限制, 单个 stuck slot 累积 305GB WAL) + - "max_slot_wal_keep_size=10GB" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-rwa_user}"] interval: 5s @@ -130,7 +136,9 @@ services: # Debezium Kafka Connect - CDC (Change Data Capture) # --------------------------------------------------------------------------- debezium-connect: - image: debezium/connect:2.4 + # 必须使用 2.5.4.Final+: 修复 DBZ-7316 (WAL 积压 bug) + # 2.4 版本 searchWalPosition 循环不推进 confirmed_flush_lsn + image: debezium/connect:2.5.4.Final container_name: rwa-debezium-connect depends_on: kafka: @@ -138,7 +146,9 @@ services: postgres: condition: service_healthy ports: - - "8084:8083" + # 安全加固: 仅绑定 127.0.0.1, 禁止公网访问 Kafka Connect REST API + # 暴露公网会导致 SSRF 攻击 (恶意注入 connector) + - "127.0.0.1:8084:8083" environment: TZ: Asia/Shanghai GROUP_ID: debezium-connect @@ -149,6 +159,9 @@ services: CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口 + OFFSET_FLUSH_INTERVAL_MS: 10000 + OFFSET_FLUSH_TIMEOUT_MS: 5000 # Connector settings KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter diff --git a/backend/services/scripts/debezium/auth-outbox-connector.json b/backend/services/scripts/debezium/auth-outbox-connector.json index a57d91fb..923c7d49 100644 --- a/backend/services/scripts/debezium/auth-outbox-connector.json +++ b/backend/services/scripts/debezium/auth-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.auth", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal", "plugin.name": "pgoutput", "publication.name": "debezium_auth_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_auth_outbox_slot", @@ -31,11 +31,16 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.auth.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", + + "signal.enabled.channels": "source,kafka", + "signal.data.collection": "public.debezium_signal", + "signal.kafka.topic": "debezium-signals", + "signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/authorization-connector.json b/backend/services/scripts/debezium/authorization-connector.json index 5169a93d..5c9f80fb 100644 --- a/backend/services/scripts/debezium/authorization-connector.json +++ b/backend/services/scripts/debezium/authorization-connector.json @@ -32,6 +32,7 @@ "transforms.unwrap.add.fields": "op,table,source.ts_ms", "heartbeat.interval.ms": "10000", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/contribution-outbox-connector.json b/backend/services/scripts/debezium/contribution-outbox-connector.json index 2b0c3357..ea673504 100644 --- a/backend/services/scripts/debezium/contribution-outbox-connector.json +++ b/backend/services/scripts/debezium/contribution-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.contribution", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat,public.debezium_signal", "plugin.name": "pgoutput", "publication.name": "debezium_contribution_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_contribution_outbox_slot", @@ -31,11 +31,16 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.contribution.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", + + "signal.enabled.channels": "source,kafka", + "signal.data.collection": "public.debezium_signal", + "signal.kafka.topic": "debezium-signals", + "signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:29092}", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/identity-connector.json b/backend/services/scripts/debezium/identity-connector.json index 792b6455..7eb5b720 100644 --- a/backend/services/scripts/debezium/identity-connector.json +++ b/backend/services/scripts/debezium/identity-connector.json @@ -32,6 +32,7 @@ "transforms.unwrap.add.fields": "op,table,source.ts_ms", "heartbeat.interval.ms": "10000", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/mining-outbox-connector.json b/backend/services/scripts/debezium/mining-outbox-connector.json index 106a2ea7..2ed24306 100644 --- a/backend/services/scripts/debezium/mining-outbox-connector.json +++ b/backend/services/scripts/debezium/mining-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.mining", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_mining_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_mining_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.mining.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/mining-wallet-outbox-connector.json b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json index 4bc44c64..7eaa880a 100644 --- a/backend/services/scripts/debezium/mining-wallet-outbox-connector.json +++ b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.mining-wallet", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_mining_wallet_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_mining_wallet_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.mining-wallet.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/trading-outbox-connector.json b/backend/services/scripts/debezium/trading-outbox-connector.json index 17b80316..1f899fc5 100644 --- a/backend/services/scripts/debezium/trading-outbox-connector.json +++ b/backend/services/scripts/debezium/trading-outbox-connector.json @@ -12,11 +12,11 @@ "topic.prefix": "cdc.trading", - "table.include.list": "public.outbox_events", + "table.include.list": "public.outbox_events,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_trading_outbox_publication", - "publication.autocreate.mode": "filtered", + "publication.autocreate.mode": "disabled", "slot.name": "debezium_trading_outbox_slot", @@ -31,11 +31,11 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex": ".*", + "transforms.route.regex": ".*outbox_events", "transforms.route.replacement": "cdc.trading.outbox", "heartbeat.interval.ms": "10000", - "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", "snapshot.mode": "initial",