gcx/backend/migrations/040_create_debezium_support...

40 lines
1.8 KiB
SQL

-- 040: Debezium CDC support tables — heartbeat + signal + publication
--
-- 经验教训 (来自 rwadurian 事故):
-- 1. Debezium ≤2.4 存在 DBZ-7316 bug: 当 outbox 表长时间无写入时,
-- confirmed_flush_lsn 停止推进, PostgreSQL WAL 无限积压 (曾导致 306GB WAL)
-- 2. heartbeat TABLE 方式比 pg_logical_emit_message() 更可靠:
-- - pg_logical_emit_message 写入 WAL 但不经过 publication, Debezium 无法感知
-- - heartbeat TABLE 方式: Debezium 定期 INSERT/UPDATE → 产生 WAL 变更 →
-- 经过 publication → Debezium 消费 → 推进 confirmed_flush_lsn
-- 3. signal 表用于 Debezium 增量快照 (incremental snapshot):
-- - 支持 Kafka 信号通道触发数据重放, 零数据库修改
-- - 出问题时可安全地重新投递 outbox 事件
-- 4. publication 必须包含 outbox + heartbeat + signal 三张表
-- Debezium 心跳表: 防止 WAL 积压
CREATE TABLE IF NOT EXISTS debezium_heartbeat (
id INTEGER PRIMARY KEY DEFAULT 1,
ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT debezium_heartbeat_singleton CHECK (id = 1)
);
-- 初始化心跳行 (Debezium 后续通过 ON CONFLICT DO UPDATE 更新)
INSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT DO NOTHING;
-- Debezium 信号表: 用于增量快照等运维操作
CREATE TABLE IF NOT EXISTS debezium_signal (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data TEXT
);
-- 创建 publication: 包含 outbox + heartbeat + signal 三张表
-- Debezium connector 使用 publication.autocreate.mode=disabled, 直接引用此 publication
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = 'debezium_outbox_publication') THEN
CREATE PUBLICATION debezium_outbox_publication FOR TABLE outbox, debezium_heartbeat, debezium_signal;
END IF;
END $$;