fix(pre-planting): 用 orderNo 替代 BigInt 自增 ID 作为 CDC 关联键

问题:Debezium CDC 事件中 Prisma @map("order_id") 字段以 DB 列名
order_id 发送,而代码访问 data.id 导致 undefined → BigInt 转换失败。

修复方案(遵循"用 orderNo 业务键关联"原则):
- pre-planting-order-synced.handler.ts:
  * PrePlantingOrderSyncResult 改为 { orderNo: string }
  * handleCreateOrSnapshot/handleUpdate 均用 order_no 字段
  * syncToTrackingTable upsert where 改为 { orderNo }
  * ensureAdoptionMarker 入参从 orderId bigint 改为 orderNo string
    - markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + hash(orderNo)
  * isAlreadyDistributed 改为 findUnique({ where: { orderNo } })
  * calculateAfterCommit 传 result.orderNo
- pre-planting-contribution.service.ts:
  * calculateForPrePlantingOrder 入参从 bigint 改为 string(orderNo)
  * findUnique({ where: { orderNo } }) 查询,用存储的 originalOrderId 计算偏移
  * 所有日志/update 中 originalOrderId 替换为 orderNo
  * processUndistributedOrders 改为传 order.orderNo,orderBy 改为 createdAt
- schema.prisma:orderNo 字段新增 @unique 约束
- migration SQL:CREATE UNIQUE INDEX on order_no 列

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-28 02:31:04 -08:00
parent 560674f2e9
commit 390e5ccb19
4 changed files with 65 additions and 54 deletions

View File

@ -0,0 +1,7 @@
-- Migration: add unique constraint to order_no in pre_planting_synced_orders
-- 用 orderNo业务单号作为追踪表的唯一键避免依赖数据库自增 BigInt ID
-- 调整背景Debezium CDC 事件中 @map("order_id") 列名与 Prisma 字段名 id 不一致,
-- 改用 orderNo 作为 upsert 和查询的唯一标识,更稳定、更语义化。
CREATE UNIQUE INDEX IF NOT EXISTS "pre_planting_synced_orders_order_no_key"
ON "pre_planting_synced_orders"("order_no");

View File

@ -30,7 +30,7 @@ datasource db {
model PrePlantingSyncedOrder {
id BigInt @id @default(autoincrement())
originalOrderId BigInt @unique @map("original_order_id")
orderNo String @map("order_no") @db.VarChar(50)
orderNo String @unique @map("order_no") @db.VarChar(50)
userId BigInt @map("user_id")
accountSequence String @map("account_sequence") @db.VarChar(20)
portionCount Int @map("portion_count")

View File

@ -9,9 +9,10 @@ import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../
/**
*
* 使 orderNo BigInt ID
*/
export interface PrePlantingOrderSyncResult {
originalOrderId: bigint;
orderNo: string;
needsCalculation: boolean;
}
@ -62,7 +63,7 @@ export class PrePlantingOrderSyncedHandler {
}
this.logger.log(
`[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.id}, status=${data.status}`,
`[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.order_id ?? data.id}, status=${data.status}`,
);
switch (op) {
@ -86,14 +87,14 @@ export class PrePlantingOrderSyncedHandler {
async calculateAfterCommit(result: PrePlantingOrderSyncResult): Promise<void> {
if (!result?.needsCalculation) return;
this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderId=${result.originalOrderId}`);
this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderNo=${result.orderNo}`);
try {
await this.contributionService.calculateForPrePlantingOrder(result.originalOrderId);
this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderId=${result.originalOrderId}`);
await this.contributionService.calculateForPrePlantingOrder(result.orderNo);
this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderNo=${result.orderNo}`);
} catch (error) {
// 算力计算失败不影响数据同步,后续调度器会重试
this.logger.error(
`[PRE-PLANTING-ORDER] Contribution calculation failed: orderId=${result.originalOrderId}`,
`[PRE-PLANTING-ORDER] Contribution calculation failed: orderNo=${result.orderNo}`,
error,
);
}
@ -104,12 +105,12 @@ export class PrePlantingOrderSyncedHandler {
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingOrderSyncResult | null> {
const orderId = BigInt(data.id);
const orderNo = data.order_no || data.orderNo;
const accountSequence = data.account_sequence || data.accountSequence;
const status = data.status;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`);
if (!orderNo || !accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing orderNo or accountSequence, data=${JSON.stringify(data)}`);
return null;
}
@ -125,19 +126,16 @@ export class PrePlantingOrderSyncedHandler {
}
const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR);
// 同步到预种追踪表(事务外,最终一致性
// 同步到预种追踪表(以 orderNo 为唯一键
await this.syncToTrackingTable(data, event, contributionPerPortion);
// 当状态为 PAID 时,在 synced_adoptions 中插入 marker用于 unlock 计数)
const needsCalculation = status === 'PAID';
if (needsCalculation) {
await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx);
await this.ensureAdoptionMarker(accountSequence, orderNo, paidAt, tx);
}
return {
originalOrderId: orderId,
needsCalculation,
};
return { orderNo, needsCalculation };
}
private async handleUpdate(
@ -146,13 +144,13 @@ export class PrePlantingOrderSyncedHandler {
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingOrderSyncResult | null> {
const orderId = BigInt(after.id);
const orderNo = after.order_no || after.orderNo;
const accountSequence = after.account_sequence || after.accountSequence;
const newStatus = after.status;
const oldStatus = before?.status;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`);
if (!orderNo || !accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing orderNo or accountSequence in update`);
return null;
}
@ -174,21 +172,21 @@ export class PrePlantingOrderSyncedHandler {
// 只在状态变为 PAID且之前不是 PAID时触发算力计算
const statusChangedToPaid = newStatus === 'PAID' && oldStatus !== 'PAID';
if (statusChangedToPaid) {
await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx);
await this.ensureAdoptionMarker(accountSequence, orderNo, paidAt, tx);
}
// 检查是否已分配
const alreadyDistributed = await this.isAlreadyDistributed(orderId);
const alreadyDistributed = await this.isAlreadyDistributed(orderNo);
return {
originalOrderId: orderId,
orderNo,
needsCalculation: statusChangedToPaid && !alreadyDistributed,
};
}
/**
* pre_planting_synced_orders
* 使 PrePlantingPrismaService schema
* 使 orderNo upsert BigInt ID
*/
private async syncToTrackingTable(
data: any,
@ -196,12 +194,13 @@ export class PrePlantingOrderSyncedHandler {
contributionPerPortion: Decimal,
): Promise<void> {
try {
const orderId = BigInt(data.id);
const orderNo = data.order_no || data.orderNo || '';
const originalOrderId = data.order_id ? BigInt(data.order_id) : BigInt(0);
await this.prePlantingPrisma.prePlantingSyncedOrder.upsert({
where: { originalOrderId: orderId },
where: { orderNo },
create: {
originalOrderId: orderId,
orderNo: data.order_no || data.orderNo || '',
originalOrderId,
orderNo,
userId: BigInt(data.user_id || data.userId || 0),
accountSequence: data.account_sequence || data.accountSequence,
portionCount: data.portion_count || data.portionCount || 1,
@ -248,7 +247,7 @@ export class PrePlantingOrderSyncedHandler {
*/
private async ensureAdoptionMarker(
accountSequence: string,
orderId: bigint,
orderNo: string,
paidAt: string | null,
tx: Prisma.TransactionClient,
): Promise<void> {
@ -283,7 +282,12 @@ export class PrePlantingOrderSyncedHandler {
return;
}
const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderId;
// 用 orderNo 的字符编码生成一个稳定的、不与正常认种冲突的唯一 BigInt
// 格式PRE_PLANTING_SOURCE_ID_OFFSET + hash(orderNo) % 1_000_000_000n
const orderNoHash = BigInt(
orderNo.split('').reduce((acc, c) => (acc * 31 + c.charCodeAt(0)) & 0x7fffffff, 0),
);
const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderNoHash;
const adoptionDate = paidAt ? new Date(paidAt) : new Date();
await tx.syncedAdoption.create({
@ -308,12 +312,12 @@ export class PrePlantingOrderSyncedHandler {
}
/**
*
* orderNo
*/
private async isAlreadyDistributed(orderId: bigint): Promise<boolean> {
private async isAlreadyDistributed(orderNo: string): Promise<boolean> {
try {
const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({
where: { originalOrderId: orderId },
where: { orderNo },
select: { contributionDistributed: true },
});
return order?.contributionDistributed ?? false;

View File

@ -58,30 +58,30 @@ export class PrePlantingContributionService {
/**
*
*
* @param originalOrderId IDplanting-service pre_planting_orders.id
* @param orderNo PPLMM... accountSequence BigInt ID
*/
async calculateForPrePlantingOrder(originalOrderId: bigint): Promise<void> {
// 生成偏移后的 sourceAdoptionId
const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + originalOrderId;
// 检查是否已经处理过(使用偏移后的 ID
const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId);
if (exists) {
this.logger.debug(`Pre-planting order ${originalOrderId} already processed, skipping`);
return;
}
// 从预种追踪表获取订单数据
async calculateForPrePlantingOrder(orderNo: string): Promise<void> {
// 从预种追踪表获取订单数据(以 orderNo 为唯一键)
const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({
where: { originalOrderId },
where: { orderNo },
});
if (!order) {
throw new Error(`Pre-planting order not found: ${originalOrderId}`);
throw new Error(`Pre-planting order not found: ${orderNo}`);
}
if (order.status !== 'PAID') {
this.logger.debug(`Pre-planting order ${originalOrderId} status=${order.status}, skipping`);
this.logger.debug(`Pre-planting order ${orderNo} status=${order.status}, skipping`);
return;
}
// 生成偏移后的 sourceAdoptionId用存储的 originalOrderId 保证唯一性,仅内部使用)
const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + order.originalOrderId;
// 检查是否已经处理过(避免重复计算)
const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId);
if (exists) {
this.logger.debug(`Pre-planting order ${orderNo} already processed, skipping`);
return;
}
@ -120,7 +120,7 @@ export class PrePlantingContributionService {
if (!userReferral) {
this.logger.warn(
`[PRE-PLANTING] Deferring order ${originalOrderId}: ` +
`[PRE-PLANTING] Deferring order ${orderNo}: ` +
`referral for ${order.accountSequence} not yet synced`,
);
return;
@ -176,7 +176,7 @@ export class PrePlantingContributionService {
// 标记预种追踪表为已分配
try {
await this.prePlantingPrisma.prePlantingSyncedOrder.update({
where: { originalOrderId },
where: { orderNo },
data: {
contributionDistributed: true,
contributionDistributedAt: new Date(),
@ -187,7 +187,7 @@ export class PrePlantingContributionService {
}
this.logger.log(
`Pre-planting contribution calculated: orderId=${originalOrderId}, ` +
`Pre-planting contribution calculated: orderNo=${orderNo}, ` +
`sourceId=${sourceAdoptionId}, personal=${result.personalRecord.amount.value}, ` +
`teamLevel=${result.teamLevelRecords.length}, teamBonus=${result.teamBonusRecords.length}`,
);
@ -206,17 +206,17 @@ export class PrePlantingContributionService {
contributionDistributed: false,
},
take: batchSize,
orderBy: { originalOrderId: 'asc' },
orderBy: { createdAt: 'asc' },
});
let count = 0;
for (const order of orders) {
try {
await this.calculateForPrePlantingOrder(order.originalOrderId);
await this.calculateForPrePlantingOrder(order.orderNo);
count++;
} catch (error) {
this.logger.error(
`Failed to process pre-planting order ${order.originalOrderId}`,
`Failed to process pre-planting order ${order.orderNo}`,
error,
);
}