diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index 82881f7..084e822 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -13,7 +13,8 @@ services: POSTGRES_PASSWORD: genex_dev_password POSTGRES_DB: genex ports: - - "5432:5432" + # 安全加固: 仅绑定 127.0.0.1, 禁止公网直连数据库 + - "127.0.0.1:5432:5432" volumes: - postgres_data:/var/lib/postgresql/data - ./migrations:/docker-entrypoint-initdb.d @@ -25,11 +26,17 @@ services: - "max_replication_slots=10" # CDC connector slots - "-c" - "max_wal_senders=10" # WAL sender processes + - "-c" + # WAL 安全阀: 限制单个 replication slot 最多保留 10GB WAL + # 超过此值 PostgreSQL 会使 slot 失效, 防止磁盘被吃满 + # (rwadurian 事故: 无此限制, 单个 stuck slot 累积 305GB WAL) + - "max_slot_wal_keep_size=10GB" healthcheck: test: ["CMD-SHELL", "pg_isready -U genex"] interval: 5s timeout: 5s retries: 5 + restart: unless-stopped networks: - genex-network @@ -37,7 +44,8 @@ services: image: redis:7-alpine container_name: genex-redis ports: - - "6379:6379" + # 安全加固: 仅绑定 127.0.0.1, Redis 无密码保护, 暴露公网极易被利用 + - "127.0.0.1:6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes @@ -46,6 +54,7 @@ services: interval: 5s timeout: 5s retries: 5 + restart: unless-stopped networks: - genex-network @@ -70,7 +79,8 @@ services: CLUSTER_ID: "genex-kafka-cluster-001" ports: - "9092:9092" - - "29092:29092" + # 安全加固: 外部访问端口仅绑定 127.0.0.1 + - "127.0.0.1:29092:29092" volumes: - kafka_data:/var/lib/kafka/data healthcheck: @@ -78,6 +88,7 @@ services: interval: 10s timeout: 10s retries: 5 + restart: unless-stopped networks: - genex-network @@ -90,7 +101,8 @@ services: MINIO_ROOT_PASSWORD: genex-minio-secret ports: - "9000:9000" # S3 API - - "9001:9001" # Console UI + # 安全加固: MinIO Console 仅绑定 127.0.0.1, 带默认密码暴露公网极其危险 + - "127.0.0.1:9001:9001" # Console UI volumes: - minio_data:/data command: server /data --console-address ":9001" @@ -99,6 +111,7 @@ services: interval: 10s timeout: 5s retries: 5 + restart: unless-stopped networks: - genex-network @@ -127,8 +140,9 @@ services: - genex-network # Debezium Kafka Connect (CDC - Change Data Capture) + # 版本说明: 必须使用 2.5.1+ (修复 DBZ-7316: searchWalPosition 不推进 confirmed_flush_lsn, 导致 WAL 无限积压) kafka-connect: - image: debezium/connect:2.5 + image: debezium/connect:2.5.4.Final container_name: genex-kafka-connect environment: BOOTSTRAP_SERVERS: kafka:9092 @@ -139,13 +153,19 @@ services: CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 + # Offset 提交频率: 默认 60s, 缩短至 10s 以减少重启后重复处理窗口 + OFFSET_FLUSH_INTERVAL_MS: 10000 + OFFSET_FLUSH_TIMEOUT_MS: 5000 ports: - - "8083:8083" # Kafka Connect REST API + # 安全加固: 仅绑定 127.0.0.1, 禁止公网访问 Kafka Connect REST API + # 暴露公网会导致 SSRF 攻击 (恶意注入 connector 读取 /etc/passwd 等) + - "127.0.0.1:8083:8083" depends_on: kafka: condition: service_healthy postgres: condition: service_healthy + restart: unless-stopped networks: - genex-network @@ -164,7 +184,8 @@ services: KONG_PROXY_LISTEN: 0.0.0.0:8080 ports: - "8080:8080" # Proxy (frontend connects here) - - "8001:8001" # Admin API + # 安全加固: Kong Admin API 仅绑定 127.0.0.1, 暴露公网可被用于篡改路由规则 + - "127.0.0.1:8001:8001" # Admin API volumes: - ./kong/kong.yml:/etc/kong/kong.yml:ro healthcheck: @@ -172,6 +193,7 @@ services: interval: 10s timeout: 10s retries: 5 + restart: unless-stopped networks: - genex-network diff --git a/backend/migrations/040_create_debezium_support.sql b/backend/migrations/040_create_debezium_support.sql new file mode 100644 index 0000000..7f8e29d --- /dev/null +++ b/backend/migrations/040_create_debezium_support.sql @@ -0,0 +1,39 @@ +-- 040: Debezium CDC support tables — heartbeat + signal + publication +-- +-- 经验教训 (来自 rwadurian 事故): +-- 1. Debezium ≤2.4 存在 DBZ-7316 bug: 当 outbox 表长时间无写入时, +-- confirmed_flush_lsn 停止推进, PostgreSQL WAL 无限积压 (曾导致 306GB WAL) +-- 2. heartbeat TABLE 方式比 pg_logical_emit_message() 更可靠: +-- - pg_logical_emit_message 写入 WAL 但不经过 publication, Debezium 无法感知 +-- - heartbeat TABLE 方式: Debezium 定期 INSERT/UPDATE → 产生 WAL 变更 → +-- 经过 publication → Debezium 消费 → 推进 confirmed_flush_lsn +-- 3. signal 表用于 Debezium 增量快照 (incremental snapshot): +-- - 支持 Kafka 信号通道触发数据重放, 零数据库修改 +-- - 出问题时可安全地重新投递 outbox 事件 +-- 4. publication 必须包含 outbox + heartbeat + signal 三张表 + +-- Debezium 心跳表: 防止 WAL 积压 +CREATE TABLE IF NOT EXISTS debezium_heartbeat ( + id INTEGER PRIMARY KEY DEFAULT 1, + ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT debezium_heartbeat_singleton CHECK (id = 1) +); + +-- 初始化心跳行 (Debezium 后续通过 ON CONFLICT DO UPDATE 更新) +INSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT DO NOTHING; + +-- Debezium 信号表: 用于增量快照等运维操作 +CREATE TABLE IF NOT EXISTS debezium_signal ( + id VARCHAR(64) PRIMARY KEY, + type VARCHAR(32) NOT NULL, + data TEXT +); + +-- 创建 publication: 包含 outbox + heartbeat + signal 三张表 +-- Debezium connector 使用 publication.autocreate.mode=disabled, 直接引用此 publication +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'debezium_outbox_publication') THEN + CREATE PUBLICATION debezium_outbox_publication FOR TABLE outbox, debezium_heartbeat, debezium_signal; + END IF; +END $$; diff --git a/backend/scripts/debezium/outbox-connector.json b/backend/scripts/debezium/outbox-connector.json new file mode 100644 index 0000000..d1dd407 --- /dev/null +++ b/backend/scripts/debezium/outbox-connector.json @@ -0,0 +1,50 @@ +{ + "name": "genex-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "${DEBEZIUM_DB_HOST:-postgres}", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-genex}", + "database.password": "${POSTGRES_PASSWORD:-genex_dev_password}", + "database.dbname": "genex", + + "topic.prefix": "cdc.genex", + + "table.include.list": "public.outbox,public.debezium_heartbeat,public.debezium_signal", + + "plugin.name": "pgoutput", + "publication.name": "debezium_outbox_publication", + "publication.autocreate.mode": "disabled", + + "slot.name": "debezium_genex_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.genex.outbox", + + "heartbeat.interval.ms": "10000", + "heartbeat.action.query": "INSERT INTO debezium_heartbeat (id, ts) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET ts = now()", + + "signal.enabled.channels": "source,kafka", + "signal.data.collection": "public.debezium_signal", + "signal.kafka.topic": "debezium-signals", + "signal.kafka.bootstrap.servers": "${KAFKA_BROKERS:-kafka:9092}", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/scripts/debezium/register-connectors.sh b/backend/scripts/debezium/register-connectors.sh new file mode 100644 index 0000000..89e21ef --- /dev/null +++ b/backend/scripts/debezium/register-connectors.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +# ============================================================================= +# Debezium connector 注册脚本 +# 用法: ./scripts/debezium/register-connectors.sh [CONNECT_URL] +# +# 经验教训 (rwadurian 事故): +# - 必须使用 debezium/connect:2.5.4.Final (修复 DBZ-7316 WAL 积压 bug) +# - heartbeat 使用 TABLE 方式而非 pg_logical_emit_message +# - Kafka Connect REST API 不能暴露公网 (SSRF 注入风险) +# - 配置 signal channel 以便出问题时可用增量快照重放数据 +# ============================================================================= +set -euo pipefail + +CONNECT_URL="${1:-http://localhost:8083}" +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "Waiting for Kafka Connect to be ready..." +until curl -sf "${CONNECT_URL}/" > /dev/null 2>&1; do + echo " Kafka Connect not ready, retrying in 5s..." + sleep 5 +done +echo "Kafka Connect is ready at ${CONNECT_URL}" + +# 注册 outbox connector +CONNECTOR_NAME="genex-outbox-connector" +CONFIG_FILE="${SCRIPT_DIR}/outbox-connector.json" + +if [ ! -f "$CONFIG_FILE" ]; then + echo "ERROR: Config file not found: $CONFIG_FILE" + exit 1 +fi + +# 检查 connector 是否已存在 +STATUS_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${CONNECT_URL}/connectors/${CONNECTOR_NAME}") + +if [ "$STATUS_CODE" = "200" ]; then + echo "Connector '${CONNECTOR_NAME}' already exists. Updating config..." + # 提取 config 部分进行 PUT 更新 + CONFIG_ONLY=$(python3 -c "import json,sys; d=json.load(open(sys.argv[1])); print(json.dumps(d['config']))" "$CONFIG_FILE" 2>/dev/null \ + || python -c "import json,sys; d=json.load(open(sys.argv[1])); print(json.dumps(d['config']))" "$CONFIG_FILE") + curl -sf -X PUT \ + -H "Content-Type: application/json" \ + -d "$CONFIG_ONLY" \ + "${CONNECT_URL}/connectors/${CONNECTOR_NAME}/config" | python3 -m json.tool 2>/dev/null || true + echo "Connector '${CONNECTOR_NAME}' updated." +else + echo "Creating connector '${CONNECTOR_NAME}'..." + curl -sf -X POST \ + -H "Content-Type: application/json" \ + -d @"$CONFIG_FILE" \ + "${CONNECT_URL}/connectors" | python3 -m json.tool 2>/dev/null || true + echo "Connector '${CONNECTOR_NAME}' created." +fi + +# 验证状态 +echo "" +echo "=== Connector Status ===" +curl -sf "${CONNECT_URL}/connectors/${CONNECTOR_NAME}/status" | python3 -m json.tool 2>/dev/null || \ + curl -sf "${CONNECT_URL}/connectors/${CONNECTOR_NAME}/status" +echo "" +echo "Done."