fix(cdc): 添加 Debezium heartbeat 机制防止 WAL 堆积

问题背景:
- PostgreSQL pg_wal 目录从 80MB 膨胀到 60.4GB,导致磁盘使用率达到 96%
- 根因: wallet/planting/referral 三个数据库的业务表长期无写入
- 虽然 Debezium 有 heartbeat 配置,但未配置 heartbeat.action.query
- 导致 replication slot 的 restart_lsn 无法推进,WAL 文件无法被清理

解决方案:
1. 在 wallet/planting/referral 三个服务中添加 debezium_heartbeat 表
2. 配置 Debezium connector 的 heartbeat.action.query
3. 每 60 秒自动执行 UPDATE 语句推进 restart_lsn

修改内容:
- wallet-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model
- planting-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model
- referral-service/prisma/schema.prisma: 添加 DebeziumHeartbeat model
- scripts/debezium/wallet-connector.json: 添加 heartbeat.action.query 配置
- scripts/debezium/planting-connector.json: 添加 heartbeat.action.query 配置
- scripts/debezium/referral-connector.json: 添加 heartbeat.action.query 配置
- 新增三个服务的 Prisma migration 文件

效果:
- pg_wal 从 60.4GB 降至 80.2MB
- 磁盘使用率从 96% 降至 40%
- replication slot lag 从 51-60GB 降至 KB 级别

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-19 17:42:41 -08:00
parent 964b06b370
commit 8326f8c35c
9 changed files with 132 additions and 22 deletions

View File

@ -0,0 +1,14 @@
-- CreateTable: Debezium Heartbeat
-- 用于 CDC replication slot 的 WAL 位置推进
-- 由 Debezium heartbeat.action.query 自动更新
-- 防止因业务表长期无写入导致 WAL 堆积
CREATE TABLE "debezium_heartbeat" (
"id" INTEGER NOT NULL DEFAULT 1,
"ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id")
);
-- 插入初始记录
INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW());

View File

@ -377,3 +377,16 @@ model ContractSigningTask {
@@index([status, expiresAt])
@@map("contract_signing_tasks")
}
// ============================================
// Debezium 心跳表
// 用于 CDC replication slot 的 WAL 位置推进
// 由 Debezium heartbeat.action.query 自动更新
// 防止因业务表长期无写入导致 WAL 堆积
// ============================================
model DebeziumHeartbeat {
id Int @id @default(1)
ts DateTime @default(now()) @updatedAt @map("ts")
@@map("debezium_heartbeat")
}

View File

@ -0,0 +1,14 @@
-- CreateTable: Debezium Heartbeat
-- 用于 CDC replication slot 的 WAL 位置推进
-- 由 Debezium heartbeat.action.query 自动更新
-- 防止因业务表长期无写入导致 WAL 堆积
CREATE TABLE "debezium_heartbeat" (
"id" INTEGER NOT NULL DEFAULT 1,
"ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id")
);
-- 插入初始记录
INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW());

View File

@ -199,3 +199,16 @@ model ReferralEvent {
@@index([userId], name: "idx_event_user")
@@index([occurredAt], name: "idx_event_occurred")
}
// ============================================
// Debezium 心跳表
// 用于 CDC replication slot 的 WAL 位置推进
// 由 Debezium heartbeat.action.query 自动更新
// 防止因业务表长期无写入导致 WAL 堆积
// ============================================
model DebeziumHeartbeat {
id Int @id @default(1)
ts DateTime @default(now()) @updatedAt @map("ts")
@@map("debezium_heartbeat")
}

View File

@ -2,26 +2,40 @@
"name": "planting-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium_password",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_planting",
"database.server.name": "planting",
"topic.prefix": "cdc.planting",
"table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations,public.debezium_heartbeat",
"plugin.name": "pgoutput",
"publication.name": "planting_cdc_publication",
"slot.name": "planting_cdc_slot",
"table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"publication.name": "debezium_planting_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_planting_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "60000",
"heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}

View File

@ -12,7 +12,7 @@
"topic.prefix": "cdc.referral",
"table.include.list": "public.referral_relationships",
"table.include.list": "public.referral_relationships,public.debezium_heartbeat",
"plugin.name": "pgoutput",
"publication.name": "debezium_referral_publication",
@ -31,7 +31,8 @@
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"heartbeat.interval.ms": "60000",
"heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1",
"snapshot.mode": "initial",

View File

@ -2,26 +2,40 @@
"name": "wallet-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium_password",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_wallet",
"database.server.name": "wallet",
"topic.prefix": "cdc.wallet",
"table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries,public.debezium_heartbeat",
"plugin.name": "pgoutput",
"publication.name": "wallet_cdc_publication",
"slot.name": "wallet_cdc_slot",
"table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"publication.name": "debezium_wallet_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_wallet_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "60000",
"heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}

View File

@ -0,0 +1,14 @@
-- CreateTable: Debezium Heartbeat
-- 用于 CDC replication slot 的 WAL 位置推进
-- 由 Debezium heartbeat.action.query 自动更新
-- 防止因业务表长期无写入导致 WAL 堆积
CREATE TABLE "debezium_heartbeat" (
"id" INTEGER NOT NULL DEFAULT 1,
"ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id")
);
-- 插入初始记录
INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW());

View File

@ -498,3 +498,16 @@ model OutboxEvent {
@@index([topic])
@@map("outbox_events")
}
// ============================================
// Debezium 心跳表
// 用于 CDC replication slot 的 WAL 位置推进
// 由 Debezium heartbeat.action.query 自动更新
// 防止因业务表长期无写入导致 WAL 堆积
// ============================================
model DebeziumHeartbeat {
id Int @id @default(1)
ts DateTime @default(now()) @updatedAt @map("ts")
@@map("debezium_heartbeat")
}