docs(migrations): add detailed comments for idempotency tables

- Add comments explaining unique key composition:
  - CDC events: (source_topic, offset) = Kafka topic + message offset
  - Outbox events: (source_service, event_id) = service name + outbox ID
- Fix contribution-service migration:
  - Extend source_service column from VARCHAR(50) to VARCHAR(100)
  - Set source_service as NOT NULL to match schema
  - Use snake_case for index name consistency
- Clarify that offset/event_id are NOT database auto-increment IDs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 19:44:46 -08:00
parent 41f142124b
commit e00c81153b
3 changed files with 43 additions and 15 deletions

View File

@ -4,18 +4,21 @@
-- ============================================================================ -- ============================================================================
-- 创建 processed_cdc_events 表(用于 CDC 事件幂等) -- 创建 processed_cdc_events 表(用于 CDC 事件幂等)
-- 唯一键: (source_topic, offset) - Kafka topic 名称 + 消息偏移量
-- 用于保证每个 CDC 事件只处理一次exactly-once 语义)
CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( CREATE TABLE IF NOT EXISTS "processed_cdc_events" (
"id" BIGSERIAL NOT NULL, "id" BIGSERIAL NOT NULL,
"source_topic" VARCHAR(200) NOT NULL, "source_topic" VARCHAR(200) NOT NULL, -- Kafka topic 名称(如 cdc.identity.public.user_accounts
"offset" BIGINT NOT NULL, "offset" BIGINT NOT NULL, -- Kafka 消息偏移量(在 partition 内唯一)
"table_name" VARCHAR(100) NOT NULL, "table_name" VARCHAR(100) NOT NULL, -- 源表名
"operation" VARCHAR(10) NOT NULL, "operation" VARCHAR(10) NOT NULL, -- CDC 操作类型: c(create), u(update), d(delete), r(snapshot read)
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id")
); );
-- 复合唯一索引:(source_topic, offset) 保证幂等性 -- 复合唯一索引:(source_topic, offset) 保证幂等性
-- 注意:这不是数据库自增 ID而是 Kafka 消息的唯一标识
CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset");
-- 时间索引用于清理旧数据 -- 时间索引用于清理旧数据

View File

@ -4,26 +4,42 @@
-- ============================================================================ -- ============================================================================
-- 1. 创建 processed_cdc_events 表(用于 CDC 事件幂等) -- 1. 创建 processed_cdc_events 表(用于 CDC 事件幂等)
-- 唯一键: (source_topic, offset) - Kafka topic 名称 + 消息偏移量
-- 用于保证每个 CDC 事件只处理一次exactly-once 语义)
CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( CREATE TABLE IF NOT EXISTS "processed_cdc_events" (
"id" BIGSERIAL NOT NULL, "id" BIGSERIAL NOT NULL,
"source_topic" VARCHAR(200) NOT NULL, "source_topic" VARCHAR(200) NOT NULL, -- Kafka topic 名称(如 cdc.identity.public.user_accounts
"offset" BIGINT NOT NULL, "offset" BIGINT NOT NULL, -- Kafka 消息偏移量(在 partition 内唯一)
"table_name" VARCHAR(100) NOT NULL, "table_name" VARCHAR(100) NOT NULL, -- 源表名
"operation" VARCHAR(10) NOT NULL, "operation" VARCHAR(10) NOT NULL, -- CDC 操作类型: c(create), u(update), d(delete), r(snapshot read)
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id")
); );
-- 复合唯一索引:(source_topic, offset) 保证幂等性 -- 复合唯一索引:(source_topic, offset) 保证幂等性
-- 注意:这不是数据库自增 ID而是 Kafka 消息的唯一标识
CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset");
-- 时间索引用于清理旧数据 -- 时间索引用于清理旧数据
CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at"); CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at");
-- 2. 修复 processed_events 表的唯一约束 -- 2. 修复 processed_events 表(用于 2.0 服务间 Outbox 事件幂等)
-- 删除旧的单字段唯一索引 -- 唯一键: (source_service, event_id) - 服务名 + outbox 表的 ID
-- 不同服务的 outbox ID 可能相同,所以需要组合服务名作为复合唯一键
-- 2.1 修改 source_service 列:扩展长度 50->100且设为 NOT NULL
-- 先为已有 NULL 值设置默认值
UPDATE "processed_events" SET "source_service" = 'unknown' WHERE "source_service" IS NULL;
-- 修改列类型和约束
ALTER TABLE "processed_events"
ALTER COLUMN "source_service" SET NOT NULL,
ALTER COLUMN "source_service" TYPE VARCHAR(100);
-- 2.2 删除旧的单字段唯一索引
DROP INDEX IF EXISTS "processed_events_event_id_key"; DROP INDEX IF EXISTS "processed_events_event_id_key";
-- 创建新的复合唯一索引 -- 2.3 创建新的复合唯一索引
CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_sourceService_eventId_key" ON "processed_events"("source_service", "event_id"); -- 索引名使用蛇形命名以与列名保持一致
CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_source_service_event_id_key" ON "processed_events"("source_service", "event_id");

View File

@ -1,17 +1,26 @@
-- ============================================================================ -- ============================================================================
-- 修复 processed_events 表的幂等键 -- 修复 processed_events 表的幂等键
-- 用于 2.0 服务间 Outbox 事件的 100% exactly-once 语义
-- ============================================================================
--
-- 问题: 原来使用 eventId 作为唯一键,但不同服务的 outbox ID 可能相同 -- 问题: 原来使用 eventId 作为唯一键,但不同服务的 outbox ID 可能相同
-- 解决: 使用 (sourceService, eventId) 作为复合唯一键 -- 解决: 使用 (sourceService, eventId) 作为复合唯一键
--
-- 唯一键说明:
-- - sourceService: 发送事件的服务名(如 "auth-service", "contribution-service"
-- - eventId: 发送方 outbox 表的自增 ID非 UUID而是数据库自增主键
-- - 组合后在全局唯一,可用于精确追踪事件来源
-- ============================================================================ -- ============================================================================
-- 先清空已有数据(因为之前的数据可能有冲突) -- 先清空已有数据(因为之前的数据可能有冲突)
TRUNCATE TABLE "processed_events"; TRUNCATE TABLE "processed_events";
-- 删除旧的唯一索引 -- 删除旧的唯一索引(仅 eventId
DROP INDEX IF EXISTS "processed_events_eventId_key"; DROP INDEX IF EXISTS "processed_events_eventId_key";
-- 删除旧的 sourceService 索引 -- 删除旧的 sourceService 普通索引
DROP INDEX IF EXISTS "processed_events_sourceService_idx"; DROP INDEX IF EXISTS "processed_events_sourceService_idx";
-- 创建新的复合唯一索引 -- 创建新的复合唯一索引:(sourceService, eventId)
-- 这个组合保证跨服务的唯一性
CREATE UNIQUE INDEX "processed_events_sourceService_eventId_key" ON "processed_events"("sourceService", "eventId"); CREATE UNIQUE INDEX "processed_events_sourceService_eventId_key" ON "processed_events"("sourceService", "eventId");