From 8326f8c35c95a132edfd814a013545a44bf62abc Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 19 Jan 2026 17:42:41 -0800 Subject: [PATCH] =?UTF-8?q?fix(cdc):=20=E6=B7=BB=E5=8A=A0=20Debezium=20hea?= =?UTF-8?q?rtbeat=20=E6=9C=BA=E5=88=B6=E9=98=B2=E6=AD=A2=20WAL=20=E5=A0=86?= =?UTF-8?q?=E7=A7=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题背景: - PostgreSQL pg_wal 目录从 80MB 膨胀到 60.4GB,导致磁盘使用率达到 96% - 根因: wallet/planting/referral 三个数据库的业务表长期无写入 - 虽然 Debezium 有 heartbeat 配置,但未配置 heartbeat.action.query - 导致 replication slot 的 restart_lsn 无法推进,WAL 文件无法被清理 解决方案: 1. 在 wallet/planting/referral 三个服务中添加 debezium_heartbeat 表 2. 配置 Debezium connector 的 heartbeat.action.query 3. 每 60 秒自动执行 UPDATE 语句推进 restart_lsn 修改内容: - wallet-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model - planting-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model - referral-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model - scripts/debezium/wallet-connector.json: 添加 heartbeat.action.query 配置 - scripts/debezium/planting-connector.json: 添加 heartbeat.action.query 配置 - scripts/debezium/referral-connector.json: 添加 heartbeat.action.query 配置 - 新增三个服务的 Prisma migration 文件 效果: - pg_wal 从 60.4GB 降至 80.2MB - 磁盘使用率从 96% 降至 40% - replication slot lag 从 51-60GB 降至 KB 级别 Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 14 ++++++++ .../planting-service/prisma/schema.prisma | 13 +++++++ .../migration.sql | 14 ++++++++ .../referral-service/prisma/schema.prisma | 13 +++++++ .../scripts/debezium/planting-connector.json | 34 +++++++++++++------ .../scripts/debezium/referral-connector.json | 5 +-- .../scripts/debezium/wallet-connector.json | 34 +++++++++++++------ .../migration.sql | 14 ++++++++ .../wallet-service/prisma/schema.prisma | 13 +++++++ 9 files changed, 132 insertions(+), 22 deletions(-) create mode 100644 backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql create mode 100644 backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql create mode 100644 backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql diff --git a/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/planting-service/prisma/schema.prisma b/backend/services/planting-service/prisma/schema.prisma index d31014bc..f69fb85b 100644 --- a/backend/services/planting-service/prisma/schema.prisma +++ b/backend/services/planting-service/prisma/schema.prisma @@ -377,3 +377,16 @@ model ContractSigningTask { @@index([status, expiresAt]) @@map("contract_signing_tasks") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +} diff --git a/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/referral-service/prisma/schema.prisma b/backend/services/referral-service/prisma/schema.prisma index 47f1dfc5..d06df42c 100644 --- a/backend/services/referral-service/prisma/schema.prisma +++ b/backend/services/referral-service/prisma/schema.prisma @@ -199,3 +199,16 @@ model ReferralEvent { @@index([userId], name: "idx_event_user") @@index([occurredAt], name: "idx_event_occurred") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +} diff --git a/backend/services/scripts/debezium/planting-connector.json b/backend/services/scripts/debezium/planting-connector.json index f6240b45..6b1eda01 100644 --- a/backend/services/scripts/debezium/planting-connector.json +++ b/backend/services/scripts/debezium/planting-connector.json @@ -2,26 +2,40 @@ "name": "planting-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", "database.port": "5432", - "database.user": "debezium", - "database.password": "debezium_password", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", "database.dbname": "rwa_planting", - "database.server.name": "planting", + "topic.prefix": "cdc.planting", + + "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations,public.debezium_heartbeat", + "plugin.name": "pgoutput", - "publication.name": "planting_cdc_publication", - "slot.name": "planting_cdc_slot", - "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "transforms.unwrap.delete.handling.mode": "rewrite", + "publication.name": "debezium_planting_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_planting_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", "time.precision.mode": "connect" } diff --git a/backend/services/scripts/debezium/referral-connector.json b/backend/services/scripts/debezium/referral-connector.json index 8a3af356..d8bd7d9d 100644 --- a/backend/services/scripts/debezium/referral-connector.json +++ b/backend/services/scripts/debezium/referral-connector.json @@ -12,7 +12,7 @@ "topic.prefix": "cdc.referral", - "table.include.list": "public.referral_relationships", + "table.include.list": "public.referral_relationships,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_referral_publication", @@ -31,7 +31,8 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/wallet-connector.json b/backend/services/scripts/debezium/wallet-connector.json index b3f7a7d1..785074dc 100644 --- a/backend/services/scripts/debezium/wallet-connector.json +++ b/backend/services/scripts/debezium/wallet-connector.json @@ -2,26 +2,40 @@ "name": "wallet-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", "database.port": "5432", - "database.user": "debezium", - "database.password": "debezium_password", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", "database.dbname": "rwa_wallet", - "database.server.name": "wallet", + "topic.prefix": "cdc.wallet", + + "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries,public.debezium_heartbeat", + "plugin.name": "pgoutput", - "publication.name": "wallet_cdc_publication", - "slot.name": "wallet_cdc_slot", - "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "transforms.unwrap.delete.handling.mode": "rewrite", + "publication.name": "debezium_wallet_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_wallet_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", "time.precision.mode": "connect" } diff --git a/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/wallet-service/prisma/schema.prisma b/backend/services/wallet-service/prisma/schema.prisma index 2e9d11bd..c0ad9444 100644 --- a/backend/services/wallet-service/prisma/schema.prisma +++ b/backend/services/wallet-service/prisma/schema.prisma @@ -498,3 +498,16 @@ model OutboxEvent { @@index([topic]) @@map("outbox_events") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +}