fix(pre-planting): 修复 5 个预种模块 Bug + 补全 3 服务 migration

Bug #1 (HIGH): allocateFunds 从 Prisma 事务内移到事务外
  - pre-planting-reward.service.ts: distributeRewards 拆为
    prepareAndPersistRewards(事务内持久化)+ executeAllocations(事务后转账)
  - pre-planting-application.service.ts: 事务后调用 executeAllocations

Bug #2 (HIGH): signContract 后触发 hasPlanted
  - 签约事务成功后发布 PlantingOrderPaid 到 planting-events topic
  - wallet-service 消费后执行 markUserAsPlanted + settleUserPendingRewards
  - event-publisher.service.ts: 新增 publishRawToPlantingEvents 方法

Bug #3 (MEDIUM): PENDING 推荐奖励改为全部 SETTLED
  - 与现有认种行为对齐,推荐奖励立即发放

Bug #4 (HIGH): 补全 3 个服务的数据库迁移文件
  - planting-service: 4 张预种表(orders/positions/merges/reward_entries)
  - admin-service: 1 张配置表(pre_planting_configs)+ 默认数据
  - contribution-service: 4 张 CDC 追踪表(synced_orders/positions/freeze_states/processed_cdc_events)

Bug #5 (LOW): 合并循环 if→while,支持一次购买多份触发多次合并

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 18:09:14 -08:00
parent b14ad94e85
commit a11e4d0261
7 changed files with 335 additions and 51 deletions

View File

@ -0,0 +1,14 @@
-- CreateTable: 预种计划开关配置
CREATE TABLE "pre_planting_configs" (
"id" TEXT NOT NULL,
"is_active" BOOLEAN NOT NULL DEFAULT false,
"activated_at" TIMESTAMP(3),
"updated_at" TIMESTAMP(3) NOT NULL,
"updated_by" VARCHAR(50),
CONSTRAINT "pre_planting_configs_pkey" PRIMARY KEY ("id")
);
-- 插入默认配置(关闭状态)
INSERT INTO "pre_planting_configs" ("id", "is_active", "updated_at")
VALUES (gen_random_uuid(), false, NOW());

View File

@ -0,0 +1,89 @@
-- CreateTable: 预种 CDC 同步追踪 — 订单
CREATE TABLE "pre_planting_synced_orders" (
"id" BIGSERIAL NOT NULL,
"original_order_id" BIGINT NOT NULL,
"order_no" VARCHAR(50) NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"portion_count" INTEGER NOT NULL,
"price_per_portion" DECIMAL(20,8) NOT NULL,
"total_amount" DECIMAL(20,8) NOT NULL,
"province_code" VARCHAR(10) NOT NULL,
"city_code" VARCHAR(10) NOT NULL,
"status" VARCHAR(20) NOT NULL,
"merged_to_merge_id" BIGINT,
"paid_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL,
"contribution_per_portion" DECIMAL(20,10) NOT NULL,
"contribution_distributed" BOOLEAN NOT NULL DEFAULT false,
"contribution_distributed_at" TIMESTAMP(3),
"source_topic" VARCHAR(200) NOT NULL,
"source_offset" BIGINT NOT NULL,
"synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pre_planting_synced_orders_pkey" PRIMARY KEY ("id")
);
-- CreateTable: 预种 CDC 同步追踪 — 持仓
CREATE TABLE "pre_planting_synced_positions" (
"id" BIGSERIAL NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"total_portions" INTEGER NOT NULL DEFAULT 0,
"merged_portions" INTEGER NOT NULL DEFAULT 0,
"total_trees_merged" INTEGER NOT NULL DEFAULT 0,
"first_purchase_at" TIMESTAMP(3),
"source_topic" VARCHAR(200) NOT NULL,
"source_offset" BIGINT NOT NULL,
"synced_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pre_planting_synced_positions_pkey" PRIMARY KEY ("id")
);
-- CreateTable: 预种算力冻结状态
CREATE TABLE "pre_planting_freeze_states" (
"id" BIGSERIAL NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"total_portions" INTEGER NOT NULL DEFAULT 0,
"total_trees_merged" INTEGER NOT NULL DEFAULT 0,
"first_purchase_at" TIMESTAMP(3),
"is_frozen" BOOLEAN NOT NULL DEFAULT false,
"frozen_at" TIMESTAMP(3),
"unfrozen_at" TIMESTAMP(3),
"post_unfreeze_expire_date" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "pre_planting_freeze_states_pkey" PRIMARY KEY ("id")
);
-- CreateTable: 预种 CDC 幂等性追踪
CREATE TABLE "pre_planting_processed_cdc_events" (
"id" BIGSERIAL NOT NULL,
"source_topic" VARCHAR(200) NOT NULL,
"offset" BIGINT NOT NULL,
"table_name" VARCHAR(100) NOT NULL,
"operation" VARCHAR(10) NOT NULL,
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pre_planting_processed_cdc_events_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_synced_orders_original_order_id_key" ON "pre_planting_synced_orders"("original_order_id");
CREATE INDEX "pre_planting_synced_orders_account_sequence_idx" ON "pre_planting_synced_orders"("account_sequence");
CREATE INDEX "pre_planting_synced_orders_status_idx" ON "pre_planting_synced_orders"("status");
CREATE INDEX "pre_planting_synced_orders_contribution_distributed_idx" ON "pre_planting_synced_orders"("contribution_distributed");
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_synced_positions_user_id_key" ON "pre_planting_synced_positions"("user_id");
CREATE UNIQUE INDEX "pre_planting_synced_positions_account_sequence_key" ON "pre_planting_synced_positions"("account_sequence");
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_freeze_states_account_sequence_key" ON "pre_planting_freeze_states"("account_sequence");
CREATE INDEX "pre_planting_freeze_states_is_frozen_idx" ON "pre_planting_freeze_states"("is_frozen");
CREATE INDEX "pre_planting_freeze_states_first_purchase_at_idx" ON "pre_planting_freeze_states"("first_purchase_at");
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_processed_cdc_events_source_topic_offset_key" ON "pre_planting_processed_cdc_events"("source_topic", "offset");
CREATE INDEX "pre_planting_processed_cdc_events_processed_at_idx" ON "pre_planting_processed_cdc_events"("processed_at");

View File

@ -0,0 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "postgresql"

View File

@ -0,0 +1,103 @@
-- CreateTable: 预种订单表
CREATE TABLE "pre_planting_orders" (
"order_id" BIGSERIAL NOT NULL,
"order_no" VARCHAR(50) NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"portion_count" INTEGER NOT NULL DEFAULT 1,
"price_per_portion" DECIMAL(20,8) NOT NULL DEFAULT 3171,
"total_amount" DECIMAL(20,8) NOT NULL,
"province_code" VARCHAR(10) NOT NULL,
"city_code" VARCHAR(10) NOT NULL,
"status" VARCHAR(20) NOT NULL DEFAULT 'CREATED',
"merged_to_merge_id" BIGINT,
"merged_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"paid_at" TIMESTAMP(3),
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "pre_planting_orders_pkey" PRIMARY KEY ("order_id")
);
-- CreateTable: 预种持仓表(每用户一条)
CREATE TABLE "pre_planting_positions" (
"position_id" BIGSERIAL NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"total_portions" INTEGER NOT NULL DEFAULT 0,
"available_portions" INTEGER NOT NULL DEFAULT 0,
"merged_portions" INTEGER NOT NULL DEFAULT 0,
"total_trees_merged" INTEGER NOT NULL DEFAULT 0,
"province_code" VARCHAR(10),
"city_code" VARCHAR(10),
"first_purchase_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "pre_planting_positions_pkey" PRIMARY KEY ("position_id")
);
-- CreateTable: 预种合并记录表
CREATE TABLE "pre_planting_merges" (
"merge_id" BIGSERIAL NOT NULL,
"merge_no" VARCHAR(50) NOT NULL,
"user_id" BIGINT NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"source_order_nos" JSONB NOT NULL,
"tree_count" INTEGER NOT NULL DEFAULT 1,
"province_code" VARCHAR(10),
"city_code" VARCHAR(10),
"contract_status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"contract_signed_at" TIMESTAMP(3),
"mining_enabled_at" TIMESTAMP(3),
"merged_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "pre_planting_merges_pkey" PRIMARY KEY ("merge_id")
);
-- CreateTable: 预种分配记录表
CREATE TABLE "pre_planting_reward_entries" (
"entry_id" BIGSERIAL NOT NULL,
"source_order_no" VARCHAR(50) NOT NULL,
"source_account_sequence" VARCHAR(20) NOT NULL,
"recipient_account_sequence" VARCHAR(20) NOT NULL,
"right_type" VARCHAR(50) NOT NULL,
"usdt_amount" DECIMAL(20,8) NOT NULL,
"reward_status" VARCHAR(20) NOT NULL DEFAULT 'SETTLED',
"memo" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pre_planting_reward_entries_pkey" PRIMARY KEY ("entry_id")
);
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_orders_order_no_key" ON "pre_planting_orders"("order_no");
CREATE INDEX "pre_planting_orders_user_id_idx" ON "pre_planting_orders"("user_id");
CREATE INDEX "pre_planting_orders_account_sequence_idx" ON "pre_planting_orders"("account_sequence");
CREATE INDEX "pre_planting_orders_order_no_idx" ON "pre_planting_orders"("order_no");
CREATE INDEX "pre_planting_orders_status_idx" ON "pre_planting_orders"("status");
CREATE INDEX "pre_planting_orders_merged_to_merge_id_idx" ON "pre_planting_orders"("merged_to_merge_id");
CREATE INDEX "pre_planting_orders_created_at_idx" ON "pre_planting_orders"("created_at");
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_positions_user_id_key" ON "pre_planting_positions"("user_id");
CREATE UNIQUE INDEX "pre_planting_positions_account_sequence_key" ON "pre_planting_positions"("account_sequence");
CREATE INDEX "pre_planting_positions_user_id_idx" ON "pre_planting_positions"("user_id");
CREATE INDEX "pre_planting_positions_account_sequence_idx" ON "pre_planting_positions"("account_sequence");
CREATE INDEX "pre_planting_positions_total_trees_merged_idx" ON "pre_planting_positions"("total_trees_merged");
-- CreateIndex
CREATE UNIQUE INDEX "pre_planting_merges_merge_no_key" ON "pre_planting_merges"("merge_no");
CREATE INDEX "pre_planting_merges_user_id_idx" ON "pre_planting_merges"("user_id");
CREATE INDEX "pre_planting_merges_account_sequence_idx" ON "pre_planting_merges"("account_sequence");
CREATE INDEX "pre_planting_merges_merge_no_idx" ON "pre_planting_merges"("merge_no");
CREATE INDEX "pre_planting_merges_contract_status_idx" ON "pre_planting_merges"("contract_status");
-- CreateIndex
CREATE INDEX "pre_planting_reward_entries_source_order_no_idx" ON "pre_planting_reward_entries"("source_order_no");
CREATE INDEX "pre_planting_reward_entries_source_account_sequence_idx" ON "pre_planting_reward_entries"("source_account_sequence");
CREATE INDEX "pre_planting_reward_entries_recipient_account_sequence_idx" ON "pre_planting_reward_entries"("recipient_account_sequence");
CREATE INDEX "pre_planting_reward_entries_right_type_idx" ON "pre_planting_reward_entries"("right_type");
CREATE INDEX "pre_planting_reward_entries_reward_status_idx" ON "pre_planting_reward_entries"("reward_status");
CREATE INDEX "pre_planting_reward_entries_created_at_idx" ON "pre_planting_reward_entries"("created_at");

View File

@ -145,6 +145,42 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
}
}
/**
* planting-events topic
*
* publishToPlantingEvents eventType
* 使 eventType wallet-service
* markUserAsPlanted + settleUserPendingRewards
*
* wallet-service 消费者匹配: eventType === 'PlantingOrderPaid'
*/
async publishRawToPlantingEvents(
key: string,
eventType: string,
payload: Record<string, unknown>,
): Promise<void> {
const message = {
key,
value: JSON.stringify({
eventType,
payload,
occurredAt: new Date().toISOString(),
}),
};
if (!this.isConnected) {
this.logger.warn(`[PUBLISH] Kafka not connected, skipping raw planting-events for ${eventType}`);
return;
}
try {
this.kafkaClient.emit('planting-events', message);
this.logger.log(`[PUBLISH] ✓ Raw event ${eventType} published to topic planting-events`);
} catch (error) {
this.logger.error(`[PUBLISH] ✗ Failed to publish raw to planting-events:`, error);
}
}
/**
* (reward-service )
*/

View File

@ -14,6 +14,7 @@ import { PrePlantingPositionRepository } from '../../infrastructure/repositories
import { PrePlantingMergeRepository } from '../../infrastructure/repositories/pre-planting-merge.repository';
import { PrePlantingRewardService } from './pre-planting-reward.service';
import { PrePlantingAdminClient } from '../../infrastructure/external/pre-planting-admin.client';
import { EventPublisherService } from '../../../infrastructure/kafka/event-publisher.service';
@Injectable()
export class PrePlantingApplicationService {
@ -28,6 +29,7 @@ export class PrePlantingApplicationService {
private readonly mergeRepo: PrePlantingMergeRepository,
private readonly rewardService: PrePlantingRewardService,
private readonly adminClient: PrePlantingAdminClient,
private readonly eventPublisher: EventPublisherService,
) {}
/**
@ -63,9 +65,11 @@ export class PrePlantingApplicationService {
let merged = false;
let mergeNo: string | undefined;
let rewardAllocations: import('./pre-planting-reward.service').RewardAllocation[] = [];
try {
// Step 3-4: 事务内处理(创建订单 + 更新持仓 + 分配记录 + outbox
// 注意:事务内只做 DB 写入,不做 HTTP 调用
await this.prisma.$transaction(async (tx) => {
// 创建预种订单
const order = PrePlantingOrder.create(
@ -95,8 +99,8 @@ export class PrePlantingApplicationService {
await this.orderRepo.save(tx, order);
await this.positionRepo.save(tx, position);
// 分配 10 类权益(在事务内记录,事务外执行转账)
await this.rewardService.distributeRewards(
// 分配 10 类权益 — 事务内只持久化记录,返回 allocations 供事务后转账
rewardAllocations = await this.rewardService.prepareAndPersistRewards(
tx,
orderNo,
accountSequence,
@ -118,11 +122,11 @@ export class PrePlantingApplicationService {
aggregateType: event.aggregateType,
}));
// Step 6: 检查是否触发合并
if (position.canMerge()) {
// Step 6: 检查是否触发合并(循环处理,支持一次购买多份触发多次合并)
const mergeResults: { mergeNo: string; domainEvents: typeof order.domainEvents }[] = [];
while (position.canMerge()) {
const mergeResult = await this.performMerge(tx, userId, accountSequence, position);
merged = true;
mergeNo = mergeResult.mergeNo;
mergeResults.push(mergeResult);
// 合并事件也写入 Outbox
for (const event of mergeResult.domainEvents) {
@ -140,15 +144,22 @@ export class PrePlantingApplicationService {
}
}
if (mergeResults.length > 0) {
merged = true;
mergeNo = mergeResults[mergeResults.length - 1].mergeNo;
}
await this.outboxRepo.saveInTransaction(tx, outboxEvents);
});
// Step 5: 确认扣款(事务成功后)
// Step 5: 事务成功后执行资金转账HTTP 调用,不在事务内
await this.walletClient.confirmPlantingDeduction({
userId: userId.toString(),
accountSequence,
orderId: orderNo,
});
await this.rewardService.executeAllocations(orderNo, rewardAllocations);
} catch (error) {
// 事务失败,解冻余额
this.logger.error(
@ -184,7 +195,8 @@ export class PrePlantingApplicationService {
): Promise<void> {
this.logger.log(`[PRE-PLANTING] Sign contract: userId=${userId}, mergeNo=${mergeNo}`);
await this.prisma.$transaction(async (tx) => {
// 事务内签约,返回合并记录数据供后续事件发布
const mergeData = await this.prisma.$transaction(async (tx) => {
const merge = await this.mergeRepo.findByMergeNo(tx, mergeNo);
if (!merge) {
throw new NotFoundException(`合并记录 ${mergeNo} 不存在`);
@ -209,10 +221,42 @@ export class PrePlantingApplicationService {
aggregateType: event.aggregateType,
}));
await this.outboxRepo.saveInTransaction(tx, outboxEvents);
return {
mergeNo: merge.mergeNo,
accountSequence: merge.accountSequence,
userId: merge.userId.toString(),
treeCount: merge.treeCount,
};
});
// 事务成功后,设置 hasPlanted=true调用 wallet-service
// TODO: 调用 wallet-service 设置 hasPlanted
// 事务成功后,发布 PlantingOrderPaid 到 planting-events topic
// wallet-service 消费此事件触发:
// 1. markUserAsPlanted(accountSequence) — 设置 hasPlanted=true
// 2. settleUserPendingRewards(accountSequence) — 结算所有 PENDING 奖励
try {
await this.eventPublisher.publishRawToPlantingEvents(
mergeData.mergeNo,
'PlantingOrderPaid',
{
orderNo: mergeData.mergeNo,
accountSequence: mergeData.accountSequence,
userId: mergeData.userId,
treeCount: mergeData.treeCount,
},
);
this.logger.log(
`[PRE-PLANTING] PlantingOrderPaid event published for merge ${mergeNo}, ` +
`accountSequence=${mergeData.accountSequence}`,
);
} catch (error) {
// 发布失败不阻塞签约流程,后续可通过补偿机制重试
this.logger.error(
`[PRE-PLANTING] Failed to publish PlantingOrderPaid for merge ${mergeNo}`,
error,
);
}
this.logger.log(`[PRE-PLANTING] Contract signed: mergeNo=${mergeNo}`);
}

View File

@ -34,23 +34,20 @@ export class PrePlantingRewardService {
) {}
/**
* 10
* Step 3-4: 计算分配对象 +
*
* Step 3-5 in the purchase flow:
* 3. 10
* 4.
* 5.
* allocations HTTP
*/
async distributeRewards(
async prepareAndPersistRewards(
tx: Prisma.TransactionClient,
orderNo: string,
accountSequence: string,
provinceCode: string,
cityCode: string,
portionCount: number,
): Promise<void> {
): Promise<RewardAllocation[]> {
this.logger.log(
`[PRE-PLANTING] Distributing rewards for order ${orderNo}, ` +
`[PRE-PLANTING] Preparing rewards for order ${orderNo}, ` +
`${portionCount} portion(s), province=${provinceCode}, city=${cityCode}`,
);
@ -75,11 +72,36 @@ export class PrePlantingRewardService {
await this.rewardEntryRepo.saveMany(tx, entries);
// Step 5: 执行资金转账(调用 wallet-service 已有 API
await this.executeAllocations(orderNo, allocations);
this.logger.log(
`[PRE-PLANTING] Rewards persisted: ${allocations.length} allocations for order ${orderNo}`,
);
return allocations;
}
/**
* Step 5: 事务提交后执行资金转账HTTP wallet-service
*/
async executeAllocations(
orderNo: string,
allocations: RewardAllocation[],
): Promise<void> {
// 只转 SETTLED 状态的分配
const settledAllocations = allocations.filter(
(a) => a.rewardStatus === PrePlantingRewardStatus.SETTLED,
);
await this.walletClient.allocateFunds({
orderId: orderNo,
allocations: settledAllocations.map((a) => ({
targetType: a.rightType as unknown as import('../../../domain/value-objects/fund-allocation-target-type.enum').FundAllocationTargetType,
amount: a.amount,
targetAccountId: a.recipientAccountSequence,
})),
});
this.logger.log(
`[PRE-PLANTING] Rewards distributed: ${allocations.length} allocations for order ${orderNo}`,
`[PRE-PLANTING] Funds allocated: ${settledAllocations.length} transfers for order ${orderNo}`,
);
}
@ -147,18 +169,15 @@ export class PrePlantingRewardService {
]);
// 推荐奖励 (SHARE_RIGHT)
// 与现有认种一致:推荐奖励立即发放到推荐人钱包,无论推荐人是否已认种
const referrer = referralInfo.directReferrer;
if (referrer) {
allocations.push({
recipientAccountSequence: referrer.accountSequence,
rightType: PrePlantingRightType.SHARE_RIGHT,
amount: PRE_PLANTING_RIGHT_AMOUNTS.SHARE_RIGHT * multiplier,
memo: referrer.hasPlanted
? '预种推荐奖励(立即到账)'
: '预种推荐奖励(待推荐人认种后生效)',
rewardStatus: referrer.hasPlanted
? PrePlantingRewardStatus.SETTLED
: PrePlantingRewardStatus.PENDING,
memo: '预种推荐奖励',
rewardStatus: PrePlantingRewardStatus.SETTLED,
});
} else {
allocations.push({
@ -218,28 +237,4 @@ export class PrePlantingRewardService {
return allocations;
}
/**
* wallet-service API
*/
private async executeAllocations(
orderNo: string,
allocations: RewardAllocation[],
): Promise<void> {
// 只转 SETTLED 状态的分配
const settledAllocations = allocations.filter(
(a) => a.rewardStatus === PrePlantingRewardStatus.SETTLED,
);
// wallet-service 的 allocateFunds API 接受通用分配数据
// 预种的 rightType 不属于现有 FundAllocationTargetType 枚举,
// 但 wallet-service 内部实际只使用 targetAccountId 做转账
await this.walletClient.allocateFunds({
orderId: orderNo,
allocations: settledAllocations.map((a) => ({
targetType: a.rightType as unknown as import('../../../domain/value-objects/fund-allocation-target-type.enum').FundAllocationTargetType,
amount: a.amount,
targetAccountId: a.recipientAccountSequence,
})),
});
}
}