From dbe9ab223fc2e5e00dd7989dca9c765de231cf40 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 12 Jan 2026 07:26:32 -0800 Subject: [PATCH] feat(contribution): fix pending fields update and add network progress tracking - Fix updateContribution to properly update levelXPending and bonusTierXPending fields - Add NetworkAdoptionProgress and DailyContributionRate tables for tracking contribution coefficient - Create ContributionRateService for dynamic rate calculation (base 22617, +0.3% per 100 trees after 1000) - Add ContributionRecordSynced and NetworkProgressUpdated events for CDC sync - Add admin endpoints for network progress query and contribution records publishing - Update mining-admin-service to sync contribution records and network progress Co-Authored-By: Claude Opus 4.5 --- .../contribution-service/prisma/schema.prisma | 67 ++++ .../src/api/controllers/admin.controller.ts | 146 +++++++ .../src/application/application.module.ts | 3 + .../contribution-calculation.service.ts | 55 +++ .../services/contribution-rate.service.ts | 374 ++++++++++++++++++ .../contribution-record-synced.event.ts | 47 +++ .../src/domain/events/index.ts | 2 + .../events/network-progress-updated.event.ts | 31 ++ .../contribution-account.repository.ts | 31 +- .../mining-admin-service/prisma/schema.prisma | 60 +++ .../src/application/services/users.service.ts | 95 ++++- .../infrastructure/kafka/cdc-sync.service.ts | 106 +++++ 12 files changed, 996 insertions(+), 21 deletions(-) create mode 100644 backend/services/contribution-service/src/application/services/contribution-rate.service.ts create mode 100644 backend/services/contribution-service/src/domain/events/contribution-record-synced.event.ts create mode 100644 backend/services/contribution-service/src/domain/events/network-progress-updated.event.ts diff --git a/backend/services/contribution-service/prisma/schema.prisma b/backend/services/contribution-service/prisma/schema.prisma index 24fdc700..91c2fce1 100644 --- a/backend/services/contribution-service/prisma/schema.prisma +++ b/backend/services/contribution-service/prisma/schema.prisma @@ -458,6 +458,73 @@ model DistributionRateConfig { @@map("distribution_rate_configs") } +// 全网认种进度表(单行记录,实时更新) +// 记录当前全网累计认种数量和对应的算力系数 +model NetworkAdoptionProgress { + id BigInt @id @default(autoincrement()) + + // ========== 全网累计认种统计 ========== + totalTreeCount Int @default(0) @map("total_tree_count") // 全网累计认种棵数 + totalAdoptionOrders Int @default(0) @map("total_adoption_orders") // 全网累计认种订单数 + totalAdoptedUsers Int @default(0) @map("total_adopted_users") // 全网累计认种用户数 + + // ========== 当前算力系数 ========== + // 基础值: 22617 (从第1棵到第999棵) + // 从第1000棵开始,每100棵为1个单位,每个单位递增 0.3% + // currentMultiplier = 1 + (currentUnit * 0.003) + // currentContributionPerTree = baseContribution * currentMultiplier + currentUnit Int @default(0) @map("current_unit") // 当前单位数 (0表示还没到1000棵) + currentMultiplier Decimal @default(1.0) @map("current_multiplier") @db.Decimal(10, 6) // 当前系数 (1.000, 1.003, 1.006...) + currentContributionPerTree Decimal @default(22617) @map("current_contribution_per_tree") @db.Decimal(20, 10) // 当前每棵树贡献值 + + // ========== 下一个单位的触发点 ========== + nextUnitTreeCount Int @default(1000) @map("next_unit_tree_count") // 下一个单位触发的棵数 + + // ========== 最后更新信息 ========== + lastAdoptionId BigInt? @map("last_adoption_id") // 最后处理的认种ID + lastAdoptionDate DateTime? @map("last_adoption_date") @db.Date // 最后认种日期 + + updatedAt DateTime @updatedAt @map("updated_at") + createdAt DateTime @default(now()) @map("created_at") + + @@map("network_adoption_progress") +} + +// 每日算力系数快照表 +// 记录每天的算力系数,确保同一天认种的用户使用相同系数 +model DailyContributionRate { + id BigInt @id @default(autoincrement()) + + // ========== 日期 ========== + effectiveDate DateTime @unique @map("effective_date") @db.Date // 生效日期 + + // ========== 当日起始状态 ========== + startTreeCount Int @default(0) @map("start_tree_count") // 当日开始时的全网棵数 + startUnit Int @default(0) @map("start_unit") // 当日开始时的单位数 + startMultiplier Decimal @default(1.0) @map("start_multiplier") @db.Decimal(10, 6) // 当日开始时的系数 + contributionPerTree Decimal @map("contribution_per_tree") @db.Decimal(20, 10) // 当日每棵树贡献值(同一天内不变) + + // ========== 当日结束状态 ========== + endTreeCount Int? @map("end_tree_count") // 当日结束时的全网棵数 + endUnit Int? @map("end_unit") // 当日结束时的单位数 + endMultiplier Decimal? @map("end_multiplier") @db.Decimal(10, 6) // 当日结束时的系数 + + // ========== 当日统计 ========== + dailyTreeCount Int @default(0) @map("daily_tree_count") // 当日新增认种棵数 + dailyAdoptionOrders Int @default(0) @map("daily_adoption_orders") // 当日新增认种订单数 + dailyAdoptedUsers Int @default(0) @map("daily_adopted_users") // 当日新增认种用户数 + + // ========== 状态 ========== + isClosed Boolean @default(false) @map("is_closed") // 是否已结算(日终处理后置为true) + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@index([effectiveDate]) + @@index([isClosed]) + @@map("daily_contribution_rates") +} + // ============================================ // Outbox 事件表(可靠事件发布) // ============================================ diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index 31bcba30..edbd2811 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -3,10 +3,13 @@ import { ApiTags, ApiOperation } from '@nestjs/swagger'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; +import { ContributionRateService } from '../../application/services/contribution-rate.service'; import { ContributionAccountSyncedEvent, ReferralSyncedEvent, AdoptionSyncedEvent, + ContributionRecordSyncedEvent, + NetworkProgressUpdatedEvent, } from '../../domain/events'; import { Public } from '../../shared/guards/jwt-auth.guard'; @@ -19,6 +22,7 @@ export class AdminController { private readonly prisma: PrismaService, private readonly outboxRepository: OutboxRepository, private readonly unitOfWork: UnitOfWork, + private readonly contributionRateService: ContributionRateService, ) {} @Get('accounts/sync') @@ -274,4 +278,146 @@ export class AdminController { message: `Published ${publishedCount} events, ${failedCount} failed out of ${adoptions.length} total`, }; } + + @Get('network-progress') + @Public() + @ApiOperation({ summary: '获取全网认种进度和算力系数' }) + async getNetworkProgress() { + const progress = await this.contributionRateService.getNetworkProgress(); + + return { + totalTreeCount: progress.totalTreeCount, + totalAdoptionOrders: progress.totalAdoptionOrders, + totalAdoptedUsers: progress.totalAdoptedUsers, + currentUnit: progress.currentUnit, + currentMultiplier: progress.currentMultiplier.toString(), + currentContributionPerTree: progress.currentContributionPerTree.toString(), + nextUnitTreeCount: progress.nextUnitTreeCount, + // 计算下一个单位还需要多少棵 + treesToNextUnit: progress.nextUnitTreeCount - progress.totalTreeCount, + }; + } + + @Post('contribution-records/publish-all') + @Public() + @ApiOperation({ summary: '发布所有算力记录事件到 outbox,用于初始同步到 mining-admin-service' }) + async publishAllContributionRecords(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const records = await this.prisma.contributionRecord.findMany({ + select: { + id: true, + accountSequence: true, + sourceType: true, + sourceAdoptionId: true, + sourceAccountSequence: true, + treeCount: true, + baseContribution: true, + distributionRate: true, + levelDepth: true, + bonusTier: true, + amount: true, + effectiveDate: true, + expireDate: true, + isExpired: true, + createdAt: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + const batchSize = 100; + for (let i = 0; i < records.length; i += batchSize) { + const batch = records.slice(i, i + batchSize); + + try { + await this.unitOfWork.executeInTransaction(async () => { + const events = batch.map((record) => { + const event = new ContributionRecordSyncedEvent( + record.id, + record.accountSequence, + record.sourceType, + record.sourceAdoptionId, + record.sourceAccountSequence, + record.treeCount, + record.baseContribution.toString(), + record.distributionRate.toString(), + record.levelDepth, + record.bonusTier, + record.amount.toString(), + record.effectiveDate, + record.expireDate, + record.isExpired, + record.createdAt, + ); + + return { + aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE, + aggregateId: record.id.toString(), + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + publishedCount += batch.length; + this.logger.debug(`Published contribution record batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`); + } catch (error) { + failedCount += batch.length; + this.logger.error(`Failed to publish contribution record batch ${Math.floor(i / batchSize) + 1}`, error); + } + } + + this.logger.log(`Published ${publishedCount} contribution record events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${records.length} total`, + }; + } + + @Post('network-progress/publish') + @Public() + @ApiOperation({ summary: '发布当前全网进度事件' }) + async publishNetworkProgress(): Promise<{ success: boolean; message: string }> { + try { + const progress = await this.contributionRateService.getNetworkProgress(); + + const event = new NetworkProgressUpdatedEvent( + progress.totalTreeCount, + progress.totalAdoptionOrders, + progress.totalAdoptedUsers, + progress.currentUnit, + progress.currentMultiplier.toString(), + progress.currentContributionPerTree.toString(), + progress.nextUnitTreeCount, + ); + + await this.outboxRepository.save({ + aggregateType: NetworkProgressUpdatedEvent.AGGREGATE_TYPE, + aggregateId: 'network', + eventType: NetworkProgressUpdatedEvent.EVENT_TYPE, + payload: event.toPayload(), + }); + + return { + success: true, + message: `Published network progress: trees=${progress.totalTreeCount}, unit=${progress.currentUnit}, multiplier=${progress.currentMultiplier.toString()}`, + }; + } catch (error) { + this.logger.error('Failed to publish network progress', error); + return { + success: false, + message: `Failed: ${error.message}`, + }; + } + } } diff --git a/backend/services/contribution-service/src/application/application.module.ts b/backend/services/contribution-service/src/application/application.module.ts index 6df64743..2e1d44ed 100644 --- a/backend/services/contribution-service/src/application/application.module.ts +++ b/backend/services/contribution-service/src/application/application.module.ts @@ -11,6 +11,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher'; // Services import { ContributionCalculationService } from './services/contribution-calculation.service'; import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service'; +import { ContributionRateService } from './services/contribution-rate.service'; import { SnapshotService } from './services/snapshot.service'; // Queries @@ -36,6 +37,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler'; // Services ContributionCalculationService, ContributionDistributionPublisherService, + ContributionRateService, SnapshotService, // Queries @@ -48,6 +50,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler'; ], exports: [ ContributionCalculationService, + ContributionRateService, SnapshotService, GetContributionAccountQuery, GetContributionStatsQuery, diff --git a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts index 691a99df..9885e6a4 100644 --- a/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts +++ b/backend/services/contribution-service/src/application/services/contribution-calculation.service.ts @@ -8,8 +8,11 @@ import { SystemAccountRepository } from '../../infrastructure/persistence/reposi import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate'; +import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate'; import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface'; import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service'; +import { ContributionRateService } from './contribution-rate.service'; +import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent } from '../../domain/events'; /** * 算力计算应用服务 @@ -29,6 +32,7 @@ export class ContributionCalculationService { private readonly outboxRepository: OutboxRepository, private readonly unitOfWork: UnitOfWork, private readonly distributionPublisher: ContributionDistributionPublisherService, + private readonly contributionRateService: ContributionRateService, ) {} /** @@ -172,6 +176,8 @@ export class ContributionCalculationService { record.accountSequence, ContributionSourceType.TEAM_LEVEL, record.amount, + record.levelDepth, // 传递层级深度 + null, ); } } @@ -186,6 +192,8 @@ export class ContributionCalculationService { record.accountSequence, ContributionSourceType.TEAM_BONUS, record.amount, + null, + record.bonusTier, // 传递加成档位 ); } } @@ -224,6 +232,53 @@ export class ContributionCalculationService { }); } } + + // 6. 发布算力记录同步事件(用于 mining-admin-service) + await this.publishContributionRecordEvents(result); + } + + /** + * 发布算力记录同步事件 + */ + private async publishContributionRecordEvents( + result: ContributionDistributionResult, + ): Promise { + const allRecords: ContributionRecordAggregate[] = [ + result.personalRecord, + ...result.teamLevelRecords, + ...result.teamBonusRecords, + ]; + + const events = allRecords.map((record) => { + const event = new ContributionRecordSyncedEvent( + record.id!, + record.accountSequence, + record.sourceType, + record.sourceAdoptionId, + record.sourceAccountSequence, + record.treeCount, + record.baseContribution.value.toString(), + record.distributionRate.value.toString(), + record.levelDepth, + record.bonusTier, + record.amount.value.toString(), + record.effectiveDate, + record.expireDate, + record.isExpired, + record.createdAt, + ); + + return { + aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE, + aggregateId: record.id!.toString(), + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + if (events.length > 0) { + await this.outboxRepository.saveMany(events); + } } /** diff --git a/backend/services/contribution-service/src/application/services/contribution-rate.service.ts b/backend/services/contribution-service/src/application/services/contribution-rate.service.ts new file mode 100644 index 00000000..0de5ad0a --- /dev/null +++ b/backend/services/contribution-service/src/application/services/contribution-rate.service.ts @@ -0,0 +1,374 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Decimal } from 'decimal.js'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; + +/** + * 算力系数配置 + */ +interface ContributionRateConfig { + baseContribution: Decimal; // 基础贡献值 (22617) + incrementPercentage: Decimal; // 每单位递增百分比 (0.003 = 0.3%) + unitSize: number; // 每单位棵数 (100) + startTreeNumber: number; // 开始递增的棵数 (1000) +} + +/** + * 当日算力系数信息 + */ +export interface DailyContributionRateInfo { + effectiveDate: Date; + contributionPerTree: Decimal; + multiplier: Decimal; + unit: number; + startTreeCount: number; +} + +/** + * 算力系数计算服务 + * + * 业务规则: + * - 基础贡献值: 22617 (从第1棵到第999棵) + * - 从第1000棵开始,每100棵为1个单位 + * - 每个单位递增 0.3% 的贡献值 + * - 同一天认种的用户使用相同的系数(以当日首次认种时的系数为准) + * + * 计算公式: + * - currentUnit = floor((totalTreeCount - startTreeNumber) / unitSize) + * - currentMultiplier = 1 + (currentUnit * incrementPercentage) + * - currentContributionPerTree = baseContribution * currentMultiplier + */ +@Injectable() +export class ContributionRateService { + private readonly logger = new Logger(ContributionRateService.name); + + // 默认配置(可从数据库加载) + private config: ContributionRateConfig = { + baseContribution: new Decimal(22617), + incrementPercentage: new Decimal(0.003), // 0.3% + unitSize: 100, + startTreeNumber: 1000, + }; + + constructor(private readonly prisma: PrismaService) {} + + /** + * 获取或创建当日算力系数 + * + * @param adoptionDate 认种日期 + * @returns 当日算力系数信息 + */ + async getOrCreateDailyRate(adoptionDate: Date): Promise { + // 标准化日期(去掉时间部分) + const effectiveDate = this.normalizeDate(adoptionDate); + + // 尝试获取已存在的当日记录 + let dailyRate = await this.prisma.dailyContributionRate.findUnique({ + where: { effectiveDate }, + }); + + if (dailyRate) { + return { + effectiveDate: dailyRate.effectiveDate, + contributionPerTree: new Decimal(dailyRate.contributionPerTree.toString()), + multiplier: new Decimal(dailyRate.startMultiplier.toString()), + unit: dailyRate.startUnit, + startTreeCount: dailyRate.startTreeCount, + }; + } + + // 获取当前全网进度 + const progress = await this.getOrCreateNetworkProgress(); + + // 计算当前系数 + const { unit, multiplier, contributionPerTree } = this.calculateRate(progress.totalTreeCount); + + // 创建当日记录 + dailyRate = await this.prisma.dailyContributionRate.create({ + data: { + effectiveDate, + startTreeCount: progress.totalTreeCount, + startUnit: unit, + startMultiplier: multiplier.toNumber(), + contributionPerTree: contributionPerTree.toNumber(), + dailyTreeCount: 0, + dailyAdoptionOrders: 0, + dailyAdoptedUsers: 0, + isClosed: false, + }, + }); + + this.logger.log( + `Created daily contribution rate for ${effectiveDate.toISOString().split('T')[0]}: ` + + `unit=${unit}, multiplier=${multiplier.toString()}, contributionPerTree=${contributionPerTree.toString()}` + ); + + return { + effectiveDate: dailyRate.effectiveDate, + contributionPerTree, + multiplier, + unit, + startTreeCount: progress.totalTreeCount, + }; + } + + /** + * 获取当日每棵树的贡献值 + * + * @param adoptionDate 认种日期 + * @returns 每棵树的贡献值 + */ + async getContributionPerTree(adoptionDate: Date): Promise { + const rateInfo = await this.getOrCreateDailyRate(adoptionDate); + return rateInfo.contributionPerTree; + } + + /** + * 更新全网认种进度 + * + * @param treeCount 新增认种棵数 + * @param adoptionDate 认种日期 + * @param adoptionId 认种ID + * @param isNewUser 是否为新认种用户 + */ + async updateNetworkProgress( + treeCount: number, + adoptionDate: Date, + adoptionId: bigint, + isNewUser: boolean, + ): Promise { + const effectiveDate = this.normalizeDate(adoptionDate); + + await this.prisma.$transaction(async (tx) => { + // 更新全网进度 + const progress = await tx.networkAdoptionProgress.findFirst(); + + if (!progress) { + // 首次创建 + const newTotalTreeCount = treeCount; + const { unit, multiplier, contributionPerTree, nextUnitTreeCount } = + this.calculateRate(newTotalTreeCount); + + await tx.networkAdoptionProgress.create({ + data: { + totalTreeCount: newTotalTreeCount, + totalAdoptionOrders: 1, + totalAdoptedUsers: isNewUser ? 1 : 0, + currentUnit: unit, + currentMultiplier: multiplier.toNumber(), + currentContributionPerTree: contributionPerTree.toNumber(), + nextUnitTreeCount, + lastAdoptionId: adoptionId, + lastAdoptionDate: effectiveDate, + }, + }); + } else { + // 更新现有记录 + const newTotalTreeCount = progress.totalTreeCount + treeCount; + const { unit, multiplier, contributionPerTree, nextUnitTreeCount } = + this.calculateRate(newTotalTreeCount); + + await tx.networkAdoptionProgress.update({ + where: { id: progress.id }, + data: { + totalTreeCount: newTotalTreeCount, + totalAdoptionOrders: { increment: 1 }, + totalAdoptedUsers: isNewUser ? { increment: 1 } : undefined, + currentUnit: unit, + currentMultiplier: multiplier.toNumber(), + currentContributionPerTree: contributionPerTree.toNumber(), + nextUnitTreeCount, + lastAdoptionId: adoptionId, + lastAdoptionDate: effectiveDate, + }, + }); + } + + // 更新当日统计 + await tx.dailyContributionRate.upsert({ + where: { effectiveDate }, + create: { + effectiveDate, + startTreeCount: 0, + startUnit: 0, + startMultiplier: 1.0, + contributionPerTree: this.config.baseContribution.toNumber(), + dailyTreeCount: treeCount, + dailyAdoptionOrders: 1, + dailyAdoptedUsers: isNewUser ? 1 : 0, + isClosed: false, + }, + update: { + dailyTreeCount: { increment: treeCount }, + dailyAdoptionOrders: { increment: 1 }, + dailyAdoptedUsers: isNewUser ? { increment: 1 } : undefined, + }, + }); + }); + + this.logger.debug( + `Updated network progress: +${treeCount} trees, adoptionId=${adoptionId}` + ); + } + + /** + * 日终结算 + * 关闭当日记录,记录结束状态 + * + * @param date 要结算的日期 + */ + async closeDailyRate(date: Date): Promise { + const effectiveDate = this.normalizeDate(date); + + const dailyRate = await this.prisma.dailyContributionRate.findUnique({ + where: { effectiveDate }, + }); + + if (!dailyRate) { + this.logger.warn(`No daily rate found for ${effectiveDate.toISOString().split('T')[0]}`); + return; + } + + if (dailyRate.isClosed) { + this.logger.warn(`Daily rate for ${effectiveDate.toISOString().split('T')[0]} is already closed`); + return; + } + + // 获取当前全网进度 + const progress = await this.getOrCreateNetworkProgress(); + const { unit, multiplier } = this.calculateRate(progress.totalTreeCount); + + await this.prisma.dailyContributionRate.update({ + where: { effectiveDate }, + data: { + endTreeCount: progress.totalTreeCount, + endUnit: unit, + endMultiplier: multiplier.toNumber(), + isClosed: true, + }, + }); + + this.logger.log( + `Closed daily rate for ${effectiveDate.toISOString().split('T')[0]}: ` + + `endTreeCount=${progress.totalTreeCount}, endUnit=${unit}, endMultiplier=${multiplier.toString()}` + ); + } + + /** + * 获取当前全网进度 + */ + async getNetworkProgress(): Promise<{ + totalTreeCount: number; + totalAdoptionOrders: number; + totalAdoptedUsers: number; + currentUnit: number; + currentMultiplier: Decimal; + currentContributionPerTree: Decimal; + nextUnitTreeCount: number; + }> { + const progress = await this.getOrCreateNetworkProgress(); + return { + totalTreeCount: progress.totalTreeCount, + totalAdoptionOrders: progress.totalAdoptionOrders, + totalAdoptedUsers: progress.totalAdoptedUsers, + currentUnit: progress.currentUnit, + currentMultiplier: new Decimal(progress.currentMultiplier.toString()), + currentContributionPerTree: new Decimal(progress.currentContributionPerTree.toString()), + nextUnitTreeCount: progress.nextUnitTreeCount, + }; + } + + /** + * 获取或创建全网进度记录 + */ + private async getOrCreateNetworkProgress() { + let progress = await this.prisma.networkAdoptionProgress.findFirst(); + + if (!progress) { + progress = await this.prisma.networkAdoptionProgress.create({ + data: { + totalTreeCount: 0, + totalAdoptionOrders: 0, + totalAdoptedUsers: 0, + currentUnit: 0, + currentMultiplier: 1.0, + currentContributionPerTree: this.config.baseContribution.toNumber(), + nextUnitTreeCount: this.config.startTreeNumber, + }, + }); + } + + return progress; + } + + /** + * 计算算力系数 + * + * @param totalTreeCount 全网累计认种棵数 + * @returns 单位数、系数、每棵树贡献值、下一单位触发棵数 + */ + private calculateRate(totalTreeCount: number): { + unit: number; + multiplier: Decimal; + contributionPerTree: Decimal; + nextUnitTreeCount: number; + } { + let unit = 0; + let nextUnitTreeCount = this.config.startTreeNumber; + + // 如果总棵数已经超过起始阈值 + if (totalTreeCount >= this.config.startTreeNumber) { + // 计算当前单位数 + unit = Math.floor((totalTreeCount - this.config.startTreeNumber) / this.config.unitSize); + // 计算下一个单位的触发点 + nextUnitTreeCount = this.config.startTreeNumber + ((unit + 1) * this.config.unitSize); + } + + // 计算系数: 1 + (unit * 0.003) + const multiplier = new Decimal(1).plus( + new Decimal(unit).times(this.config.incrementPercentage) + ); + + // 计算每棵树贡献值 + const contributionPerTree = this.config.baseContribution.times(multiplier); + + return { + unit, + multiplier, + contributionPerTree, + nextUnitTreeCount, + }; + } + + /** + * 标准化日期(去掉时间部分) + */ + private normalizeDate(date: Date): Date { + const normalized = new Date(date); + normalized.setHours(0, 0, 0, 0); + return normalized; + } + + /** + * 加载配置(可选,从数据库加载) + */ + async loadConfig(): Promise { + const dbConfig = await this.prisma.contributionConfig.findFirst({ + where: { isActive: true }, + }); + + if (dbConfig) { + this.config = { + baseContribution: new Decimal(dbConfig.baseContribution.toString()), + incrementPercentage: new Decimal(dbConfig.incrementPercentage.toString()), + unitSize: dbConfig.unitSize, + startTreeNumber: dbConfig.startTreeNumber, + }; + + this.logger.log( + `Loaded contribution rate config: base=${this.config.baseContribution.toString()}, ` + + `increment=${this.config.incrementPercentage.toString()}, ` + + `unitSize=${this.config.unitSize}, startTree=${this.config.startTreeNumber}` + ); + } + } +} diff --git a/backend/services/contribution-service/src/domain/events/contribution-record-synced.event.ts b/backend/services/contribution-service/src/domain/events/contribution-record-synced.event.ts new file mode 100644 index 00000000..942029d2 --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/contribution-record-synced.event.ts @@ -0,0 +1,47 @@ +/** + * 算力记录同步事件 + * 用于同步算力明细记录到 mining-admin-service + */ +export class ContributionRecordSyncedEvent { + static readonly EVENT_TYPE = 'ContributionRecordSynced'; + static readonly AGGREGATE_TYPE = 'ContributionRecord'; + + constructor( + public readonly originalRecordId: bigint, + public readonly accountSequence: string, + public readonly sourceType: string, + public readonly sourceAdoptionId: bigint, + public readonly sourceAccountSequence: string, + public readonly treeCount: number, + public readonly baseContribution: string, + public readonly distributionRate: string, + public readonly levelDepth: number | null, + public readonly bonusTier: number | null, + public readonly amount: string, + public readonly effectiveDate: Date, + public readonly expireDate: Date, + public readonly isExpired: boolean, + public readonly createdAt: Date, + ) {} + + toPayload(): Record { + return { + eventType: ContributionRecordSyncedEvent.EVENT_TYPE, + originalRecordId: this.originalRecordId.toString(), + accountSequence: this.accountSequence, + sourceType: this.sourceType, + sourceAdoptionId: this.sourceAdoptionId.toString(), + sourceAccountSequence: this.sourceAccountSequence, + treeCount: this.treeCount, + baseContribution: this.baseContribution, + distributionRate: this.distributionRate, + levelDepth: this.levelDepth, + bonusTier: this.bonusTier, + amount: this.amount, + effectiveDate: this.effectiveDate.toISOString(), + expireDate: this.expireDate.toISOString(), + isExpired: this.isExpired, + createdAt: this.createdAt.toISOString(), + }; + } +} diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index 828d8ff6..f5d1f555 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -3,3 +3,5 @@ export * from './daily-snapshot-created.event'; export * from './contribution-account-synced.event'; export * from './referral-synced.event'; export * from './adoption-synced.event'; +export * from './contribution-record-synced.event'; +export * from './network-progress-updated.event'; diff --git a/backend/services/contribution-service/src/domain/events/network-progress-updated.event.ts b/backend/services/contribution-service/src/domain/events/network-progress-updated.event.ts new file mode 100644 index 00000000..f5d4955d --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/network-progress-updated.event.ts @@ -0,0 +1,31 @@ +/** + * 全网认种进度更新事件 + * 用于同步全网算力系数到 mining-admin-service + */ +export class NetworkProgressUpdatedEvent { + static readonly EVENT_TYPE = 'NetworkProgressUpdated'; + static readonly AGGREGATE_TYPE = 'NetworkProgress'; + + constructor( + public readonly totalTreeCount: number, + public readonly totalAdoptionOrders: number, + public readonly totalAdoptedUsers: number, + public readonly currentUnit: number, + public readonly currentMultiplier: string, + public readonly currentContributionPerTree: string, + public readonly nextUnitTreeCount: number, + ) {} + + toPayload(): Record { + return { + eventType: NetworkProgressUpdatedEvent.EVENT_TYPE, + totalTreeCount: this.totalTreeCount, + totalAdoptionOrders: this.totalAdoptionOrders, + totalAdoptedUsers: this.totalAdoptedUsers, + currentUnit: this.currentUnit, + currentMultiplier: this.currentMultiplier, + currentContributionPerTree: this.currentContributionPerTree, + nextUnitTreeCount: this.nextUnitTreeCount, + }; + } +} diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts index 39fc027a..1b66c02b 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/contribution-account.repository.ts @@ -79,9 +79,10 @@ export class ContributionAccountRepository implements IContributionAccountReposi accountSequence: string, sourceType: ContributionSourceType, amount: ContributionAmount, + levelDepth?: number | null, + bonusTier?: number | null, ): Promise { // 个人算力直接增加到 personalContribution 和 effectiveContribution - // 层级/加成算力需要根据解锁状态分配到对应的 pending 字段 if (sourceType === ContributionSourceType.PERSONAL) { await this.client.contributionAccount.update({ where: { accountSequence }, @@ -91,8 +92,34 @@ export class ContributionAccountRepository implements IContributionAccountReposi updatedAt: new Date(), }, }); + } else if (sourceType === ContributionSourceType.TEAM_LEVEL && levelDepth) { + // 层级算力:更新对应层级的 pending 字段和汇总字段 + const levelPendingField = `level${levelDepth}Pending` as const; + await this.client.contributionAccount.update({ + where: { accountSequence }, + data: { + [levelPendingField]: { increment: amount.value }, + totalLevelPending: { increment: amount.value }, + totalPending: { increment: amount.value }, + effectiveContribution: { increment: amount.value }, + updatedAt: new Date(), + }, + }); + } else if (sourceType === ContributionSourceType.TEAM_BONUS && bonusTier) { + // 加成算力:更新对应档位的 pending 字段和汇总字段 + const bonusTierField = `bonusTier${bonusTier}Pending` as const; + await this.client.contributionAccount.update({ + where: { accountSequence }, + data: { + [bonusTierField]: { increment: amount.value }, + totalBonusPending: { increment: amount.value }, + totalPending: { increment: amount.value }, + effectiveContribution: { increment: amount.value }, + updatedAt: new Date(), + }, + }); } else { - // 层级和加成算力暂时累加到 effectiveContribution(待后续细化分配逻辑) + // 兜底:只更新 effectiveContribution await this.client.contributionAccount.update({ where: { accountSequence }, data: { diff --git a/backend/services/mining-admin-service/prisma/schema.prisma b/backend/services/mining-admin-service/prisma/schema.prisma index f257b636..563ed3de 100644 --- a/backend/services/mining-admin-service/prisma/schema.prisma +++ b/backend/services/mining-admin-service/prisma/schema.prisma @@ -242,6 +242,66 @@ model SyncedAdoption { @@map("synced_adoptions") } +// ============================================================================= +// CDC 同步表 - 算力记录明细 (from contribution-service) +// ============================================================================= + +model SyncedContributionRecord { + id String @id @default(uuid()) + originalRecordId BigInt @unique // contribution-service 中的原始 ID + accountSequence String // 获得算力的账户 + + // 来源信息 + sourceType String // PERSONAL / TEAM_LEVEL / TEAM_BONUS + sourceAdoptionId BigInt // 来源认种ID + sourceAccountSequence String // 认种人账号 + + // 计算参数 + treeCount Int // 认种棵数 + baseContribution Decimal @db.Decimal(20, 10) // 基础贡献值/棵 + distributionRate Decimal @db.Decimal(10, 6) // 分配比例 + levelDepth Int? // 层级深度 (1-15) + bonusTier Int? // 加成档位 (1-3) + + // 金额 + amount Decimal @db.Decimal(30, 10) // 算力金额 + + // 有效期 + effectiveDate DateTime @db.Date // 生效日期 + expireDate DateTime @db.Date // 过期日期 + isExpired Boolean @default(false) + + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + createdAt DateTime // 原始记录创建时间 + + @@index([accountSequence]) + @@index([sourceAccountSequence]) + @@index([sourceAdoptionId]) + @@index([sourceType]) + @@index([createdAt(sort: Desc)]) + @@map("synced_contribution_records") +} + +// ============================================================================= +// CDC 同步表 - 全网算力进度 (from contribution-service) +// ============================================================================= + +model SyncedNetworkProgress { + id String @id @default(uuid()) + totalTreeCount Int @default(0) // 全网累计认种棵数 + totalAdoptionOrders Int @default(0) // 全网累计认种订单数 + totalAdoptedUsers Int @default(0) // 全网累计认种用户数 + currentUnit Int @default(0) // 当前单位数 + currentMultiplier Decimal @db.Decimal(10, 6) @default(1.0) // 当前系数 + currentContributionPerTree Decimal @db.Decimal(20, 10) @default(22617) // 当前每棵树贡献值 + nextUnitTreeCount Int @default(1000) // 下一个单位触发的棵数 + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_network_progress") +} + // ============================================================================= // CDC 同步表 - 挖矿账户 (from mining-service) // ============================================================================= diff --git a/backend/services/mining-admin-service/src/application/services/users.service.ts b/backend/services/mining-admin-service/src/application/services/users.service.ts index 257e77ee..fb2fb8e7 100644 --- a/backend/services/mining-admin-service/src/application/services/users.service.ts +++ b/backend/services/mining-admin-service/src/application/services/users.service.ts @@ -296,8 +296,7 @@ export class UsersService { } /** - * 获取用户算力记录(从同步表获取概要) - * 注:详细流水需要调用 contribution-service + * 获取用户算力记录(包含明细流水) */ async getUserContributions( accountSequence: string, @@ -315,22 +314,77 @@ export class UsersService { // 返回算力账户概要 const contribution = user.contributionAccount; + const emptySummary = { + accountSequence, + personalContribution: '0', + teamLevelContribution: '0', + teamBonusContribution: '0', + totalContribution: '0', + effectiveContribution: '0', + hasAdopted: false, + directReferralCount: 0, + unlockedLevelDepth: 0, + unlockedBonusTiers: 0, + }; + + // 获取算力明细记录 + const [records, total] = await Promise.all([ + this.prisma.syncedContributionRecord.findMany({ + where: { accountSequence }, + orderBy: { createdAt: 'desc' }, + skip: (page - 1) * pageSize, + take: pageSize, + }), + this.prisma.syncedContributionRecord.count({ + where: { accountSequence }, + }), + ]); + + // 格式化记录 + const formattedRecords = await Promise.all( + records.map(async (record) => { + // 获取来源用户信息 + let sourceUserInfo = null; + if (record.sourceAccountSequence !== accountSequence) { + const sourceUser = await this.prisma.syncedUser.findUnique({ + where: { accountSequence: record.sourceAccountSequence }, + select: { nickname: true, phone: true }, + }); + sourceUserInfo = sourceUser ? { + nickname: sourceUser.nickname, + phone: this.maskPhone(sourceUser.phone), + } : null; + } + + return { + id: record.originalRecordId.toString(), + sourceType: record.sourceType, + sourceAccountSequence: record.sourceAccountSequence, + sourceUserInfo, + treeCount: record.treeCount, + baseContribution: record.baseContribution.toString(), + distributionRate: record.distributionRate.toString(), + levelDepth: record.levelDepth, + bonusTier: record.bonusTier, + amount: record.amount.toString(), + effectiveDate: record.effectiveDate, + expireDate: record.expireDate, + isExpired: record.isExpired, + createdAt: record.createdAt, + }; + }) + ); + if (!contribution) { return { - summary: { - accountSequence, - personalContribution: '0', - teamLevelContribution: '0', - teamBonusContribution: '0', - totalContribution: '0', - effectiveContribution: '0', - hasAdopted: false, - directReferralCount: 0, - unlockedLevelDepth: 0, - unlockedBonusTiers: 0, + summary: emptySummary, + records: formattedRecords, + pagination: { + page, + pageSize, + total, + totalPages: Math.ceil(total / pageSize), }, - records: [], - pagination: { page, pageSize, total: 0, totalPages: 0 }, }; } @@ -347,10 +401,13 @@ export class UsersService { unlockedLevelDepth: contribution.unlockedLevelDepth, unlockedBonusTiers: contribution.unlockedBonusTiers, }, - // 详细流水需要从 contribution-service 获取 - records: [], - pagination: { page, pageSize, total: 0, totalPages: 0 }, - note: '详细算力流水请查看 contribution-service', + records: formattedRecords, + pagination: { + page, + pageSize, + total, + totalPages: Math.ceil(total / pageSize), + }, }; } diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index aaa113a5..7d830514 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -101,6 +101,16 @@ export class CdcSyncService implements OnModuleInit { 'AdoptionSynced', this.handleAdoptionSynced.bind(this), ); + // ContributionRecordSynced 事件 - 同步算力明细记录 + this.cdcConsumer.registerServiceHandler( + 'ContributionRecordSynced', + this.handleContributionRecordSynced.bind(this), + ); + // NetworkProgressUpdated 事件 - 同步全网算力进度 + this.cdcConsumer.registerServiceHandler( + 'NetworkProgressUpdated', + this.handleNetworkProgressUpdated.bind(this), + ); // =========================================================================== // 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表) @@ -620,6 +630,102 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 ContributionRecordSynced 事件 - 同步算力明细记录 + */ + private async handleContributionRecordSynced(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedContributionRecord.upsert({ + where: { originalRecordId: BigInt(payload.originalRecordId) }, + create: { + originalRecordId: BigInt(payload.originalRecordId), + accountSequence: payload.accountSequence, + sourceType: payload.sourceType, + sourceAdoptionId: BigInt(payload.sourceAdoptionId), + sourceAccountSequence: payload.sourceAccountSequence, + treeCount: payload.treeCount, + baseContribution: payload.baseContribution, + distributionRate: payload.distributionRate, + levelDepth: payload.levelDepth, + bonusTier: payload.bonusTier, + amount: payload.amount, + effectiveDate: new Date(payload.effectiveDate), + expireDate: new Date(payload.expireDate), + isExpired: payload.isExpired || false, + createdAt: new Date(payload.createdAt), + }, + update: { + accountSequence: payload.accountSequence, + sourceType: payload.sourceType, + sourceAdoptionId: BigInt(payload.sourceAdoptionId), + sourceAccountSequence: payload.sourceAccountSequence, + treeCount: payload.treeCount, + baseContribution: payload.baseContribution, + distributionRate: payload.distributionRate, + levelDepth: payload.levelDepth, + bonusTier: payload.bonusTier, + amount: payload.amount, + effectiveDate: new Date(payload.effectiveDate), + expireDate: new Date(payload.expireDate), + isExpired: payload.isExpired || false, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced contribution record: ${payload.originalRecordId}`); + } catch (error) { + this.logger.error(`Failed to sync contribution record: ${payload.originalRecordId}`, error); + } + } + + /** + * 处理 NetworkProgressUpdated 事件 - 同步全网算力进度 + */ + private async handleNetworkProgressUpdated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + // 全网进度只保留一条记录 + const existing = await this.prisma.syncedNetworkProgress.findFirst(); + + if (existing) { + await this.prisma.syncedNetworkProgress.update({ + where: { id: existing.id }, + data: { + totalTreeCount: payload.totalTreeCount, + totalAdoptionOrders: payload.totalAdoptionOrders, + totalAdoptedUsers: payload.totalAdoptedUsers, + currentUnit: payload.currentUnit, + currentMultiplier: payload.currentMultiplier, + currentContributionPerTree: payload.currentContributionPerTree, + nextUnitTreeCount: payload.nextUnitTreeCount, + }, + }); + } else { + await this.prisma.syncedNetworkProgress.create({ + data: { + totalTreeCount: payload.totalTreeCount, + totalAdoptionOrders: payload.totalAdoptionOrders, + totalAdoptedUsers: payload.totalAdoptedUsers, + currentUnit: payload.currentUnit, + currentMultiplier: payload.currentMultiplier, + currentContributionPerTree: payload.currentContributionPerTree, + nextUnitTreeCount: payload.nextUnitTreeCount, + }, + }); + } + + await this.recordProcessedEvent(event); + this.logger.debug( + `Synced network progress: trees=${payload.totalTreeCount}, unit=${payload.currentUnit}, multiplier=${payload.currentMultiplier}` + ); + } catch (error) { + this.logger.error('Failed to sync network progress', error); + } + } + // =========================================================================== // 挖矿账户事件处理 // ===========================================================================