diff --git a/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/planting-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/planting-service/prisma/schema.prisma b/backend/services/planting-service/prisma/schema.prisma index d31014bc..f69fb85b 100644 --- a/backend/services/planting-service/prisma/schema.prisma +++ b/backend/services/planting-service/prisma/schema.prisma @@ -377,3 +377,16 @@ model ContractSigningTask { @@index([status, expiresAt]) @@map("contract_signing_tasks") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +} diff --git a/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/referral-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/referral-service/prisma/schema.prisma b/backend/services/referral-service/prisma/schema.prisma index 47f1dfc5..d06df42c 100644 --- a/backend/services/referral-service/prisma/schema.prisma +++ b/backend/services/referral-service/prisma/schema.prisma @@ -199,3 +199,16 @@ model ReferralEvent { @@index([userId], name: "idx_event_user") @@index([occurredAt], name: "idx_event_occurred") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +} diff --git a/backend/services/scripts/debezium/planting-connector.json b/backend/services/scripts/debezium/planting-connector.json index f6240b45..6b1eda01 100644 --- a/backend/services/scripts/debezium/planting-connector.json +++ b/backend/services/scripts/debezium/planting-connector.json @@ -2,26 +2,40 @@ "name": "planting-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", "database.port": "5432", - "database.user": "debezium", - "database.password": "debezium_password", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", "database.dbname": "rwa_planting", - "database.server.name": "planting", + "topic.prefix": "cdc.planting", + + "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations,public.debezium_heartbeat", + "plugin.name": "pgoutput", - "publication.name": "planting_cdc_publication", - "slot.name": "planting_cdc_slot", - "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "transforms.unwrap.delete.handling.mode": "rewrite", + "publication.name": "debezium_planting_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_planting_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", "time.precision.mode": "connect" } diff --git a/backend/services/scripts/debezium/referral-connector.json b/backend/services/scripts/debezium/referral-connector.json index 8a3af356..d8bd7d9d 100644 --- a/backend/services/scripts/debezium/referral-connector.json +++ b/backend/services/scripts/debezium/referral-connector.json @@ -12,7 +12,7 @@ "topic.prefix": "cdc.referral", - "table.include.list": "public.referral_relationships", + "table.include.list": "public.referral_relationships,public.debezium_heartbeat", "plugin.name": "pgoutput", "publication.name": "debezium_referral_publication", @@ -31,7 +31,8 @@ "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", "snapshot.mode": "initial", diff --git a/backend/services/scripts/debezium/wallet-connector.json b/backend/services/scripts/debezium/wallet-connector.json index b3f7a7d1..785074dc 100644 --- a/backend/services/scripts/debezium/wallet-connector.json +++ b/backend/services/scripts/debezium/wallet-connector.json @@ -2,26 +2,40 @@ "name": "wallet-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", "database.port": "5432", - "database.user": "debezium", - "database.password": "debezium_password", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", "database.dbname": "rwa_wallet", - "database.server.name": "wallet", + "topic.prefix": "cdc.wallet", + + "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries,public.debezium_heartbeat", + "plugin.name": "pgoutput", - "publication.name": "wallet_cdc_publication", - "slot.name": "wallet_cdc_slot", - "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "transforms.unwrap.delete.handling.mode": "rewrite", + "publication.name": "debezium_wallet_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_wallet_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "60000", + "heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW() WHERE id = 1", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", "time.precision.mode": "connect" } diff --git a/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql b/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql new file mode 100644 index 00000000..7dbfbdcb --- /dev/null +++ b/backend/services/wallet-service/prisma/migrations/20260120000000_add_debezium_heartbeat/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable: Debezium Heartbeat +-- 用于 CDC replication slot 的 WAL 位置推进 +-- 由 Debezium heartbeat.action.query 自动更新 +-- 防止因业务表长期无写入导致 WAL 堆积 + +CREATE TABLE "debezium_heartbeat" ( + "id" INTEGER NOT NULL DEFAULT 1, + "ts" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "debezium_heartbeat_pkey" PRIMARY KEY ("id") +); + +-- 插入初始记录 +INSERT INTO "debezium_heartbeat" ("id", "ts") VALUES (1, NOW()); diff --git a/backend/services/wallet-service/prisma/schema.prisma b/backend/services/wallet-service/prisma/schema.prisma index 2e9d11bd..c0ad9444 100644 --- a/backend/services/wallet-service/prisma/schema.prisma +++ b/backend/services/wallet-service/prisma/schema.prisma @@ -498,3 +498,16 @@ model OutboxEvent { @@index([topic]) @@map("outbox_events") } + +// ============================================ +// Debezium 心跳表 +// 用于 CDC replication slot 的 WAL 位置推进 +// 由 Debezium heartbeat.action.query 自动更新 +// 防止因业务表长期无写入导致 WAL 堆积 +// ============================================ +model DebeziumHeartbeat { + id Int @id @default(1) + ts DateTime @default(now()) @updatedAt @map("ts") + + @@map("debezium_heartbeat") +}