From e1cd8ed7f2086ccf818611c4ecdfc486acc2d5ba Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 18 Feb 2026 05:25:14 -0800 Subject: [PATCH] =?UTF-8?q?feat(pre-planting):=203171=20=E9=A2=84=E7=A7=8D?= =?UTF-8?q?=E8=AE=A1=E5=88=92=202.0=20=E7=AE=97=E5=8A=9B=E9=9B=86=E6=88=90?= =?UTF-8?q?=EF=BC=88contribution-service=20CDC=20=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 5:将预种数据集成到 contribution-service 2.0 算力体系。 === 新增文件(11 个) === - prisma/pre-planting/schema.prisma:独立 Prisma schema(4 张追踪表) - PrePlantingPrismaService + Module:独立 PrismaClient - PrePlantingCdcConsumerService:独立 CDC 消费者(consumer group: contribution-pre-planting-cdc) - PrePlantingOrderSyncedHandler:订单 CDC handler + synced_adoptions marker 插入 - PrePlantingPositionSyncedHandler:持仓 CDC handler - PrePlantingCdcDispatcher:CDC 事件分发器 - PrePlantingContributionService:1/5 算力计算(复用领域计算器) - PrePlantingFreezeScheduler:每日冻结/解冻调度(凌晨 5 点) - PrePlantingCdcModule:模块注册 - constants.ts:10B 偏移量、冻结期限等常量 === 隔离保证 === - 独立 Kafka consumer group(contribution-pre-planting-cdc) - 独立 CDC topics(cdc.pre-planting.public.*) - 独立 Prisma schema + generated client - sourceAdoptionId 使用 10,000,000,000 偏移避免 ID 冲突 - synced_adoptions marker: contributionDistributed=true + treeCount=0 - 不更新 NetworkAdoptionProgress(预种不推高全网算力系数) - 现有代码文件零修改(仅 app.module.ts 加 1 行 import) Co-Authored-By: Claude Opus 4.6 --- .../contribution-service/package.json | 7 +- .../prisma/pre-planting/schema.prisma | 132 ++++ .../contribution-service/src/app.module.ts | 3 + .../handlers/pre-planting-cdc-dispatcher.ts | 54 ++ .../pre-planting-order-synced.handler.ts | 324 ++++++++++ .../pre-planting-position-synced.handler.ts | 116 ++++ .../pre-planting-freeze.scheduler.ts | 226 +++++++ .../pre-planting-contribution.service.ts | 567 ++++++++++++++++++ .../src/pre-planting/domain/constants.ts | 52 ++ .../pre-planting-cdc-consumer.service.ts | 269 +++++++++ .../prisma/pre-planting-prisma.module.ts | 14 + .../prisma/pre-planting-prisma.service.ts | 28 + .../pre-planting/pre-planting-cdc.module.ts | 88 +++ 13 files changed, 1879 insertions(+), 1 deletion(-) create mode 100644 backend/services/contribution-service/prisma/pre-planting/schema.prisma create mode 100644 backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts create mode 100644 backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts create mode 100644 backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-position-synced.handler.ts create mode 100644 backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-freeze.scheduler.ts create mode 100644 backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts create mode 100644 backend/services/contribution-service/src/pre-planting/domain/constants.ts create mode 100644 backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts create mode 100644 backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.module.ts create mode 100644 backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.service.ts create mode 100644 backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts diff --git a/backend/services/contribution-service/package.json b/backend/services/contribution-service/package.json index dd385da9..0179d0a6 100644 --- a/backend/services/contribution-service/package.json +++ b/backend/services/contribution-service/package.json @@ -25,7 +25,12 @@ "prisma:generate": "prisma generate", "prisma:migrate": "prisma migrate dev", "prisma:migrate:prod": "prisma migrate deploy", - "prisma:studio": "prisma studio" + "prisma:studio": "prisma studio", + "prisma:pre-planting:generate": "prisma generate --schema=prisma/pre-planting/schema.prisma", + "prisma:pre-planting:migrate": "prisma migrate dev --schema=prisma/pre-planting/schema.prisma", + "prisma:pre-planting:migrate:prod": "prisma migrate deploy --schema=prisma/pre-planting/schema.prisma", + "prisma:pre-planting:studio": "prisma studio --schema=prisma/pre-planting/schema.prisma", + "prisma:all:generate": "npm run prisma:generate && npm run prisma:pre-planting:generate" }, "dependencies": { "@nestjs/common": "^10.0.0", diff --git a/backend/services/contribution-service/prisma/pre-planting/schema.prisma b/backend/services/contribution-service/prisma/pre-planting/schema.prisma new file mode 100644 index 00000000..b00dcb11 --- /dev/null +++ b/backend/services/contribution-service/prisma/pre-planting/schema.prisma @@ -0,0 +1,132 @@ +// ============================================ +// [2026-02-17] 预种计划独立 Prisma Schema +// ============================================ +// +// 本 schema 仅包含预种计划在 contribution-service 中的追踪表。 +// 与主 schema (prisma/schema.prisma) 完全隔离,拥有独立的: +// - Prisma Client(生成到 src/pre-planting/infrastructure/prisma/generated/) +// - Migration 目录(prisma/pre-planting/migrations/) +// +// 预种的算力分配结果仍然写入主 schema 的 contribution_accounts、 +// contribution_records 等表(通过现有 Repository),以便挖矿系统读取。 +// 本 schema 仅负责预种 CDC 同步追踪、冻结状态等预种专属数据。 + +generator client { + provider = "prisma-client-js" + output = "../../src/pre-planting/infrastructure/prisma/generated" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +// ============================================ +// 预种 CDC 同步追踪表 +// ============================================ + +/// 预种订单同步记录(从 planting-service CDC 同步) +/// 用于追踪每笔预种订单的算力分配状态 +model PrePlantingSyncedOrder { + id BigInt @id @default(autoincrement()) + originalOrderId BigInt @unique @map("original_order_id") + orderNo String @map("order_no") @db.VarChar(50) + userId BigInt @map("user_id") + accountSequence String @map("account_sequence") @db.VarChar(20) + portionCount Int @map("portion_count") + pricePerPortion Decimal @map("price_per_portion") @db.Decimal(20, 8) + totalAmount Decimal @map("total_amount") @db.Decimal(20, 8) + provinceCode String @map("province_code") @db.VarChar(10) + cityCode String @map("city_code") @db.VarChar(10) + status String @map("status") @db.VarChar(20) // CREATED, PAID, MERGED + mergedToMergeId BigInt? @map("merged_to_merge_id") + paidAt DateTime? @map("paid_at") + createdAt DateTime @map("created_at") + + // 算力追踪 + contributionPerPortion Decimal @map("contribution_per_portion") @db.Decimal(20, 10) + contributionDistributed Boolean @default(false) @map("contribution_distributed") + contributionDistributedAt DateTime? @map("contribution_distributed_at") + + // CDC 同步元数据 + sourceTopic String @map("source_topic") @db.VarChar(200) + sourceOffset BigInt @map("source_offset") + syncedAt DateTime @default(now()) @map("synced_at") + + @@index([accountSequence]) + @@index([status]) + @@index([contributionDistributed]) + @@map("pre_planting_synced_orders") +} + +/// 预种持仓同步记录(从 planting-service CDC 同步) +/// 用于追踪用户预种总量,判断冻结条件 +model PrePlantingSyncedPosition { + id BigInt @id @default(autoincrement()) + userId BigInt @unique @map("user_id") + accountSequence String @unique @map("account_sequence") @db.VarChar(20) + totalPortions Int @default(0) @map("total_portions") + mergedPortions Int @default(0) @map("merged_portions") + totalTreesMerged Int @default(0) @map("total_trees_merged") + firstPurchaseAt DateTime? @map("first_purchase_at") + + // CDC 同步元数据 + sourceTopic String @map("source_topic") @db.VarChar(200) + sourceOffset BigInt @map("source_offset") + syncedAt DateTime @default(now()) @map("synced_at") + + @@map("pre_planting_synced_positions") +} + +// ============================================ +// 预种冻结状态表 +// ============================================ + +/// 预种算力冻结状态(每用户一条) +/// +/// 冻结规则: +/// - firstPurchaseAt + 1 年后仍未满 5 份 → 所有预种算力冻结(暂停分配) +/// - 后续累积满 5 份 → 解冻,恢复分配 +/// - 解冻后的失效期 = 解冻日起算 + 2 年 +/// - 未被冻结过的正常到期 = 首次产生挖矿收益日 + 2 年 +model PrePlantingFreezeState { + id BigInt @id @default(autoincrement()) + accountSequence String @unique @map("account_sequence") @db.VarChar(20) + totalPortions Int @default(0) @map("total_portions") + totalTreesMerged Int @default(0) @map("total_trees_merged") + firstPurchaseAt DateTime? @map("first_purchase_at") + + // 冻结状态 + isFrozen Boolean @default(false) @map("is_frozen") + frozenAt DateTime? @map("frozen_at") + unfrozenAt DateTime? @map("unfrozen_at") + + // 解冻后的过期日期(解冻日 + 2 年) + postUnfreezeExpireDate DateTime? @map("post_unfreeze_expire_date") + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@index([isFrozen]) + @@index([firstPurchaseAt]) + @@map("pre_planting_freeze_states") +} + +// ============================================ +// 预种 CDC 幂等性追踪表 +// ============================================ + +/// 已处理的预种 CDC 事件(幂等性保证) +/// 使用 (sourceTopic, offset) 作为复合唯一键 +model PrePlantingProcessedCdcEvent { + id BigInt @id @default(autoincrement()) + sourceTopic String @map("source_topic") @db.VarChar(200) + offset BigInt @map("offset") + tableName String @map("table_name") @db.VarChar(100) + operation String @map("operation") @db.VarChar(10) + processedAt DateTime @default(now()) @map("processed_at") + + @@unique([sourceTopic, offset]) + @@index([processedAt]) + @@map("pre_planting_processed_cdc_events") +} diff --git a/backend/services/contribution-service/src/app.module.ts b/backend/services/contribution-service/src/app.module.ts index 1b60ccde..8cd466e7 100644 --- a/backend/services/contribution-service/src/app.module.ts +++ b/backend/services/contribution-service/src/app.module.ts @@ -8,6 +8,8 @@ import { DomainExceptionFilter } from './shared/filters/domain-exception.filter' import { TransformInterceptor } from './shared/interceptors/transform.interceptor'; import { LoggingInterceptor } from './shared/interceptors/logging.interceptor'; import { JwtAuthGuard } from './shared/guards/jwt-auth.guard'; +// [2026-02-17] 新增:预种 CDC 集成模块(纯新增,与现有 CDC 消费零耦合) +import { PrePlantingCdcModule } from './pre-planting/pre-planting-cdc.module'; @Module({ imports: [ @@ -23,6 +25,7 @@ import { JwtAuthGuard } from './shared/guards/jwt-auth.guard'; InfrastructureModule, ApplicationModule, ApiModule, + PrePlantingCdcModule, // 预种计划:独立 CDC consumer、独立 Prisma schema、1/5 算力、冻结调度 ], providers: [ { diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts new file mode 100644 index 00000000..55df8333 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-cdc-dispatcher.ts @@ -0,0 +1,54 @@ +import { Injectable, OnModuleInit, Logger } from '@nestjs/common'; +import { PrePlantingCdcConsumerService } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service'; +import { PrePlantingOrderSyncedHandler, PrePlantingOrderSyncResult } from './pre-planting-order-synced.handler'; +import { PrePlantingPositionSyncedHandler } from './pre-planting-position-synced.handler'; + +/** + * 预种 CDC 事件分发器 + * + * [2026-02-17] 新增:将预种 CDC 事件路由到对应的处理器 + * + * === 与现有 CDCEventDispatcher 的关系 === + * - 完全独立的实例,不与现有 CDCEventDispatcher 共享任何状态 + * - 使用独立的 PrePlantingCdcConsumerService(独立 consumer group、独立 topics) + * - 注册不同的 handler:预种订单 + 预种持仓(不处理 user_accounts / referral_relationships) + * + * === 对现有系统的影响 === + * - 零影响。现有 CDCEventDispatcher 照常运行。 + * - 两个 dispatcher 各自独立启动、独立消费、互不干扰。 + */ +@Injectable() +export class PrePlantingCdcDispatcher implements OnModuleInit { + private readonly logger = new Logger(PrePlantingCdcDispatcher.name); + + constructor( + private readonly cdcConsumer: PrePlantingCdcConsumerService, + private readonly orderHandler: PrePlantingOrderSyncedHandler, + private readonly positionHandler: PrePlantingPositionSyncedHandler, + ) {} + + async onModuleInit() { + // 注册预种订单表 handler(带后置回调:事务提交后计算算力) + this.cdcConsumer.registerHandler( + 'pre_planting_orders', + this.orderHandler.handle.bind(this.orderHandler), + this.orderHandler.calculateAfterCommit.bind(this.orderHandler), + ); + + // 注册预种持仓表 handler(无后置回调:纯同步,不触发算力计算) + this.cdcConsumer.registerHandler( + 'pre_planting_positions', + this.positionHandler.handle.bind(this.positionHandler), + ); + + // 非阻塞启动 CDC 消费者 + this.cdcConsumer.start() + .then(() => { + this.logger.log('[PRE-PLANTING-CDC] Dispatcher started with handlers: pre_planting_orders, pre_planting_positions'); + }) + .catch((error) => { + this.logger.error('[PRE-PLANTING-CDC] Failed to start dispatcher', error); + // 不抛出错误,允许服务在没有预种 CDC 的情况下启动 + }); + } +} diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts new file mode 100644 index 00000000..226ff990 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-order-synced.handler.ts @@ -0,0 +1,324 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import Decimal from 'decimal.js'; +import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service'; +import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service'; +import { PrePlantingContributionService } from '../services/pre-planting-contribution.service'; +import { ContributionRateService } from '@/application/services/contribution-rate.service'; +import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants'; + +/** + * 预种订单同步结果(用于事务提交后的算力计算) + */ +export interface PrePlantingOrderSyncResult { + originalOrderId: bigint; + needsCalculation: boolean; +} + +/** + * 预种订单 CDC 事件处理器 + * + * [2026-02-17] 新增:处理 cdc.pre-planting.public.pre_planting_orders 的 CDC 事件 + * + * === 处理逻辑 === + * 1. 同步预种订单到 pre_planting_synced_orders(追踪表) + * 2. 当订单状态为 PAID 时,标记需要算力计算 + * 3. 在 synced_adoptions 中插入 marker 记录(用于 getDirectReferralAdoptedCount 正确计数) + * 4. 事务提交后,触发 1/5 算力计算(PostCommitCallback) + * + * === 对现有系统的影响 === + * - 零修改现有代码 + * - synced_adoptions 中的 marker 记录设置 contributionDistributed=true, + * 现有 processUndistributedAdoptions 调度器不会处理它 + * - marker 的 treeCount=0,即使被误处理也不会产生算力 + */ +@Injectable() +export class PrePlantingOrderSyncedHandler { + private readonly logger = new Logger(PrePlantingOrderSyncedHandler.name); + + constructor( + private readonly prePlantingPrisma: PrePlantingPrismaService, + private readonly contributionService: PrePlantingContributionService, + private readonly contributionRateService: ContributionRateService, + ) {} + + /** + * 处理预种订单 CDC 事件(在主 PrismaService 事务内执行) + * + * @param event CDC 事件 + * @param tx 主 Prisma 事务客户端 + * @returns 同步结果(用于后置回调判断是否需要算力计算) + */ + async handle( + event: PrePlantingCdcEvent, + tx: Prisma.TransactionClient, + ): Promise { + const { op, before, after } = event.payload; + const data = after || before; + + if (!data) { + this.logger.warn(`[PRE-PLANTING-ORDER] Empty data, op=${op}`); + return null; + } + + this.logger.log( + `[PRE-PLANTING-ORDER] Event: op=${op}, id=${data.id}, status=${data.status}`, + ); + + switch (op) { + case 'c': // create + case 'r': // read (snapshot) + return await this.handleCreateOrSnapshot(data, event, tx); + case 'u': // update + return await this.handleUpdate(data, before, event, tx); + case 'd': // delete + this.logger.warn(`[PRE-PLANTING-ORDER] Delete event: id=${data.id}`); + return null; + default: + this.logger.warn(`[PRE-PLANTING-ORDER] Unknown op: ${op}`); + return null; + } + } + + /** + * 事务提交后的算力计算回调 + */ + async calculateAfterCommit(result: PrePlantingOrderSyncResult): Promise { + if (!result?.needsCalculation) return; + + this.logger.log(`[PRE-PLANTING-ORDER] Triggering contribution calculation: orderId=${result.originalOrderId}`); + try { + await this.contributionService.calculateForPrePlantingOrder(result.originalOrderId); + this.logger.log(`[PRE-PLANTING-ORDER] Contribution calculated: orderId=${result.originalOrderId}`); + } catch (error) { + // 算力计算失败不影响数据同步,后续调度器会重试 + this.logger.error( + `[PRE-PLANTING-ORDER] Contribution calculation failed: orderId=${result.originalOrderId}`, + error, + ); + } + } + + private async handleCreateOrSnapshot( + data: any, + event: PrePlantingCdcEvent, + tx: Prisma.TransactionClient, + ): Promise { + const orderId = BigInt(data.id); + const accountSequence = data.account_sequence || data.accountSequence; + const status = data.status; + + if (!accountSequence) { + this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`); + return null; + } + + // 获取当日贡献值 + const paidAt = data.paid_at || data.paidAt || data.created_at || data.createdAt; + let contributionPerTree = new Decimal('22617'); + if (paidAt) { + try { + contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(paidAt)); + } catch (error) { + this.logger.warn(`[PRE-PLANTING-ORDER] Failed to get rate, using default`, error); + } + } + const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR); + + // 同步到预种追踪表(事务外,最终一致性) + await this.syncToTrackingTable(data, event, contributionPerPortion); + + // 当状态为 PAID 时,在 synced_adoptions 中插入 marker(用于 unlock 计数) + const needsCalculation = status === 'PAID'; + if (needsCalculation) { + await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx); + } + + return { + originalOrderId: orderId, + needsCalculation, + }; + } + + private async handleUpdate( + after: any, + before: any, + event: PrePlantingCdcEvent, + tx: Prisma.TransactionClient, + ): Promise { + const orderId = BigInt(after.id); + const accountSequence = after.account_sequence || after.accountSequence; + const newStatus = after.status; + const oldStatus = before?.status; + + if (!accountSequence) { + this.logger.warn(`[PRE-PLANTING-ORDER] Missing accountSequence for orderId=${orderId}`); + return null; + } + + // 获取当日贡献值 + const paidAt = after.paid_at || after.paidAt || after.created_at || after.createdAt; + let contributionPerTree = new Decimal('22617'); + if (paidAt) { + try { + contributionPerTree = await this.contributionRateService.getContributionPerTree(new Date(paidAt)); + } catch (error) { + this.logger.warn(`[PRE-PLANTING-ORDER] Failed to get rate, using default`, error); + } + } + const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR); + + // 同步到预种追踪表 + await this.syncToTrackingTable(after, event, contributionPerPortion); + + // 只在状态变为 PAID(且之前不是 PAID)时触发算力计算 + const statusChangedToPaid = newStatus === 'PAID' && oldStatus !== 'PAID'; + if (statusChangedToPaid) { + await this.ensureAdoptionMarker(accountSequence, orderId, paidAt, tx); + } + + // 检查是否已分配 + const alreadyDistributed = await this.isAlreadyDistributed(orderId); + + return { + originalOrderId: orderId, + needsCalculation: statusChangedToPaid && !alreadyDistributed, + }; + } + + /** + * 同步到预种追踪表(pre_planting_synced_orders) + * 使用 PrePlantingPrismaService(独立 schema) + */ + private async syncToTrackingTable( + data: any, + event: PrePlantingCdcEvent, + contributionPerPortion: Decimal, + ): Promise { + try { + const orderId = BigInt(data.id); + await this.prePlantingPrisma.prePlantingSyncedOrder.upsert({ + where: { originalOrderId: orderId }, + create: { + originalOrderId: orderId, + orderNo: data.order_no || data.orderNo || '', + userId: BigInt(data.user_id || data.userId || 0), + accountSequence: data.account_sequence || data.accountSequence, + portionCount: data.portion_count || data.portionCount || 1, + pricePerPortion: data.price_per_portion || data.pricePerPortion || 3171, + totalAmount: data.total_amount || data.totalAmount || 3171, + provinceCode: data.province_code || data.provinceCode || '', + cityCode: data.city_code || data.cityCode || '', + status: data.status || 'CREATED', + mergedToMergeId: data.merged_to_merge_id ? BigInt(data.merged_to_merge_id) : null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + createdAt: new Date(data.created_at || data.createdAt || new Date()), + contributionPerPortion, + sourceTopic: event.topic, + sourceOffset: event.offset, + }, + update: { + orderNo: data.order_no || data.orderNo || '', + status: data.status || 'CREATED', + mergedToMergeId: data.merged_to_merge_id ? BigInt(data.merged_to_merge_id) : null, + paidAt: data.paid_at ? new Date(data.paid_at) : null, + contributionPerPortion, + sourceTopic: event.topic, + sourceOffset: event.offset, + syncedAt: new Date(), + }, + }); + } catch (error) { + this.logger.error(`[PRE-PLANTING-ORDER] Failed to sync tracking table`, error); + // 追踪表同步失败不影响主流程(最终一致性) + } + } + + /** + * 在 synced_adoptions 中插入 marker 记录(每用户仅一条) + * + * 目的:让现有的 getDirectReferralAdoptedCount 能正确计入预种用户 + * + * marker 特征: + * - originalAdoptionId = 10,000,000,000 + prePlantingOrderId(避免与正常认种冲突) + * - treeCount = 0(不产生算力,算力由 PrePlantingContributionService 处理) + * - status = 'MINING_ENABLED'(让 getDirectReferralAdoptedCount 计入此用户) + * - contributionDistributed = true(防止现有调度器尝试处理) + * - contributionPerTree = 0(额外安全保障) + */ + private async ensureAdoptionMarker( + accountSequence: string, + orderId: bigint, + paidAt: string | null, + tx: Prisma.TransactionClient, + ): Promise { + // 检查是否已有 marker(通过检查 accountSequence 在大 ID 范围) + const existingMarker = await tx.syncedAdoption.findFirst({ + where: { + accountSequence, + originalAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET }, + }, + }); + + if (existingMarker) { + this.logger.debug( + `[PRE-PLANTING-ORDER] Marker already exists for ${accountSequence}, skipping`, + ); + return; + } + + // 同时检查是否已有正常认种记录(已经是正式认种用户) + const existingAdoption = await tx.syncedAdoption.findFirst({ + where: { + accountSequence, + status: 'MINING_ENABLED', + originalAdoptionId: { lt: PRE_PLANTING_SOURCE_ID_OFFSET }, + }, + }); + + if (existingAdoption) { + this.logger.debug( + `[PRE-PLANTING-ORDER] User ${accountSequence} already has regular adoption, no marker needed`, + ); + return; + } + + const markerAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + orderId; + const adoptionDate = paidAt ? new Date(paidAt) : new Date(); + + await tx.syncedAdoption.create({ + data: { + originalAdoptionId: markerAdoptionId, + accountSequence, + treeCount: 0, + adoptionDate, + status: 'MINING_ENABLED', + contributionPerTree: 0, + contributionDistributed: true, + contributionDistributedAt: new Date(), + distributionSummary: 'PRE_PLANTING_MARKER', + sourceSequenceNum: BigInt(0), + syncedAt: new Date(), + }, + }); + + this.logger.log( + `[PRE-PLANTING-ORDER] Inserted adoption marker for ${accountSequence}: id=${markerAdoptionId}`, + ); + } + + /** + * 检查预种订单是否已经分配过算力 + */ + private async isAlreadyDistributed(orderId: bigint): Promise { + try { + const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({ + where: { originalOrderId: orderId }, + select: { contributionDistributed: true }, + }); + return order?.contributionDistributed ?? false; + } catch { + return false; + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-position-synced.handler.ts b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-position-synced.handler.ts new file mode 100644 index 00000000..05548096 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/handlers/pre-planting-position-synced.handler.ts @@ -0,0 +1,116 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import { PrePlantingCdcEvent } from '../../infrastructure/kafka/pre-planting-cdc-consumer.service'; +import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service'; + +/** + * 预种持仓 CDC 事件处理器 + * + * [2026-02-17] 新增:处理 cdc.pre-planting.public.pre_planting_positions 的 CDC 事件 + * + * === 职责 === + * 同步预种持仓数据到 pre_planting_synced_positions 和 pre_planting_freeze_states 表。 + * 冻结调度器 (PrePlantingFreezeScheduler) 基于这些数据判断是否需要冻结/解冻。 + * + * === 对现有系统的影响 === + * 零影响。本 handler 只写入预种独立表,不触碰任何现有表。 + */ +@Injectable() +export class PrePlantingPositionSyncedHandler { + private readonly logger = new Logger(PrePlantingPositionSyncedHandler.name); + + constructor( + private readonly prePlantingPrisma: PrePlantingPrismaService, + ) {} + + /** + * 处理预种持仓 CDC 事件 + * + * 注意:此 handler 在主 PrismaService 事务内执行(用于幂等性), + * 但持仓同步写入的是 PrePlantingPrismaService 的表(事务外,最终一致性)。 + */ + async handle( + event: PrePlantingCdcEvent, + _tx: Prisma.TransactionClient, + ): Promise { + const { op, before, after } = event.payload; + const data = after || before; + + if (!data) { + this.logger.warn(`[PRE-PLANTING-POSITION] Empty data, op=${op}`); + return; + } + + if (op === 'd') { + this.logger.warn(`[PRE-PLANTING-POSITION] Delete event: userId=${data.user_id}`); + return; + } + + const userId = BigInt(data.user_id || data.userId || 0); + const accountSequence = data.account_sequence || data.accountSequence; + + if (!accountSequence) { + this.logger.warn(`[PRE-PLANTING-POSITION] Missing accountSequence for userId=${userId}`); + return; + } + + const totalPortions = data.total_portions || data.totalPortions || 0; + const mergedPortions = data.merged_portions || data.mergedPortions || 0; + const totalTreesMerged = data.total_trees_merged || data.totalTreesMerged || 0; + const firstPurchaseAt = data.first_purchase_at || data.firstPurchaseAt; + + this.logger.log( + `[PRE-PLANTING-POSITION] Sync: account=${accountSequence}, ` + + `portions=${totalPortions}, merged=${totalTreesMerged}`, + ); + + // 同步到 PrePlantingSyncedPosition + try { + await this.prePlantingPrisma.prePlantingSyncedPosition.upsert({ + where: { userId }, + create: { + userId, + accountSequence, + totalPortions, + mergedPortions, + totalTreesMerged, + firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null, + sourceTopic: event.topic, + sourceOffset: event.offset, + }, + update: { + accountSequence, + totalPortions, + mergedPortions, + totalTreesMerged, + firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null, + sourceTopic: event.topic, + sourceOffset: event.offset, + syncedAt: new Date(), + }, + }); + } catch (error) { + this.logger.error(`[PRE-PLANTING-POSITION] Failed to sync position`, error); + } + + // 同步到 PrePlantingFreezeState(用于冻结调度器) + try { + await this.prePlantingPrisma.prePlantingFreezeState.upsert({ + where: { accountSequence }, + create: { + accountSequence, + totalPortions, + totalTreesMerged, + firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null, + }, + update: { + totalPortions, + totalTreesMerged, + firstPurchaseAt: firstPurchaseAt ? new Date(firstPurchaseAt) : null, + }, + }); + } catch (error) { + this.logger.error(`[PRE-PLANTING-POSITION] Failed to sync freeze state`, error); + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-freeze.scheduler.ts b/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-freeze.scheduler.ts new file mode 100644 index 00000000..25f31ef1 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-freeze.scheduler.ts @@ -0,0 +1,226 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { RedisService } from '@/infrastructure/redis/redis.service'; +import { + PRE_PLANTING_SOURCE_ID_OFFSET, + PRE_PLANTING_FREEZE_PERIOD_YEARS, + PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS, +} from '../../domain/constants'; + +/** + * 预种算力冻结调度器 + * + * [2026-02-17] 新增:每日检查预种用户的冻结/解冻条件 + * + * === 冻结规则 === + * - firstPurchaseAt + 1 年后,仍未满 5 份(totalPortions < 5)→ 冻结所有预种算力 + * - 冻结方式:将预种算力记录标记为 expired(is_expired=true) + * + * === 解冻规则 === + * - 冻结后累积满 5 份(totalPortions >= 5)→ 解冻 + * - 解冻后的失效期 = 解冻日 + 2 年 + * + * === 正常到期(未被冻结过)=== + * - 首次产生挖矿收益日 + 2 年后失效(由现有 processExpiredRecords 调度器处理) + * + * === 对现有系统的影响 === + * - 零修改现有代码。冻结/解冻直接使用 PrismaService 查询 contribution_records 表。 + * - 只操作 sourceAdoptionId >= 10,000,000,000 的预种记录。 + * - 不调用现有 ContributionRecordRepository 的任何新方法,避免修改现有文件。 + */ +@Injectable() +export class PrePlantingFreezeScheduler { + private readonly logger = new Logger(PrePlantingFreezeScheduler.name); + private readonly LOCK_KEY = 'pre-planting:freeze:lock'; + + constructor( + private readonly prePlantingPrisma: PrePlantingPrismaService, + private readonly prisma: PrismaService, // 主 PrismaService,直接查询 contribution_records + private readonly redis: RedisService, + ) {} + + /** + * 每日凌晨 5 点检查预种冻结条件 + * + * 选择 5 点是因为: + * - 现有调度器在 1-4 点运行(snapshot, expire, full-sync) + * - 错开时间避免资源竞争 + */ + @Cron('0 5 * * *') + async checkFreezeConditions(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}:check`, 300); + if (!lockValue) return; + + try { + const now = new Date(); + let frozenCount = 0; + let unfrozenCount = 0; + + // 1. 检查需要冻结的用户 + frozenCount = await this.processFreezes(now); + + // 2. 检查需要解冻的用户 + unfrozenCount = await this.processUnfreezes(now); + + if (frozenCount > 0 || unfrozenCount > 0) { + this.logger.log( + `[PRE-PLANTING-FREEZE] Check complete: frozen=${frozenCount}, unfrozen=${unfrozenCount}`, + ); + } + } catch (error) { + this.logger.error('[PRE-PLANTING-FREEZE] Check failed', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}:check`, lockValue); + } + } + + /** + * 处理冻结:查找 firstPurchaseAt + 1 年已过且未满 5 份的用户 + */ + private async processFreezes(now: Date): Promise { + const freezeDeadline = new Date(now); + freezeDeadline.setFullYear(freezeDeadline.getFullYear() - PRE_PLANTING_FREEZE_PERIOD_YEARS); + + const candidates = await this.prePlantingPrisma.prePlantingFreezeState.findMany({ + where: { + firstPurchaseAt: { lte: freezeDeadline }, + totalPortions: { lt: 5 }, + isFrozen: false, + }, + take: 100, + }); + + let count = 0; + for (const candidate of candidates) { + try { + await this.freezeContributions(candidate.accountSequence); + + await this.prePlantingPrisma.prePlantingFreezeState.update({ + where: { accountSequence: candidate.accountSequence }, + data: { + isFrozen: true, + frozenAt: now, + }, + }); + + count++; + this.logger.log( + `[PRE-PLANTING-FREEZE] Frozen: ${candidate.accountSequence}, ` + + `portions=${candidate.totalPortions}, firstPurchase=${candidate.firstPurchaseAt?.toISOString()}`, + ); + } catch (error) { + this.logger.error( + `[PRE-PLANTING-FREEZE] Failed to freeze ${candidate.accountSequence}`, + error, + ); + } + } + + return count; + } + + /** + * 处理解冻:查找已冻结但现在满 5 份的用户 + */ + private async processUnfreezes(now: Date): Promise { + const candidates = await this.prePlantingPrisma.prePlantingFreezeState.findMany({ + where: { + isFrozen: true, + totalPortions: { gte: 5 }, + }, + take: 100, + }); + + let count = 0; + for (const candidate of candidates) { + try { + const postUnfreezeExpireDate = new Date(now); + postUnfreezeExpireDate.setFullYear( + postUnfreezeExpireDate.getFullYear() + PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS, + ); + + await this.unfreezeContributions(candidate.accountSequence, postUnfreezeExpireDate); + + await this.prePlantingPrisma.prePlantingFreezeState.update({ + where: { accountSequence: candidate.accountSequence }, + data: { + isFrozen: false, + unfrozenAt: now, + postUnfreezeExpireDate, + }, + }); + + count++; + this.logger.log( + `[PRE-PLANTING-FREEZE] Unfrozen: ${candidate.accountSequence}, ` + + `portions=${candidate.totalPortions}, newExpire=${postUnfreezeExpireDate.toISOString()}`, + ); + } catch (error) { + this.logger.error( + `[PRE-PLANTING-FREEZE] Failed to unfreeze ${candidate.accountSequence}`, + error, + ); + } + } + + return count; + } + + /** + * 冻结用户的预种算力记录 + * + * 直接使用 PrismaService 操作 contribution_records 表, + * 通过 sourceAdoptionId >= 10B 精确定位预种记录,不影响正常认种记录。 + */ + private async freezeContributions(accountSequence: string): Promise { + const result = await this.prisma.contributionRecord.updateMany({ + where: { + accountSequence, + sourceAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET }, + isExpired: false, + }, + data: { + isExpired: true, + expiredAt: new Date(), + }, + }); + + if (result.count > 0) { + this.logger.log( + `[PRE-PLANTING-FREEZE] Froze ${result.count} records for ${accountSequence}`, + ); + } + } + + /** + * 解冻用户的预种算力记录 + * + * 恢复 expired 标记并更新过期日期为 解冻日 + 2 年 + */ + private async unfreezeContributions( + accountSequence: string, + newExpireDate: Date, + ): Promise { + const result = await this.prisma.contributionRecord.updateMany({ + where: { + accountSequence, + sourceAdoptionId: { gte: PRE_PLANTING_SOURCE_ID_OFFSET }, + isExpired: true, + }, + data: { + isExpired: false, + expiredAt: null, + expireDate: newExpireDate, + }, + }); + + if (result.count > 0) { + this.logger.log( + `[PRE-PLANTING-FREEZE] Unfroze ${result.count} records for ${accountSequence}, ` + + `newExpire=${newExpireDate.toISOString()}`, + ); + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts new file mode 100644 index 00000000..228adc6c --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/services/pre-planting-contribution.service.ts @@ -0,0 +1,567 @@ +import { Injectable, Logger } from '@nestjs/common'; +import Decimal from 'decimal.js'; +import { ContributionCalculatorService, ContributionDistributionResult } from '@/domain/services/contribution-calculator.service'; +import { ContributionAccountRepository } from '@/infrastructure/persistence/repositories/contribution-account.repository'; +import { ContributionRecordRepository } from '@/infrastructure/persistence/repositories/contribution-record.repository'; +import { SyncedDataRepository } from '@/infrastructure/persistence/repositories/synced-data.repository'; +import { UnallocatedContributionRepository } from '@/infrastructure/persistence/repositories/unallocated-contribution.repository'; +import { SystemAccountRepository } from '@/infrastructure/persistence/repositories/system-account.repository'; +import { OutboxRepository } from '@/infrastructure/persistence/repositories/outbox.repository'; +import { UnitOfWork } from '@/infrastructure/persistence/unit-of-work/unit-of-work'; +import { ContributionAccountAggregate, ContributionSourceType } from '@/domain/aggregates/contribution-account.aggregate'; +import { ContributionRecordAggregate } from '@/domain/aggregates/contribution-record.aggregate'; +import { ContributionAmount } from '@/domain/value-objects/contribution-amount.vo'; +import { SyncedAdoption, SyncedReferral } from '@/domain/repositories/synced-data.repository.interface'; +import { ContributionRateService } from '@/application/services/contribution-rate.service'; +import { ContributionDistributionPublisherService } from '@/application/services/contribution-distribution-publisher.service'; +import { BonusClaimService } from '@/application/services/bonus-claim.service'; +import { ContributionRecordSyncedEvent, ContributionAccountUpdatedEvent, SystemAccountSyncedEvent, SystemContributionRecordCreatedEvent, UnallocatedContributionSyncedEvent } from '@/domain/events'; +import { PrePlantingPrismaService } from '../../infrastructure/prisma/pre-planting-prisma.service'; +import { PRE_PLANTING_SOURCE_ID_OFFSET, PRE_PLANTING_PORTION_DIVISOR } from '../../domain/constants'; + +/** + * 预种算力计算服务 + * + * [2026-02-17] 新增:复用现有领域计算器,以 1/5 贡献值处理预种份额 + * + * === 核心设计 === + * - 构建虚拟 SyncedAdoption 对象(treeCount=portionCount, contributionPerTree=rate/5) + * - 调用现有 ContributionCalculatorService(领域服务)计算分配 + * - 结果写入现有 contribution_accounts/contribution_records 表(挖矿系统可见) + * - 使用 PRE_PLANTING_SOURCE_ID_OFFSET (10B) 偏移,避免与正常认种 ID 冲突 + * + * === 与现有 ContributionCalculationService 的区别 === + * - 不更新 NetworkAdoptionProgress(预种不推高全网算力系数) + * - sourceAdoptionId 使用 10B 偏移 + * - 追踪表使用独立的 PrePlantingPrismaService + * - unlock 逻辑通过 synced_adoptions marker 实现(不直接修改现有方法) + */ +@Injectable() +export class PrePlantingContributionService { + private readonly logger = new Logger(PrePlantingContributionService.name); + private readonly domainCalculator = new ContributionCalculatorService(); + + constructor( + private readonly contributionAccountRepository: ContributionAccountRepository, + private readonly contributionRecordRepository: ContributionRecordRepository, + private readonly syncedDataRepository: SyncedDataRepository, + private readonly unallocatedContributionRepository: UnallocatedContributionRepository, + private readonly systemAccountRepository: SystemAccountRepository, + private readonly outboxRepository: OutboxRepository, + private readonly unitOfWork: UnitOfWork, + private readonly distributionPublisher: ContributionDistributionPublisherService, + private readonly contributionRateService: ContributionRateService, + private readonly bonusClaimService: BonusClaimService, + private readonly prePlantingPrisma: PrePlantingPrismaService, + ) {} + + /** + * 为预种订单计算并分配算力 + * + * @param originalOrderId 预种订单原始 ID(planting-service 中的 pre_planting_orders.id) + */ + async calculateForPrePlantingOrder(originalOrderId: bigint): Promise { + // 生成偏移后的 sourceAdoptionId + const sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + originalOrderId; + + // 检查是否已经处理过(使用偏移后的 ID) + const exists = await this.contributionRecordRepository.existsBySourceAdoptionId(sourceAdoptionId); + if (exists) { + this.logger.debug(`Pre-planting order ${originalOrderId} already processed, skipping`); + return; + } + + // 从预种追踪表获取订单数据 + const order = await this.prePlantingPrisma.prePlantingSyncedOrder.findUnique({ + where: { originalOrderId }, + }); + + if (!order) { + throw new Error(`Pre-planting order not found: ${originalOrderId}`); + } + + if (order.status !== 'PAID') { + this.logger.debug(`Pre-planting order ${originalOrderId} status=${order.status}, skipping`); + return; + } + + // 获取当日每棵树贡献值(预种份额 = 1/5) + const adoptionDate = order.paidAt || order.createdAt; + let contributionPerTree = new Decimal('22617'); + try { + contributionPerTree = await this.contributionRateService.getContributionPerTree(adoptionDate); + } catch (error) { + this.logger.warn(`Failed to get contribution rate, using default`, error); + } + const contributionPerPortion = contributionPerTree.div(PRE_PLANTING_PORTION_DIVISOR); + + // 构建虚拟 SyncedAdoption 对象 + const virtualAdoption: SyncedAdoption = { + id: BigInt(0), + originalAdoptionId: sourceAdoptionId, + accountSequence: order.accountSequence, + treeCount: Number(order.portionCount), // 每份 = 1 个虚拟 "树" + adoptionDate, + status: 'MINING_ENABLED', + selectedProvince: order.provinceCode, + selectedCity: order.cityCode, + contributionPerTree: contributionPerPortion, // 1/5 贡献值 + sourceSequenceNum: BigInt(0), + syncedAt: new Date(), + contributionDistributed: false, + contributionDistributedAt: null, + createdAt: order.createdAt, + }; + + // 获取推荐关系链 + const userReferral = await this.syncedDataRepository.findSyncedReferralByAccountSequence( + order.accountSequence, + ); + + if (!userReferral) { + this.logger.warn( + `[PRE-PLANTING] Deferring order ${originalOrderId}: ` + + `referral for ${order.accountSequence} not yet synced`, + ); + return; + } + + // 获取上线链条(最多15级) + let ancestorChain: SyncedReferral[] = []; + if (userReferral.referrerAccountSequence) { + ancestorChain = await this.syncedDataRepository.findAncestorChain( + userReferral.referrerAccountSequence, + 15, + ); + } + + // 获取算力账户 + const adopterAccount = await this.contributionAccountRepository.findByAccountSequence( + order.accountSequence, + ); + const ancestorAccountSequences = ancestorChain.map((a) => a.accountSequence); + const ancestorAccounts = await this.contributionAccountRepository.findByAccountSequences( + ancestorAccountSequences, + ); + + // 使用领域计算器计算分配 + const result = this.domainCalculator.calculateAdoptionContribution( + virtualAdoption, + adopterAccount, + ancestorChain, + ancestorAccounts, + ); + + // 在事务中保存分配结果 + await this.unitOfWork.executeInTransaction(async () => { + await this.saveDistributionResult(result, sourceAdoptionId, order.accountSequence); + + // 更新认种人解锁状态 + await this.updateAdopterUnlockStatus(order.accountSequence); + + // 更新直接上线解锁状态 + if (userReferral.referrerAccountSequence) { + await this.updateReferrerUnlockStatus(userReferral.referrerAccountSequence); + } + + // 发布分配结果到 Kafka + await this.distributionPublisher.publishDistributionResult( + virtualAdoption, + result, + order.provinceCode || 'DEFAULT', + order.cityCode || 'DEFAULT', + ); + }); + + // 标记预种追踪表为已分配 + try { + await this.prePlantingPrisma.prePlantingSyncedOrder.update({ + where: { originalOrderId }, + data: { + contributionDistributed: true, + contributionDistributedAt: new Date(), + }, + }); + } catch (error) { + this.logger.error(`Failed to mark pre-planting order as distributed`, error); + } + + this.logger.log( + `Pre-planting contribution calculated: orderId=${originalOrderId}, ` + + `sourceId=${sourceAdoptionId}, personal=${result.personalRecord.amount.value}, ` + + `teamLevel=${result.teamLevelRecords.length}, teamBonus=${result.teamBonusRecords.length}`, + ); + + // 注意:不调用 contributionRateService.updateNetworkProgress() + // 预种份额不推高全网算力系数 + } + + /** + * 批量处理未分配的预种订单(由调度器调用) + */ + async processUndistributedOrders(batchSize: number = 50): Promise { + const orders = await this.prePlantingPrisma.prePlantingSyncedOrder.findMany({ + where: { + status: 'PAID', + contributionDistributed: false, + }, + take: batchSize, + orderBy: { originalOrderId: 'asc' }, + }); + + let count = 0; + for (const order of orders) { + try { + await this.calculateForPrePlantingOrder(order.originalOrderId); + count++; + } catch (error) { + this.logger.error( + `Failed to process pre-planting order ${order.originalOrderId}`, + error, + ); + } + } + return count; + } + + /** + * 保存分配结果(复制自 ContributionCalculationService.saveDistributionResult) + * + * 不修改原方法(private 且参数不同),在此完整复制保存逻辑。 + * 与原版唯一的区别是日志前缀带 [PRE-PLANTING] 标识。 + */ + private async saveDistributionResult( + result: ContributionDistributionResult, + sourceAdoptionId: bigint, + sourceAccountSequence: string, + ): Promise { + const savedRecords: ContributionRecordAggregate[] = []; + const updatedAccountSequences = new Set(); + + // 1. 个人算力 + const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord); + savedRecords.push(savedPersonalRecord); + + let account = await this.contributionAccountRepository.findByAccountSequence( + result.personalRecord.accountSequence, + ); + if (!account) { + account = ContributionAccountAggregate.create(result.personalRecord.accountSequence); + } + account.addPersonalContribution(result.personalRecord.amount); + await this.contributionAccountRepository.save(account); + updatedAccountSequences.add(result.personalRecord.accountSequence); + + // 2. 团队层级算力 + if (result.teamLevelRecords.length > 0) { + const savedLevelRecords = await this.contributionRecordRepository.saveMany(result.teamLevelRecords); + savedRecords.push(...savedLevelRecords); + + for (const record of result.teamLevelRecords) { + await this.contributionAccountRepository.updateContribution( + record.accountSequence, + ContributionSourceType.TEAM_LEVEL, + record.amount, + record.levelDepth, + null, + ); + updatedAccountSequences.add(record.accountSequence); + } + } + + // 3. 团队加成算力 + if (result.teamBonusRecords.length > 0) { + const savedBonusRecords = await this.contributionRecordRepository.saveMany(result.teamBonusRecords); + savedRecords.push(...savedBonusRecords); + + for (const record of result.teamBonusRecords) { + await this.contributionAccountRepository.updateContribution( + record.accountSequence, + ContributionSourceType.TEAM_BONUS, + record.amount, + null, + record.bonusTier, + ); + updatedAccountSequences.add(record.accountSequence); + } + } + + const effectiveDate = result.personalRecord.effectiveDate; + const expireDate = result.personalRecord.expireDate; + + // 4. 未分配算力 + if (result.unallocatedContributions.length > 0) { + await this.unallocatedContributionRepository.saveMany( + result.unallocatedContributions.map((u) => ({ + ...u, + sourceAdoptionId, + sourceAccountSequence, + effectiveDate, + expireDate, + })), + ); + + const totalUnallocatedAmount = result.unallocatedContributions.reduce( + (sum, u) => sum.add(u.amount), + new ContributionAmount(0), + ); + await this.systemAccountRepository.addContribution('HEADQUARTERS', null, totalUnallocatedAmount); + + for (const unallocated of result.unallocatedContributions) { + const sourceType = unallocated.type as string; + const levelDepth = unallocated.levelDepth; + + const savedRecord = await this.systemAccountRepository.saveContributionRecord({ + accountType: 'HEADQUARTERS', + regionCode: null, + sourceAdoptionId, + sourceAccountSequence, + sourceType, + levelDepth, + distributionRate: 0, + amount: unallocated.amount, + effectiveDate, + expireDate: null, + }); + + const recordEvent = new SystemContributionRecordCreatedEvent( + savedRecord.id, + 'HEADQUARTERS', + null, + sourceAdoptionId, + sourceAccountSequence, + sourceType as any, + levelDepth, + 0, + unallocated.amount.value.toString(), + effectiveDate, + null, + savedRecord.createdAt, + ); + await this.outboxRepository.save({ + aggregateType: SystemContributionRecordCreatedEvent.AGGREGATE_TYPE, + aggregateId: savedRecord.id.toString(), + eventType: SystemContributionRecordCreatedEvent.EVENT_TYPE, + payload: recordEvent.toPayload(), + }); + } + + const headquartersAccount = await this.systemAccountRepository.findByTypeAndRegion('HEADQUARTERS', null); + if (headquartersAccount) { + const hqEvent = new SystemAccountSyncedEvent( + 'HEADQUARTERS', + null, + headquartersAccount.name, + headquartersAccount.contributionBalance.value.toString(), + headquartersAccount.createdAt, + ); + await this.outboxRepository.save({ + aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE, + aggregateId: 'HEADQUARTERS', + eventType: SystemAccountSyncedEvent.EVENT_TYPE, + payload: hqEvent.toPayload(), + }); + } + + for (const unallocated of result.unallocatedContributions) { + const event = new UnallocatedContributionSyncedEvent( + sourceAdoptionId, + sourceAccountSequence, + unallocated.wouldBeAccountSequence, + unallocated.type, + unallocated.amount.value.toString(), + unallocated.reason, + effectiveDate, + expireDate, + ); + await this.outboxRepository.save({ + aggregateType: UnallocatedContributionSyncedEvent.AGGREGATE_TYPE, + aggregateId: `${sourceAdoptionId}-${unallocated.type}`, + eventType: UnallocatedContributionSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } + } + + // 5. 系统账户算力 + if (result.systemContributions.length > 0) { + await this.systemAccountRepository.ensureSystemAccountsExist(); + + for (const sys of result.systemContributions) { + await this.systemAccountRepository.addContribution(sys.accountType, sys.regionCode, sys.amount); + + const savedRecord = await this.systemAccountRepository.saveContributionRecord({ + accountType: sys.accountType, + regionCode: sys.regionCode, + sourceAdoptionId, + sourceAccountSequence, + sourceType: 'FIXED_RATE', + levelDepth: null, + distributionRate: sys.rate.value.toNumber(), + amount: sys.amount, + effectiveDate, + expireDate: null, + }); + + const systemAccount = await this.systemAccountRepository.findByTypeAndRegion(sys.accountType, sys.regionCode); + if (systemAccount) { + const event = new SystemAccountSyncedEvent( + sys.accountType, + sys.regionCode, + systemAccount.name, + systemAccount.contributionBalance.value.toString(), + systemAccount.createdAt, + ); + await this.outboxRepository.save({ + aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE, + aggregateId: `${sys.accountType}:${sys.regionCode || 'null'}`, + eventType: SystemAccountSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + + const recordEvent = new SystemContributionRecordCreatedEvent( + savedRecord.id, + sys.accountType, + sys.regionCode, + sourceAdoptionId, + sourceAccountSequence, + 'FIXED_RATE', + null, + sys.rate.value.toNumber(), + sys.amount.value.toString(), + effectiveDate, + null, + savedRecord.createdAt, + ); + await this.outboxRepository.save({ + aggregateType: SystemContributionRecordCreatedEvent.AGGREGATE_TYPE, + aggregateId: savedRecord.id.toString(), + eventType: SystemContributionRecordCreatedEvent.EVENT_TYPE, + payload: recordEvent.toPayload(), + }); + } + } + } + + // 6. 发布算力记录事件 + await this.publishContributionRecordEvents(savedRecords); + + // 7. 发布账户更新事件 + await this.publishUpdatedAccountEvents(updatedAccountSequences); + } + + private async updateAdopterUnlockStatus(accountSequence: string): Promise { + const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + if (!account) return; + + if (!account.hasAdopted) { + account.markAsAdopted(); + await this.contributionAccountRepository.save(account); + await this.publishContributionAccountUpdatedEvent(account); + } + } + + private async updateReferrerUnlockStatus(referrerAccountSequence: string): Promise { + const account = await this.contributionAccountRepository.findByAccountSequence(referrerAccountSequence); + if (!account) return; + + // 重新计算直推已认种人数 + // 由于 synced_adoptions 中已有预种 marker 记录,此方法会自然计入预种用户 + const directReferralAdoptedCount = await this.syncedDataRepository.getDirectReferralAdoptedCount( + referrerAccountSequence, + ); + + const previousCount = account.directReferralAdoptedCount; + if (directReferralAdoptedCount > previousCount) { + for (let i = previousCount; i < directReferralAdoptedCount; i++) { + account.incrementDirectReferralAdoptedCount(); + } + await this.contributionAccountRepository.save(account); + await this.publishContributionAccountUpdatedEvent(account); + + this.logger.debug( + `[PRE-PLANTING] Updated referrer ${referrerAccountSequence}: ` + + `level=${account.unlockedLevelDepth}, bonus=${account.unlockedBonusTiers}`, + ); + + await this.bonusClaimService.checkAndClaimBonus( + referrerAccountSequence, + previousCount, + directReferralAdoptedCount, + ); + } + } + + private async publishContributionRecordEvents( + savedRecords: ContributionRecordAggregate[], + ): Promise { + if (savedRecords.length === 0) return; + + const events = savedRecords.map((record) => { + const event = new ContributionRecordSyncedEvent( + record.id!, + record.accountSequence, + record.sourceType, + record.sourceAdoptionId, + record.sourceAccountSequence, + record.treeCount, + record.baseContribution.value.toString(), + record.distributionRate.value.toString(), + record.levelDepth, + record.bonusTier, + record.amount.value.toString(), + record.effectiveDate, + record.expireDate, + record.isExpired, + record.createdAt, + ); + + return { + aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE, + aggregateId: record.id!.toString(), + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + } + + private async publishUpdatedAccountEvents(accountSequences: Set): Promise { + for (const accountSequence of accountSequences) { + const account = await this.contributionAccountRepository.findByAccountSequence(accountSequence); + if (account) { + await this.publishContributionAccountUpdatedEvent(account); + } + } + } + + private async publishContributionAccountUpdatedEvent( + account: ContributionAccountAggregate, + ): Promise { + const totalContribution = account.personalContribution.value + .plus(account.totalLevelPending.value) + .plus(account.totalBonusPending.value); + + const event = new ContributionAccountUpdatedEvent( + account.accountSequence, + account.personalContribution.value.toString(), + account.totalLevelPending.value.toString(), + account.totalBonusPending.value.toString(), + totalContribution.toString(), + account.effectiveContribution.value.toString(), + account.hasAdopted, + account.directReferralAdoptedCount, + account.unlockedLevelDepth, + account.unlockedBonusTiers, + account.createdAt, + ); + + await this.outboxRepository.save({ + aggregateType: ContributionAccountUpdatedEvent.AGGREGATE_TYPE, + aggregateId: account.accountSequence, + eventType: ContributionAccountUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + } +} diff --git a/backend/services/contribution-service/src/pre-planting/domain/constants.ts b/backend/services/contribution-service/src/pre-planting/domain/constants.ts new file mode 100644 index 00000000..48751467 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/domain/constants.ts @@ -0,0 +1,52 @@ +/** + * 预种计划算力常量 + * + * [2026-02-17] 新增:预种 CDC 集成到 contribution-service 2.0 + */ + +/** + * 预种订单 sourceAdoptionId 偏移量 + * + * 用于在 contribution_records 中区分预种订单和正常认种订单。 + * 预种订单的 sourceAdoptionId = PRE_PLANTING_SOURCE_ID_OFFSET + prePlantingOrderId + * + * 选择 10,000,000,000 (100 亿) 作为偏移量,确保永远不会与正常认种订单 ID 冲突。 + */ +export const PRE_PLANTING_SOURCE_ID_OFFSET = 10_000_000_000n; + +/** + * 预种每份的贡献值因子 + * + * 每份预种 = 1棵树贡献值的 1/5 + * 用法:contributionPerPortion = contributionPerTree / PRE_PLANTING_PORTION_DIVISOR + */ +export const PRE_PLANTING_PORTION_DIVISOR = 5; + +/** + * 预种冻结期限(从首次购买日起算,单位:年) + * + * 首次购买 + 1 年后仍未满 5 份 → 所有预种算力冻结 + */ +export const PRE_PLANTING_FREEZE_PERIOD_YEARS = 1; + +/** + * 解冻后的算力有效期(从解冻日起算,单位:年) + * + * 冻结后满 5 份解冻 → 算力恢复,从解冻日起算 2 年后失效 + */ +export const PRE_PLANTING_POST_UNFREEZE_VALIDITY_YEARS = 2; + +/** + * 预种 CDC topic 前缀 + * + * Debezium connector: cdc.pre-planting + * Topic 格式: cdc.pre-planting.public. + */ +export const PRE_PLANTING_CDC_TOPIC_PREFIX = 'cdc.pre-planting'; + +/** + * 预种 CDC Kafka consumer group ID + * + * 与现有 contribution-service-cdc-group 完全独立 + */ +export const PRE_PLANTING_CDC_GROUP_ID = 'contribution-pre-planting-cdc'; diff --git a/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts b/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts new file mode 100644 index 00000000..a89bc738 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/infrastructure/kafka/pre-planting-cdc-consumer.service.ts @@ -0,0 +1,269 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { Prisma } from '@prisma/client'; +import { PRE_PLANTING_CDC_GROUP_ID } from '../../domain/constants'; + +/** + * 预种 CDC 事件(Debezium ExtractNewRecordState 扁平格式) + */ +export interface PrePlantingCdcEvent { + payload: { + op: 'c' | 'u' | 'd' | 'r'; + before: any | null; + after: any | null; + table: string; + source_ts_ms: number; + deleted: boolean; + }; + topic: string; + offset: bigint; +} + +/** + * 事务性 handler(带后置回调支持) + */ +export type PrePlantingCdcHandler = ( + event: PrePlantingCdcEvent, + tx: Prisma.TransactionClient, +) => Promise; + +export type PostCommitCallback = (result: T) => Promise; + +interface RegisteredHandler { + tableName: string; + handler: (event: PrePlantingCdcEvent) => Promise; +} + +/** + * 预种 CDC 消费者服务 + * + * [2026-02-17] 新增:完全独立于现有 CDCConsumerService + * + * === 隔离保证 === + * - 独立 Kafka consumer group:contribution-pre-planting-cdc + * - 独立 CDC topics:cdc.pre-planting.public.* + * - 独立 Debezium connector/replication slot/publication + * - 幂等性:使用主 PrismaService 的 ProcessedCdcEvent 表(同 DB 事务保证一致性) + * + * === 与现有 CDCConsumerService 的区别 === + * - 无三阶段顺序消费(预种订单不依赖 user/referral 同步顺序,它们已由主 CDC 同步完成) + * - 无收集-排序-处理(预种订单间无严格顺序依赖) + * - 更简单的消费模式:直接消费 + 幂等处理 + */ +@Injectable() +export class PrePlantingCdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PrePlantingCdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private handlers: Map Promise> = new Map(); + private isRunning = false; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, // 主 PrismaService,用于幂等性 + 算力写入 + ) { + const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); + + this.kafka = new Kafka({ + clientId: 'contribution-service-pre-planting-cdc', + brokers, + }); + + this.consumer = this.kafka.consumer({ + groupId: PRE_PLANTING_CDC_GROUP_ID, + }); + } + + async onModuleInit() { + // 等待 handler 注册后再启动(由 PrePlantingCdcDispatcher 调用 start()) + } + + async onModuleDestroy() { + await this.stop(); + } + + /** + * 注册事务性 handler(带幂等保证 + 后置回调) + * + * 幂等性通过主 PrismaService 的 ProcessedCdcEvent 表实现, + * 与业务逻辑在同一个 Serializable 事务中执行。 + */ + registerHandler( + tableName: string, + handler: PrePlantingCdcHandler, + postCommitCallback?: PostCommitCallback, + ): void { + const wrappedHandler = async (event: PrePlantingCdcEvent) => { + const idempotencyKey = `${event.topic}:${event.offset}`; + let result: T | null = null; + let shouldCallback = false; + + try { + await this.prisma.$transaction(async (tx) => { + // 1. 幂等检查:插入 ProcessedCdcEvent(唯一约束防重复) + try { + await tx.processedCdcEvent.create({ + data: { + sourceTopic: event.topic, + offset: event.offset, + tableName: event.payload.table, + operation: event.payload.op, + }, + }); + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[PRE-PLANTING-CDC] Skip duplicate: ${idempotencyKey}`); + return; + } + throw error; + } + + // 2. 执行业务逻辑 + result = await handler(event, tx); + shouldCallback = true; + + this.logger.debug(`[PRE-PLANTING-CDC] Processed: ${idempotencyKey}`); + }, { + isolationLevel: Prisma.TransactionIsolationLevel.Serializable, + timeout: 60000, + }); + + // 3. 事务提交后执行后置回调 + if (shouldCallback && postCommitCallback && result !== null) { + try { + await postCommitCallback(result); + } catch (callbackError) { + this.logger.error( + `[PRE-PLANTING-CDC] Post-commit callback failed: ${idempotencyKey}`, + callbackError, + ); + } + } + } catch (error: any) { + if (error.code === 'P2002') { + this.logger.debug(`[PRE-PLANTING-CDC] Skip duplicate (concurrent): ${idempotencyKey}`); + return; + } + this.logger.error(`[PRE-PLANTING-CDC] Failed: ${idempotencyKey}`, error); + throw error; + } + }; + + this.handlers.set(tableName, wrappedHandler); + this.logger.log(`[PRE-PLANTING-CDC] Registered handler for table: ${tableName}`); + } + + /** + * 启动消费者 + */ + async start(): Promise { + if (this.isRunning) { + this.logger.warn('[PRE-PLANTING-CDC] Consumer already running'); + return; + } + + try { + await this.consumer.connect(); + + // 订阅预种 CDC topics + const topicOrders = this.configService.get( + 'PRE_PLANTING_CDC_TOPIC_ORDERS', + 'cdc.pre-planting.public.pre_planting_orders', + ); + const topicPositions = this.configService.get( + 'PRE_PLANTING_CDC_TOPIC_POSITIONS', + 'cdc.pre-planting.public.pre_planting_positions', + ); + + await this.consumer.subscribe({ + topics: [topicOrders, topicPositions], + fromBeginning: true, + }); + + this.isRunning = true; + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.logger.log( + `[PRE-PLANTING-CDC] Consumer started, topics: [${topicOrders}, ${topicPositions}]`, + ); + } catch (error) { + this.logger.error('[PRE-PLANTING-CDC] Failed to start consumer', error); + // 不抛出错误,允许服务在没有 Kafka/预种 CDC 的情况下启动 + } + } + + async stop(): Promise { + if (!this.isRunning) return; + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('[PRE-PLANTING-CDC] Consumer stopped'); + } catch (error) { + this.logger.error('[PRE-PLANTING-CDC] Failed to stop consumer', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + try { + if (!message.value) { + this.logger.warn(`[PRE-PLANTING-CDC] Empty message: topic=${topic}, offset=${message.offset}`); + return; + } + + const rawData = JSON.parse(message.value.toString()); + + // Debezium ExtractNewRecordState 扁平格式 + const op = rawData.__op || rawData.op; + const table = rawData.__table; + const sourceTsMs = rawData.__source_ts_ms || 0; + const deleted = rawData.__deleted === 'true' || rawData.__deleted === true; + + const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData; + + const event: PrePlantingCdcEvent = { + payload: { + op: op as 'c' | 'u' | 'd' | 'r', + before: op === 'd' ? businessData : null, + after: op !== 'd' ? businessData : null, + table, + source_ts_ms: sourceTsMs, + deleted, + }, + topic, + offset: BigInt(message.offset), + }; + + // 从 topic 提取表名作为备选 + const parts = topic.split('.'); + const tableName = table || parts[parts.length - 1]; + + const handler = this.handlers.get(tableName); + if (handler) { + await handler(event); + this.logger.debug( + `[PRE-PLANTING-CDC] Processed: table=${tableName}, op=${op}, offset=${message.offset}`, + ); + } else { + this.logger.warn( + `[PRE-PLANTING-CDC] No handler for table: ${tableName}. ` + + `Available: ${Array.from(this.handlers.keys()).join(', ')}`, + ); + } + } catch (error) { + this.logger.error( + `[PRE-PLANTING-CDC] Error: topic=${topic}, partition=${partition}, offset=${message.offset}`, + error, + ); + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.module.ts b/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.module.ts new file mode 100644 index 00000000..30ca9e3b --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.module.ts @@ -0,0 +1,14 @@ +import { Module, Global } from '@nestjs/common'; +import { PrePlantingPrismaService } from './pre-planting-prisma.service'; + +/** + * 预种计划 Prisma 模块 + * + * 提供独立的 PrePlantingPrismaService,用于访问预种追踪表。 + * 标记为 @Global 以便在 PrePlantingCdcModule 内的所有 provider 中直接注入。 + */ +@Module({ + providers: [PrePlantingPrismaService], + exports: [PrePlantingPrismaService], +}) +export class PrePlantingPrismaModule {} diff --git a/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.service.ts b/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.service.ts new file mode 100644 index 00000000..7a95b707 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/infrastructure/prisma/pre-planting-prisma.service.ts @@ -0,0 +1,28 @@ +import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; +import { PrismaClient } from './generated'; + +/** + * 预种计划独立 Prisma 服务 + * + * 连接与主 PrismaService 相同的数据库,但使用独立的 Prisma Client, + * 仅包含预种追踪表的类型定义(pre_planting_synced_orders 等)。 + * + * 预种的算力分配结果仍通过主 PrismaService 写入 contribution_accounts 等表。 + */ +@Injectable() +export class PrePlantingPrismaService + extends PrismaClient + implements OnModuleInit, OnModuleDestroy +{ + private readonly logger = new Logger(PrePlantingPrismaService.name); + + async onModuleInit() { + await this.$connect(); + this.logger.log('Pre-planting Prisma client connected'); + } + + async onModuleDestroy() { + await this.$disconnect(); + this.logger.log('Pre-planting Prisma client disconnected'); + } +} diff --git a/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts new file mode 100644 index 00000000..eed86485 --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts @@ -0,0 +1,88 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { InfrastructureModule } from '../infrastructure/infrastructure.module'; + +// Pre-planting Prisma (独立 schema) +import { PrePlantingPrismaModule } from './infrastructure/prisma/pre-planting-prisma.module'; + +// CDC Consumer (独立 Kafka consumer group) +import { PrePlantingCdcConsumerService } from './infrastructure/kafka/pre-planting-cdc-consumer.service'; + +// CDC Event Handlers +import { PrePlantingOrderSyncedHandler } from './application/handlers/pre-planting-order-synced.handler'; +import { PrePlantingPositionSyncedHandler } from './application/handlers/pre-planting-position-synced.handler'; +import { PrePlantingCdcDispatcher } from './application/handlers/pre-planting-cdc-dispatcher'; + +// Application Services +import { PrePlantingContributionService } from './application/services/pre-planting-contribution.service'; + +// Schedulers +import { PrePlantingFreezeScheduler } from './application/schedulers/pre-planting-freeze.scheduler'; + +// 现有 Application Services(直接提供,不 import ApplicationModule 避免引入现有 CDCEventDispatcher) +// 这些服务是无状态的,仅依赖 InfrastructureModule 的 providers,重复实例化无副作用。 +import { ContributionRateService } from '../application/services/contribution-rate.service'; +import { ContributionDistributionPublisherService } from '../application/services/contribution-distribution-publisher.service'; +import { BonusClaimService } from '../application/services/bonus-claim.service'; + +/** + * 预种 CDC 集成模块 + * + * [2026-02-17] 新增:将预种数据集成到 contribution-service 2.0 算力体系 + * + * === 功能概述 === + * 1. 通过独立 CDC consumer 消费预种表变更(cdc.pre-planting.public.*) + * 2. 同步预种订单/持仓到独立追踪表(PrePlantingPrismaService) + * 3. 为每份预种订单计算 1/5 算力(复用现有领域计算器) + * 4. 在 synced_adoptions 中插入 marker 记录(让现有 unlock 逻辑自然计入预种用户) + * 5. 每日检查冻结/解冻条件(1 年未满 5 份 → 冻结;满 5 份 → 解冻 + 2 年有效期) + * + * === 隔离保证 === + * - 独立 Kafka consumer group:contribution-pre-planting-cdc(不影响现有 contribution-service-cdc-group) + * - 独立 CDC topics:cdc.pre-planting.public.*(不影响现有 cdc.planting.public.* topics) + * - 独立 Debezium connector / replication slot / publication + * - 独立 Prisma schema:prisma/pre-planting/schema.prisma(追踪表与现有表完全分离) + * - 幂等性共用主 PrismaService 的 ProcessedCdcEvent 表(同 DB 事务保证一致性) + * + * === 对现有系统的影响 === + * - 零修改现有代码文件。本模块是纯新增。 + * - 算力写入现有 contribution_accounts / contribution_records 表(挖矿系统可见) + * - sourceAdoptionId 使用 10,000,000,000 偏移,永远不会与正常认种 ID 冲突 + * - synced_adoptions 中的 marker 记录设置 contributionDistributed=true + treeCount=0, + * 现有调度器不会处理,即使误处理也不会产生算力 + * - 不更新 NetworkAdoptionProgress(预种份额不推高全网算力系数) + * + * === 依赖关系 === + * - InfrastructureModule:提供 PrismaService、Repositories、UnitOfWork、Redis 等 + * - PrePlantingPrismaModule:提供独立的 PrePlantingPrismaService + * - ScheduleModule:提供 @Cron 装饰器支持(冻结调度器) + */ +@Module({ + imports: [ + ScheduleModule.forRoot(), + InfrastructureModule, + PrePlantingPrismaModule, + ], + providers: [ + // CDC Consumer (独立 consumer group) + PrePlantingCdcConsumerService, + + // CDC Event Handlers + PrePlantingOrderSyncedHandler, + PrePlantingPositionSyncedHandler, + PrePlantingCdcDispatcher, + + // Application Services (预种) + PrePlantingContributionService, + + // Application Services (现有,直接提供以避免 import ApplicationModule) + // ApplicationModule 内含 CDCEventDispatcher,import 会导致现有 CDC 被二次注册 + ContributionRateService, + ContributionDistributionPublisherService, + BonusClaimService, + + // Schedulers + PrePlantingFreezeScheduler, + ], +}) +export class PrePlantingCdcModule {}