feat(pre-planting): 合成树后算力切换(预种 5 份合同签署触发)

当用户购买满5份预种后合成1棵树并签署合同时,自动执行算力切换:
1. 作废5份份额的算力记录(is_expired=true,remark 标注合成原因,已挖积分不受影响)
2. 从认种人账户扣减旧个人算力(保持账户余额准确)
3. 以1棵完整树的算力单价创建新算力记录(remark 标注来源订单)
4. 写入 pre_planting_synced_merges 幂等标记

== 实现方式 ==
- 触发节点:Debezium CDC on pre_planting_merges.mining_enabled_at(null → 非null)
- 新增 Debezium table:public.pre_planting_merges
- 新增 Kafka topic 订阅:cdc.pre-planting.public.pre_planting_merges
- 新增 handler:PrePlantingMergeSyncedHandler(解析 CDC 事件)
- 新增 service 方法:swapContributionForMerge(核心算力切换逻辑)
- 新增常量:PRE_PLANTING_MERGE_SOURCE_ID_OFFSET = 20B(区别于份额的 10B 偏移)
- 新增 DB 表:pre_planting_synced_merges(幂等标记,migration 已包含)

== 幂等保证 ==
- CDC 层:processedCdcEvent 表(sourceTopic + offset 唯一)
- 业务层:contribution_records WHERE sourceAdoptionId=20B+mergeId 存在性检查
- 标记层:pre_planting_synced_merges(best-effort,事务提交后写入)

== 对现有系统的影响 ==
- 零修改现有 contribution 调度器 / freeze scheduler
- 团队分润账户净效果≈0(旧5份=1棵树,切换后金额一致)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-28 07:22:09 -08:00
parent eea38b2b86
commit 4c6fd424b5
9 changed files with 434 additions and 5 deletions

View File

@ -0,0 +1,17 @@
-- Migration: add pre_planting_synced_merges table
-- 预种合成树算力切换幂等标记表
-- 每当5份预种合成1棵树并签署合同后contribution-service 完成算力切换后写入本表
CREATE TABLE "pre_planting_synced_merges" (
"id" BIGSERIAL PRIMARY KEY,
"merge_no" VARCHAR(50) NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"source_order_nos" JSONB NOT NULL,
"new_source_adoption_id" BIGINT NOT NULL,
"swapped_at" TIMESTAMPTZ NOT NULL DEFAULT now(),
"source_topic" VARCHAR(200) NOT NULL,
"source_offset" BIGINT NOT NULL,
CONSTRAINT "pre_planting_synced_merges_merge_no_key" UNIQUE ("merge_no")
);
CREATE INDEX "pre_planting_synced_merges_account_sequence_idx"
ON "pre_planting_synced_merges"("account_sequence");

View File

@ -116,6 +116,36 @@ model PrePlantingFreezeState {
// 预种 CDC 幂等性追踪表
// ============================================
// ============================================
// 预种合成树追踪表
// ============================================
/// 预种合成树算力切换记录(幂等标记)
///
/// 每当5份预种合成1棵树并签署合同后mining_enabled_at 写入),
/// contribution-service 完成算力切换后写入本表作为幂等标记。
/// 记录旧份额的 sourceOrderNos 和新树的 newSourceAdoptionId
/// 便于审计和追溯。
model PrePlantingSyncedMerge {
id BigInt @id @default(autoincrement())
mergeNo String @unique @map("merge_no") @db.VarChar(50)
accountSequence String @map("account_sequence") @db.VarChar(20)
sourceOrderNos Json @map("source_order_nos") // 原5份订单号JSON 数组)
newSourceAdoptionId BigInt @map("new_source_adoption_id") // 新树算力的 sourceAdoptionId20B + mergeId
swappedAt DateTime @default(now()) @map("swapped_at")
// CDC 同步元数据
sourceTopic String @map("source_topic") @db.VarChar(200)
sourceOffset BigInt @map("source_offset")
@@index([accountSequence])
@@map("pre_planting_synced_merges")
}
// ============================================
// 预种 CDC 幂等性追踪表
// ============================================
/// 已处理的预种 CDC 事件(幂等性保证)
/// 使用 (sourceTopic, offset) 作为复合唯一键
model PrePlantingProcessedCdcEvent {

View File

@ -2,6 +2,8 @@ import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { PrePlantingCdcConsumerService } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service';
import { PrePlantingOrderSyncedHandler, PrePlantingOrderSyncResult } from './pre-planting-order-synced.handler';
import { PrePlantingPositionSyncedHandler } from './pre-planting-position-synced.handler';
import { PrePlantingMergeSyncedHandler } from './pre-planting-merge-synced.handler';
import { PrePlantingMergeSyncResult } from '../services/pre-planting-contribution.service';
/**
* CDC
@ -25,6 +27,7 @@ export class PrePlantingCdcDispatcher implements OnModuleInit {
private readonly cdcConsumer: PrePlantingCdcConsumerService,
private readonly orderHandler: PrePlantingOrderSyncedHandler,
private readonly positionHandler: PrePlantingPositionSyncedHandler,
private readonly mergeHandler: PrePlantingMergeSyncedHandler,
) {}
async onModuleInit() {
@ -41,10 +44,20 @@ export class PrePlantingCdcDispatcher implements OnModuleInit {
this.positionHandler.handle.bind(this.positionHandler),
);
// 注册预种合成树 handler带后置回调事务提交后执行算力切换
this.cdcConsumer.registerHandler<PrePlantingMergeSyncResult | null>(
'pre_planting_merges',
this.mergeHandler.handle.bind(this.mergeHandler),
this.mergeHandler.swapAfterCommit.bind(this.mergeHandler),
);
// 非阻塞启动 CDC 消费者
this.cdcConsumer.start()
.then(() => {
this.logger.log('[PRE-PLANTING-CDC] Dispatcher started with handlers: pre_planting_orders, pre_planting_positions');
this.logger.log(
'[PRE-PLANTING-CDC] Dispatcher started with handlers: ' +
'pre_planting_orders, pre_planting_positions, pre_planting_merges',
);
})
.catch((error) => {
this.logger.error('[PRE-PLANTING-CDC] Failed to start dispatcher', error);

View File

@ -0,0 +1,142 @@
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service';
import { PrePlantingContributionService, PrePlantingMergeSyncResult } from '../services/pre-planting-contribution.service';
/**
* CDC
*
* [2026-02-28] cdc.pre-planting.public.pre_planting_merges CDC
*
* === ===
* planting-service.signContract(mergeNo) pre_planting_merges.mining_enabled_at null
* nullDebezium UPDATE handler
*
* === ===
* 1. mining_enabled_at UPDATE
* 2. mergeNoaccountSequencesourceOrderNosminingEnabledAt
* 3. PrePlantingMergeSyncResult dispatcher
*
* === ===
* - CDC processedCdcEvent sourceTopic + offset Prisma
* - contribution_records sourceAdoptionId(20B + mergeId)
* - pre_planting_synced_merges best-effort
*/
@Injectable()
export class PrePlantingMergeSyncedHandler {
private readonly logger = new Logger(PrePlantingMergeSyncedHandler.name);
constructor(
private readonly contributionService: PrePlantingContributionService,
) {}
/**
* CDC PrismaService
*
* swapAfterCommit
*
* @param event CDC
* @param tx Prisma CDC consumer
* @returns null
*/
async handle(
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingMergeSyncResult | null> {
const { op, after } = event.payload;
// 只处理 INSERTc/r和 UPDATEuDELETE 忽略
if (!after || op === 'd') {
return null;
}
// 只处理 mining_enabled_at 非空的事件(合同签署)
const miningEnabledAtRaw = after.mining_enabled_at ?? after.miningEnabledAt;
if (!miningEnabledAtRaw) {
this.logger.debug(
`[PRE-PLANTING-MERGE] Skipping: mining_enabled_at is null, ` +
`mergeNo=${after.merge_no ?? after.mergeNo ?? 'unknown'}`,
);
return null;
}
const mergeNo = after.merge_no || after.mergeNo;
if (!mergeNo) {
this.logger.warn(`[PRE-PLANTING-MERGE] Missing merge_no in CDC event`);
return null;
}
const mergeId = after.merge_id || after.id;
if (!mergeId) {
this.logger.warn(`[PRE-PLANTING-MERGE] Missing merge_id in CDC event for mergeNo=${mergeNo}`);
return null;
}
const accountSequence = after.account_sequence || after.accountSequence;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-MERGE] Missing account_sequence for mergeNo=${mergeNo}`);
return null;
}
// 解析 source_order_nosJSONB 列Debezium 以字符串或对象传输)
const rawSourceOrderNos = after.source_order_nos ?? after.sourceOrderNos;
let sourceOrderNos: string[];
try {
if (typeof rawSourceOrderNos === 'string') {
sourceOrderNos = JSON.parse(rawSourceOrderNos);
} else if (Array.isArray(rawSourceOrderNos)) {
sourceOrderNos = rawSourceOrderNos;
} else {
this.logger.warn(
`[PRE-PLANTING-MERGE] Invalid source_order_nos for mergeNo=${mergeNo}: ` +
`${JSON.stringify(rawSourceOrderNos)}`,
);
return null;
}
} catch (e) {
this.logger.warn(`[PRE-PLANTING-MERGE] Failed to parse source_order_nos for mergeNo=${mergeNo}`, e);
return null;
}
// 解析 mining_enabled_at可能是 ISO 字符串或 epoch ms 整数)
const miningEnabledAt = new Date(
typeof miningEnabledAtRaw === 'number' ? miningEnabledAtRaw : miningEnabledAtRaw,
);
this.logger.log(
`[PRE-PLANTING-MERGE] Detected contract signed: mergeNo=${mergeNo}, ` +
`accountSequence=${accountSequence}, miningEnabledAt=${miningEnabledAt.toISOString()}, ` +
`sourceOrders=${sourceOrderNos.join(',')}`,
);
return {
mergeNo,
mergeId: BigInt(mergeId),
accountSequence,
sourceOrderNos,
miningEnabledAt,
sourceTopic: event.topic,
sourceOffset: event.offset,
};
}
/**
*
*
* PrePlantingCdcDispatcher Prisma
* CDC offset
*/
async swapAfterCommit(result: PrePlantingMergeSyncResult): Promise<void> {
this.logger.log(`[PRE-PLANTING-MERGE] Triggering contribution swap: mergeNo=${result.mergeNo}`);
try {
await this.contributionService.swapContributionForMerge(result);
this.logger.log(`[PRE-PLANTING-MERGE] Contribution swap completed: mergeNo=${result.mergeNo}`);
} catch (error) {
// 算力切换失败不阻断 CDC 消费,下次 Kafka 重试时幂等保护会防止重复执行
this.logger.error(
`[PRE-PLANTING-MERGE] Contribution swap failed: mergeNo=${result.mergeNo}`,
error,
);
}
}
}

View File

@ -17,7 +17,25 @@ import { ContributionDistributionPublisherService } from '@/application/services
import { BonusClaimService } from '@/application/services/bonus-claim.service';
import { ContributionRecordSyncedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, SystemContributionRecordCreatedEvent, UnallocatedContributionSyncedEvent } from '@/domain/events';
import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service';
import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants';
import {
PRE_PLANTING_SOURCE_ID_OFFSET,
PRE_PLANTING_PORTION_DIVISOR,
PRE_PLANTING_MERGE_SOURCE_ID_OFFSET,
} from '../../domain/constants';
/**
*
* PrePlantingMergeSyncedHandler CDC
*/
export interface PrePlantingMergeSyncResult {
mergeNo: string;
mergeId: bigint;
accountSequence: string;
sourceOrderNos: string[];
miningEnabledAt: Date;
sourceTopic: string;
sourceOffset: bigint;
}
/**
*
@ -536,6 +554,198 @@ export class PrePlantingContributionService {
}
}
/**
*
*
* [2026-02-28] 51mining_enabled_at
*
* === ===
* 1. 5is_expired=trueremark
* 2.
* 3. 1remark
* 4. saveDistributionResult 0=
* 5. pre_planting_synced_merges best-effort
*
* === ===
* contribution_records WHERE sourceAdoptionId = 20B + mergeId
* prePlantingSyncedMerge
*/
async swapContributionForMerge(result: PrePlantingMergeSyncResult): Promise<void> {
const { mergeNo, mergeId, accountSequence, sourceOrderNos, miningEnabledAt } = result;
const mergeSourceAdoptionId = PRE_PLANTING_MERGE_SOURCE_ID_OFFSET + mergeId;
// Step 1: 查找5份源订单的追踪记录
const sourceOrders = await this.prePlantingPrisma.prePlantingSyncedOrder.findMany({
where: { orderNo: { in: sourceOrderNos } },
});
if (sourceOrders.length === 0) {
this.logger.warn(
`[PRE-PLANTING-MERGE] No source orders found in tracking table for merge: ${mergeNo}. ` +
`Expected orders: ${sourceOrderNos.join(',')}`,
);
return;
}
// Step 2: 计算份额 sourceAdoptionId 列表10B 偏移,与 calculateForPrePlantingOrder 一致)
const portionSourceAdoptionIds = sourceOrders.map(
(o) => PRE_PLANTING_SOURCE_ID_OFFSET + o.originalOrderId,
);
// Step 3: 幂等检查 —— 若新树算力记录已存在,直接跳过
const alreadyProcessed = await this.contributionRecordRepository.existsBySourceAdoptionId(
mergeSourceAdoptionId,
);
if (alreadyProcessed) {
this.logger.debug(`[PRE-PLANTING-MERGE] Already swapped (idempotent skip): ${mergeNo}`);
return;
}
// Step 4: 获取合成时的全树算力单价(以 mining_enabled_at 为基准日不除以5
let contributionPerTree = new Decimal('22617');
try {
contributionPerTree = await this.contributionRateService.getContributionPerTree(miningEnabledAt);
} catch (error) {
this.logger.warn(`[PRE-PLANTING-MERGE] Failed to get contribution rate, using default`, error);
}
// Step 5: 构建虚拟 SyncedAdoption1棵完整树不÷5
const firstOrder = sourceOrders[0];
const virtualAdoption: SyncedAdoption = {
id: BigInt(0),
originalAdoptionId: mergeSourceAdoptionId,
accountSequence,
treeCount: 1,
adoptionDate: miningEnabledAt,
status: 'MINING_ENABLED',
selectedProvince: firstOrder.provinceCode || '',
selectedCity: firstOrder.cityCode || '',
contributionPerTree,
sourceSequenceNum: BigInt(0),
syncedAt: new Date(),
contributionDistributed: false,
contributionDistributedAt: null,
createdAt: miningEnabledAt,
};
// Step 6: 获取推荐关系链
const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(accountSequence);
if (!userReferral) {
throw new Error(
`[PRE-PLANTING-MERGE] Referral not synced for ${accountSequence}, ` +
`cannot swap contribution for merge ${mergeNo}`,
);
}
let ancestorChain: SyncedReferral[] = [];
if (userReferral.referrerAccountSequence) {
ancestorChain = await this.syncedDataRepository.findAncestorChain(
userReferral.referrerAccountSequence,
15,
);
}
// Step 7: 获取算力账户
const adopterAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
const ancestorAccounts = await this.contributionAccountRepository.findByAccountSequences(
ancestorChain.map((a) => a.accountSequence),
);
// Step 8: 使用领域计算器计算新树分配(复用现有逻辑)
const newTreeResult = this.domainCalculator.calculateAdoptionContribution(
virtualAdoption,
adopterAccount,
ancestorChain,
ancestorAccounts,
);
// Step 9: 在事务中执行算力切换(原子操作)
const expireRemark =
`预种合成本份额已合并为1棵树${mergeNo}),算力转为树级计算,已挖积分不受影响`;
const mergeRemark =
`预种合成树由5份份额合并${mergeNo})算力升级为整棵树,` +
`源订单:${sourceOrderNos.join('、')}`;
await this.unitOfWork.executeInTransaction(async () => {
const tx = this.unitOfWork.getClient();
// 9a: 查询即将作废的个人算力总量(用于账户扣减)
const personalSumResult = await tx.contributionRecord.aggregate({
where: {
sourceAdoptionId: { in: portionSourceAdoptionIds },
sourceType: 'PERSONAL',
isExpired: false,
},
_sum: { amount: true },
});
const expiredPersonalStr = personalSumResult._sum.amount?.toString() ?? '0';
// 9b: 作废旧份额算力记录全部类型PERSONAL + TEAM_LEVEL + TEAM_BONUS
const expiredCount = await tx.contributionRecord.updateMany({
where: {
sourceAdoptionId: { in: portionSourceAdoptionIds },
isExpired: false,
},
data: {
isExpired: true,
expiredAt: miningEnabledAt,
remark: expireRemark,
},
});
this.logger.log(
`[PRE-PLANTING-MERGE] Expired ${expiredCount.count} portion records for merge ${mergeNo}`,
);
// 9c: 从认种人账户扣减旧个人算力personal_contribution - 和 effective_contribution -
if (parseFloat(expiredPersonalStr) > 0) {
await tx.contributionAccount.updateMany({
where: { accountSequence },
data: {
personalContribution: { decrement: expiredPersonalStr },
effectiveContribution: { decrement: expiredPersonalStr },
},
});
}
// 9d: 创建新树算力分配记录personal 70% + 团队15级 7.5% + 加成奖励 7.5%
// 内部调用复用 saveDistributionResult各 repository 自动使用事务 client
await this.saveDistributionResult(newTreeResult, mergeSourceAdoptionId, accountSequence);
// 9e: 为新树算力记录补充 remark标注合成来源
await tx.contributionRecord.updateMany({
where: { sourceAdoptionId: mergeSourceAdoptionId },
data: { remark: mergeRemark },
});
});
// Step 10: 插入幂等标记(最终一致性,事务提交后 best-effort 写入)
try {
await this.prePlantingPrisma.prePlantingSyncedMerge.create({
data: {
mergeNo,
accountSequence,
sourceOrderNos,
newSourceAdoptionId: mergeSourceAdoptionId,
sourceTopic: result.sourceTopic,
sourceOffset: result.sourceOffset,
},
});
} catch (error: any) {
if (error.code === 'P2002') {
this.logger.debug(`[PRE-PLANTING-MERGE] Idempotency marker already exists: ${mergeNo}`);
} else {
this.logger.error(`[PRE-PLANTING-MERGE] Failed to insert idempotency marker: ${mergeNo}`, error);
}
}
this.logger.log(
`[PRE-PLANTING-MERGE] Swap completed: mergeNo=${mergeNo}, ` +
`accountSequence=${accountSequence}, ` +
`expiredPortions=${portionSourceAdoptionIds.length}, ` +
`newSourceAdoptionId=${mergeSourceAdoptionId}`,
);
}
private async publishContributionAccountUpdatedEvent(
account: ContributionAccountAggregate,
): Promise<void> {

View File

@ -50,3 +50,14 @@ export const PRE_PLANTING_CDC_TOPIC_PREFIX = 'cdc.pre-planting';
* contribution-service-cdc-group
*/
export const PRE_PLANTING_CDC_GROUP_ID = 'contribution-pre-planting-cdc';
/**
* sourceAdoptionId
*
* contribution_records 10B
* sourceAdoptionId = PRE_PLANTING_MERGE_SOURCE_ID_OFFSET + mergeId
*
* 20,000,000,000 (200 亿)
* 10B
*/
export const PRE_PLANTING_MERGE_SOURCE_ID_OFFSET = 20_000_000_000n;

View File

@ -176,9 +176,13 @@ export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDest
'PRE_PLANTING_CDC_TOPIC_POSITIONS',
'cdc.pre-planting.public.pre_planting_positions',
);
const topicMerges = this.configService.get<string>(
'PRE_PLANTING_CDC_TOPIC_MERGES',
'cdc.pre-planting.public.pre_planting_merges',
);
await this.consumer.subscribe({
topics: [topicOrders, topicPositions],
topics: [topicOrders, topicPositions, topicMerges],
fromBeginning: true,
});
@ -191,7 +195,7 @@ export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDest
});
this.logger.log(
`[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}]`,
`[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}, ${topicMerges}]`,
);
} catch (error) {
this.logger.error('[PRE-PLANTING-CDC] Failed to start consumer', error);

View File

@ -11,6 +11,7 @@ import { PrePlantingCdcConsumerService } from './infrastructure/kafka/pre-planti
// CDC Event Handlers
import { PrePlantingOrderSyncedHandler } from './application/handlers/pre-planting-order-synced.handler';
import { PrePlantingPositionSyncedHandler } from './application/handlers/pre-planting-position-synced.handler';
import { PrePlantingMergeSyncedHandler } from './application/handlers/pre-planting-merge-synced.handler';
import { PrePlantingCdcDispatcher } from './application/handlers/pre-planting-cdc-dispatcher';
// Application Services
@ -71,6 +72,7 @@ import { BonusClaimService } from '../application/services/bonus-claim.service';
// CDC Event Handlers
PrePlantingOrderSyncedHandler,
PrePlantingPositionSyncedHandler,
PrePlantingMergeSyncedHandler,
PrePlantingCdcDispatcher,
// Application Services (预种)

View File

@ -12,7 +12,7 @@
"topic.prefix": "cdc.pre-planting",
"table.include.list": "public.pre_planting_orders,public.pre_planting_positions",
"table.include.list": "public.pre_planting_orders,public.pre_planting_positions,public.pre_planting_merges",
"plugin.name": "pgoutput",
"publication.name": "debezium_pre_planting_publication",