diff --git a/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql b/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql index 206f5fe5..8004933a 100644 --- a/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql +++ b/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql @@ -4,18 +4,21 @@ -- ============================================================================ -- 创建 processed_cdc_events 表(用于 CDC 事件幂等) +-- 唯一键: (source_topic, offset) - Kafka topic 名称 + 消息偏移量 +-- 用于保证每个 CDC 事件只处理一次(exactly-once 语义) CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( "id" BIGSERIAL NOT NULL, - "source_topic" VARCHAR(200) NOT NULL, - "offset" BIGINT NOT NULL, - "table_name" VARCHAR(100) NOT NULL, - "operation" VARCHAR(10) NOT NULL, + "source_topic" VARCHAR(200) NOT NULL, -- Kafka topic 名称(如 cdc.identity.public.user_accounts) + "offset" BIGINT NOT NULL, -- Kafka 消息偏移量(在 partition 内唯一) + "table_name" VARCHAR(100) NOT NULL, -- 源表名 + "operation" VARCHAR(10) NOT NULL, -- CDC 操作类型: c(create), u(update), d(delete), r(snapshot read) "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") ); -- 复合唯一索引:(source_topic, offset) 保证幂等性 +-- 注意:这不是数据库自增 ID,而是 Kafka 消息的唯一标识 CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); -- 时间索引用于清理旧数据 diff --git a/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql b/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql index a01ee459..529847de 100644 --- a/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql +++ b/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql @@ -4,26 +4,42 @@ -- ============================================================================ -- 1. 创建 processed_cdc_events 表(用于 CDC 事件幂等) +-- 唯一键: (source_topic, offset) - Kafka topic 名称 + 消息偏移量 +-- 用于保证每个 CDC 事件只处理一次(exactly-once 语义) CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( "id" BIGSERIAL NOT NULL, - "source_topic" VARCHAR(200) NOT NULL, - "offset" BIGINT NOT NULL, - "table_name" VARCHAR(100) NOT NULL, - "operation" VARCHAR(10) NOT NULL, + "source_topic" VARCHAR(200) NOT NULL, -- Kafka topic 名称(如 cdc.identity.public.user_accounts) + "offset" BIGINT NOT NULL, -- Kafka 消息偏移量(在 partition 内唯一) + "table_name" VARCHAR(100) NOT NULL, -- 源表名 + "operation" VARCHAR(10) NOT NULL, -- CDC 操作类型: c(create), u(update), d(delete), r(snapshot read) "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") ); -- 复合唯一索引:(source_topic, offset) 保证幂等性 +-- 注意:这不是数据库自增 ID,而是 Kafka 消息的唯一标识 CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); -- 时间索引用于清理旧数据 CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at"); --- 2. 修复 processed_events 表的唯一约束 --- 删除旧的单字段唯一索引 +-- 2. 修复 processed_events 表(用于 2.0 服务间 Outbox 事件幂等) +-- 唯一键: (source_service, event_id) - 服务名 + outbox 表的 ID +-- 不同服务的 outbox ID 可能相同,所以需要组合服务名作为复合唯一键 + +-- 2.1 修改 source_service 列:扩展长度 50->100,且设为 NOT NULL +-- 先为已有 NULL 值设置默认值 +UPDATE "processed_events" SET "source_service" = 'unknown' WHERE "source_service" IS NULL; + +-- 修改列类型和约束 +ALTER TABLE "processed_events" + ALTER COLUMN "source_service" SET NOT NULL, + ALTER COLUMN "source_service" TYPE VARCHAR(100); + +-- 2.2 删除旧的单字段唯一索引 DROP INDEX IF EXISTS "processed_events_event_id_key"; --- 创建新的复合唯一索引 -CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_sourceService_eventId_key" ON "processed_events"("source_service", "event_id"); +-- 2.3 创建新的复合唯一索引 +-- 索引名使用蛇形命名以与列名保持一致 +CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_source_service_event_id_key" ON "processed_events"("source_service", "event_id"); diff --git a/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql b/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql index 8dfdcc03..d67e3d1a 100644 --- a/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql +++ b/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql @@ -1,17 +1,26 @@ -- ============================================================================ -- 修复 processed_events 表的幂等键 +-- 用于 2.0 服务间 Outbox 事件的 100% exactly-once 语义 +-- ============================================================================ +-- -- 问题: 原来使用 eventId 作为唯一键,但不同服务的 outbox ID 可能相同 -- 解决: 使用 (sourceService, eventId) 作为复合唯一键 +-- +-- 唯一键说明: +-- - sourceService: 发送事件的服务名(如 "auth-service", "contribution-service") +-- - eventId: 发送方 outbox 表的自增 ID(非 UUID,而是数据库自增主键) +-- - 组合后在全局唯一,可用于精确追踪事件来源 -- ============================================================================ -- 先清空已有数据(因为之前的数据可能有冲突) TRUNCATE TABLE "processed_events"; --- 删除旧的唯一索引 +-- 删除旧的唯一索引(仅 eventId) DROP INDEX IF EXISTS "processed_events_eventId_key"; --- 删除旧的 sourceService 索引 +-- 删除旧的 sourceService 普通索引 DROP INDEX IF EXISTS "processed_events_sourceService_idx"; --- 创建新的复合唯一索引 +-- 创建新的复合唯一索引:(sourceService, eventId) +-- 这个组合保证跨服务的唯一性 CREATE UNIQUE INDEX "processed_events_sourceService_eventId_key" ON "processed_events"("sourceService", "eventId");