From 390e5ccb1936a5f9add11a1e506cac0ca7e357a2 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 28 Feb 2026 02:31:04 -0800 Subject: [PATCH] =?UTF-8?q?fix(pre-planting):=20=E7=94=A8=20orderNo=20?= =?UTF-8?q?=E6=9B=BF=E4=BB=A3=20BigInt=20=E8=87=AA=E5=A2=9E=20ID=20?= =?UTF-8?q?=E4=BD=9C=E4=B8=BA=20CDC=20=E5=85=B3=E8=81=94=E9=94=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:Debezium CDC 事件中 Prisma @map("order_id") 字段以 DB 列名 order_id 发送,而代码访问 data.id 导致 undefined → BigInt 转换失败。 修复方案(遵循"用 orderNo 业务键关联"原则): - pre-planting-order-synced.handler.ts: * PrePlantingOrderSyncResult 改为 { orderNo: string } * handleCreateOrSnapshot/handleUpdate 均用 order_no 字段 * syncToTrackingTable upsert where 改为 { orderNo } * ensureAdoptionMarker 入参从 orderId bigint 改为 orderNo string - markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + hash(orderNo) * isAlreadyDistributed 改为 findUnique({ where: { orderNo } }) * calculateAfterCommit 传 result.orderNo - pre-planting-contribution.service.ts: * calculateForPrePlantingOrder 入参从 bigint 改为 string(orderNo) * findUnique({ where: { orderNo } }) 查询,用存储的 originalOrderId 计算偏移 * 所有日志/update 中 originalOrderId 替换为 orderNo * processUndistributedOrders 改为传 order.orderNo,orderBy 改为 createdAt - schema.prisma:orderNo 字段新增 @unique 约束 - migration SQL:CREATE UNIQUE INDEX on order_no 列 Co-Authored-By: Claude Sonnet 4.6 --- .../migration.sql | 7 ++ .../prisma/pre-planting/schema.prisma | 2 +- .../pre-planting-order-synced.handler.ts | 66 ++++++++++--------- .../pre-planting-contribution.service.ts | 44 ++++++------- 4 files changed, 65 insertions(+), 54 deletions(-) create mode 100644 backend/services/contribution-service/prisma/pre-planting/migrations/20260228000000_add_unique_order_no_to_synced_orders/migration.sql diff --git a/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000000_add_unique_order_no_to_synced_orders/migration.sql b/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000000_add_unique_order_no_to_synced_orders/migration.sql new file mode 100644 index 00000000..bdec06c4 --- /dev/null +++ b/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000000_add_unique_order_no_to_synced_orders/migration.sql @@ -0,0 +1,7 @@ +-- Migration: add unique constraint to order_no in pre_planting_synced_orders +-- 用 orderNo(业务单号)作为追踪表的唯一键,避免依赖数据库自增 BigInt ID +-- 调整背景:Debezium CDC 事件中 @map("order_id") 列名与 Prisma 字段名 id 不一致, +-- 改用 orderNo 作为 upsert 和查询的唯一标识,更稳定、更语义化。 + +CREATE UNIQUE INDEX IF NOT EXISTS "pre_planting_synced_orders_order_no_key" + ON "pre_planting_synced_orders"("order_no"); diff --git a/backend/services/contribution-service/prisma/pre-planting/schema.prisma b/backend/services/contribution-service/prisma/pre-planting/schema.prisma index b00dcb11..86feba9d 100644 --- a/backend/services/contribution-service/prisma/pre-planting/schema.prisma +++ b/backend/services/contribution-service/prisma/pre-planting/schema.prisma @@ -30,7 +30,7 @@ datasource db { model PrePlantingSyncedOrder { id BigInt @id @default(autoincrement()) originalOrderId BigInt @unique @map("original_order_id") - orderNo String @map("order_no") @db.VarChar(50) + orderNo String @unique @map("order_no") @db.VarChar(50) userId BigInt @map("user_id") accountSequence String @map("account_sequence") @db.VarChar(20) portionCount Int @map("portion_count") diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts index 85882737..3ce16c62 100644 --- a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts @@ -9,9 +9,10 @@ import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../ /** * 预种订单同步结果(用于事务提交后的算力计算) + * 使用 orderNo(业务单号)作为关联键,避免依赖数据库自增 BigInt ID */ export interface PrePlantingOrderSyncResult { - originalOrderId: bigint; + orderNo: string; needsCalculation: boolean; } @@ -62,7 +63,7 @@ export class PrePlantingOrderSyncedHandler { } this.logger.log( - `[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.id}, status=${data.status}`, + `[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.order_id ?? data.id}, status=${data.status}`, ); switch (op) { @@ -86,14 +87,14 @@ export class PrePlantingOrderSyncedHandler { async calculateAfterCommit(result: PrePlantingOrderSyncResult): Promise { if (!result?.needsCalculation) return; - this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderId=${result.originalOrderId}`); + this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderNo=${result.orderNo}`); try { - await this.contributionService.calculateForPrePlantingOrder(result.originalOrderId); - this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderId=${result.originalOrderId}`); + await this.contributionService.calculateForPrePlantingOrder(result.orderNo); + this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderNo=${result.orderNo}`); } catch (error) { // 算力计算失败不影响数据同步,后续调度器会重试 this.logger.error( - `[PRE-PLANTING-ORDER] Contribution calculation failed: orderId=${result.originalOrderId}`, + `[PRE-PLANTING-ORDER] Contribution calculation failed: orderNo=${result.orderNo}`, error, ); } @@ -104,12 +105,12 @@ export class PrePlantingOrderSyncedHandler { event: PrePlantingCdcEvent, tx: Prisma.TransactionClient, ): Promise { - const orderId = BigInt(data.id); + const orderNo = data.order_no || data.orderNo; const accountSequence = data.account_sequence || data.accountSequence; const status = data.status; - if (!accountSequence) { - this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`); + if (!orderNo || !accountSequence) { + this.logger.warn(`[PRE-PLANTING-ORDER] Missing orderNo or accountSequence, data=${JSON.stringify(data)}`); return null; } @@ -125,19 +126,16 @@ export class PrePlantingOrderSyncedHandler { } const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR); - // 同步到预种追踪表(事务外,最终一致性) + // 同步到预种追踪表(以 orderNo 为唯一键) await this.syncToTrackingTable(data, event, contributionPerPortion); // 当状态为 PAID 时,在 synced_adoptions 中插入 marker(用于 unlock 计数) const needsCalculation = status === 'PAID'; if (needsCalculation) { - await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx); + await this.ensureAdoptionMarker(accountSequence, orderNo, paidAt, tx); } - return { - originalOrderId: orderId, - needsCalculation, - }; + return { orderNo, needsCalculation }; } private async handleUpdate( @@ -146,13 +144,13 @@ export class PrePlantingOrderSyncedHandler { event: PrePlantingCdcEvent, tx: Prisma.TransactionClient, ): Promise { - const orderId = BigInt(after.id); + const orderNo = after.order_no || after.orderNo; const accountSequence = after.account_sequence || after.accountSequence; const newStatus = after.status; const oldStatus = before?.status; - if (!accountSequence) { - this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`); + if (!orderNo || !accountSequence) { + this.logger.warn(`[PRE-PLANTING-ORDER] Missing orderNo or accountSequence in update`); return null; } @@ -174,21 +172,21 @@ export class PrePlantingOrderSyncedHandler { // 只在状态变为 PAID(且之前不是 PAID)时触发算力计算 const statusChangedToPaid = newStatus === 'PAID' && oldStatus !== 'PAID'; if (statusChangedToPaid) { - await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx); + await this.ensureAdoptionMarker(accountSequence, orderNo, paidAt, tx); } // 检查是否已分配 - const alreadyDistributed = await this.isAlreadyDistributed(orderId); + const alreadyDistributed = await this.isAlreadyDistributed(orderNo); return { - originalOrderId: orderId, + orderNo, needsCalculation: statusChangedToPaid && !alreadyDistributed, }; } /** * 同步到预种追踪表(pre_planting_synced_orders) - * 使用 PrePlantingPrismaService(独立 schema) + * 使用 orderNo(业务单号)作为 upsert 唯一键,不依赖 BigInt 自增 ID */ private async syncToTrackingTable( data: any, @@ -196,12 +194,13 @@ export class PrePlantingOrderSyncedHandler { contributionPerPortion: Decimal, ): Promise { try { - const orderId = BigInt(data.id); + const orderNo = data.order_no || data.orderNo || ''; + const originalOrderId = data.order_id ? BigInt(data.order_id) : BigInt(0); await this.prePlantingPrisma.prePlantingSyncedOrder.upsert({ - where: { originalOrderId: orderId }, + where: { orderNo }, create: { - originalOrderId: orderId, - orderNo: data.order_no || data.orderNo || '', + originalOrderId, + orderNo, userId: BigInt(data.user_id || data.userId || 0), accountSequence: data.account_sequence || data.accountSequence, portionCount: data.portion_count || data.portionCount || 1, @@ -248,7 +247,7 @@ export class PrePlantingOrderSyncedHandler { */ private async ensureAdoptionMarker( accountSequence: string, - orderId: bigint, + orderNo: string, paidAt: string | null, tx: Prisma.TransactionClient, ): Promise { @@ -283,7 +282,12 @@ export class PrePlantingOrderSyncedHandler { return; } - const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderId; + // 用 orderNo 的字符编码生成一个稳定的、不与正常认种冲突的唯一 BigInt + // 格式:PRE_PLANTING_SOURCE_ID_OFFSET + hash(orderNo) % 1_000_000_000n + const orderNoHash = BigInt( + orderNo.split('').reduce((acc, c) => (acc * 31 + c.charCodeAt(0)) & 0x7fffffff, 0), + ); + const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderNoHash; const adoptionDate = paidAt ? new Date(paidAt) : new Date(); await tx.syncedAdoption.create({ @@ -308,12 +312,12 @@ export class PrePlantingOrderSyncedHandler { } /** - * 检查预种订单是否已经分配过算力 + * 检查预种订单是否已经分配过算力(通过 orderNo 查询) */ - private async isAlreadyDistributed(orderId: bigint): Promise { + private async isAlreadyDistributed(orderNo: string): Promise { try { const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({ - where: { originalOrderId: orderId }, + where: { orderNo }, select: { contributionDistributed: true }, }); return order?.contributionDistributed ?? false; diff --git a/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts index 228adc6c..85a7e699 100644 --- a/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts +++ b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts @@ -58,30 +58,30 @@ export class PrePlantingContributionService { /** * 为预种订单计算并分配算力 * - * @param originalOrderId 预种订单原始 ID(planting-service 中的 pre_planting_orders.id) + * @param orderNo 预种订单业务单号(如 PPLMM...),通过 accountSequence 关联,不依赖 BigInt 自增 ID */ - async calculateForPrePlantingOrder(originalOrderId: bigint): Promise { - // 生成偏移后的 sourceAdoptionId - const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + originalOrderId; - - // 检查是否已经处理过(使用偏移后的 ID) - const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId); - if (exists) { - this.logger.debug(`Pre-planting order ${originalOrderId} already processed, skipping`); - return; - } - - // 从预种追踪表获取订单数据 + async calculateForPrePlantingOrder(orderNo: string): Promise { + // 从预种追踪表获取订单数据(以 orderNo 为唯一键) const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({ - where: { originalOrderId }, + where: { orderNo }, }); if (!order) { - throw new Error(`Pre-planting order not found: ${originalOrderId}`); + throw new Error(`Pre-planting order not found: ${orderNo}`); } if (order.status !== 'PAID') { - this.logger.debug(`Pre-planting order ${originalOrderId} status=${order.status}, skipping`); + this.logger.debug(`Pre-planting order ${orderNo} status=${order.status}, skipping`); + return; + } + + // 生成偏移后的 sourceAdoptionId(用存储的 originalOrderId 保证唯一性,仅内部使用) + const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + order.originalOrderId; + + // 检查是否已经处理过(避免重复计算) + const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId); + if (exists) { + this.logger.debug(`Pre-planting order ${orderNo} already processed, skipping`); return; } @@ -120,7 +120,7 @@ export class PrePlantingContributionService { if (!userReferral) { this.logger.warn( - `[PRE-PLANTING] Deferring order ${originalOrderId}: ` + + `[PRE-PLANTING] Deferring order ${orderNo}: ` + `referral for ${order.accountSequence} not yet synced`, ); return; @@ -176,7 +176,7 @@ export class PrePlantingContributionService { // 标记预种追踪表为已分配 try { await this.prePlantingPrisma.prePlantingSyncedOrder.update({ - where: { originalOrderId }, + where: { orderNo }, data: { contributionDistributed: true, contributionDistributedAt: new Date(), @@ -187,7 +187,7 @@ export class PrePlantingContributionService { } this.logger.log( - `Pre-planting contribution calculated: orderId=${originalOrderId}, ` + + `Pre-planting contribution calculated: orderNo=${orderNo}, ` + `sourceId=${sourceAdoptionId}, personal=${result.personalRecord.amount.value}, ` + `teamLevel=${result.teamLevelRecords.length}, teamBonus=${result.teamBonusRecords.length}`, ); @@ -206,17 +206,17 @@ export class PrePlantingContributionService { contributionDistributed: false, }, take: batchSize, - orderBy: { originalOrderId: 'asc' }, + orderBy: { createdAt: 'asc' }, }); let count = 0; for (const order of orders) { try { - await this.calculateForPrePlantingOrder(order.originalOrderId); + await this.calculateForPrePlantingOrder(order.orderNo); count++; } catch (error) { this.logger.error( - `Failed to process pre-planting order ${order.originalOrderId}`, + `Failed to process pre-planting order ${order.orderNo}`, error, ); }