fix(mining-admin): 修复 PostgreSQL NULL 唯一约束导致系统账户数据重复问题
- 修改 synced_system_contributions 唯一索引使用 COALESCE 处理 NULL 值 - 修改 handleSystemAccountSynced 和 handleSystemContributionUpdated 方法 使用 findMany 替代 findFirst,自动清理重复记录 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
eeaa43e044
commit
946978f624
|
|
@ -689,8 +689,9 @@ CREATE UNIQUE INDEX "synced_daily_mining_stats_statDate_key" ON "synced_daily_mi
|
||||||
CREATE UNIQUE INDEX "synced_day_klines_klineDate_key" ON "synced_day_klines"("klineDate");
|
CREATE UNIQUE INDEX "synced_day_klines_klineDate_key" ON "synced_day_klines"("klineDate");
|
||||||
|
|
||||||
-- CreateIndex: synced_system_contributions
|
-- CreateIndex: synced_system_contributions
|
||||||
-- 使用 accountType + region_code 复合唯一键
|
-- 使用 accountType + COALESCE(region_code, '__NULL__') 复合唯一键
|
||||||
CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", "region_code");
|
-- 注意:PostgreSQL 中 NULL != NULL,所以直接用 region_code 做唯一索引无法阻止重复的 (OPERATION, NULL)
|
||||||
|
CREATE UNIQUE INDEX "synced_system_contributions_accountType_region_code_key" ON "synced_system_contributions"("accountType", COALESCE(region_code, '__NULL__'));
|
||||||
CREATE INDEX "synced_system_contributions_accountType_idx" ON "synced_system_contributions"("accountType");
|
CREATE INDEX "synced_system_contributions_accountType_idx" ON "synced_system_contributions"("accountType");
|
||||||
CREATE INDEX "synced_system_contributions_region_code_idx" ON "synced_system_contributions"("region_code");
|
CREATE INDEX "synced_system_contributions_region_code_idx" ON "synced_system_contributions"("region_code");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -543,23 +543,36 @@ export class CdcSyncService implements OnModuleInit {
|
||||||
const accountType = payload.accountType;
|
const accountType = payload.accountType;
|
||||||
const regionCode = payload.regionCode || null;
|
const regionCode = payload.regionCode || null;
|
||||||
|
|
||||||
// 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null
|
// 查找所有匹配的记录(处理可能存在的重复记录情况)
|
||||||
const existing = await tx.syncedSystemContribution.findFirst({
|
// 注意:由于 PostgreSQL 中 NULL != NULL,唯一约束在 regionCode 为 NULL 时不生效
|
||||||
|
const existingRecords = await tx.syncedSystemContribution.findMany({
|
||||||
where: {
|
where: {
|
||||||
accountType,
|
accountType,
|
||||||
regionCode: regionCode === null ? { equals: null } : regionCode,
|
regionCode: regionCode === null ? { equals: null } : regionCode,
|
||||||
},
|
},
|
||||||
|
orderBy: { syncedAt: 'asc' },
|
||||||
});
|
});
|
||||||
|
|
||||||
if (existing) {
|
if (existingRecords.length > 0) {
|
||||||
await tx.syncedSystemContribution.update({
|
await tx.syncedSystemContribution.update({
|
||||||
where: { id: existing.id },
|
where: { id: existingRecords[0].id },
|
||||||
data: {
|
data: {
|
||||||
name: payload.name,
|
name: payload.name,
|
||||||
contributionBalance: payload.contributionBalance,
|
contributionBalance: payload.contributionBalance,
|
||||||
contributionNeverExpires: payload.contributionNeverExpires,
|
contributionNeverExpires: payload.contributionNeverExpires,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 删除重复记录
|
||||||
|
if (existingRecords.length > 1) {
|
||||||
|
const duplicateIds = existingRecords.slice(1).map(r => r.id);
|
||||||
|
await tx.syncedSystemContribution.deleteMany({
|
||||||
|
where: { id: { in: duplicateIds } },
|
||||||
|
});
|
||||||
|
this.logger.warn(
|
||||||
|
`Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
await tx.syncedSystemContribution.create({
|
await tx.syncedSystemContribution.create({
|
||||||
data: {
|
data: {
|
||||||
|
|
@ -578,28 +591,44 @@ export class CdcSyncService implements OnModuleInit {
|
||||||
* 来自 contribution-service 的系统账户(运营、省、市、总部)算力同步
|
* 来自 contribution-service 的系统账户(运营、省、市、总部)算力同步
|
||||||
* accountType: OPERATION / PROVINCE / CITY / HEADQUARTERS
|
* accountType: OPERATION / PROVINCE / CITY / HEADQUARTERS
|
||||||
* regionCode: 省/市代码,如 440000, 440100
|
* regionCode: 省/市代码,如 440000, 440100
|
||||||
|
*
|
||||||
|
* 注意:由于 PostgreSQL 中 NULL != NULL,@@unique([accountType, regionCode]) 约束
|
||||||
|
* 在 regionCode 为 NULL 时不会阻止重复插入。因此需要在代码层面确保唯一性。
|
||||||
*/
|
*/
|
||||||
private async handleSystemAccountSynced(event: ServiceEvent, tx: TransactionClient): Promise<void> {
|
private async handleSystemAccountSynced(event: ServiceEvent, tx: TransactionClient): Promise<void> {
|
||||||
const { payload } = event;
|
const { payload } = event;
|
||||||
const accountType = payload.accountType; // OPERATION / PROVINCE / CITY / HEADQUARTERS
|
const accountType = payload.accountType; // OPERATION / PROVINCE / CITY / HEADQUARTERS
|
||||||
const regionCode = payload.regionCode || null;
|
const regionCode = payload.regionCode || null;
|
||||||
|
|
||||||
// 使用 findFirst + create/update 替代 upsert,因为 regionCode 可以为 null
|
// 查找所有匹配的记录(处理可能存在的重复记录情况)
|
||||||
const existing = await tx.syncedSystemContribution.findFirst({
|
const existingRecords = await tx.syncedSystemContribution.findMany({
|
||||||
where: {
|
where: {
|
||||||
accountType,
|
accountType,
|
||||||
regionCode: regionCode === null ? { equals: null } : regionCode,
|
regionCode: regionCode === null ? { equals: null } : regionCode,
|
||||||
},
|
},
|
||||||
|
orderBy: { syncedAt: 'asc' }, // 保留最早创建的记录
|
||||||
});
|
});
|
||||||
|
|
||||||
if (existing) {
|
if (existingRecords.length > 0) {
|
||||||
|
// 更新第一条记录
|
||||||
await tx.syncedSystemContribution.update({
|
await tx.syncedSystemContribution.update({
|
||||||
where: { id: existing.id },
|
where: { id: existingRecords[0].id },
|
||||||
data: {
|
data: {
|
||||||
name: payload.name,
|
name: payload.name,
|
||||||
contributionBalance: payload.contributionBalance,
|
contributionBalance: payload.contributionBalance,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 如果存在重复记录,删除多余的(只保留第一条)
|
||||||
|
if (existingRecords.length > 1) {
|
||||||
|
const duplicateIds = existingRecords.slice(1).map(r => r.id);
|
||||||
|
await tx.syncedSystemContribution.deleteMany({
|
||||||
|
where: { id: { in: duplicateIds } },
|
||||||
|
});
|
||||||
|
this.logger.warn(
|
||||||
|
`Deleted ${duplicateIds.length} duplicate system contribution records for ${accountType}:${regionCode}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
await tx.syncedSystemContribution.create({
|
await tx.syncedSystemContribution.create({
|
||||||
data: {
|
data: {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue