diff --git a/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql b/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql index 2831bd91..bd6a5060 100644 --- a/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql +++ b/backend/services/mining-admin-service/prisma/migrations/0001_init/migration.sql @@ -689,8 +689,9 @@ CREATE UNIQUE INDEX "synced_daily_mining_stats_statDate_key" ON "synced_daily_mi CREATE UNIQUE INDEX "synced_day_klines_klineDate_key" ON "synced_day_klines"("klineDate"); -- CreateIndex: synced_system_contributions --- 使用 accountType + region_code 复合唯一键 -CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", "region_code"); +-- 使用 accountType + COALESCE(region_code, '__NULL__') 复合唯一键 +-- 注意:PostgreSQL 中 NULL != NULL,所以直接用 region_code 做唯一索引无法阻止重复的 (OPERATION, NULL) +CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", COALESCE(region_code, '__NULL__')); CREATE INDEX "synced_system_contributions_accountType_idx" ON "synced_system_contributions"("accountType"); CREATE INDEX "synced_system_contributions_region_code_idx" ON "synced_system_contributions"("region_code"); diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 97115630..a1a56a09 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -543,23 +543,36 @@ export class CdcSyncService implements OnModuleInit { const accountType = payload.accountType; const regionCode = payload.regionCode || null; - // 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null - const existing = await tx.syncedSystemContribution.findFirst({ + // 查找所有匹配的记录(处理可能存在的重复记录情况) + // 注意:由于 PostgreSQL 中 NULL != NULL,唯一约束在 regionCode 为 NULL 时不生效 + const existingRecords = await tx.syncedSystemContribution.findMany({ where: { accountType, regionCode: regionCode === null ? { equals: null } : regionCode, }, + orderBy: { syncedAt: 'asc' }, }); - if (existing) { + if (existingRecords.length > 0) { await tx.syncedSystemContribution.update({ - where: { id: existing.id }, + where: { id: existingRecords[0].id }, data: { name: payload.name, contributionBalance: payload.contributionBalance, contributionNeverExpires: payload.contributionNeverExpires, }, }); + + // 删除重复记录 + if (existingRecords.length > 1) { + const duplicateIds = existingRecords.slice(1).map(r => r.id); + await tx.syncedSystemContribution.deleteMany({ + where: { id: { in: duplicateIds } }, + }); + this.logger.warn( + `Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`, + ); + } } else { await tx.syncedSystemContribution.create({ data: { @@ -578,28 +591,44 @@ export class CdcSyncService implements OnModuleInit { * 来自 contribution-service 的系统账户(运营、省、市、总部)算力同步 * accountType: OPERATION / PROVINCE / CITY / HEADQUARTERS * regionCode: 省/市代码,如 440000, 440100 + * + * 注意:由于 PostgreSQL 中 NULL != NULL,@@unique([accountType, regionCode]) 约束 + * 在 regionCode 为 NULL 时不会阻止重复插入。因此需要在代码层面确保唯一性。 */ private async handleSystemAccountSynced(event: ServiceEvent, tx: TransactionClient): Promise { const { payload } = event; const accountType = payload.accountType; // OPERATION / PROVINCE / CITY / HEADQUARTERS const regionCode = payload.regionCode || null; - // 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null - const existing = await tx.syncedSystemContribution.findFirst({ + // 查找所有匹配的记录(处理可能存在的重复记录情况) + const existingRecords = await tx.syncedSystemContribution.findMany({ where: { accountType, regionCode: regionCode === null ? { equals: null } : regionCode, }, + orderBy: { syncedAt: 'asc' }, // 保留最早创建的记录 }); - if (existing) { + if (existingRecords.length > 0) { + // 更新第一条记录 await tx.syncedSystemContribution.update({ - where: { id: existing.id }, + where: { id: existingRecords[0].id }, data: { name: payload.name, contributionBalance: payload.contributionBalance, }, }); + + // 如果存在重复记录,删除多余的(只保留第一条) + if (existingRecords.length > 1) { + const duplicateIds = existingRecords.slice(1).map(r => r.id); + await tx.syncedSystemContribution.deleteMany({ + where: { id: { in: duplicateIds } }, + }); + this.logger.warn( + `Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`, + ); + } } else { await tx.syncedSystemContribution.create({ data: {