From 4c6fd424b562a53116e99a13c5e070d57fc07951 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 28 Feb 2026 07:22:09 -0800 Subject: [PATCH] =?UTF-8?q?feat(pre-planting):=20=E5=90=88=E6=88=90?= =?UTF-8?q?=E6=A0=91=E5=90=8E=E7=AE=97=E5=8A=9B=E5=88=87=E6=8D=A2=EF=BC=88?= =?UTF-8?q?=E9=A2=84=E7=A7=8D=205=20=E4=BB=BD=E5=90=88=E5=90=8C=E7=AD=BE?= =?UTF-8?q?=E7=BD=B2=E8=A7=A6=E5=8F=91=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当用户购买满5份预种后合成1棵树并签署合同时,自动执行算力切换: 1. 作废5份份额的算力记录(is_expired=true,remark 标注合成原因,已挖积分不受影响) 2. 从认种人账户扣减旧个人算力(保持账户余额准确) 3. 以1棵完整树的算力单价创建新算力记录(remark 标注来源订单) 4. 写入 pre_planting_synced_merges 幂等标记 == 实现方式 == - 触发节点:Debezium CDC on pre_planting_merges.mining_enabled_at(null → 非null) - 新增 Debezium table:public.pre_planting_merges - 新增 Kafka topic 订阅:cdc.pre-planting.public.pre_planting_merges - 新增 handler:PrePlantingMergeSyncedHandler(解析 CDC 事件) - 新增 service 方法:swapContributionForMerge(核心算力切换逻辑) - 新增常量:PRE_PLANTING_MERGE_SOURCE_ID_OFFSET = 20B(区别于份额的 10B 偏移) - 新增 DB 表:pre_planting_synced_merges(幂等标记,migration 已包含) == 幂等保证 == - CDC 层:processedCdcEvent 表(sourceTopic + offset 唯一) - 业务层:contribution_records WHERE sourceAdoptionId=20B+mergeId 存在性检查 - 标记层:pre_planting_synced_merges(best-effort,事务提交后写入) == 对现有系统的影响 == - 零修改现有 contribution 调度器 / freeze scheduler - 团队分润账户净效果≈0(旧5份=1棵树,切换后金额一致) Co-Authored-By: Claude Sonnet 4.6 --- .../migration.sql | 17 ++ .../prisma/pre-planting/schema.prisma | 30 +++ .../handlers/pre-planting-cdc-dispatcher.ts | 15 +- .../pre-planting-merge-synced.handler.ts | 142 ++++++++++++ .../pre-planting-contribution.service.ts | 212 +++++++++++++++++- .../src/pre-planting/domain/constants.ts | 11 + .../pre-planting-cdc-consumer.service.ts | 8 +- .../pre-planting/pre-planting-cdc.module.ts | 2 + .../debezium/pre-planting-connector.json | 2 +- 9 files changed, 434 insertions(+), 5 deletions(-) create mode 100644 backend/services/contribution-service/prisma/pre-planting/migrations/20260228000001_add_synced_merges/migration.sql create mode 100644 backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-merge-synced.handler.ts diff --git a/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000001_add_synced_merges/migration.sql b/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000001_add_synced_merges/migration.sql new file mode 100644 index 00000000..4b1bec4d --- /dev/null +++ b/backend/services/contribution-service/prisma/pre-planting/migrations/20260228000001_add_synced_merges/migration.sql @@ -0,0 +1,17 @@ +-- Migration: add pre_planting_synced_merges table +-- 预种合成树算力切换幂等标记表 +-- 每当5份预种合成1棵树并签署合同后,contribution-service 完成算力切换后写入本表 +CREATE TABLE "pre_planting_synced_merges" ( + "id" BIGSERIAL PRIMARY KEY, + "merge_no" VARCHAR(50) NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "source_order_nos" JSONB NOT NULL, + "new_source_adoption_id" BIGINT NOT NULL, + "swapped_at" TIMESTAMPTZ NOT NULL DEFAULT now(), + "source_topic" VARCHAR(200) NOT NULL, + "source_offset" BIGINT NOT NULL, + CONSTRAINT "pre_planting_synced_merges_merge_no_key" UNIQUE ("merge_no") +); + +CREATE INDEX "pre_planting_synced_merges_account_sequence_idx" + ON "pre_planting_synced_merges"("account_sequence"); diff --git a/backend/services/contribution-service/prisma/pre-planting/schema.prisma b/backend/services/contribution-service/prisma/pre-planting/schema.prisma index 86feba9d..18f508db 100644 --- a/backend/services/contribution-service/prisma/pre-planting/schema.prisma +++ b/backend/services/contribution-service/prisma/pre-planting/schema.prisma @@ -116,6 +116,36 @@ model PrePlantingFreezeState { // 预种 CDC 幂等性追踪表 // ============================================ +// ============================================ +// 预种合成树追踪表 +// ============================================ + +/// 预种合成树算力切换记录(幂等标记) +/// +/// 每当5份预种合成1棵树并签署合同后(mining_enabled_at 写入), +/// contribution-service 完成算力切换后写入本表作为幂等标记。 +/// 记录旧份额的 sourceOrderNos 和新树的 newSourceAdoptionId, +/// 便于审计和追溯。 +model PrePlantingSyncedMerge { + id BigInt @id @default(autoincrement()) + mergeNo String @unique @map("merge_no") @db.VarChar(50) + accountSequence String @map("account_sequence") @db.VarChar(20) + sourceOrderNos Json @map("source_order_nos") // 原5份订单号(JSON 数组) + newSourceAdoptionId BigInt @map("new_source_adoption_id") // 新树算力的 sourceAdoptionId(20B + mergeId) + swappedAt DateTime @default(now()) @map("swapped_at") + + // CDC 同步元数据 + sourceTopic String @map("source_topic") @db.VarChar(200) + sourceOffset BigInt @map("source_offset") + + @@index([accountSequence]) + @@map("pre_planting_synced_merges") +} + +// ============================================ +// 预种 CDC 幂等性追踪表 +// ============================================ + /// 已处理的预种 CDC 事件(幂等性保证) /// 使用 (sourceTopic, offset) 作为复合唯一键 model PrePlantingProcessedCdcEvent { diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts index 55df8333..5e0acacf 100644 --- a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts @@ -2,6 +2,8 @@ import { Injectable, OnModuleInit, Logger } from '@nestjs/common'; import { PrePlantingCdcConsumerService } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service'; import { PrePlantingOrderSyncedHandler, PrePlantingOrderSyncResult } from './pre-planting-order-synced.handler'; import { PrePlantingPositionSyncedHandler } from './pre-planting-position-synced.handler'; +import { PrePlantingMergeSyncedHandler } from './pre-planting-merge-synced.handler'; +import { PrePlantingMergeSyncResult } from '../services/pre-planting-contribution.service'; /** * 预种 CDC 事件分发器 @@ -25,6 +27,7 @@ export class PrePlantingCdcDispatcher implements OnModuleInit { private readonly cdcConsumer: PrePlantingCdcConsumerService, private readonly orderHandler: PrePlantingOrderSyncedHandler, private readonly positionHandler: PrePlantingPositionSyncedHandler, + private readonly mergeHandler: PrePlantingMergeSyncedHandler, ) {} async onModuleInit() { @@ -41,10 +44,20 @@ export class PrePlantingCdcDispatcher implements OnModuleInit { this.positionHandler.handle.bind(this.positionHandler), ); + // 注册预种合成树 handler(带后置回调:事务提交后执行算力切换) + this.cdcConsumer.registerHandler( + 'pre_planting_merges', + this.mergeHandler.handle.bind(this.mergeHandler), + this.mergeHandler.swapAfterCommit.bind(this.mergeHandler), + ); + // 非阻塞启动 CDC 消费者 this.cdcConsumer.start() .then(() => { - this.logger.log('[PRE-PLANTING-CDC] Dispatcher started with handlers: pre_planting_orders, pre_planting_positions'); + this.logger.log( + '[PRE-PLANTING-CDC] Dispatcher started with handlers: ' + + 'pre_planting_orders, pre_planting_positions, pre_planting_merges', + ); }) .catch((error) => { this.logger.error('[PRE-PLANTING-CDC] Failed to start dispatcher', error); diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-merge-synced.handler.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-merge-synced.handler.ts new file mode 100644 index 00000000..b328e608 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-merge-synced.handler.ts @@ -0,0 +1,142 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service'; +import { PrePlantingContributionService, PrePlantingMergeSyncResult } from '../services/pre-planting-contribution.service'; + +/** + * 预种合成树 CDC 事件处理器 + * + * [2026-02-28] 新增:处理 cdc.pre-planting.public.pre_planting_merges 的 CDC 事件 + * + * === 触发节点 === + * planting-service.signContract(mergeNo) 调用后,pre_planting_merges.mining_enabled_at 从 null + * 变为非 null,Debezium 捕获此 UPDATE 事件,本 handler 处理。 + * + * === 处理逻辑 === + * 1. 过滤:只处理 mining_enabled_at 非空的 UPDATE 事件(合同签署触发) + * 2. 解析:提取 mergeNo、accountSequence、sourceOrderNos、miningEnabledAt + * 3. 返回:PrePlantingMergeSyncResult(由 dispatcher 在事务提交后触发算力切换) + * + * === 幂等性 === + * - CDC 层面:processedCdcEvent 表(sourceTopic + offset 唯一约束,在主 Prisma 事务内) + * - 业务层面:contribution_records 中 sourceAdoptionId(20B + mergeId) 存在性检查 + * - 最终标记:pre_planting_synced_merges 表(事务提交后 best-effort 写入) + */ +@Injectable() +export class PrePlantingMergeSyncedHandler { + private readonly logger = new Logger(PrePlantingMergeSyncedHandler.name); + + constructor( + private readonly contributionService: PrePlantingContributionService, + ) {} + + /** + * 处理预种合成树 CDC 事件(在主 PrismaService 事务内执行) + * + * 只解析事件,不执行业务逻辑(业务逻辑在 swapAfterCommit 中执行)。 + * + * @param event CDC 事件 + * @param tx 主 Prisma 事务客户端(由 CDC consumer 提供) + * @returns 合成结果(null 表示不需要处理) + */ + async handle( + event: PrePlantingCdcEvent, + tx: Prisma.TransactionClient, + ): Promise { + const { op, after } = event.payload; + + // 只处理 INSERT(c/r)和 UPDATE(u);DELETE 忽略 + if (!after || op === 'd') { + return null; + } + + // 只处理 mining_enabled_at 非空的事件(合同签署) + const miningEnabledAtRaw = after.mining_enabled_at ?? after.miningEnabledAt; + if (!miningEnabledAtRaw) { + this.logger.debug( + `[PRE-PLANTING-MERGE] Skipping: mining_enabled_at is null, ` + + `mergeNo=${after.merge_no ?? after.mergeNo ?? 'unknown'}`, + ); + return null; + } + + const mergeNo = after.merge_no || after.mergeNo; + if (!mergeNo) { + this.logger.warn(`[PRE-PLANTING-MERGE] Missing merge_no in CDC event`); + return null; + } + + const mergeId = after.merge_id || after.id; + if (!mergeId) { + this.logger.warn(`[PRE-PLANTING-MERGE] Missing merge_id in CDC event for mergeNo=${mergeNo}`); + return null; + } + + const accountSequence = after.account_sequence || after.accountSequence; + if (!accountSequence) { + this.logger.warn(`[PRE-PLANTING-MERGE] Missing account_sequence for mergeNo=${mergeNo}`); + return null; + } + + // 解析 source_order_nos(JSONB 列,Debezium 以字符串或对象传输) + const rawSourceOrderNos = after.source_order_nos ?? after.sourceOrderNos; + let sourceOrderNos: string[]; + try { + if (typeof rawSourceOrderNos === 'string') { + sourceOrderNos = JSON.parse(rawSourceOrderNos); + } else if (Array.isArray(rawSourceOrderNos)) { + sourceOrderNos = rawSourceOrderNos; + } else { + this.logger.warn( + `[PRE-PLANTING-MERGE] Invalid source_order_nos for mergeNo=${mergeNo}: ` + + `${JSON.stringify(rawSourceOrderNos)}`, + ); + return null; + } + } catch (e) { + this.logger.warn(`[PRE-PLANTING-MERGE] Failed to parse source_order_nos for mergeNo=${mergeNo}`, e); + return null; + } + + // 解析 mining_enabled_at(可能是 ISO 字符串或 epoch ms 整数) + const miningEnabledAt = new Date( + typeof miningEnabledAtRaw === 'number' ? miningEnabledAtRaw : miningEnabledAtRaw, + ); + + this.logger.log( + `[PRE-PLANTING-MERGE] Detected contract signed: mergeNo=${mergeNo}, ` + + `accountSequence=${accountSequence}, miningEnabledAt=${miningEnabledAt.toISOString()}, ` + + `sourceOrders=${sourceOrderNos.join(',')}`, + ); + + return { + mergeNo, + mergeId: BigInt(mergeId), + accountSequence, + sourceOrderNos, + miningEnabledAt, + sourceTopic: event.topic, + sourceOffset: event.offset, + }; + } + + /** + * 事务提交后的算力切换回调 + * + * 由 PrePlantingCdcDispatcher 在主 Prisma 事务成功提交后调用。 + * 失败时记录错误但不影响 CDC offset 提交(下次重试时幂等保护)。 + */ + async swapAfterCommit(result: PrePlantingMergeSyncResult): Promise { + this.logger.log(`[PRE-PLANTING-MERGE] Triggering contribution swap: mergeNo=${result.mergeNo}`); + try { + await this.contributionService.swapContributionForMerge(result); + this.logger.log(`[PRE-PLANTING-MERGE] Contribution swap completed: mergeNo=${result.mergeNo}`); + } catch (error) { + // 算力切换失败不阻断 CDC 消费,下次 Kafka 重试时幂等保护会防止重复执行 + this.logger.error( + `[PRE-PLANTING-MERGE] Contribution swap failed: mergeNo=${result.mergeNo}`, + error, + ); + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts index 85a7e699..ed9b5daa 100644 --- a/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts +++ b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts @@ -17,7 +17,25 @@ import { ContributionDistributionPublisherService } from '@/application/services import { BonusClaimService } from '@/application/services/bonus-claim.service'; import { ContributionRecordSyncedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, SystemContributionRecordCreatedEvent, UnallocatedContributionSyncedEvent } from '@/domain/events'; import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service'; -import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants'; +import { + PRE_PLANTING_SOURCE_ID_OFFSET, + PRE_PLANTING_PORTION_DIVISOR, + PRE_PLANTING_MERGE_SOURCE_ID_OFFSET, +} from '../../domain/constants'; + +/** + * 预种合成树算力切换的输入参数 + * 由 PrePlantingMergeSyncedHandler 解析 CDC 事件后传入 + */ +export interface PrePlantingMergeSyncResult { + mergeNo: string; + mergeId: bigint; + accountSequence: string; + sourceOrderNos: string[]; + miningEnabledAt: Date; + sourceTopic: string; + sourceOffset: bigint; +} /** * 预种算力计算服务 @@ -536,6 +554,198 @@ export class PrePlantingContributionService { } } + /** + * 预种合成树后算力切换 + * + * [2026-02-28] 新增:5份预种合成1棵树,合同签署(mining_enabled_at 写入)后触发 + * + * === 操作流程 === + * 1. 作废5份份额的算力记录(is_expired=true,remark 说明原因,已挖积分不追回) + * 2. 从认种人账户扣减旧个人算力(保持账户余额准确) + * 3. 以1棵完整树的算力创建新记录(remark 标注合成来源) + * 4. 团队分润账户通过 saveDistributionResult 自然更新(净效果≈0,旧=新) + * 5. 写入 pre_planting_synced_merges 幂等标记(best-effort) + * + * === 幂等性 === + * 以 contribution_records WHERE sourceAdoptionId = 20B + mergeId 的存在性为主要幂等检查。 + * 若检查通过但 prePlantingSyncedMerge 未写入,下次重试时仍会被幂等检查拦截。 + */ + async swapContributionForMerge(result: PrePlantingMergeSyncResult): Promise { + const { mergeNo, mergeId, accountSequence, sourceOrderNos, miningEnabledAt } = result; + const mergeSourceAdoptionId = PRE_PLANTING_MERGE_SOURCE_ID_OFFSET + mergeId; + + // Step 1: 查找5份源订单的追踪记录 + const sourceOrders = await this.prePlantingPrisma.prePlantingSyncedOrder.findMany({ + where: { orderNo: { in: sourceOrderNos } }, + }); + + if (sourceOrders.length === 0) { + this.logger.warn( + `[PRE-PLANTING-MERGE] No source orders found in tracking table for merge: ${mergeNo}. ` + + `Expected orders: ${sourceOrderNos.join(',')}`, + ); + return; + } + + // Step 2: 计算份额 sourceAdoptionId 列表(10B 偏移,与 calculateForPrePlantingOrder 一致) + const portionSourceAdoptionIds = sourceOrders.map( + (o) => PRE_PLANTING_SOURCE_ID_OFFSET + o.originalOrderId, + ); + + // Step 3: 幂等检查 —— 若新树算力记录已存在,直接跳过 + const alreadyProcessed = await this.contributionRecordRepository.existsBySourceAdoptionId( + mergeSourceAdoptionId, + ); + if (alreadyProcessed) { + this.logger.debug(`[PRE-PLANTING-MERGE] Already swapped (idempotent skip): ${mergeNo}`); + return; + } + + // Step 4: 获取合成时的全树算力单价(以 mining_enabled_at 为基准日,不除以5) + let contributionPerTree = new Decimal('22617'); + try { + contributionPerTree = await this.contributionRateService.getContributionPerTree(miningEnabledAt); + } catch (error) { + this.logger.warn(`[PRE-PLANTING-MERGE] Failed to get contribution rate, using default`, error); + } + + // Step 5: 构建虚拟 SyncedAdoption(1棵完整树,不÷5) + const firstOrder = sourceOrders[0]; + const virtualAdoption: SyncedAdoption = { + id: BigInt(0), + originalAdoptionId: mergeSourceAdoptionId, + accountSequence, + treeCount: 1, + adoptionDate: miningEnabledAt, + status: 'MINING_ENABLED', + selectedProvince: firstOrder.provinceCode || '', + selectedCity: firstOrder.cityCode || '', + contributionPerTree, + sourceSequenceNum: BigInt(0), + syncedAt: new Date(), + contributionDistributed: false, + contributionDistributedAt: null, + createdAt: miningEnabledAt, + }; + + // Step 6: 获取推荐关系链 + const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(accountSequence); + if (!userReferral) { + throw new Error( + `[PRE-PLANTING-MERGE] Referral not synced for ${accountSequence}, ` + + `cannot swap contribution for merge ${mergeNo}`, + ); + } + let ancestorChain: SyncedReferral[] = []; + if (userReferral.referrerAccountSequence) { + ancestorChain = await this.syncedDataRepository.findAncestorChain( + userReferral.referrerAccountSequence, + 15, + ); + } + + // Step 7: 获取算力账户 + const adopterAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + const ancestorAccounts = await this.contributionAccountRepository.findByAccountSequences( + ancestorChain.map((a) => a.accountSequence), + ); + + // Step 8: 使用领域计算器计算新树分配(复用现有逻辑) + const newTreeResult = this.domainCalculator.calculateAdoptionContribution( + virtualAdoption, + adopterAccount, + ancestorChain, + ancestorAccounts, + ); + + // Step 9: 在事务中执行算力切换(原子操作) + const expireRemark = + `预种合成:本份额已合并为1棵树(${mergeNo}),算力转为树级计算,已挖积分不受影响`; + const mergeRemark = + `预种合成树:由5份份额合并(${mergeNo})算力升级为整棵树,` + + `源订单:${sourceOrderNos.join('、')}`; + + await this.unitOfWork.executeInTransaction(async () => { + const tx = this.unitOfWork.getClient(); + + // 9a: 查询即将作废的个人算力总量(用于账户扣减) + const personalSumResult = await tx.contributionRecord.aggregate({ + where: { + sourceAdoptionId: { in: portionSourceAdoptionIds }, + sourceType: 'PERSONAL', + isExpired: false, + }, + _sum: { amount: true }, + }); + const expiredPersonalStr = personalSumResult._sum.amount?.toString() ?? '0'; + + // 9b: 作废旧份额算力记录(全部类型:PERSONAL + TEAM_LEVEL + TEAM_BONUS) + const expiredCount = await tx.contributionRecord.updateMany({ + where: { + sourceAdoptionId: { in: portionSourceAdoptionIds }, + isExpired: false, + }, + data: { + isExpired: true, + expiredAt: miningEnabledAt, + remark: expireRemark, + }, + }); + + this.logger.log( + `[PRE-PLANTING-MERGE] Expired ${expiredCount.count} portion records for merge ${mergeNo}`, + ); + + // 9c: 从认种人账户扣减旧个人算力(personal_contribution - 和 effective_contribution -) + if (parseFloat(expiredPersonalStr) > 0) { + await tx.contributionAccount.updateMany({ + where: { accountSequence }, + data: { + personalContribution: { decrement: expiredPersonalStr }, + effectiveContribution: { decrement: expiredPersonalStr }, + }, + }); + } + + // 9d: 创建新树算力分配记录(personal 70% + 团队15级 7.5% + 加成奖励 7.5%) + // 内部调用复用 saveDistributionResult(各 repository 自动使用事务 client) + await this.saveDistributionResult(newTreeResult, mergeSourceAdoptionId, accountSequence); + + // 9e: 为新树算力记录补充 remark(标注合成来源) + await tx.contributionRecord.updateMany({ + where: { sourceAdoptionId: mergeSourceAdoptionId }, + data: { remark: mergeRemark }, + }); + }); + + // Step 10: 插入幂等标记(最终一致性,事务提交后 best-effort 写入) + try { + await this.prePlantingPrisma.prePlantingSyncedMerge.create({ + data: { + mergeNo, + accountSequence, + sourceOrderNos, + newSourceAdoptionId: mergeSourceAdoptionId, + sourceTopic: result.sourceTopic, + sourceOffset: result.sourceOffset, + }, + }); + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[PRE-PLANTING-MERGE] Idempotency marker already exists: ${mergeNo}`); + } else { + this.logger.error(`[PRE-PLANTING-MERGE] Failed to insert idempotency marker: ${mergeNo}`, error); + } + } + + this.logger.log( + `[PRE-PLANTING-MERGE] Swap completed: mergeNo=${mergeNo}, ` + + `accountSequence=${accountSequence}, ` + + `expiredPortions=${portionSourceAdoptionIds.length}, ` + + `newSourceAdoptionId=${mergeSourceAdoptionId}`, + ); + } + private async publishContributionAccountUpdatedEvent( account: ContributionAccountAggregate, ): Promise { diff --git a/backend/services/contribution-service/src/pre-planting/domain/constants.ts b/backend/services/contribution-service/src/pre-planting/domain/constants.ts index 48751467..7b221721 100644 --- a/backend/services/contribution-service/src/pre-planting/domain/constants.ts +++ b/backend/services/contribution-service/src/pre-planting/domain/constants.ts @@ -50,3 +50,14 @@ export const PRE_PLANTING_CDC_TOPIC_PREFIX = 'cdc.pre-planting'; * 与现有 contribution-service-cdc-group 完全独立 */ export const PRE_PLANTING_CDC_GROUP_ID = 'contribution-pre-planting-cdc'; + +/** + * 合成树算力 sourceAdoptionId 偏移量 + * + * 用于在 contribution_records 中区分合成树算力和份额算力(10B 偏移)。 + * 合成树的 sourceAdoptionId = PRE_PLANTING_MERGE_SOURCE_ID_OFFSET + mergeId + * + * 选择 20,000,000,000 (200 亿) 作为偏移量,确保永远不会与正常认种(无偏移) + * 或预种份额(10B 偏移)冲突。 + */ +export const PRE_PLANTING_MERGE_SOURCE_ID_OFFSET = 20_000_000_000n; diff --git a/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts b/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts index a89bc738..36bd6dbc 100644 --- a/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts @@ -176,9 +176,13 @@ export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDest 'PRE_PLANTING_CDC_TOPIC_POSITIONS', 'cdc.pre-planting.public.pre_planting_positions', ); + const topicMerges = this.configService.get( + 'PRE_PLANTING_CDC_TOPIC_MERGES', + 'cdc.pre-planting.public.pre_planting_merges', + ); await this.consumer.subscribe({ - topics: [topicOrders, topicPositions], + topics: [topicOrders, topicPositions, topicMerges], fromBeginning: true, }); @@ -191,7 +195,7 @@ export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDest }); this.logger.log( - `[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}]`, + `[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}, ${topicMerges}]`, ); } catch (error) { this.logger.error('[PRE-PLANTING-CDC] Failed to start consumer', error); diff --git a/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts index 8506dd2a..65bd4a21 100644 --- a/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts +++ b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts @@ -11,6 +11,7 @@ import { PrePlantingCdcConsumerService } from './infrastructure/kafka/pre-planti // CDC Event Handlers import { PrePlantingOrderSyncedHandler } from './application/handlers/pre-planting-order-synced.handler'; import { PrePlantingPositionSyncedHandler } from './application/handlers/pre-planting-position-synced.handler'; +import { PrePlantingMergeSyncedHandler } from './application/handlers/pre-planting-merge-synced.handler'; import { PrePlantingCdcDispatcher } from './application/handlers/pre-planting-cdc-dispatcher'; // Application Services @@ -71,6 +72,7 @@ import { BonusClaimService } from '../application/services/bonus-claim.service'; // CDC Event Handlers PrePlantingOrderSyncedHandler, PrePlantingPositionSyncedHandler, + PrePlantingMergeSyncedHandler, PrePlantingCdcDispatcher, // Application Services (预种) diff --git a/backend/services/scripts/debezium/pre-planting-connector.json b/backend/services/scripts/debezium/pre-planting-connector.json index 4e3e3084..d263fab8 100644 --- a/backend/services/scripts/debezium/pre-planting-connector.json +++ b/backend/services/scripts/debezium/pre-planting-connector.json @@ -12,7 +12,7 @@ "topic.prefix": "cdc.pre-planting", - "table.include.list": "public.pre_planting_orders,public.pre_planting_positions", + "table.include.list": "public.pre_planting_orders,public.pre_planting_positions,public.pre_planting_merges", "plugin.name": "pgoutput", "publication.name": "debezium_pre_planting_publication",