feat(contribution): fix pending fields update and add network progress tracking

- Fix updateContribution to properly update levelXPending and bonusTierXPending fields
- Add NetworkAdoptionProgress and DailyContributionRate tables for tracking contribution coefficient
- Create ContributionRateService for dynamic rate calculation (base 22617, +0.3% per 100 trees after 1000)
- Add ContributionRecordSynced and NetworkProgressUpdated events for CDC sync
- Add admin endpoints for network progress query and contribution records publishing
- Update mining-admin-service to sync contribution records and network progress

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 07:26:32 -08:00
parent c0d0088b8e
commit dbe9ab223f
12 changed files with 996 additions and 21 deletions

View File

@ -458,6 +458,73 @@ model DistributionRateConfig {
@@map("distribution_rate_configs")
}
// 全网认种进度表(单行记录,实时更新)
// 记录当前全网累计认种数量和对应的算力系数
model NetworkAdoptionProgress {
id BigInt @id @default(autoincrement())
// ========== 全网累计认种统计 ==========
totalTreeCount Int @default(0) @map("total_tree_count") // 全网累计认种棵数
totalAdoptionOrders Int @default(0) @map("total_adoption_orders") // 全网累计认种订单数
totalAdoptedUsers Int @default(0) @map("total_adopted_users") // 全网累计认种用户数
// ========== 当前算力系数 ==========
// 基础值: 22617 (从第1棵到第999棵)
// 从第1000棵开始每100棵为1个单位每个单位递增 0.3%
// currentMultiplier = 1 + (currentUnit * 0.003)
// currentContributionPerTree = baseContribution * currentMultiplier
currentUnit Int @default(0) @map("current_unit") // 当前单位数 (0表示还没到1000棵)
currentMultiplier Decimal @default(1.0) @map("current_multiplier") @db.Decimal(10, 6) // 当前系数 (1.000, 1.003, 1.006...)
currentContributionPerTree Decimal @default(22617) @map("current_contribution_per_tree") @db.Decimal(20, 10) // 当前每棵树贡献值
// ========== 下一个单位的触发点 ==========
nextUnitTreeCount Int @default(1000) @map("next_unit_tree_count") // 下一个单位触发的棵数
// ========== 最后更新信息 ==========
lastAdoptionId BigInt? @map("last_adoption_id") // 最后处理的认种ID
lastAdoptionDate DateTime? @map("last_adoption_date") @db.Date // 最后认种日期
updatedAt DateTime @updatedAt @map("updated_at")
createdAt DateTime @default(now()) @map("created_at")
@@map("network_adoption_progress")
}
// 每日算力系数快照表
// 记录每天的算力系数,确保同一天认种的用户使用相同系数
model DailyContributionRate {
id BigInt @id @default(autoincrement())
// ========== 日期 ==========
effectiveDate DateTime @unique @map("effective_date") @db.Date // 生效日期
// ========== 当日起始状态 ==========
startTreeCount Int @default(0) @map("start_tree_count") // 当日开始时的全网棵数
startUnit Int @default(0) @map("start_unit") // 当日开始时的单位数
startMultiplier Decimal @default(1.0) @map("start_multiplier") @db.Decimal(10, 6) // 当日开始时的系数
contributionPerTree Decimal @map("contribution_per_tree") @db.Decimal(20, 10) // 当日每棵树贡献值(同一天内不变)
// ========== 当日结束状态 ==========
endTreeCount Int? @map("end_tree_count") // 当日结束时的全网棵数
endUnit Int? @map("end_unit") // 当日结束时的单位数
endMultiplier Decimal? @map("end_multiplier") @db.Decimal(10, 6) // 当日结束时的系数
// ========== 当日统计 ==========
dailyTreeCount Int @default(0) @map("daily_tree_count") // 当日新增认种棵数
dailyAdoptionOrders Int @default(0) @map("daily_adoption_orders") // 当日新增认种订单数
dailyAdoptedUsers Int @default(0) @map("daily_adopted_users") // 当日新增认种用户数
// ========== 状态 ==========
isClosed Boolean @default(false) @map("is_closed") // 是否已结算日终处理后置为true
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@index([effectiveDate])
@@index([isClosed])
@@map("daily_contribution_rates")
}
// ============================================
// Outbox 事件表(可靠事件发布)
// ============================================

View File

@ -3,10 +3,13 @@ import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionRateService } from '../../application/services/contribution-rate.service';
import {
ContributionAccountSyncedEvent,
ReferralSyncedEvent,
AdoptionSyncedEvent,
ContributionRecordSyncedEvent,
NetworkProgressUpdatedEvent,
} from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ -19,6 +22,7 @@ export class AdminController {
private readonly prisma: PrismaService,
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly contributionRateService: ContributionRateService,
) {}
@Get('accounts/sync')
@ -274,4 +278,146 @@ export class AdminController {
message: `Published ${publishedCount} events, ${failedCount} failed out of ${adoptions.length} total`,
};
}
@Get('network-progress')
@Public()
@ApiOperation({ summary: '获取全网认种进度和算力系数' })
async getNetworkProgress() {
const progress = await this.contributionRateService.getNetworkProgress();
return {
totalTreeCount: progress.totalTreeCount,
totalAdoptionOrders: progress.totalAdoptionOrders,
totalAdoptedUsers: progress.totalAdoptedUsers,
currentUnit: progress.currentUnit,
currentMultiplier: progress.currentMultiplier.toString(),
currentContributionPerTree: progress.currentContributionPerTree.toString(),
nextUnitTreeCount: progress.nextUnitTreeCount,
// 计算下一个单位还需要多少棵
treesToNextUnit: progress.nextUnitTreeCount - progress.totalTreeCount,
};
}
@Post('contribution-records/publish-all')
@Public()
@ApiOperation({ summary: '发布所有算力记录事件到 outbox用于初始同步到 mining-admin-service' })
async publishAllContributionRecords(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const records = await this.prisma.contributionRecord.findMany({
select: {
id: true,
accountSequence: true,
sourceType: true,
sourceAdoptionId: true,
sourceAccountSequence: true,
treeCount: true,
baseContribution: true,
distributionRate: true,
levelDepth: true,
bonusTier: true,
amount: true,
effectiveDate: true,
expireDate: true,
isExpired: true,
createdAt: true,
},
});
let publishedCount = 0;
let failedCount = 0;
const batchSize = 100;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((record) => {
const event = new ContributionRecordSyncedEvent(
record.id,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.toString(),
record.distributionRate.toString(),
record.levelDepth,
record.bonusTier,
record.amount.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
return {
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published contribution record batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish contribution record batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} contribution record events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${records.length} total`,
};
}
@Post('network-progress/publish')
@Public()
@ApiOperation({ summary: '发布当前全网进度事件' })
async publishNetworkProgress(): Promise<{ success: boolean; message: string }> {
try {
const progress = await this.contributionRateService.getNetworkProgress();
const event = new NetworkProgressUpdatedEvent(
progress.totalTreeCount,
progress.totalAdoptionOrders,
progress.totalAdoptedUsers,
progress.currentUnit,
progress.currentMultiplier.toString(),
progress.currentContributionPerTree.toString(),
progress.nextUnitTreeCount,
);
await this.outboxRepository.save({
aggregateType: NetworkProgressUpdatedEvent.AGGREGATE_TYPE,
aggregateId: 'network',
eventType: NetworkProgressUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
return {
success: true,
message: `Published network progress: trees=${progress.totalTreeCount}, unit=${progress.currentUnit}, multiplier=${progress.currentMultiplier.toString()}`,
};
} catch (error) {
this.logger.error('Failed to publish network progress', error);
return {
success: false,
message: `Failed: ${error.message}`,
};
}
}
}

View File

@ -11,6 +11,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher';
// Services
import { ContributionCalculationService } from './services/contribution-calculation.service';
import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service';
import { ContributionRateService } from './services/contribution-rate.service';
import { SnapshotService } from './services/snapshot.service';
// Queries
@ -36,6 +37,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler';
// Services
ContributionCalculationService,
ContributionDistributionPublisherService,
ContributionRateService,
SnapshotService,
// Queries
@ -48,6 +50,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler';
],
exports: [
ContributionCalculationService,
ContributionRateService,
SnapshotService,
GetContributionAccountQuery,
GetContributionStatsQuery,

View File

@ -8,8 +8,11 @@ import { SystemAccountRepository } from '../../infrastructure/persistence/reposi
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate';
import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate';
import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface';
import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service';
import { ContributionRateService } from './contribution-rate.service';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent } from '../../domain/events';
/**
*
@ -29,6 +32,7 @@ export class ContributionCalculationService {
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly distributionPublisher: ContributionDistributionPublisherService,
private readonly contributionRateService: ContributionRateService,
) {}
/**
@ -172,6 +176,8 @@ export class ContributionCalculationService {
record.accountSequence,
ContributionSourceType.TEAM_LEVEL,
record.amount,
record.levelDepth, // 传递层级深度
null,
);
}
}
@ -186,6 +192,8 @@ export class ContributionCalculationService {
record.accountSequence,
ContributionSourceType.TEAM_BONUS,
record.amount,
null,
record.bonusTier, // 传递加成档位
);
}
}
@ -224,6 +232,53 @@ export class ContributionCalculationService {
});
}
}
// 6. 发布算力记录同步事件(用于 mining-admin-service
await this.publishContributionRecordEvents(result);
}
/**
*
*/
private async publishContributionRecordEvents(
result: ContributionDistributionResult,
): Promise<void> {
const allRecords: ContributionRecordAggregate[] = [
result.personalRecord,
...result.teamLevelRecords,
...result.teamBonusRecords,
];
const events = allRecords.map((record) => {
const event = new ContributionRecordSyncedEvent(
record.id!,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.value.toString(),
record.distributionRate.value.toString(),
record.levelDepth,
record.bonusTier,
record.amount.value.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
return {
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id!.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
if (events.length > 0) {
await this.outboxRepository.saveMany(events);
}
}
/**

View File

@ -0,0 +1,374 @@
import { Injectable, Logger } from '@nestjs/common';
import { Decimal } from 'decimal.js';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
/**
*
*/
interface ContributionRateConfig {
baseContribution: Decimal; // 基础贡献值 (22617)
incrementPercentage: Decimal; // 每单位递增百分比 (0.003 = 0.3%)
unitSize: number; // 每单位棵数 (100)
startTreeNumber: number; // 开始递增的棵数 (1000)
}
/**
*
*/
export interface DailyContributionRateInfo {
effectiveDate: Date;
contributionPerTree: Decimal;
multiplier: Decimal;
unit: number;
startTreeCount: number;
}
/**
*
*
*
* - 基础贡献值: 22617 (1999)
* - 10001001
* - 0.3%
* - 使
*
*
* - currentUnit = floor((totalTreeCount - startTreeNumber) / unitSize)
* - currentMultiplier = 1 + (currentUnit * incrementPercentage)
* - currentContributionPerTree = baseContribution * currentMultiplier
*/
@Injectable()
export class ContributionRateService {
private readonly logger = new Logger(ContributionRateService.name);
// 默认配置(可从数据库加载)
private config: ContributionRateConfig = {
baseContribution: new Decimal(22617),
incrementPercentage: new Decimal(0.003), // 0.3%
unitSize: 100,
startTreeNumber: 1000,
};
constructor(private readonly prisma: PrismaService) {}
/**
*
*
* @param adoptionDate
* @returns
*/
async getOrCreateDailyRate(adoptionDate: Date): Promise<DailyContributionRateInfo> {
// 标准化日期(去掉时间部分)
const effectiveDate = this.normalizeDate(adoptionDate);
// 尝试获取已存在的当日记录
let dailyRate = await this.prisma.dailyContributionRate.findUnique({
where: { effectiveDate },
});
if (dailyRate) {
return {
effectiveDate: dailyRate.effectiveDate,
contributionPerTree: new Decimal(dailyRate.contributionPerTree.toString()),
multiplier: new Decimal(dailyRate.startMultiplier.toString()),
unit: dailyRate.startUnit,
startTreeCount: dailyRate.startTreeCount,
};
}
// 获取当前全网进度
const progress = await this.getOrCreateNetworkProgress();
// 计算当前系数
const { unit, multiplier, contributionPerTree } = this.calculateRate(progress.totalTreeCount);
// 创建当日记录
dailyRate = await this.prisma.dailyContributionRate.create({
data: {
effectiveDate,
startTreeCount: progress.totalTreeCount,
startUnit: unit,
startMultiplier: multiplier.toNumber(),
contributionPerTree: contributionPerTree.toNumber(),
dailyTreeCount: 0,
dailyAdoptionOrders: 0,
dailyAdoptedUsers: 0,
isClosed: false,
},
});
this.logger.log(
`Created daily contribution rate for ${effectiveDate.toISOString().split('T')[0]}: ` +
`unit=${unit}, multiplier=${multiplier.toString()}, contributionPerTree=${contributionPerTree.toString()}`
);
return {
effectiveDate: dailyRate.effectiveDate,
contributionPerTree,
multiplier,
unit,
startTreeCount: progress.totalTreeCount,
};
}
/**
*
*
* @param adoptionDate
* @returns
*/
async getContributionPerTree(adoptionDate: Date): Promise<Decimal> {
const rateInfo = await this.getOrCreateDailyRate(adoptionDate);
return rateInfo.contributionPerTree;
}
/**
*
*
* @param treeCount
* @param adoptionDate
* @param adoptionId ID
* @param isNewUser
*/
async updateNetworkProgress(
treeCount: number,
adoptionDate: Date,
adoptionId: bigint,
isNewUser: boolean,
): Promise<void> {
const effectiveDate = this.normalizeDate(adoptionDate);
await this.prisma.$transaction(async (tx) => {
// 更新全网进度
const progress = await tx.networkAdoptionProgress.findFirst();
if (!progress) {
// 首次创建
const newTotalTreeCount = treeCount;
const { unit, multiplier, contributionPerTree, nextUnitTreeCount } =
this.calculateRate(newTotalTreeCount);
await tx.networkAdoptionProgress.create({
data: {
totalTreeCount: newTotalTreeCount,
totalAdoptionOrders: 1,
totalAdoptedUsers: isNewUser ? 1 : 0,
currentUnit: unit,
currentMultiplier: multiplier.toNumber(),
currentContributionPerTree: contributionPerTree.toNumber(),
nextUnitTreeCount,
lastAdoptionId: adoptionId,
lastAdoptionDate: effectiveDate,
},
});
} else {
// 更新现有记录
const newTotalTreeCount = progress.totalTreeCount + treeCount;
const { unit, multiplier, contributionPerTree, nextUnitTreeCount } =
this.calculateRate(newTotalTreeCount);
await tx.networkAdoptionProgress.update({
where: { id: progress.id },
data: {
totalTreeCount: newTotalTreeCount,
totalAdoptionOrders: { increment: 1 },
totalAdoptedUsers: isNewUser ? { increment: 1 } : undefined,
currentUnit: unit,
currentMultiplier: multiplier.toNumber(),
currentContributionPerTree: contributionPerTree.toNumber(),
nextUnitTreeCount,
lastAdoptionId: adoptionId,
lastAdoptionDate: effectiveDate,
},
});
}
// 更新当日统计
await tx.dailyContributionRate.upsert({
where: { effectiveDate },
create: {
effectiveDate,
startTreeCount: 0,
startUnit: 0,
startMultiplier: 1.0,
contributionPerTree: this.config.baseContribution.toNumber(),
dailyTreeCount: treeCount,
dailyAdoptionOrders: 1,
dailyAdoptedUsers: isNewUser ? 1 : 0,
isClosed: false,
},
update: {
dailyTreeCount: { increment: treeCount },
dailyAdoptionOrders: { increment: 1 },
dailyAdoptedUsers: isNewUser ? { increment: 1 } : undefined,
},
});
});
this.logger.debug(
`Updated network progress: +${treeCount} trees, adoptionId=${adoptionId}`
);
}
/**
*
*
*
* @param date
*/
async closeDailyRate(date: Date): Promise<void> {
const effectiveDate = this.normalizeDate(date);
const dailyRate = await this.prisma.dailyContributionRate.findUnique({
where: { effectiveDate },
});
if (!dailyRate) {
this.logger.warn(`No daily rate found for ${effectiveDate.toISOString().split('T')[0]}`);
return;
}
if (dailyRate.isClosed) {
this.logger.warn(`Daily rate for ${effectiveDate.toISOString().split('T')[0]} is already closed`);
return;
}
// 获取当前全网进度
const progress = await this.getOrCreateNetworkProgress();
const { unit, multiplier } = this.calculateRate(progress.totalTreeCount);
await this.prisma.dailyContributionRate.update({
where: { effectiveDate },
data: {
endTreeCount: progress.totalTreeCount,
endUnit: unit,
endMultiplier: multiplier.toNumber(),
isClosed: true,
},
});
this.logger.log(
`Closed daily rate for ${effectiveDate.toISOString().split('T')[0]}: ` +
`endTreeCount=${progress.totalTreeCount}, endUnit=${unit}, endMultiplier=${multiplier.toString()}`
);
}
/**
*
*/
async getNetworkProgress(): Promise<{
totalTreeCount: number;
totalAdoptionOrders: number;
totalAdoptedUsers: number;
currentUnit: number;
currentMultiplier: Decimal;
currentContributionPerTree: Decimal;
nextUnitTreeCount: number;
}> {
const progress = await this.getOrCreateNetworkProgress();
return {
totalTreeCount: progress.totalTreeCount,
totalAdoptionOrders: progress.totalAdoptionOrders,
totalAdoptedUsers: progress.totalAdoptedUsers,
currentUnit: progress.currentUnit,
currentMultiplier: new Decimal(progress.currentMultiplier.toString()),
currentContributionPerTree: new Decimal(progress.currentContributionPerTree.toString()),
nextUnitTreeCount: progress.nextUnitTreeCount,
};
}
/**
*
*/
private async getOrCreateNetworkProgress() {
let progress = await this.prisma.networkAdoptionProgress.findFirst();
if (!progress) {
progress = await this.prisma.networkAdoptionProgress.create({
data: {
totalTreeCount: 0,
totalAdoptionOrders: 0,
totalAdoptedUsers: 0,
currentUnit: 0,
currentMultiplier: 1.0,
currentContributionPerTree: this.config.baseContribution.toNumber(),
nextUnitTreeCount: this.config.startTreeNumber,
},
});
}
return progress;
}
/**
*
*
* @param totalTreeCount
* @returns
*/
private calculateRate(totalTreeCount: number): {
unit: number;
multiplier: Decimal;
contributionPerTree: Decimal;
nextUnitTreeCount: number;
} {
let unit = 0;
let nextUnitTreeCount = this.config.startTreeNumber;
// 如果总棵数已经超过起始阈值
if (totalTreeCount >= this.config.startTreeNumber) {
// 计算当前单位数
unit = Math.floor((totalTreeCount - this.config.startTreeNumber) / this.config.unitSize);
// 计算下一个单位的触发点
nextUnitTreeCount = this.config.startTreeNumber + ((unit + 1) * this.config.unitSize);
}
// 计算系数: 1 + (unit * 0.003)
const multiplier = new Decimal(1).plus(
new Decimal(unit).times(this.config.incrementPercentage)
);
// 计算每棵树贡献值
const contributionPerTree = this.config.baseContribution.times(multiplier);
return {
unit,
multiplier,
contributionPerTree,
nextUnitTreeCount,
};
}
/**
*
*/
private normalizeDate(date: Date): Date {
const normalized = new Date(date);
normalized.setHours(0, 0, 0, 0);
return normalized;
}
/**
*
*/
async loadConfig(): Promise<void> {
const dbConfig = await this.prisma.contributionConfig.findFirst({
where: { isActive: true },
});
if (dbConfig) {
this.config = {
baseContribution: new Decimal(dbConfig.baseContribution.toString()),
incrementPercentage: new Decimal(dbConfig.incrementPercentage.toString()),
unitSize: dbConfig.unitSize,
startTreeNumber: dbConfig.startTreeNumber,
};
this.logger.log(
`Loaded contribution rate config: base=${this.config.baseContribution.toString()}, ` +
`increment=${this.config.incrementPercentage.toString()}, ` +
`unitSize=${this.config.unitSize}, startTree=${this.config.startTreeNumber}`
);
}
}
}

View File

@ -0,0 +1,47 @@
/**
*
* mining-admin-service
*/
export class ContributionRecordSyncedEvent {
static readonly EVENT_TYPE = 'ContributionRecordSynced';
static readonly AGGREGATE_TYPE = 'ContributionRecord';
constructor(
public readonly originalRecordId: bigint,
public readonly accountSequence: string,
public readonly sourceType: string,
public readonly sourceAdoptionId: bigint,
public readonly sourceAccountSequence: string,
public readonly treeCount: number,
public readonly baseContribution: string,
public readonly distributionRate: string,
public readonly levelDepth: number | null,
public readonly bonusTier: number | null,
public readonly amount: string,
public readonly effectiveDate: Date,
public readonly expireDate: Date,
public readonly isExpired: boolean,
public readonly createdAt: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
originalRecordId: this.originalRecordId.toString(),
accountSequence: this.accountSequence,
sourceType: this.sourceType,
sourceAdoptionId: this.sourceAdoptionId.toString(),
sourceAccountSequence: this.sourceAccountSequence,
treeCount: this.treeCount,
baseContribution: this.baseContribution,
distributionRate: this.distributionRate,
levelDepth: this.levelDepth,
bonusTier: this.bonusTier,
amount: this.amount,
effectiveDate: this.effectiveDate.toISOString(),
expireDate: this.expireDate.toISOString(),
isExpired: this.isExpired,
createdAt: this.createdAt.toISOString(),
};
}
}

View File

@ -3,3 +3,5 @@ export * from './daily-snapshot-created.event';
export * from './contribution-account-synced.event';
export * from './referral-synced.event';
export * from './adoption-synced.event';
export * from './contribution-record-synced.event';
export * from './network-progress-updated.event';

View File

@ -0,0 +1,31 @@
/**
*
* mining-admin-service
*/
export class NetworkProgressUpdatedEvent {
static readonly EVENT_TYPE = 'NetworkProgressUpdated';
static readonly AGGREGATE_TYPE = 'NetworkProgress';
constructor(
public readonly totalTreeCount: number,
public readonly totalAdoptionOrders: number,
public readonly totalAdoptedUsers: number,
public readonly currentUnit: number,
public readonly currentMultiplier: string,
public readonly currentContributionPerTree: string,
public readonly nextUnitTreeCount: number,
) {}
toPayload(): Record<string, any> {
return {
eventType: NetworkProgressUpdatedEvent.EVENT_TYPE,
totalTreeCount: this.totalTreeCount,
totalAdoptionOrders: this.totalAdoptionOrders,
totalAdoptedUsers: this.totalAdoptedUsers,
currentUnit: this.currentUnit,
currentMultiplier: this.currentMultiplier,
currentContributionPerTree: this.currentContributionPerTree,
nextUnitTreeCount: this.nextUnitTreeCount,
};
}
}

View File

@ -79,9 +79,10 @@ export class ContributionAccountRepository implements IContributionAccountReposi
accountSequence: string,
sourceType: ContributionSourceType,
amount: ContributionAmount,
levelDepth?: number | null,
bonusTier?: number | null,
): Promise<void> {
// 个人算力直接增加到 personalContribution 和 effectiveContribution
// 层级/加成算力需要根据解锁状态分配到对应的 pending 字段
if (sourceType === ContributionSourceType.PERSONAL) {
await this.client.contributionAccount.update({
where: { accountSequence },
@ -91,8 +92,34 @@ export class ContributionAccountRepository implements IContributionAccountReposi
updatedAt: new Date(),
},
});
} else if (sourceType === ContributionSourceType.TEAM_LEVEL && levelDepth) {
// 层级算力:更新对应层级的 pending 字段和汇总字段
const levelPendingField = `level${levelDepth}Pending` as const;
await this.client.contributionAccount.update({
where: { accountSequence },
data: {
[levelPendingField]: { increment: amount.value },
totalLevelPending: { increment: amount.value },
totalPending: { increment: amount.value },
effectiveContribution: { increment: amount.value },
updatedAt: new Date(),
},
});
} else if (sourceType === ContributionSourceType.TEAM_BONUS && bonusTier) {
// 加成算力:更新对应档位的 pending 字段和汇总字段
const bonusTierField = `bonusTier${bonusTier}Pending` as const;
await this.client.contributionAccount.update({
where: { accountSequence },
data: {
[bonusTierField]: { increment: amount.value },
totalBonusPending: { increment: amount.value },
totalPending: { increment: amount.value },
effectiveContribution: { increment: amount.value },
updatedAt: new Date(),
},
});
} else {
// 层级和加成算力暂时累加到 effectiveContribution待后续细化分配逻辑
// 兜底:只更新 effectiveContribution
await this.client.contributionAccount.update({
where: { accountSequence },
data: {

View File

@ -242,6 +242,66 @@ model SyncedAdoption {
@@map("synced_adoptions")
}
// =============================================================================
// CDC 同步表 - 算力记录明细 (from contribution-service)
// =============================================================================
model SyncedContributionRecord {
id String @id @default(uuid())
originalRecordId BigInt @unique // contribution-service 中的原始 ID
accountSequence String // 获得算力的账户
// 来源信息
sourceType String // PERSONAL / TEAM_LEVEL / TEAM_BONUS
sourceAdoptionId BigInt // 来源认种ID
sourceAccountSequence String // 认种人账号
// 计算参数
treeCount Int // 认种棵数
baseContribution Decimal @db.Decimal(20, 10) // 基础贡献值/棵
distributionRate Decimal @db.Decimal(10, 6) // 分配比例
levelDepth Int? // 层级深度 (1-15)
bonusTier Int? // 加成档位 (1-3)
// 金额
amount Decimal @db.Decimal(30, 10) // 算力金额
// 有效期
effectiveDate DateTime @db.Date // 生效日期
expireDate DateTime @db.Date // 过期日期
isExpired Boolean @default(false)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
createdAt DateTime // 原始记录创建时间
@@index([accountSequence])
@@index([sourceAccountSequence])
@@index([sourceAdoptionId])
@@index([sourceType])
@@index([createdAt(sort: Desc)])
@@map("synced_contribution_records")
}
// =============================================================================
// CDC 同步表 - 全网算力进度 (from contribution-service)
// =============================================================================
model SyncedNetworkProgress {
id String @id @default(uuid())
totalTreeCount Int @default(0) // 全网累计认种棵数
totalAdoptionOrders Int @default(0) // 全网累计认种订单数
totalAdoptedUsers Int @default(0) // 全网累计认种用户数
currentUnit Int @default(0) // 当前单位数
currentMultiplier Decimal @db.Decimal(10, 6) @default(1.0) // 当前系数
currentContributionPerTree Decimal @db.Decimal(20, 10) @default(22617) // 当前每棵树贡献值
nextUnitTreeCount Int @default(1000) // 下一个单位触发的棵数
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_network_progress")
}
// =============================================================================
// CDC 同步表 - 挖矿账户 (from mining-service)
// =============================================================================

View File

@ -296,8 +296,7 @@ export class UsersService {
}
/**
*
* contribution-service
*
*/
async getUserContributions(
accountSequence: string,
@ -315,22 +314,77 @@ export class UsersService {
// 返回算力账户概要
const contribution = user.contributionAccount;
const emptySummary = {
accountSequence,
personalContribution: '0',
teamLevelContribution: '0',
teamBonusContribution: '0',
totalContribution: '0',
effectiveContribution: '0',
hasAdopted: false,
directReferralCount: 0,
unlockedLevelDepth: 0,
unlockedBonusTiers: 0,
};
// 获取算力明细记录
const [records, total] = await Promise.all([
this.prisma.syncedContributionRecord.findMany({
where: { accountSequence },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.syncedContributionRecord.count({
where: { accountSequence },
}),
]);
// 格式化记录
const formattedRecords = await Promise.all(
records.map(async (record) => {
// 获取来源用户信息
let sourceUserInfo = null;
if (record.sourceAccountSequence !== accountSequence) {
const sourceUser = await this.prisma.syncedUser.findUnique({
where: { accountSequence: record.sourceAccountSequence },
select: { nickname: true, phone: true },
});
sourceUserInfo = sourceUser ? {
nickname: sourceUser.nickname,
phone: this.maskPhone(sourceUser.phone),
} : null;
}
return {
id: record.originalRecordId.toString(),
sourceType: record.sourceType,
sourceAccountSequence: record.sourceAccountSequence,
sourceUserInfo,
treeCount: record.treeCount,
baseContribution: record.baseContribution.toString(),
distributionRate: record.distributionRate.toString(),
levelDepth: record.levelDepth,
bonusTier: record.bonusTier,
amount: record.amount.toString(),
effectiveDate: record.effectiveDate,
expireDate: record.expireDate,
isExpired: record.isExpired,
createdAt: record.createdAt,
};
})
);
if (!contribution) {
return {
summary: {
accountSequence,
personalContribution: '0',
teamLevelContribution: '0',
teamBonusContribution: '0',
totalContribution: '0',
effectiveContribution: '0',
hasAdopted: false,
directReferralCount: 0,
unlockedLevelDepth: 0,
unlockedBonusTiers: 0,
summary: emptySummary,
records: formattedRecords,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
};
}
@ -347,10 +401,13 @@ export class UsersService {
unlockedLevelDepth: contribution.unlockedLevelDepth,
unlockedBonusTiers: contribution.unlockedBonusTiers,
},
// 详细流水需要从 contribution-service 获取
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
note: '详细算力流水请查看 contribution-service',
records: formattedRecords,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}

View File

@ -101,6 +101,16 @@ export class CdcSyncService implements OnModuleInit {
'AdoptionSynced',
this.handleAdoptionSynced.bind(this),
);
// ContributionRecordSynced 事件 - 同步算力明细记录
this.cdcConsumer.registerServiceHandler(
'ContributionRecordSynced',
this.handleContributionRecordSynced.bind(this),
);
// NetworkProgressUpdated 事件 - 同步全网算力进度
this.cdcConsumer.registerServiceHandler(
'NetworkProgressUpdated',
this.handleNetworkProgressUpdated.bind(this),
);
// ===========================================================================
// 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表)
@ -620,6 +630,102 @@ export class CdcSyncService implements OnModuleInit {
}
}
/**
* ContributionRecordSynced -
*/
private async handleContributionRecordSynced(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedContributionRecord.upsert({
where: { originalRecordId: BigInt(payload.originalRecordId) },
create: {
originalRecordId: BigInt(payload.originalRecordId),
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
createdAt: new Date(payload.createdAt),
},
update: {
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced contribution record: ${payload.originalRecordId}`);
} catch (error) {
this.logger.error(`Failed to sync contribution record: ${payload.originalRecordId}`, error);
}
}
/**
* NetworkProgressUpdated -
*/
private async handleNetworkProgressUpdated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
// 全网进度只保留一条记录
const existing = await this.prisma.syncedNetworkProgress.findFirst();
if (existing) {
await this.prisma.syncedNetworkProgress.update({
where: { id: existing.id },
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
} else {
await this.prisma.syncedNetworkProgress.create({
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
}
await this.recordProcessedEvent(event);
this.logger.debug(
`Synced network progress: trees=${payload.totalTreeCount}, unit=${payload.currentUnit}, multiplier=${payload.currentMultiplier}`
);
} catch (error) {
this.logger.error('Failed to sync network progress', error);
}
}
// ===========================================================================
// 挖矿账户事件处理
// ===========================================================================