feat(pre-planting): 3171 预种计划 2.0 算力集成(contribution-service CDC 模块)

Phase 5:将预种数据集成到 contribution-service 2.0 算力体系。

=== 新增文件(11 个) ===
- prisma/pre-planting/schema.prisma:独立 Prisma schema(4 张追踪表)
- PrePlantingPrismaService + Module:独立 PrismaClient
- PrePlantingCdcConsumerService:独立 CDC 消费者(consumer group: contribution-pre-planting-cdc)
- PrePlantingOrderSyncedHandler:订单 CDC handler + synced_adoptions marker 插入
- PrePlantingPositionSyncedHandler:持仓 CDC handler
- PrePlantingCdcDispatcher:CDC 事件分发器
- PrePlantingContributionService:1/5 算力计算(复用领域计算器)
- PrePlantingFreezeScheduler:每日冻结/解冻调度(凌晨 5 点)
- PrePlantingCdcModule:模块注册
- constants.ts:10B 偏移量、冻结期限等常量

=== 隔离保证 ===
- 独立 Kafka consumer group(contribution-pre-planting-cdc)
- 独立 CDC topics(cdc.pre-planting.public.*)
- 独立 Prisma schema + generated client
- sourceAdoptionId 使用 10,000,000,000 偏移避免 ID 冲突
- synced_adoptions marker: contributionDistributed=true + treeCount=0
- 不更新 NetworkAdoptionProgress(预种不推高全网算力系数)
- 现有代码文件零修改(仅 app.module.ts 加 1 行 import)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-18 05:25:14 -08:00
parent 010b0392fd
commit e1cd8ed7f2
13 changed files with 1879 additions and 1 deletions

View File

@ -25,7 +25,12 @@
"prisma:generate": "prisma generate", "prisma:generate": "prisma generate",
"prisma:migrate": "prisma migrate dev", "prisma:migrate": "prisma migrate dev",
"prisma:migrate:prod": "prisma migrate deploy", "prisma:migrate:prod": "prisma migrate deploy",
"prisma:studio": "prisma studio" "prisma:studio": "prisma studio",
"prisma:pre-planting:generate": "prisma generate --schema=prisma/pre-planting/schema.prisma",
"prisma:pre-planting:migrate": "prisma migrate dev --schema=prisma/pre-planting/schema.prisma",
"prisma:pre-planting:migrate:prod": "prisma migrate deploy --schema=prisma/pre-planting/schema.prisma",
"prisma:pre-planting:studio": "prisma studio --schema=prisma/pre-planting/schema.prisma",
"prisma:all:generate": "npm run prisma:generate && npm run prisma:pre-planting:generate"
}, },
"dependencies": { "dependencies": {
"@nestjs/common": "^10.0.0", "@nestjs/common": "^10.0.0",

View File

@ -0,0 +1,132 @@
// ============================================
// [2026-02-17] 预种计划独立 Prisma Schema
// ============================================
//
// 本 schema 仅包含预种计划在 contribution-service 中的追踪表。
// 与主 schema (prisma/schema.prisma) 完全隔离,拥有独立的:
// - Prisma Client生成到 src/pre-planting/infrastructure/prisma/generated/
// - Migration 目录prisma/pre-planting/migrations/
//
// 预种的算力分配结果仍然写入主 schema 的 contribution_accounts、
// contribution_records 等表(通过现有 Repository以便挖矿系统读取。
// 本 schema 仅负责预种 CDC 同步追踪、冻结状态等预种专属数据。
generator client {
provider = "prisma-client-js"
output = "../../src/pre-planting/infrastructure/prisma/generated"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// ============================================
// 预种 CDC 同步追踪表
// ============================================
/// 预种订单同步记录(从 planting-service CDC 同步)
/// 用于追踪每笔预种订单的算力分配状态
model PrePlantingSyncedOrder {
id BigInt @id @default(autoincrement())
originalOrderId BigInt @unique @map("original_order_id")
orderNo String @map("order_no") @db.VarChar(50)
userId BigInt @map("user_id")
accountSequence String @map("account_sequence") @db.VarChar(20)
portionCount Int @map("portion_count")
pricePerPortion Decimal @map("price_per_portion") @db.Decimal(20, 8)
totalAmount Decimal @map("total_amount") @db.Decimal(20, 8)
provinceCode String @map("province_code") @db.VarChar(10)
cityCode String @map("city_code") @db.VarChar(10)
status String @map("status") @db.VarChar(20) // CREATED, PAID, MERGED
mergedToMergeId BigInt? @map("merged_to_merge_id")
paidAt DateTime? @map("paid_at")
createdAt DateTime @map("created_at")
// 算力追踪
contributionPerPortion Decimal @map("contribution_per_portion") @db.Decimal(20, 10)
contributionDistributed Boolean @default(false) @map("contribution_distributed")
contributionDistributedAt DateTime? @map("contribution_distributed_at")
// CDC 同步元数据
sourceTopic String @map("source_topic") @db.VarChar(200)
sourceOffset BigInt @map("source_offset")
syncedAt DateTime @default(now()) @map("synced_at")
@@index([accountSequence])
@@index([status])
@@index([contributionDistributed])
@@map("pre_planting_synced_orders")
}
/// 预种持仓同步记录(从 planting-service CDC 同步)
/// 用于追踪用户预种总量,判断冻结条件
model PrePlantingSyncedPosition {
id BigInt @id @default(autoincrement())
userId BigInt @unique @map("user_id")
accountSequence String @unique @map("account_sequence") @db.VarChar(20)
totalPortions Int @default(0) @map("total_portions")
mergedPortions Int @default(0) @map("merged_portions")
totalTreesMerged Int @default(0) @map("total_trees_merged")
firstPurchaseAt DateTime? @map("first_purchase_at")
// CDC 同步元数据
sourceTopic String @map("source_topic") @db.VarChar(200)
sourceOffset BigInt @map("source_offset")
syncedAt DateTime @default(now()) @map("synced_at")
@@map("pre_planting_synced_positions")
}
// ============================================
// 预种冻结状态表
// ============================================
/// 预种算力冻结状态(每用户一条)
///
/// 冻结规则:
/// - firstPurchaseAt + 1 年后仍未满 5 份 → 所有预种算力冻结(暂停分配)
/// - 后续累积满 5 份 → 解冻,恢复分配
/// - 解冻后的失效期 = 解冻日起算 + 2 年
/// - 未被冻结过的正常到期 = 首次产生挖矿收益日 + 2 年
model PrePlantingFreezeState {
id BigInt @id @default(autoincrement())
accountSequence String @unique @map("account_sequence") @db.VarChar(20)
totalPortions Int @default(0) @map("total_portions")
totalTreesMerged Int @default(0) @map("total_trees_merged")
firstPurchaseAt DateTime? @map("first_purchase_at")
// 冻结状态
isFrozen Boolean @default(false) @map("is_frozen")
frozenAt DateTime? @map("frozen_at")
unfrozenAt DateTime? @map("unfrozen_at")
// 解冻后的过期日期(解冻日 + 2 年)
postUnfreezeExpireDate DateTime? @map("post_unfreeze_expire_date")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@index([isFrozen])
@@index([firstPurchaseAt])
@@map("pre_planting_freeze_states")
}
// ============================================
// 预种 CDC 幂等性追踪表
// ============================================
/// 已处理的预种 CDC 事件(幂等性保证)
/// 使用 (sourceTopic, offset) 作为复合唯一键
model PrePlantingProcessedCdcEvent {
id BigInt @id @default(autoincrement())
sourceTopic String @map("source_topic") @db.VarChar(200)
offset BigInt @map("offset")
tableName String @map("table_name") @db.VarChar(100)
operation String @map("operation") @db.VarChar(10)
processedAt DateTime @default(now()) @map("processed_at")
@@unique([sourceTopic, offset])
@@index([processedAt])
@@map("pre_planting_processed_cdc_events")
}

View File

@ -8,6 +8,8 @@ import { DomainExceptionFilter } from './shared/filters/domain-exception.filter'
import { TransformInterceptor } from './shared/interceptors/transform.interceptor'; import { TransformInterceptor } from './shared/interceptors/transform.interceptor';
import { LoggingInterceptor } from './shared/interceptors/logging.interceptor'; import { LoggingInterceptor } from './shared/interceptors/logging.interceptor';
import { JwtAuthGuard } from './shared/guards/jwt-auth.guard'; import { JwtAuthGuard } from './shared/guards/jwt-auth.guard';
// [2026-02-17] 新增:预种 CDC 集成模块(纯新增,与现有 CDC 消费零耦合)
import { PrePlantingCdcModule } from './pre-planting/pre-planting-cdc.module';
@Module({ @Module({
imports: [ imports: [
@ -23,6 +25,7 @@ import { JwtAuthGuard } from './shared/guards/jwt-auth.guard';
InfrastructureModule, InfrastructureModule,
ApplicationModule, ApplicationModule,
ApiModule, ApiModule,
PrePlantingCdcModule, // 预种计划:独立 CDC consumer、独立 Prisma schema、1/5 算力、冻结调度
], ],
providers: [ providers: [
{ {

View File

@ -0,0 +1,54 @@
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { PrePlantingCdcConsumerService } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service';
import { PrePlantingOrderSyncedHandler, PrePlantingOrderSyncResult } from './pre-planting-order-synced.handler';
import { PrePlantingPositionSyncedHandler } from './pre-planting-position-synced.handler';
/**
* CDC
*
* [2026-02-17] CDC
*
* === CDCEventDispatcher ===
* - CDCEventDispatcher
* - 使 PrePlantingCdcConsumerService consumer group topics
* - handler + user_accounts / referral_relationships
*
* === ===
* - CDCEventDispatcher
* - dispatcher
*/
@Injectable()
export class PrePlantingCdcDispatcher implements OnModuleInit {
private readonly logger = new Logger(PrePlantingCdcDispatcher.name);
constructor(
private readonly cdcConsumer: PrePlantingCdcConsumerService,
private readonly orderHandler: PrePlantingOrderSyncedHandler,
private readonly positionHandler: PrePlantingPositionSyncedHandler,
) {}
async onModuleInit() {
// 注册预种订单表 handler带后置回调事务提交后计算算力
this.cdcConsumer.registerHandler<PrePlantingOrderSyncResult | null>(
'pre_planting_orders',
this.orderHandler.handle.bind(this.orderHandler),
this.orderHandler.calculateAfterCommit.bind(this.orderHandler),
);
// 注册预种持仓表 handler无后置回调纯同步不触发算力计算
this.cdcConsumer.registerHandler(
'pre_planting_positions',
this.positionHandler.handle.bind(this.positionHandler),
);
// 非阻塞启动 CDC 消费者
this.cdcConsumer.start()
.then(() => {
this.logger.log('[PRE-PLANTING-CDC] Dispatcher started with handlers: pre_planting_orders, pre_planting_positions');
})
.catch((error) => {
this.logger.error('[PRE-PLANTING-CDC] Failed to start dispatcher', error);
// 不抛出错误,允许服务在没有预种 CDC 的情况下启动
});
}
}

View File

@ -0,0 +1,324 @@
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import Decimal from 'decimal.js';
import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service';
import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service';
import { PrePlantingContributionService } from '../services/pre-planting-contribution.service';
import { ContributionRateService } from '@/application/services/contribution-rate.service';
import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants';
/**
*
*/
export interface PrePlantingOrderSyncResult {
originalOrderId: bigint;
needsCalculation: boolean;
}
/**
* CDC
*
* [2026-02-17] cdc.pre-planting.public.pre_planting_orders CDC
*
* === ===
* 1. pre_planting_synced_orders
* 2. PAID
* 3. synced_adoptions marker getDirectReferralAdoptedCount
* 4. 1/5 PostCommitCallback
*
* === ===
* -
* - synced_adoptions marker contributionDistributed=true
* processUndistributedAdoptions
* - marker treeCount=0使
*/
@Injectable()
export class PrePlantingOrderSyncedHandler {
private readonly logger = new Logger(PrePlantingOrderSyncedHandler.name);
constructor(
private readonly prePlantingPrisma: PrePlantingPrismaService,
private readonly contributionService: PrePlantingContributionService,
private readonly contributionRateService: ContributionRateService,
) {}
/**
* CDC PrismaService
*
* @param event CDC
* @param tx Prisma
* @returns
*/
async handle(
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingOrderSyncResult | null> {
const { op, before, after } = event.payload;
const data = after || before;
if (!data) {
this.logger.warn(`[PRE-PLANTING-ORDER] Empty data, op=${op}`);
return null;
}
this.logger.log(
`[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.id}, status=${data.status}`,
);
switch (op) {
case 'c': // create
case 'r': // read (snapshot)
return await this.handleCreateOrSnapshot(data, event, tx);
case 'u': // update
return await this.handleUpdate(data, before, event, tx);
case 'd': // delete
this.logger.warn(`[PRE-PLANTING-ORDER] Delete event: id=${data.id}`);
return null;
default:
this.logger.warn(`[PRE-PLANTING-ORDER] Unknown op: ${op}`);
return null;
}
}
/**
*
*/
async calculateAfterCommit(result: PrePlantingOrderSyncResult): Promise<void> {
if (!result?.needsCalculation) return;
this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderId=${result.originalOrderId}`);
try {
await this.contributionService.calculateForPrePlantingOrder(result.originalOrderId);
this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderId=${result.originalOrderId}`);
} catch (error) {
// 算力计算失败不影响数据同步,后续调度器会重试
this.logger.error(
`[PRE-PLANTING-ORDER] Contribution calculation failed: orderId=${result.originalOrderId}`,
error,
);
}
}
private async handleCreateOrSnapshot(
data: any,
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingOrderSyncResult | null> {
const orderId = BigInt(data.id);
const accountSequence = data.account_sequence || data.accountSequence;
const status = data.status;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`);
return null;
}
// 获取当日贡献值
const paidAt = data.paid_at || data.paidAt || data.created_at || data.createdAt;
let contributionPerTree = new Decimal('22617');
if (paidAt) {
try {
contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(paidAt));
} catch (error) {
this.logger.warn(`[PRE-PLANTING-ORDER] Failed to get rate, using default`, error);
}
}
const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR);
// 同步到预种追踪表(事务外,最终一致性)
await this.syncToTrackingTable(data, event, contributionPerPortion);
// 当状态为 PAID 时,在 synced_adoptions 中插入 marker用于 unlock 计数)
const needsCalculation = status === 'PAID';
if (needsCalculation) {
await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx);
}
return {
originalOrderId: orderId,
needsCalculation,
};
}
private async handleUpdate(
after: any,
before: any,
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
): Promise<PrePlantingOrderSyncResult | null> {
const orderId = BigInt(after.id);
const accountSequence = after.account_sequence || after.accountSequence;
const newStatus = after.status;
const oldStatus = before?.status;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`);
return null;
}
// 获取当日贡献值
const paidAt = after.paid_at || after.paidAt || after.created_at || after.createdAt;
let contributionPerTree = new Decimal('22617');
if (paidAt) {
try {
contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(paidAt));
} catch (error) {
this.logger.warn(`[PRE-PLANTING-ORDER] Failed to get rate, using default`, error);
}
}
const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR);
// 同步到预种追踪表
await this.syncToTrackingTable(after, event, contributionPerPortion);
// 只在状态变为 PAID且之前不是 PAID时触发算力计算
const statusChangedToPaid = newStatus === 'PAID' && oldStatus !== 'PAID';
if (statusChangedToPaid) {
await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx);
}
// 检查是否已分配
const alreadyDistributed = await this.isAlreadyDistributed(orderId);
return {
originalOrderId: orderId,
needsCalculation: statusChangedToPaid && !alreadyDistributed,
};
}
/**
* pre_planting_synced_orders
* 使 PrePlantingPrismaService schema
*/
private async syncToTrackingTable(
data: any,
event: PrePlantingCdcEvent,
contributionPerPortion: Decimal,
): Promise<void> {
try {
const orderId = BigInt(data.id);
await this.prePlantingPrisma.prePlantingSyncedOrder.upsert({
where: { originalOrderId: orderId },
create: {
originalOrderId: orderId,
orderNo: data.order_no || data.orderNo || '',
userId: BigInt(data.user_id || data.userId || 0),
accountSequence: data.account_sequence || data.accountSequence,
portionCount: data.portion_count || data.portionCount || 1,
pricePerPortion: data.price_per_portion || data.pricePerPortion || 3171,
totalAmount: data.total_amount || data.totalAmount || 3171,
provinceCode: data.province_code || data.provinceCode || '',
cityCode: data.city_code || data.cityCode || '',
status: data.status || 'CREATED',
mergedToMergeId: data.merged_to_merge_id ? BigInt(data.merged_to_merge_id) : null,
paidAt: data.paid_at ? new Date(data.paid_at) : null,
createdAt: new Date(data.created_at || data.createdAt || new Date()),
contributionPerPortion,
sourceTopic: event.topic,
sourceOffset: event.offset,
},
update: {
orderNo: data.order_no || data.orderNo || '',
status: data.status || 'CREATED',
mergedToMergeId: data.merged_to_merge_id ? BigInt(data.merged_to_merge_id) : null,
paidAt: data.paid_at ? new Date(data.paid_at) : null,
contributionPerPortion,
sourceTopic: event.topic,
sourceOffset: event.offset,
syncedAt: new Date(),
},
});
} catch (error) {
this.logger.error(`[PRE-PLANTING-ORDER] Failed to sync tracking table`, error);
// 追踪表同步失败不影响主流程(最终一致性)
}
}
/**
* synced_adoptions marker
*
* getDirectReferralAdoptedCount
*
* marker
* - originalAdoptionId = 10,000,000,000 + prePlantingOrderId
* - treeCount = 0 PrePlantingContributionService
* - status = 'MINING_ENABLED' getDirectReferralAdoptedCount
* - contributionDistributed = true
* - contributionPerTree = 0
*/
private async ensureAdoptionMarker(
accountSequence: string,
orderId: bigint,
paidAt: string | null,
tx: Prisma.TransactionClient,
): Promise<void> {
// 检查是否已有 marker通过检查 accountSequence 在大 ID 范围)
const existingMarker = await tx.syncedAdoption.findFirst({
where: {
accountSequence,
originalAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET },
},
});
if (existingMarker) {
this.logger.debug(
`[PRE-PLANTING-ORDER] Marker already exists for ${accountSequence}, skipping`,
);
return;
}
// 同时检查是否已有正常认种记录(已经是正式认种用户)
const existingAdoption = await tx.syncedAdoption.findFirst({
where: {
accountSequence,
status: 'MINING_ENABLED',
originalAdoptionId: { lt: PRE_PLANTING_SOURCE_ID_OFFSET },
},
});
if (existingAdoption) {
this.logger.debug(
`[PRE-PLANTING-ORDER] User ${accountSequence} already has regular adoption, no marker needed`,
);
return;
}
const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderId;
const adoptionDate = paidAt ? new Date(paidAt) : new Date();
await tx.syncedAdoption.create({
data: {
originalAdoptionId: markerAdoptionId,
accountSequence,
treeCount: 0,
adoptionDate,
status: 'MINING_ENABLED',
contributionPerTree: 0,
contributionDistributed: true,
contributionDistributedAt: new Date(),
distributionSummary: 'PRE_PLANTING_MARKER',
sourceSequenceNum: BigInt(0),
syncedAt: new Date(),
},
});
this.logger.log(
`[PRE-PLANTING-ORDER] Inserted adoption marker for ${accountSequence}: id=${markerAdoptionId}`,
);
}
/**
*
*/
private async isAlreadyDistributed(orderId: bigint): Promise<boolean> {
try {
const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({
where: { originalOrderId: orderId },
select: { contributionDistributed: true },
});
return order?.contributionDistributed ?? false;
} catch {
return false;
}
}
}

View File

@ -0,0 +1,116 @@
import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service';
import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service';
/**
* CDC
*
* [2026-02-17] cdc.pre-planting.public.pre_planting_positions CDC
*
* === ===
* pre_planting_synced_positions pre_planting_freeze_states
* (PrePlantingFreezeScheduler) /
*
* === ===
* handler
*/
@Injectable()
export class PrePlantingPositionSyncedHandler {
private readonly logger = new Logger(PrePlantingPositionSyncedHandler.name);
constructor(
private readonly prePlantingPrisma: PrePlantingPrismaService,
) {}
/**
* CDC
*
* handler PrismaService
* PrePlantingPrismaService
*/
async handle(
event: PrePlantingCdcEvent,
_tx: Prisma.TransactionClient,
): Promise<void> {
const { op, before, after } = event.payload;
const data = after || before;
if (!data) {
this.logger.warn(`[PRE-PLANTING-POSITION] Empty data, op=${op}`);
return;
}
if (op === 'd') {
this.logger.warn(`[PRE-PLANTING-POSITION] Delete event: userId=${data.user_id}`);
return;
}
const userId = BigInt(data.user_id || data.userId || 0);
const accountSequence = data.account_sequence || data.accountSequence;
if (!accountSequence) {
this.logger.warn(`[PRE-PLANTING-POSITION] Missing accountSequence for userId=${userId}`);
return;
}
const totalPortions = data.total_portions || data.totalPortions || 0;
const mergedPortions = data.merged_portions || data.mergedPortions || 0;
const totalTreesMerged = data.total_trees_merged || data.totalTreesMerged || 0;
const firstPurchaseAt = data.first_purchase_at || data.firstPurchaseAt;
this.logger.log(
`[PRE-PLANTING-POSITION] Sync: account=${accountSequence}, ` +
`portions=${totalPortions}, merged=${totalTreesMerged}`,
);
// 同步到 PrePlantingSyncedPosition
try {
await this.prePlantingPrisma.prePlantingSyncedPosition.upsert({
where: { userId },
create: {
userId,
accountSequence,
totalPortions,
mergedPortions,
totalTreesMerged,
firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null,
sourceTopic: event.topic,
sourceOffset: event.offset,
},
update: {
accountSequence,
totalPortions,
mergedPortions,
totalTreesMerged,
firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null,
sourceTopic: event.topic,
sourceOffset: event.offset,
syncedAt: new Date(),
},
});
} catch (error) {
this.logger.error(`[PRE-PLANTING-POSITION] Failed to sync position`, error);
}
// 同步到 PrePlantingFreezeState用于冻结调度器
try {
await this.prePlantingPrisma.prePlantingFreezeState.upsert({
where: { accountSequence },
create: {
accountSequence,
totalPortions,
totalTreesMerged,
firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null,
},
update: {
totalPortions,
totalTreesMerged,
firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null,
},
});
} catch (error) {
this.logger.error(`[PRE-PLANTING-POSITION] Failed to sync freeze state`, error);
}
}
}

View File

@ -0,0 +1,226 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '@/infrastructure/redis/redis.service';
import {
PRE_PLANTING_SOURCE_ID_OFFSET,
PRE_PLANTING_FREEZE_PERIOD_YEARS,
PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS,
} from '../../domain/constants';
/**
*
*
* [2026-02-17] /
*
* === ===
* - firstPurchaseAt + 1 5 totalPortions < 5
* - expiredis_expired=true
*
* === ===
* - 5 totalPortions >= 5
* - = + 2
*
* === ===
* - + 2 processExpiredRecords
*
* === ===
* - /使 PrismaService contribution_records
* - sourceAdoptionId >= 10,000,000,000
* - ContributionRecordRepository
*/
@Injectable()
export class PrePlantingFreezeScheduler {
private readonly logger = new Logger(PrePlantingFreezeScheduler.name);
private readonly LOCK_KEY = 'pre-planting:freeze:lock';
constructor(
private readonly prePlantingPrisma: PrePlantingPrismaService,
private readonly prisma: PrismaService, // 主 PrismaService直接查询 contribution_records
private readonly redis: RedisService,
) {}
/**
* 5
*
* 5
* - 1-4 snapshot, expire, full-sync
* -
*/
@Cron('0 5 * * *')
async checkFreezeConditions(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:check`, 300);
if (!lockValue) return;
try {
const now = new Date();
let frozenCount = 0;
let unfrozenCount = 0;
// 1. 检查需要冻结的用户
frozenCount = await this.processFreezes(now);
// 2. 检查需要解冻的用户
unfrozenCount = await this.processUnfreezes(now);
if (frozenCount > 0 || unfrozenCount > 0) {
this.logger.log(
`[PRE-PLANTING-FREEZE] Check complete: frozen=${frozenCount}, unfrozen=${unfrozenCount}`,
);
}
} catch (error) {
this.logger.error('[PRE-PLANTING-FREEZE] Check failed', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}:check`, lockValue);
}
}
/**
* firstPurchaseAt + 1 5
*/
private async processFreezes(now: Date): Promise<number> {
const freezeDeadline = new Date(now);
freezeDeadline.setFullYear(freezeDeadline.getFullYear() - PRE_PLANTING_FREEZE_PERIOD_YEARS);
const candidates = await this.prePlantingPrisma.prePlantingFreezeState.findMany({
where: {
firstPurchaseAt: { lte: freezeDeadline },
totalPortions: { lt: 5 },
isFrozen: false,
},
take: 100,
});
let count = 0;
for (const candidate of candidates) {
try {
await this.freezeContributions(candidate.accountSequence);
await this.prePlantingPrisma.prePlantingFreezeState.update({
where: { accountSequence: candidate.accountSequence },
data: {
isFrozen: true,
frozenAt: now,
},
});
count++;
this.logger.log(
`[PRE-PLANTING-FREEZE] Frozen: ${candidate.accountSequence}, ` +
`portions=${candidate.totalPortions}, firstPurchase=${candidate.firstPurchaseAt?.toISOString()}`,
);
} catch (error) {
this.logger.error(
`[PRE-PLANTING-FREEZE] Failed to freeze ${candidate.accountSequence}`,
error,
);
}
}
return count;
}
/**
* 5
*/
private async processUnfreezes(now: Date): Promise<number> {
const candidates = await this.prePlantingPrisma.prePlantingFreezeState.findMany({
where: {
isFrozen: true,
totalPortions: { gte: 5 },
},
take: 100,
});
let count = 0;
for (const candidate of candidates) {
try {
const postUnfreezeExpireDate = new Date(now);
postUnfreezeExpireDate.setFullYear(
postUnfreezeExpireDate.getFullYear() + PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS,
);
await this.unfreezeContributions(candidate.accountSequence, postUnfreezeExpireDate);
await this.prePlantingPrisma.prePlantingFreezeState.update({
where: { accountSequence: candidate.accountSequence },
data: {
isFrozen: false,
unfrozenAt: now,
postUnfreezeExpireDate,
},
});
count++;
this.logger.log(
`[PRE-PLANTING-FREEZE] Unfrozen: ${candidate.accountSequence}, ` +
`portions=${candidate.totalPortions}, newExpire=${postUnfreezeExpireDate.toISOString()}`,
);
} catch (error) {
this.logger.error(
`[PRE-PLANTING-FREEZE] Failed to unfreeze ${candidate.accountSequence}`,
error,
);
}
}
return count;
}
/**
*
*
* 使 PrismaService contribution_records
* sourceAdoptionId >= 10B
*/
private async freezeContributions(accountSequence: string): Promise<void> {
const result = await this.prisma.contributionRecord.updateMany({
where: {
accountSequence,
sourceAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET },
isExpired: false,
},
data: {
isExpired: true,
expiredAt: new Date(),
},
});
if (result.count > 0) {
this.logger.log(
`[PRE-PLANTING-FREEZE] Froze ${result.count} records for ${accountSequence}`,
);
}
}
/**
*
*
* expired + 2
*/
private async unfreezeContributions(
accountSequence: string,
newExpireDate: Date,
): Promise<void> {
const result = await this.prisma.contributionRecord.updateMany({
where: {
accountSequence,
sourceAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET },
isExpired: true,
},
data: {
isExpired: false,
expiredAt: null,
expireDate: newExpireDate,
},
});
if (result.count > 0) {
this.logger.log(
`[PRE-PLANTING-FREEZE] Unfroze ${result.count} records for ${accountSequence}, ` +
`newExpire=${newExpireDate.toISOString()}`,
);
}
}
}

View File

@ -0,0 +1,567 @@
import { Injectable, Logger } from '@nestjs/common';
import Decimal from 'decimal.js';
import { ContributionCalculatorService, ContributionDistributionResult } from '@/domain/services/contribution-calculator.service';
import { ContributionAccountRepository } from '@/infrastructure/persistence/repositories/contribution-account.repository';
import { ContributionRecordRepository } from '@/infrastructure/persistence/repositories/contribution-record.repository';
import { SyncedDataRepository } from '@/infrastructure/persistence/repositories/synced-data.repository';
import { UnallocatedContributionRepository } from '@/infrastructure/persistence/repositories/unallocated-contribution.repository';
import { SystemAccountRepository } from '@/infrastructure/persistence/repositories/system-account.repository';
import { OutboxRepository } from '@/infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '@/infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountAggregate, ContributionSourceType } from '@/domain/aggregates/contribution-account.aggregate';
import { ContributionRecordAggregate } from '@/domain/aggregates/contribution-record.aggregate';
import { ContributionAmount } from '@/domain/value-objects/contribution-amount.vo';
import { SyncedAdoption, SyncedReferral } from '@/domain/repositories/synced-data.repository.interface';
import { ContributionRateService } from '@/application/services/contribution-rate.service';
import { ContributionDistributionPublisherService } from '@/application/services/contribution-distribution-publisher.service';
import { BonusClaimService } from '@/application/services/bonus-claim.service';
import { ContributionRecordSyncedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, SystemContributionRecordCreatedEvent, UnallocatedContributionSyncedEvent } from '@/domain/events';
import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service';
import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants';
/**
*
*
* [2026-02-17] 1/5
*
* === ===
* - SyncedAdoption treeCount=portionCount, contributionPerTree=rate/5
* - ContributionCalculatorService
* - contribution_accounts/contribution_records
* - 使 PRE_PLANTING_SOURCE_ID_OFFSET (10B) ID
*
* === ContributionCalculationService ===
* - NetworkAdoptionProgress
* - sourceAdoptionId 使 10B
* - 使 PrePlantingPrismaService
* - unlock synced_adoptions marker
*/
@Injectable()
export class PrePlantingContributionService {
private readonly logger = new Logger(PrePlantingContributionService.name);
private readonly domainCalculator = new ContributionCalculatorService();
constructor(
private readonly contributionAccountRepository: ContributionAccountRepository,
private readonly contributionRecordRepository: ContributionRecordRepository,
private readonly syncedDataRepository: SyncedDataRepository,
private readonly unallocatedContributionRepository: UnallocatedContributionRepository,
private readonly systemAccountRepository: SystemAccountRepository,
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly distributionPublisher: ContributionDistributionPublisherService,
private readonly contributionRateService: ContributionRateService,
private readonly bonusClaimService: BonusClaimService,
private readonly prePlantingPrisma: PrePlantingPrismaService,
) {}
/**
*
*
* @param originalOrderId IDplanting-service pre_planting_orders.id
*/
async calculateForPrePlantingOrder(originalOrderId: bigint): Promise<void> {
// 生成偏移后的 sourceAdoptionId
const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + originalOrderId;
// 检查是否已经处理过(使用偏移后的 ID
const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId);
if (exists) {
this.logger.debug(`Pre-planting order ${originalOrderId} already processed, skipping`);
return;
}
// 从预种追踪表获取订单数据
const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({
where: { originalOrderId },
});
if (!order) {
throw new Error(`Pre-planting order not found: ${originalOrderId}`);
}
if (order.status !== 'PAID') {
this.logger.debug(`Pre-planting order ${originalOrderId} status=${order.status}, skipping`);
return;
}
// 获取当日每棵树贡献值(预种份额 = 1/5
const adoptionDate = order.paidAt || order.createdAt;
let contributionPerTree = new Decimal('22617');
try {
contributionPerTree = await this.contributionRateService.getContributionPerTree(adoptionDate);
} catch (error) {
this.logger.warn(`Failed to get contribution rate, using default`, error);
}
const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR);
// 构建虚拟 SyncedAdoption 对象
const virtualAdoption: SyncedAdoption = {
id: BigInt(0),
originalAdoptionId: sourceAdoptionId,
accountSequence: order.accountSequence,
treeCount: Number(order.portionCount), // 每份 = 1 个虚拟 "树"
adoptionDate,
status: 'MINING_ENABLED',
selectedProvince: order.provinceCode,
selectedCity: order.cityCode,
contributionPerTree: contributionPerPortion, // 1/5 贡献值
sourceSequenceNum: BigInt(0),
syncedAt: new Date(),
contributionDistributed: false,
contributionDistributedAt: null,
createdAt: order.createdAt,
};
// 获取推荐关系链
const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence(
order.accountSequence,
);
if (!userReferral) {
this.logger.warn(
`[PRE-PLANTING] Deferring order ${originalOrderId}: ` +
`referral for ${order.accountSequence} not yet synced`,
);
return;
}
// 获取上线链条最多15级
let ancestorChain: SyncedReferral[] = [];
if (userReferral.referrerAccountSequence) {
ancestorChain = await this.syncedDataRepository.findAncestorChain(
userReferral.referrerAccountSequence,
15,
);
}
// 获取算力账户
const adopterAccount = await this.contributionAccountRepository.findByAccountSequence(
order.accountSequence,
);
const ancestorAccountSequences = ancestorChain.map((a) => a.accountSequence);
const ancestorAccounts = await this.contributionAccountRepository.findByAccountSequences(
ancestorAccountSequences,
);
// 使用领域计算器计算分配
const result = this.domainCalculator.calculateAdoptionContribution(
virtualAdoption,
adopterAccount,
ancestorChain,
ancestorAccounts,
);
// 在事务中保存分配结果
await this.unitOfWork.executeInTransaction(async () => {
await this.saveDistributionResult(result, sourceAdoptionId, order.accountSequence);
// 更新认种人解锁状态
await this.updateAdopterUnlockStatus(order.accountSequence);
// 更新直接上线解锁状态
if (userReferral.referrerAccountSequence) {
await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence);
}
// 发布分配结果到 Kafka
await this.distributionPublisher.publishDistributionResult(
virtualAdoption,
result,
order.provinceCode || 'DEFAULT',
order.cityCode || 'DEFAULT',
);
});
// 标记预种追踪表为已分配
try {
await this.prePlantingPrisma.prePlantingSyncedOrder.update({
where: { originalOrderId },
data: {
contributionDistributed: true,
contributionDistributedAt: new Date(),
},
});
} catch (error) {
this.logger.error(`Failed to mark pre-planting order as distributed`, error);
}
this.logger.log(
`Pre-planting contribution calculated: orderId=${originalOrderId}, ` +
`sourceId=${sourceAdoptionId}, personal=${result.personalRecord.amount.value}, ` +
`teamLevel=${result.teamLevelRecords.length}, teamBonus=${result.teamBonusRecords.length}`,
);
// 注意:不调用 contributionRateService.updateNetworkProgress()
// 预种份额不推高全网算力系数
}
/**
*
*/
async processUndistributedOrders(batchSize: number = 50): Promise<number> {
const orders = await this.prePlantingPrisma.prePlantingSyncedOrder.findMany({
where: {
status: 'PAID',
contributionDistributed: false,
},
take: batchSize,
orderBy: { originalOrderId: 'asc' },
});
let count = 0;
for (const order of orders) {
try {
await this.calculateForPrePlantingOrder(order.originalOrderId);
count++;
} catch (error) {
this.logger.error(
`Failed to process pre-planting order ${order.originalOrderId}`,
error,
);
}
}
return count;
}
/**
* ContributionCalculationService.saveDistributionResult
*
* private
* [PRE-PLANTING]
*/
private async saveDistributionResult(
result: ContributionDistributionResult,
sourceAdoptionId: bigint,
sourceAccountSequence: string,
): Promise<void> {
const savedRecords: ContributionRecordAggregate[] = [];
const updatedAccountSequences = new Set<string>();
// 1. 个人算力
const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord);
savedRecords.push(savedPersonalRecord);
let account = await this.contributionAccountRepository.findByAccountSequence(
result.personalRecord.accountSequence,
);
if (!account) {
account = ContributionAccountAggregate.create(result.personalRecord.accountSequence);
}
account.addPersonalContribution(result.personalRecord.amount);
await this.contributionAccountRepository.save(account);
updatedAccountSequences.add(result.personalRecord.accountSequence);
// 2. 团队层级算力
if (result.teamLevelRecords.length > 0) {
const savedLevelRecords = await this.contributionRecordRepository.saveMany(result.teamLevelRecords);
savedRecords.push(...savedLevelRecords);
for (const record of result.teamLevelRecords) {
await this.contributionAccountRepository.updateContribution(
record.accountSequence,
ContributionSourceType.TEAM_LEVEL,
record.amount,
record.levelDepth,
null,
);
updatedAccountSequences.add(record.accountSequence);
}
}
// 3. 团队加成算力
if (result.teamBonusRecords.length > 0) {
const savedBonusRecords = await this.contributionRecordRepository.saveMany(result.teamBonusRecords);
savedRecords.push(...savedBonusRecords);
for (const record of result.teamBonusRecords) {
await this.contributionAccountRepository.updateContribution(
record.accountSequence,
ContributionSourceType.TEAM_BONUS,
record.amount,
null,
record.bonusTier,
);
updatedAccountSequences.add(record.accountSequence);
}
}
const effectiveDate = result.personalRecord.effectiveDate;
const expireDate = result.personalRecord.expireDate;
// 4. 未分配算力
if (result.unallocatedContributions.length > 0) {
await this.unallocatedContributionRepository.saveMany(
result.unallocatedContributions.map((u) => ({
...u,
sourceAdoptionId,
sourceAccountSequence,
effectiveDate,
expireDate,
})),
);
const totalUnallocatedAmount = result.unallocatedContributions.reduce(
(sum, u) => sum.add(u.amount),
new ContributionAmount(0),
);
await this.systemAccountRepository.addContribution('HEADQUARTERS', null, totalUnallocatedAmount);
for (const unallocated of result.unallocatedContributions) {
const sourceType = unallocated.type as string;
const levelDepth = unallocated.levelDepth;
const savedRecord = await this.systemAccountRepository.saveContributionRecord({
accountType: 'HEADQUARTERS',
regionCode: null,
sourceAdoptionId,
sourceAccountSequence,
sourceType,
levelDepth,
distributionRate: 0,
amount: unallocated.amount,
effectiveDate,
expireDate: null,
});
const recordEvent = new SystemContributionRecordCreatedEvent(
savedRecord.id,
'HEADQUARTERS',
null,
sourceAdoptionId,
sourceAccountSequence,
sourceType as any,
levelDepth,
0,
unallocated.amount.value.toString(),
effectiveDate,
null,
savedRecord.createdAt,
);
await this.outboxRepository.save({
aggregateType: SystemContributionRecordCreatedEvent.AGGREGATE_TYPE,
aggregateId: savedRecord.id.toString(),
eventType: SystemContributionRecordCreatedEvent.EVENT_TYPE,
payload: recordEvent.toPayload(),
});
}
const headquartersAccount = await this.systemAccountRepository.findByTypeAndRegion('HEADQUARTERS', null);
if (headquartersAccount) {
const hqEvent = new SystemAccountSyncedEvent(
'HEADQUARTERS',
null,
headquartersAccount.name,
headquartersAccount.contributionBalance.value.toString(),
headquartersAccount.createdAt,
);
await this.outboxRepository.save({
aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE,
aggregateId: 'HEADQUARTERS',
eventType: SystemAccountSyncedEvent.EVENT_TYPE,
payload: hqEvent.toPayload(),
});
}
for (const unallocated of result.unallocatedContributions) {
const event = new UnallocatedContributionSyncedEvent(
sourceAdoptionId,
sourceAccountSequence,
unallocated.wouldBeAccountSequence,
unallocated.type,
unallocated.amount.value.toString(),
unallocated.reason,
effectiveDate,
expireDate,
);
await this.outboxRepository.save({
aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE,
aggregateId: `${sourceAdoptionId}-${unallocated.type}`,
eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
}
}
// 5. 系统账户算力
if (result.systemContributions.length > 0) {
await this.systemAccountRepository.ensureSystemAccountsExist();
for (const sys of result.systemContributions) {
await this.systemAccountRepository.addContribution(sys.accountType, sys.regionCode, sys.amount);
const savedRecord = await this.systemAccountRepository.saveContributionRecord({
accountType: sys.accountType,
regionCode: sys.regionCode,
sourceAdoptionId,
sourceAccountSequence,
sourceType: 'FIXED_RATE',
levelDepth: null,
distributionRate: sys.rate.value.toNumber(),
amount: sys.amount,
effectiveDate,
expireDate: null,
});
const systemAccount = await this.systemAccountRepository.findByTypeAndRegion(sys.accountType, sys.regionCode);
if (systemAccount) {
const event = new SystemAccountSyncedEvent(
sys.accountType,
sys.regionCode,
systemAccount.name,
systemAccount.contributionBalance.value.toString(),
systemAccount.createdAt,
);
await this.outboxRepository.save({
aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE,
aggregateId: `${sys.accountType}:${sys.regionCode || 'null'}`,
eventType: SystemAccountSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
const recordEvent = new SystemContributionRecordCreatedEvent(
savedRecord.id,
sys.accountType,
sys.regionCode,
sourceAdoptionId,
sourceAccountSequence,
'FIXED_RATE',
null,
sys.rate.value.toNumber(),
sys.amount.value.toString(),
effectiveDate,
null,
savedRecord.createdAt,
);
await this.outboxRepository.save({
aggregateType: SystemContributionRecordCreatedEvent.AGGREGATE_TYPE,
aggregateId: savedRecord.id.toString(),
eventType: SystemContributionRecordCreatedEvent.EVENT_TYPE,
payload: recordEvent.toPayload(),
});
}
}
}
// 6. 发布算力记录事件
await this.publishContributionRecordEvents(savedRecords);
// 7. 发布账户更新事件
await this.publishUpdatedAccountEvents(updatedAccountSequences);
}
private async updateAdopterUnlockStatus(accountSequence: string): Promise<void> {
const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (!account) return;
if (!account.hasAdopted) {
account.markAsAdopted();
await this.contributionAccountRepository.save(account);
await this.publishContributionAccountUpdatedEvent(account);
}
}
private async updateReferrerUnlockStatus(referrerAccountSequence: string): Promise<void> {
const account = await this.contributionAccountRepository.findByAccountSequence(referrerAccountSequence);
if (!account) return;
// 重新计算直推已认种人数
// 由于 synced_adoptions 中已有预种 marker 记录,此方法会自然计入预种用户
const directReferralAdoptedCount = await this.syncedDataRepository.getDirectReferralAdoptedCount(
referrerAccountSequence,
);
const previousCount = account.directReferralAdoptedCount;
if (directReferralAdoptedCount > previousCount) {
for (let i = previousCount; i < directReferralAdoptedCount; i++) {
account.incrementDirectReferralAdoptedCount();
}
await this.contributionAccountRepository.save(account);
await this.publishContributionAccountUpdatedEvent(account);
this.logger.debug(
`[PRE-PLANTING] Updated referrer ${referrerAccountSequence}: ` +
`level=${account.unlockedLevelDepth}, bonus=${account.unlockedBonusTiers}`,
);
await this.bonusClaimService.checkAndClaimBonus(
referrerAccountSequence,
previousCount,
directReferralAdoptedCount,
);
}
}
private async publishContributionRecordEvents(
savedRecords: ContributionRecordAggregate[],
): Promise<void> {
if (savedRecords.length === 0) return;
const events = savedRecords.map((record) => {
const event = new ContributionRecordSyncedEvent(
record.id!,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.value.toString(),
record.distributionRate.value.toString(),
record.levelDepth,
record.bonusTier,
record.amount.value.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
return {
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id!.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
}
private async publishUpdatedAccountEvents(accountSequences: Set<string>): Promise<void> {
for (const accountSequence of accountSequences) {
const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
if (account) {
await this.publishContributionAccountUpdatedEvent(account);
}
}
}
private async publishContributionAccountUpdatedEvent(
account: ContributionAccountAggregate,
): Promise<void> {
const totalContribution = account.personalContribution.value
.plus(account.totalLevelPending.value)
.plus(account.totalBonusPending.value);
const event = new ContributionAccountUpdatedEvent(
account.accountSequence,
account.personalContribution.value.toString(),
account.totalLevelPending.value.toString(),
account.totalBonusPending.value.toString(),
totalContribution.toString(),
account.effectiveContribution.value.toString(),
account.hasAdopted,
account.directReferralAdoptedCount,
account.unlockedLevelDepth,
account.unlockedBonusTiers,
account.createdAt,
);
await this.outboxRepository.save({
aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE,
aggregateId: account.accountSequence,
eventType: ContributionAccountUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
}
}

View File

@ -0,0 +1,52 @@
/**
*
*
* [2026-02-17] CDC contribution-service 2.0
*/
/**
* sourceAdoptionId
*
* contribution_records
* sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + prePlantingOrderId
*
* 10,000,000,000 (100 亿) ID
*/
export const PRE_PLANTING_SOURCE_ID_OFFSET = 10_000_000_000n;
/**
*
*
* = 1 1/5
* contributionPerPortion = contributionPerTree / PRE_PLANTING_PORTION_DIVISOR
*/
export const PRE_PLANTING_PORTION_DIVISOR = 5;
/**
*
*
* + 1 5
*/
export const PRE_PLANTING_FREEZE_PERIOD_YEARS = 1;
/**
*
*
* 5 2
*/
export const PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS = 2;
/**
* CDC topic
*
* Debezium connector: cdc.pre-planting
* Topic 格式: cdc.pre-planting.public.<table_name>
*/
export const PRE_PLANTING_CDC_TOPIC_PREFIX = 'cdc.pre-planting';
/**
* CDC Kafka consumer group ID
*
* contribution-service-cdc-group
*/
export const PRE_PLANTING_CDC_GROUP_ID = 'contribution-pre-planting-cdc';

View File

@ -0,0 +1,269 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { Prisma } from '@prisma/client';
import { PRE_PLANTING_CDC_GROUP_ID } from '../../domain/constants';
/**
* CDC Debezium ExtractNewRecordState
*/
export interface PrePlantingCdcEvent {
payload: {
op: 'c' | 'u' | 'd' | 'r';
before: any | null;
after: any | null;
table: string;
source_ts_ms: number;
deleted: boolean;
};
topic: string;
offset: bigint;
}
/**
* handler
*/
export type PrePlantingCdcHandler<T = void> = (
event: PrePlantingCdcEvent,
tx: Prisma.TransactionClient,
) => Promise<T>;
export type PostCommitCallback<T> = (result: T) => Promise<void>;
interface RegisteredHandler {
tableName: string;
handler: (event: PrePlantingCdcEvent) => Promise<void>;
}
/**
* CDC
*
* [2026-02-17] CDCConsumerService
*
* === ===
* - Kafka consumer groupcontribution-pre-planting-cdc
* - CDC topicscdc.pre-planting.public.*
* - Debezium connector/replication slot/publication
* - 使 PrismaService ProcessedCdcEvent DB
*
* === CDCConsumerService ===
* - user/referral CDC
* - --
* - +
*/
@Injectable()
export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrePlantingCdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private handlers: Map<string, (event: PrePlantingCdcEvent) => Promise<void>> = new Map();
private isRunning = false;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService, // 主 PrismaService用于幂等性 + 算力写入
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
this.kafka = new Kafka({
clientId: 'contribution-service-pre-planting-cdc',
brokers,
});
this.consumer = this.kafka.consumer({
groupId: PRE_PLANTING_CDC_GROUP_ID,
});
}
async onModuleInit() {
// 等待 handler 注册后再启动(由 PrePlantingCdcDispatcher 调用 start()
}
async onModuleDestroy() {
await this.stop();
}
/**
* handler +
*
* PrismaService ProcessedCdcEvent
* Serializable
*/
registerHandler<T>(
tableName: string,
handler: PrePlantingCdcHandler<T>,
postCommitCallback?: PostCommitCallback<T>,
): void {
const wrappedHandler = async (event: PrePlantingCdcEvent) => {
const idempotencyKey = `${event.topic}:${event.offset}`;
let result: T | null = null;
let shouldCallback = false;
try {
await this.prisma.$transaction(async (tx) => {
// 1. 幂等检查:插入 ProcessedCdcEvent唯一约束防重复
try {
await tx.processedCdcEvent.create({
data: {
sourceTopic: event.topic,
offset: event.offset,
tableName: event.payload.table,
operation: event.payload.op,
},
});
} catch (error: any) {
if (error.code === 'P2002') {
this.logger.debug(`[PRE-PLANTING-CDC] Skip duplicate: ${idempotencyKey}`);
return;
}
throw error;
}
// 2. 执行业务逻辑
result = await handler(event, tx);
shouldCallback = true;
this.logger.debug(`[PRE-PLANTING-CDC] Processed: ${idempotencyKey}`);
}, {
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
timeout: 60000,
});
// 3. 事务提交后执行后置回调
if (shouldCallback && postCommitCallback && result !== null) {
try {
await postCommitCallback(result);
} catch (callbackError) {
this.logger.error(
`[PRE-PLANTING-CDC] Post-commit callback failed: ${idempotencyKey}`,
callbackError,
);
}
}
} catch (error: any) {
if (error.code === 'P2002') {
this.logger.debug(`[PRE-PLANTING-CDC] Skip duplicate (concurrent): ${idempotencyKey}`);
return;
}
this.logger.error(`[PRE-PLANTING-CDC] Failed: ${idempotencyKey}`, error);
throw error;
}
};
this.handlers.set(tableName, wrappedHandler);
this.logger.log(`[PRE-PLANTING-CDC] Registered handler for table: ${tableName}`);
}
/**
*
*/
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('[PRE-PLANTING-CDC] Consumer already running');
return;
}
try {
await this.consumer.connect();
// 订阅预种 CDC topics
const topicOrders = this.configService.get<string>(
'PRE_PLANTING_CDC_TOPIC_ORDERS',
'cdc.pre-planting.public.pre_planting_orders',
);
const topicPositions = this.configService.get<string>(
'PRE_PLANTING_CDC_TOPIC_POSITIONS',
'cdc.pre-planting.public.pre_planting_positions',
);
await this.consumer.subscribe({
topics: [topicOrders, topicPositions],
fromBeginning: true,
});
this.isRunning = true;
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.logger.log(
`[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}]`,
);
} catch (error) {
this.logger.error('[PRE-PLANTING-CDC] Failed to start consumer', error);
// 不抛出错误,允许服务在没有 Kafka/预种 CDC 的情况下启动
}
}
async stop(): Promise<void> {
if (!this.isRunning) return;
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('[PRE-PLANTING-CDC] Consumer stopped');
} catch (error) {
this.logger.error('[PRE-PLANTING-CDC] Failed to stop consumer', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
try {
if (!message.value) {
this.logger.warn(`[PRE-PLANTING-CDC] Empty message: topic=${topic}, offset=${message.offset}`);
return;
}
const rawData = JSON.parse(message.value.toString());
// Debezium ExtractNewRecordState 扁平格式
const op = rawData.__op || rawData.op;
const table = rawData.__table;
const sourceTsMs = rawData.__source_ts_ms || 0;
const deleted = rawData.__deleted === 'true' || rawData.__deleted === true;
const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData;
const event: PrePlantingCdcEvent = {
payload: {
op: op as 'c' | 'u' | 'd' | 'r',
before: op === 'd' ? businessData : null,
after: op !== 'd' ? businessData : null,
table,
source_ts_ms: sourceTsMs,
deleted,
},
topic,
offset: BigInt(message.offset),
};
// 从 topic 提取表名作为备选
const parts = topic.split('.');
const tableName = table || parts[parts.length - 1];
const handler = this.handlers.get(tableName);
if (handler) {
await handler(event);
this.logger.debug(
`[PRE-PLANTING-CDC] Processed: table=${tableName}, op=${op}, offset=${message.offset}`,
);
} else {
this.logger.warn(
`[PRE-PLANTING-CDC] No handler for table: ${tableName}. ` +
`Available: ${Array.from(this.handlers.keys()).join(', ')}`,
);
}
} catch (error) {
this.logger.error(
`[PRE-PLANTING-CDC] Error: topic=${topic}, partition=${partition}, offset=${message.offset}`,
error,
);
}
}
}

View File

@ -0,0 +1,14 @@
import { Module, Global } from '@nestjs/common';
import { PrePlantingPrismaService } from './pre-planting-prisma.service';
/**
* Prisma
*
* PrePlantingPrismaService访
* @Global 便 PrePlantingCdcModule provider
*/
@Module({
providers: [PrePlantingPrismaService],
exports: [PrePlantingPrismaService],
})
export class PrePlantingPrismaModule {}

View File

@ -0,0 +1,28 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { PrismaClient } from './generated';
/**
* Prisma
*
* PrismaService 使 Prisma Client
* pre_planting_synced_orders
*
* PrismaService contribution_accounts
*/
@Injectable()
export class PrePlantingPrismaService
extends PrismaClient
implements OnModuleInit, OnModuleDestroy
{
private readonly logger = new Logger(PrePlantingPrismaService.name);
async onModuleInit() {
await this.$connect();
this.logger.log('Pre-planting Prisma client connected');
}
async onModuleDestroy() {
await this.$disconnect();
this.logger.log('Pre-planting Prisma client disconnected');
}
}

View File

@ -0,0 +1,88 @@
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { InfrastructureModule } from '../infrastructure/infrastructure.module';
// Pre-planting Prisma (独立 schema)
import { PrePlantingPrismaModule } from './infrastructure/prisma/pre-planting-prisma.module';
// CDC Consumer (独立 Kafka consumer group)
import { PrePlantingCdcConsumerService } from './infrastructure/kafka/pre-planting-cdc-consumer.service';
// CDC Event Handlers
import { PrePlantingOrderSyncedHandler } from './application/handlers/pre-planting-order-synced.handler';
import { PrePlantingPositionSyncedHandler } from './application/handlers/pre-planting-position-synced.handler';
import { PrePlantingCdcDispatcher } from './application/handlers/pre-planting-cdc-dispatcher';
// Application Services
import { PrePlantingContributionService } from './application/services/pre-planting-contribution.service';
// Schedulers
import { PrePlantingFreezeScheduler } from './application/schedulers/pre-planting-freeze.scheduler';
// 现有 Application Services直接提供不 import ApplicationModule 避免引入现有 CDCEventDispatcher
// 这些服务是无状态的,仅依赖 InfrastructureModule 的 providers重复实例化无副作用。
import { ContributionRateService } from '../application/services/contribution-rate.service';
import { ContributionDistributionPublisherService } from '../application/services/contribution-distribution-publisher.service';
import { BonusClaimService } from '../application/services/bonus-claim.service';
/**
* CDC
*
* [2026-02-17] contribution-service 2.0
*
* === ===
* 1. CDC consumer cdc.pre-planting.public.*
* 2. /PrePlantingPrismaService
* 3. 1/5
* 4. synced_adoptions marker unlock
* 5. /1 5 5 + 2
*
* === ===
* - Kafka consumer groupcontribution-pre-planting-cdc contribution-service-cdc-group
* - CDC topicscdc.pre-planting.public.* cdc.planting.public.* topics
* - Debezium connector / replication slot / publication
* - Prisma schemaprisma/pre-planting/schema.prisma
* - PrismaService ProcessedCdcEvent DB
*
* === ===
* -
* - contribution_accounts / contribution_records
* - sourceAdoptionId 使 10,000,000,000 ID
* - synced_adoptions marker contributionDistributed=true + treeCount=0
* 使
* - NetworkAdoptionProgress
*
* === ===
* - InfrastructureModule PrismaServiceRepositoriesUnitOfWorkRedis
* - PrePlantingPrismaModule PrePlantingPrismaService
* - ScheduleModule @Cron
*/
@Module({
imports: [
ScheduleModule.forRoot(),
InfrastructureModule,
PrePlantingPrismaModule,
],
providers: [
// CDC Consumer (独立 consumer group)
PrePlantingCdcConsumerService,
// CDC Event Handlers
PrePlantingOrderSyncedHandler,
PrePlantingPositionSyncedHandler,
PrePlantingCdcDispatcher,
// Application Services (预种)
PrePlantingContributionService,
// Application Services (现有,直接提供以避免 import ApplicationModule)
// ApplicationModule 内含 CDCEventDispatcherimport 会导致现有 CDC 被二次注册
ContributionRateService,
ContributionDistributionPublisherService,
BonusClaimService,
// Schedulers
PrePlantingFreezeScheduler,
],
})
export class PrePlantingCdcModule {}