From 946978f624d6bb6ee6d0c5697fe025cee4da9052 Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 21 Jan 2026 03:29:24 -0800 Subject: [PATCH] =?UTF-8?q?fix(mining-admin):=20=E4=BF=AE=E5=A4=8D=20Postg?= =?UTF-8?q?reSQL=20NULL=20=E5=94=AF=E4=B8=80=E7=BA=A6=E6=9D=9F=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E7=B3=BB=E7=BB=9F=E8=B4=A6=E6=88=B7=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改 synced_system_contributions 唯一索引使用 COALESCE 处理 NULL 值 - 修改 handleSystemAccountSynced 和 handleSystemContributionUpdated 方法 使用 findMany 替代 findFirst,自动清理重复记录 Co-Authored-By: Claude Opus 4.5 --- .../prisma/migrations/0001_init/migration.sql | 5 ++- .../infrastructure/kafka/cdc-sync.service.ts | 45 +++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql b/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql index 2831bd91..bd6a5060 100644 --- a/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql +++ b/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql @@ -689,8 +689,9 @@ CREATE UNIQUE INDEX "synced_daily_mining_stats_statDate_key" ON "synced_daily_mi CREATE UNIQUE INDEX "synced_day_klines_klineDate_key" ON "synced_day_klines"("klineDate"); -- CreateIndex: synced_system_contributions --- 使用 accountType + region_code 复合唯一键 -CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", "region_code"); +-- 使用 accountType + COALESCE(region_code, '__NULL__') 复合唯一键 +-- 注意:PostgreSQL 中 NULL != NULL,所以直接用 region_code 做唯一索引无法阻止重复的 (OPERATION, NULL) +CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", COALESCE(region_code, '__NULL__')); CREATE INDEX "synced_system_contributions_accountType_idx" ON "synced_system_contributions"("accountType"); CREATE INDEX "synced_system_contributions_region_code_idx" ON "synced_system_contributions"("region_code"); diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 97115630..a1a56a09 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -543,23 +543,36 @@ export class CdcSyncService implements OnModuleInit { const accountType = payload.accountType; const regionCode = payload.regionCode || null; - // 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null - const existing = await tx.syncedSystemContribution.findFirst({ + // 查找所有匹配的记录(处理可能存在的重复记录情况) + // 注意:由于 PostgreSQL 中 NULL != NULL,唯一约束在 regionCode 为 NULL 时不生效 + const existingRecords = await tx.syncedSystemContribution.findMany({ where: { accountType, regionCode: regionCode === null ? { equals: null } : regionCode, }, + orderBy: { syncedAt: 'asc' }, }); - if (existing) { + if (existingRecords.length > 0) { await tx.syncedSystemContribution.update({ - where: { id: existing.id }, + where: { id: existingRecords[0].id }, data: { name: payload.name, contributionBalance: payload.contributionBalance, contributionNeverExpires: payload.contributionNeverExpires, }, }); + + // 删除重复记录 + if (existingRecords.length > 1) { + const duplicateIds = existingRecords.slice(1).map(r => r.id); + await tx.syncedSystemContribution.deleteMany({ + where: { id: { in: duplicateIds } }, + }); + this.logger.warn( + `Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`, + ); + } } else { await tx.syncedSystemContribution.create({ data: { @@ -578,28 +591,44 @@ export class CdcSyncService implements OnModuleInit { * 来自 contribution-service 的系统账户(运营、省、市、总部)算力同步 * accountType: OPERATION / PROVINCE / CITY / HEADQUARTERS * regionCode: 省/市代码,如 440000, 440100 + * + * 注意:由于 PostgreSQL 中 NULL != NULL,@@unique([accountType, regionCode]) 约束 + * 在 regionCode 为 NULL 时不会阻止重复插入。因此需要在代码层面确保唯一性。 */ private async handleSystemAccountSynced(event: ServiceEvent, tx: TransactionClient): Promise { const { payload } = event; const accountType = payload.accountType; // OPERATION / PROVINCE / CITY / HEADQUARTERS const regionCode = payload.regionCode || null; - // 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null - const existing = await tx.syncedSystemContribution.findFirst({ + // 查找所有匹配的记录(处理可能存在的重复记录情况) + const existingRecords = await tx.syncedSystemContribution.findMany({ where: { accountType, regionCode: regionCode === null ? { equals: null } : regionCode, }, + orderBy: { syncedAt: 'asc' }, // 保留最早创建的记录 }); - if (existing) { + if (existingRecords.length > 0) { + // 更新第一条记录 await tx.syncedSystemContribution.update({ - where: { id: existing.id }, + where: { id: existingRecords[0].id }, data: { name: payload.name, contributionBalance: payload.contributionBalance, }, }); + + // 如果存在重复记录,删除多余的(只保留第一条) + if (existingRecords.length > 1) { + const duplicateIds = existingRecords.slice(1).map(r => r.id); + await tx.syncedSystemContribution.deleteMany({ + where: { id: { in: duplicateIds } }, + }); + this.logger.warn( + `Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`, + ); + } } else { await tx.syncedSystemContribution.create({ data: {