From f270b7cc273bc3d5370103f501dc2a9b2df7398a Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 26 Feb 2026 18:56:42 -0800 Subject: [PATCH] =?UTF-8?q?fix(cdc):=20=E6=B7=BB=E5=8A=A03171=E9=A2=84?= =?UTF-8?q?=E7=A7=8D=E8=AE=A1=E5=88=92Debezium=20CDC=20connector=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 预种CDC消费端(contribution-service)代码已就绪,但缺少Debezium connector配置,导致pre_planting_orders和pre_planting_positions 表变更无法捕获到Kafka,算力无法同步。 新增: - pre-planting-connector.json: 监听rwa_planting库的pre_planting_*表 独立slot/publication/topic前缀(cdc.pre-planting) - register-connectors.sh: 注册pre-planting-postgres-connector - deploy.sh: infra-status显示所有1.0 connector状态 Co-Authored-By: Claude Opus 4.6 --- backend/services/deploy.sh | 19 +++++---- .../debezium/pre-planting-connector.json | 42 +++++++++++++++++++ .../scripts/debezium/register-connectors.sh | 2 + 3 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 backend/services/scripts/debezium/pre-planting-connector.json diff --git a/backend/services/deploy.sh b/backend/services/deploy.sh index 281f93a2..43c8dcd0 100755 --- a/backend/services/deploy.sh +++ b/backend/services/deploy.sh @@ -523,14 +523,17 @@ infra_status() { if curl -s http://localhost:8084/ > /dev/null 2>&1; then echo -e " ${GREEN}[OK]${NC} Debezium Connect (port 8084)" # Check connector status - CONNECTOR_STATUS=$(curl -s http://localhost:8084/connectors/identity-postgres-connector/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "") - if echo "$CONNECTOR_STATUS" | grep -q "RUNNING"; then - echo -e " └─ ${GREEN}[RUNNING]${NC} identity-postgres-connector" - elif [ -n "$CONNECTOR_STATUS" ]; then - echo -e " └─ ${YELLOW}[$CONNECTOR_STATUS]${NC} identity-postgres-connector" - else - echo -e " └─ ${YELLOW}[NOT REGISTERED]${NC} identity-postgres-connector" - fi + # Check all 1.0 connectors + for conn_name in identity-postgres-connector planting-postgres-connector pre-planting-postgres-connector referral-postgres-connector wallet-postgres-connector authorization-postgres-connector; do + CONNECTOR_STATUS=$(curl -s http://localhost:8084/connectors/$conn_name/status 2>/dev/null | grep -o '"state":"[^"]*"' | head -1 || echo "") + if echo "$CONNECTOR_STATUS" | grep -q "RUNNING"; then + echo -e " └─ ${GREEN}[RUNNING]${NC} $conn_name" + elif [ -n "$CONNECTOR_STATUS" ]; then + echo -e " └─ ${YELLOW}[$CONNECTOR_STATUS]${NC} $conn_name" + else + echo -e " └─ ${YELLOW}[NOT REGISTERED]${NC} $conn_name" + fi + done else echo -e " ${RED}[FAIL]${NC} Debezium Connect (port 8084)" fi diff --git a/backend/services/scripts/debezium/pre-planting-connector.json b/backend/services/scripts/debezium/pre-planting-connector.json new file mode 100644 index 00000000..4e3e3084 --- /dev/null +++ b/backend/services/scripts/debezium/pre-planting-connector.json @@ -0,0 +1,42 @@ +{ + "name": "pre-planting-postgres-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "rwa-postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_planting", + + "topic.prefix": "cdc.pre-planting", + + "table.include.list": "public.pre_planting_orders,public.pre_planting_positions", + + "plugin.name": "pgoutput", + "publication.name": "debezium_pre_planting_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_pre_planting_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh index cdecc684..30fe67a4 100644 --- a/backend/services/scripts/debezium/register-connectors.sh +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -149,6 +149,8 @@ if [ "$REGISTER_1_0" = true ]; then register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector" register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector" + # [2026-02-27] 新增:3171预种计划 CDC connector(同 rwa_planting 库,独立 slot/publication/topic) + register_connector "$SCRIPT_DIR/pre-planting-connector.json" "pre-planting-postgres-connector" register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector" register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector" register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector"