Compare commits

...

3 Commits

Author SHA1 Message Date
hailin 52c573d507 fix(contribution-service): auto-publish contribution records on calculation
- Change saveMany to return saved records with IDs
- Update saveDistributionResult to use saved records for event publishing
- Contribution records are now automatically synced to mining-admin-service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 07:32:50 -08:00
hailin 04fd7b946a feat(mining-admin-web): update contribution records display to match backend API
- Update ContributionRecord type to match backend response fields (sourceType, baseContribution, distributionRate, etc.)
- Update contribution-records-list component with improved UI showing source type badges, user info, tree count, and expiry status

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 07:30:00 -08:00
hailin dbe9ab223f feat(contribution): fix pending fields update and add network progress tracking
- Fix updateContribution to properly update levelXPending and bonusTierXPending fields
- Add NetworkAdoptionProgress and DailyContributionRate tables for tracking contribution coefficient
- Create ContributionRateService for dynamic rate calculation (base 22617, +0.3% per 100 trees after 1000)
- Add ContributionRecordSynced and NetworkProgressUpdated events for CDC sync
- Add admin endpoints for network progress query and contribution records publishing
- Update mining-admin-service to sync contribution records and network progress

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 07:26:32 -08:00
16 changed files with 1077 additions and 62 deletions

View File

@ -458,6 +458,73 @@ model DistributionRateConfig {
@@map("distribution_rate_configs")
}
// 全网认种进度表(单行记录,实时更新)
// 记录当前全网累计认种数量和对应的算力系数
model NetworkAdoptionProgress {
id BigInt @id @default(autoincrement())
// ========== 全网累计认种统计 ==========
totalTreeCount Int @default(0) @map("total_tree_count") // 全网累计认种棵数
totalAdoptionOrders Int @default(0) @map("total_adoption_orders") // 全网累计认种订单数
totalAdoptedUsers Int @default(0) @map("total_adopted_users") // 全网累计认种用户数
// ========== 当前算力系数 ==========
// 基础值: 22617 (从第1棵到第999棵)
// 从第1000棵开始每100棵为1个单位每个单位递增 0.3%
// currentMultiplier = 1 + (currentUnit * 0.003)
// currentContributionPerTree = baseContribution * currentMultiplier
currentUnit Int @default(0) @map("current_unit") // 当前单位数 (0表示还没到1000棵)
currentMultiplier Decimal @default(1.0) @map("current_multiplier") @db.Decimal(10, 6) // 当前系数 (1.000, 1.003, 1.006...)
currentContributionPerTree Decimal @default(22617) @map("current_contribution_per_tree") @db.Decimal(20, 10) // 当前每棵树贡献值
// ========== 下一个单位的触发点 ==========
nextUnitTreeCount Int @default(1000) @map("next_unit_tree_count") // 下一个单位触发的棵数
// ========== 最后更新信息 ==========
lastAdoptionId BigInt? @map("last_adoption_id") // 最后处理的认种ID
lastAdoptionDate DateTime? @map("last_adoption_date") @db.Date // 最后认种日期
updatedAt DateTime @updatedAt @map("updated_at")
createdAt DateTime @default(now()) @map("created_at")
@@map("network_adoption_progress")
}
// 每日算力系数快照表
// 记录每天的算力系数,确保同一天认种的用户使用相同系数
model DailyContributionRate {
id BigInt @id @default(autoincrement())
// ========== 日期 ==========
effectiveDate DateTime @unique @map("effective_date") @db.Date // 生效日期
// ========== 当日起始状态 ==========
startTreeCount Int @default(0) @map("start_tree_count") // 当日开始时的全网棵数
startUnit Int @default(0) @map("start_unit") // 当日开始时的单位数
startMultiplier Decimal @default(1.0) @map("start_multiplier") @db.Decimal(10, 6) // 当日开始时的系数
contributionPerTree Decimal @map("contribution_per_tree") @db.Decimal(20, 10) // 当日每棵树贡献值(同一天内不变)
// ========== 当日结束状态 ==========
endTreeCount Int? @map("end_tree_count") // 当日结束时的全网棵数
endUnit Int? @map("end_unit") // 当日结束时的单位数
endMultiplier Decimal? @map("end_multiplier") @db.Decimal(10, 6) // 当日结束时的系数
// ========== 当日统计 ==========
dailyTreeCount Int @default(0) @map("daily_tree_count") // 当日新增认种棵数
dailyAdoptionOrders Int @default(0) @map("daily_adoption_orders") // 当日新增认种订单数
dailyAdoptedUsers Int @default(0) @map("daily_adopted_users") // 当日新增认种用户数
// ========== 状态 ==========
isClosed Boolean @default(false) @map("is_closed") // 是否已结算日终处理后置为true
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@index([effectiveDate])
@@index([isClosed])
@@map("daily_contribution_rates")
}
// ============================================
// Outbox 事件表(可靠事件发布)
// ============================================

View File

@ -3,10 +3,13 @@ import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionRateService } from '../../application/services/contribution-rate.service';
import {
ContributionAccountSyncedEvent,
ReferralSyncedEvent,
AdoptionSyncedEvent,
ContributionRecordSyncedEvent,
NetworkProgressUpdatedEvent,
} from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ -19,6 +22,7 @@ export class AdminController {
private readonly prisma: PrismaService,
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly contributionRateService: ContributionRateService,
) {}
@Get('accounts/sync')
@ -274,4 +278,146 @@ export class AdminController {
message: `Published ${publishedCount} events, ${failedCount} failed out of ${adoptions.length} total`,
};
}
@Get('network-progress')
@Public()
@ApiOperation({ summary: '获取全网认种进度和算力系数' })
async getNetworkProgress() {
const progress = await this.contributionRateService.getNetworkProgress();
return {
totalTreeCount: progress.totalTreeCount,
totalAdoptionOrders: progress.totalAdoptionOrders,
totalAdoptedUsers: progress.totalAdoptedUsers,
currentUnit: progress.currentUnit,
currentMultiplier: progress.currentMultiplier.toString(),
currentContributionPerTree: progress.currentContributionPerTree.toString(),
nextUnitTreeCount: progress.nextUnitTreeCount,
// 计算下一个单位还需要多少棵
treesToNextUnit: progress.nextUnitTreeCount - progress.totalTreeCount,
};
}
@Post('contribution-records/publish-all')
@Public()
@ApiOperation({ summary: '发布所有算力记录事件到 outbox用于初始同步到 mining-admin-service' })
async publishAllContributionRecords(): Promise<{
success: boolean;
publishedCount: number;
failedCount: number;
message: string;
}> {
const records = await this.prisma.contributionRecord.findMany({
select: {
id: true,
accountSequence: true,
sourceType: true,
sourceAdoptionId: true,
sourceAccountSequence: true,
treeCount: true,
baseContribution: true,
distributionRate: true,
levelDepth: true,
bonusTier: true,
amount: true,
effectiveDate: true,
expireDate: true,
isExpired: true,
createdAt: true,
},
});
let publishedCount = 0;
let failedCount = 0;
const batchSize = 100;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
try {
await this.unitOfWork.executeInTransaction(async () => {
const events = batch.map((record) => {
const event = new ContributionRecordSyncedEvent(
record.id,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.toString(),
record.distributionRate.toString(),
record.levelDepth,
record.bonusTier,
record.amount.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
return {
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
publishedCount += batch.length;
this.logger.debug(`Published contribution record batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`);
} catch (error) {
failedCount += batch.length;
this.logger.error(`Failed to publish contribution record batch ${Math.floor(i / batchSize) + 1}`, error);
}
}
this.logger.log(`Published ${publishedCount} contribution record events, ${failedCount} failed`);
return {
success: failedCount === 0,
publishedCount,
failedCount,
message: `Published ${publishedCount} events, ${failedCount} failed out of ${records.length} total`,
};
}
@Post('network-progress/publish')
@Public()
@ApiOperation({ summary: '发布当前全网进度事件' })
async publishNetworkProgress(): Promise<{ success: boolean; message: string }> {
try {
const progress = await this.contributionRateService.getNetworkProgress();
const event = new NetworkProgressUpdatedEvent(
progress.totalTreeCount,
progress.totalAdoptionOrders,
progress.totalAdoptedUsers,
progress.currentUnit,
progress.currentMultiplier.toString(),
progress.currentContributionPerTree.toString(),
progress.nextUnitTreeCount,
);
await this.outboxRepository.save({
aggregateType: NetworkProgressUpdatedEvent.AGGREGATE_TYPE,
aggregateId: 'network',
eventType: NetworkProgressUpdatedEvent.EVENT_TYPE,
payload: event.toPayload(),
});
return {
success: true,
message: `Published network progress: trees=${progress.totalTreeCount}, unit=${progress.currentUnit}, multiplier=${progress.currentMultiplier.toString()}`,
};
} catch (error) {
this.logger.error('Failed to publish network progress', error);
return {
success: false,
message: `Failed: ${error.message}`,
};
}
}
}

View File

@ -11,6 +11,7 @@ import { CDCEventDispatcher } from './event-handlers/cdc-event-dispatcher';
// Services
import { ContributionCalculationService } from './services/contribution-calculation.service';
import { ContributionDistributionPublisherService } from './services/contribution-distribution-publisher.service';
import { ContributionRateService } from './services/contribution-rate.service';
import { SnapshotService } from './services/snapshot.service';
// Queries
@ -36,6 +37,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler';
// Services
ContributionCalculationService,
ContributionDistributionPublisherService,
ContributionRateService,
SnapshotService,
// Queries
@ -48,6 +50,7 @@ import { ContributionScheduler } from './schedulers/contribution.scheduler';
],
exports: [
ContributionCalculationService,
ContributionRateService,
SnapshotService,
GetContributionAccountQuery,
GetContributionStatsQuery,

View File

@ -8,8 +8,11 @@ import { SystemAccountRepository } from '../../infrastructure/persistence/reposi
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { ContributionAccountAggregate, ContributionSourceType } from '../../domain/aggregates/contribution-account.aggregate';
import { ContributionRecordAggregate } from '../../domain/aggregates/contribution-record.aggregate';
import { SyncedReferral } from '../../domain/repositories/synced-data.repository.interface';
import { ContributionDistributionPublisherService } from './contribution-distribution-publisher.service';
import { ContributionRateService } from './contribution-rate.service';
import { ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent } from '../../domain/events';
/**
*
@ -29,6 +32,7 @@ export class ContributionCalculationService {
private readonly outboxRepository: OutboxRepository,
private readonly unitOfWork: UnitOfWork,
private readonly distributionPublisher: ContributionDistributionPublisherService,
private readonly contributionRateService: ContributionRateService,
) {}
/**
@ -149,8 +153,12 @@ export class ContributionCalculationService {
sourceAdoptionId: bigint,
sourceAccountSequence: string,
): Promise<void> {
// 收集所有保存后的记录带ID用于发布事件
const savedRecords: ContributionRecordAggregate[] = [];
// 1. 保存个人算力记录
await this.contributionRecordRepository.save(result.personalRecord);
const savedPersonalRecord = await this.contributionRecordRepository.save(result.personalRecord);
savedRecords.push(savedPersonalRecord);
// 更新个人算力账户
let account = await this.contributionAccountRepository.findByAccountSequence(
@ -164,7 +172,8 @@ export class ContributionCalculationService {
// 2. 保存团队层级算力记录
if (result.teamLevelRecords.length > 0) {
await this.contributionRecordRepository.saveMany(result.teamLevelRecords);
const savedLevelRecords = await this.contributionRecordRepository.saveMany(result.teamLevelRecords);
savedRecords.push(...savedLevelRecords);
// 更新各上线的算力账户
for (const record of result.teamLevelRecords) {
@ -172,13 +181,16 @@ export class ContributionCalculationService {
record.accountSequence,
ContributionSourceType.TEAM_LEVEL,
record.amount,
record.levelDepth, // 传递层级深度
null,
);
}
}
// 3. 保存团队奖励算力记录
if (result.teamBonusRecords.length > 0) {
await this.contributionRecordRepository.saveMany(result.teamBonusRecords);
const savedBonusRecords = await this.contributionRecordRepository.saveMany(result.teamBonusRecords);
savedRecords.push(...savedBonusRecords);
// 更新直接上线的算力账户
for (const record of result.teamBonusRecords) {
@ -186,6 +198,8 @@ export class ContributionCalculationService {
record.accountSequence,
ContributionSourceType.TEAM_BONUS,
record.amount,
null,
record.bonusTier, // 传递加成档位
);
}
}
@ -224,6 +238,47 @@ export class ContributionCalculationService {
});
}
}
// 6. 发布算力记录同步事件(用于 mining-admin-service- 使用保存后带 ID 的记录
await this.publishContributionRecordEvents(savedRecords);
}
/**
*
*/
private async publishContributionRecordEvents(
savedRecords: ContributionRecordAggregate[],
): Promise<void> {
if (savedRecords.length === 0) return;
const events = savedRecords.map((record) => {
const event = new ContributionRecordSyncedEvent(
record.id!,
record.accountSequence,
record.sourceType,
record.sourceAdoptionId,
record.sourceAccountSequence,
record.treeCount,
record.baseContribution.value.toString(),
record.distributionRate.value.toString(),
record.levelDepth,
record.bonusTier,
record.amount.value.toString(),
record.effectiveDate,
record.expireDate,
record.isExpired,
record.createdAt,
);
return {
aggregateType: ContributionRecordSyncedEvent.AGGREGATE_TYPE,
aggregateId: record.id!.toString(),
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
}
/**

View File

@ -0,0 +1,374 @@
import { Injectable, Logger } from '@nestjs/common';
import { Decimal } from 'decimal.js';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
/**
*
*/
interface ContributionRateConfig {
baseContribution: Decimal; // 基础贡献值 (22617)
incrementPercentage: Decimal; // 每单位递增百分比 (0.003 = 0.3%)
unitSize: number; // 每单位棵数 (100)
startTreeNumber: number; // 开始递增的棵数 (1000)
}
/**
*
*/
export interface DailyContributionRateInfo {
effectiveDate: Date;
contributionPerTree: Decimal;
multiplier: Decimal;
unit: number;
startTreeCount: number;
}
/**
*
*
*
* - 基础贡献值: 22617 (1999)
* - 10001001
* - 0.3%
* - 使
*
*
* - currentUnit = floor((totalTreeCount - startTreeNumber) / unitSize)
* - currentMultiplier = 1 + (currentUnit * incrementPercentage)
* - currentContributionPerTree = baseContribution * currentMultiplier
*/
@Injectable()
export class ContributionRateService {
private readonly logger = new Logger(ContributionRateService.name);
// 默认配置(可从数据库加载)
private config: ContributionRateConfig = {
baseContribution: new Decimal(22617),
incrementPercentage: new Decimal(0.003), // 0.3%
unitSize: 100,
startTreeNumber: 1000,
};
constructor(private readonly prisma: PrismaService) {}
/**
*
*
* @param adoptionDate
* @returns
*/
async getOrCreateDailyRate(adoptionDate: Date): Promise<DailyContributionRateInfo> {
// 标准化日期(去掉时间部分)
const effectiveDate = this.normalizeDate(adoptionDate);
// 尝试获取已存在的当日记录
let dailyRate = await this.prisma.dailyContributionRate.findUnique({
where: { effectiveDate },
});
if (dailyRate) {
return {
effectiveDate: dailyRate.effectiveDate,
contributionPerTree: new Decimal(dailyRate.contributionPerTree.toString()),
multiplier: new Decimal(dailyRate.startMultiplier.toString()),
unit: dailyRate.startUnit,
startTreeCount: dailyRate.startTreeCount,
};
}
// 获取当前全网进度
const progress = await this.getOrCreateNetworkProgress();
// 计算当前系数
const { unit, multiplier, contributionPerTree } = this.calculateRate(progress.totalTreeCount);
// 创建当日记录
dailyRate = await this.prisma.dailyContributionRate.create({
data: {
effectiveDate,
startTreeCount: progress.totalTreeCount,
startUnit: unit,
startMultiplier: multiplier.toNumber(),
contributionPerTree: contributionPerTree.toNumber(),
dailyTreeCount: 0,
dailyAdoptionOrders: 0,
dailyAdoptedUsers: 0,
isClosed: false,
},
});
this.logger.log(
`Created daily contribution rate for ${effectiveDate.toISOString().split('T')[0]}: ` +
`unit=${unit}, multiplier=${multiplier.toString()}, contributionPerTree=${contributionPerTree.toString()}`
);
return {
effectiveDate: dailyRate.effectiveDate,
contributionPerTree,
multiplier,
unit,
startTreeCount: progress.totalTreeCount,
};
}
/**
*
*
* @param adoptionDate
* @returns
*/
async getContributionPerTree(adoptionDate: Date): Promise<Decimal> {
const rateInfo = await this.getOrCreateDailyRate(adoptionDate);
return rateInfo.contributionPerTree;
}
/**
*
*
* @param treeCount
* @param adoptionDate
* @param adoptionId ID
* @param isNewUser
*/
async updateNetworkProgress(
treeCount: number,
adoptionDate: Date,
adoptionId: bigint,
isNewUser: boolean,
): Promise<void> {
const effectiveDate = this.normalizeDate(adoptionDate);
await this.prisma.$transaction(async (tx) => {
// 更新全网进度
const progress = await tx.networkAdoptionProgress.findFirst();
if (!progress) {
// 首次创建
const newTotalTreeCount = treeCount;
const { unit, multiplier, contributionPerTree, nextUnitTreeCount } =
this.calculateRate(newTotalTreeCount);
await tx.networkAdoptionProgress.create({
data: {
totalTreeCount: newTotalTreeCount,
totalAdoptionOrders: 1,
totalAdoptedUsers: isNewUser ? 1 : 0,
currentUnit: unit,
currentMultiplier: multiplier.toNumber(),
currentContributionPerTree: contributionPerTree.toNumber(),
nextUnitTreeCount,
lastAdoptionId: adoptionId,
lastAdoptionDate: effectiveDate,
},
});
} else {
// 更新现有记录
const newTotalTreeCount = progress.totalTreeCount + treeCount;
const { unit, multiplier, contributionPerTree, nextUnitTreeCount } =
this.calculateRate(newTotalTreeCount);
await tx.networkAdoptionProgress.update({
where: { id: progress.id },
data: {
totalTreeCount: newTotalTreeCount,
totalAdoptionOrders: { increment: 1 },
totalAdoptedUsers: isNewUser ? { increment: 1 } : undefined,
currentUnit: unit,
currentMultiplier: multiplier.toNumber(),
currentContributionPerTree: contributionPerTree.toNumber(),
nextUnitTreeCount,
lastAdoptionId: adoptionId,
lastAdoptionDate: effectiveDate,
},
});
}
// 更新当日统计
await tx.dailyContributionRate.upsert({
where: { effectiveDate },
create: {
effectiveDate,
startTreeCount: 0,
startUnit: 0,
startMultiplier: 1.0,
contributionPerTree: this.config.baseContribution.toNumber(),
dailyTreeCount: treeCount,
dailyAdoptionOrders: 1,
dailyAdoptedUsers: isNewUser ? 1 : 0,
isClosed: false,
},
update: {
dailyTreeCount: { increment: treeCount },
dailyAdoptionOrders: { increment: 1 },
dailyAdoptedUsers: isNewUser ? { increment: 1 } : undefined,
},
});
});
this.logger.debug(
`Updated network progress: +${treeCount} trees, adoptionId=${adoptionId}`
);
}
/**
*
*
*
* @param date
*/
async closeDailyRate(date: Date): Promise<void> {
const effectiveDate = this.normalizeDate(date);
const dailyRate = await this.prisma.dailyContributionRate.findUnique({
where: { effectiveDate },
});
if (!dailyRate) {
this.logger.warn(`No daily rate found for ${effectiveDate.toISOString().split('T')[0]}`);
return;
}
if (dailyRate.isClosed) {
this.logger.warn(`Daily rate for ${effectiveDate.toISOString().split('T')[0]} is already closed`);
return;
}
// 获取当前全网进度
const progress = await this.getOrCreateNetworkProgress();
const { unit, multiplier } = this.calculateRate(progress.totalTreeCount);
await this.prisma.dailyContributionRate.update({
where: { effectiveDate },
data: {
endTreeCount: progress.totalTreeCount,
endUnit: unit,
endMultiplier: multiplier.toNumber(),
isClosed: true,
},
});
this.logger.log(
`Closed daily rate for ${effectiveDate.toISOString().split('T')[0]}: ` +
`endTreeCount=${progress.totalTreeCount}, endUnit=${unit}, endMultiplier=${multiplier.toString()}`
);
}
/**
*
*/
async getNetworkProgress(): Promise<{
totalTreeCount: number;
totalAdoptionOrders: number;
totalAdoptedUsers: number;
currentUnit: number;
currentMultiplier: Decimal;
currentContributionPerTree: Decimal;
nextUnitTreeCount: number;
}> {
const progress = await this.getOrCreateNetworkProgress();
return {
totalTreeCount: progress.totalTreeCount,
totalAdoptionOrders: progress.totalAdoptionOrders,
totalAdoptedUsers: progress.totalAdoptedUsers,
currentUnit: progress.currentUnit,
currentMultiplier: new Decimal(progress.currentMultiplier.toString()),
currentContributionPerTree: new Decimal(progress.currentContributionPerTree.toString()),
nextUnitTreeCount: progress.nextUnitTreeCount,
};
}
/**
*
*/
private async getOrCreateNetworkProgress() {
let progress = await this.prisma.networkAdoptionProgress.findFirst();
if (!progress) {
progress = await this.prisma.networkAdoptionProgress.create({
data: {
totalTreeCount: 0,
totalAdoptionOrders: 0,
totalAdoptedUsers: 0,
currentUnit: 0,
currentMultiplier: 1.0,
currentContributionPerTree: this.config.baseContribution.toNumber(),
nextUnitTreeCount: this.config.startTreeNumber,
},
});
}
return progress;
}
/**
*
*
* @param totalTreeCount
* @returns
*/
private calculateRate(totalTreeCount: number): {
unit: number;
multiplier: Decimal;
contributionPerTree: Decimal;
nextUnitTreeCount: number;
} {
let unit = 0;
let nextUnitTreeCount = this.config.startTreeNumber;
// 如果总棵数已经超过起始阈值
if (totalTreeCount >= this.config.startTreeNumber) {
// 计算当前单位数
unit = Math.floor((totalTreeCount - this.config.startTreeNumber) / this.config.unitSize);
// 计算下一个单位的触发点
nextUnitTreeCount = this.config.startTreeNumber + ((unit + 1) * this.config.unitSize);
}
// 计算系数: 1 + (unit * 0.003)
const multiplier = new Decimal(1).plus(
new Decimal(unit).times(this.config.incrementPercentage)
);
// 计算每棵树贡献值
const contributionPerTree = this.config.baseContribution.times(multiplier);
return {
unit,
multiplier,
contributionPerTree,
nextUnitTreeCount,
};
}
/**
*
*/
private normalizeDate(date: Date): Date {
const normalized = new Date(date);
normalized.setHours(0, 0, 0, 0);
return normalized;
}
/**
*
*/
async loadConfig(): Promise<void> {
const dbConfig = await this.prisma.contributionConfig.findFirst({
where: { isActive: true },
});
if (dbConfig) {
this.config = {
baseContribution: new Decimal(dbConfig.baseContribution.toString()),
incrementPercentage: new Decimal(dbConfig.incrementPercentage.toString()),
unitSize: dbConfig.unitSize,
startTreeNumber: dbConfig.startTreeNumber,
};
this.logger.log(
`Loaded contribution rate config: base=${this.config.baseContribution.toString()}, ` +
`increment=${this.config.incrementPercentage.toString()}, ` +
`unitSize=${this.config.unitSize}, startTree=${this.config.startTreeNumber}`
);
}
}
}

View File

@ -0,0 +1,47 @@
/**
*
* mining-admin-service
*/
export class ContributionRecordSyncedEvent {
static readonly EVENT_TYPE = 'ContributionRecordSynced';
static readonly AGGREGATE_TYPE = 'ContributionRecord';
constructor(
public readonly originalRecordId: bigint,
public readonly accountSequence: string,
public readonly sourceType: string,
public readonly sourceAdoptionId: bigint,
public readonly sourceAccountSequence: string,
public readonly treeCount: number,
public readonly baseContribution: string,
public readonly distributionRate: string,
public readonly levelDepth: number | null,
public readonly bonusTier: number | null,
public readonly amount: string,
public readonly effectiveDate: Date,
public readonly expireDate: Date,
public readonly isExpired: boolean,
public readonly createdAt: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: ContributionRecordSyncedEvent.EVENT_TYPE,
originalRecordId: this.originalRecordId.toString(),
accountSequence: this.accountSequence,
sourceType: this.sourceType,
sourceAdoptionId: this.sourceAdoptionId.toString(),
sourceAccountSequence: this.sourceAccountSequence,
treeCount: this.treeCount,
baseContribution: this.baseContribution,
distributionRate: this.distributionRate,
levelDepth: this.levelDepth,
bonusTier: this.bonusTier,
amount: this.amount,
effectiveDate: this.effectiveDate.toISOString(),
expireDate: this.expireDate.toISOString(),
isExpired: this.isExpired,
createdAt: this.createdAt.toISOString(),
};
}
}

View File

@ -3,3 +3,5 @@ export * from './daily-snapshot-created.event';
export * from './contribution-account-synced.event';
export * from './referral-synced.event';
export * from './adoption-synced.event';
export * from './contribution-record-synced.event';
export * from './network-progress-updated.event';

View File

@ -0,0 +1,31 @@
/**
*
* mining-admin-service
*/
export class NetworkProgressUpdatedEvent {
static readonly EVENT_TYPE = 'NetworkProgressUpdated';
static readonly AGGREGATE_TYPE = 'NetworkProgress';
constructor(
public readonly totalTreeCount: number,
public readonly totalAdoptionOrders: number,
public readonly totalAdoptedUsers: number,
public readonly currentUnit: number,
public readonly currentMultiplier: string,
public readonly currentContributionPerTree: string,
public readonly nextUnitTreeCount: number,
) {}
toPayload(): Record<string, any> {
return {
eventType: NetworkProgressUpdatedEvent.EVENT_TYPE,
totalTreeCount: this.totalTreeCount,
totalAdoptionOrders: this.totalAdoptionOrders,
totalAdoptedUsers: this.totalAdoptedUsers,
currentUnit: this.currentUnit,
currentMultiplier: this.currentMultiplier,
currentContributionPerTree: this.currentContributionPerTree,
nextUnitTreeCount: this.nextUnitTreeCount,
};
}
}

View File

@ -13,7 +13,7 @@ export interface IContributionRecordRepository {
/**
*
*/
saveMany(records: ContributionRecordAggregate[], tx?: any): Promise<void>;
saveMany(records: ContributionRecordAggregate[], tx?: any): Promise<ContributionRecordAggregate[]>;
/**
*

View File

@ -79,9 +79,10 @@ export class ContributionAccountRepository implements IContributionAccountReposi
accountSequence: string,
sourceType: ContributionSourceType,
amount: ContributionAmount,
levelDepth?: number | null,
bonusTier?: number | null,
): Promise<void> {
// 个人算力直接增加到 personalContribution 和 effectiveContribution
// 层级/加成算力需要根据解锁状态分配到对应的 pending 字段
if (sourceType === ContributionSourceType.PERSONAL) {
await this.client.contributionAccount.update({
where: { accountSequence },
@ -91,8 +92,34 @@ export class ContributionAccountRepository implements IContributionAccountReposi
updatedAt: new Date(),
},
});
} else if (sourceType === ContributionSourceType.TEAM_LEVEL && levelDepth) {
// 层级算力:更新对应层级的 pending 字段和汇总字段
const levelPendingField = `level${levelDepth}Pending` as const;
await this.client.contributionAccount.update({
where: { accountSequence },
data: {
[levelPendingField]: { increment: amount.value },
totalLevelPending: { increment: amount.value },
totalPending: { increment: amount.value },
effectiveContribution: { increment: amount.value },
updatedAt: new Date(),
},
});
} else if (sourceType === ContributionSourceType.TEAM_BONUS && bonusTier) {
// 加成算力:更新对应档位的 pending 字段和汇总字段
const bonusTierField = `bonusTier${bonusTier}Pending` as const;
await this.client.contributionAccount.update({
where: { accountSequence },
data: {
[bonusTierField]: { increment: amount.value },
totalBonusPending: { increment: amount.value },
totalPending: { increment: amount.value },
effectiveContribution: { increment: amount.value },
updatedAt: new Date(),
},
});
} else {
// 层级和加成算力暂时累加到 effectiveContribution待后续细化分配逻辑
// 兜底:只更新 effectiveContribution
await this.client.contributionAccount.update({
where: { accountSequence },
data: {

View File

@ -99,17 +99,22 @@ export class ContributionRecordRepository implements IContributionRecordReposito
return this.toDomain(result);
}
async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise<void> {
if (aggregates.length === 0) return;
async saveMany(aggregates: ContributionRecordAggregate[], tx?: any): Promise<ContributionRecordAggregate[]> {
if (aggregates.length === 0) return [];
const client = tx ?? this.client;
// 使用事务批量插入
const createData = aggregates.map((a) => a.toPersistence());
await client.contributionRecord.createMany({
data: createData,
skipDuplicates: true,
});
// 逐个创建以获取返回的 ID
const results: ContributionRecordAggregate[] = [];
for (const aggregate of aggregates) {
const data = aggregate.toPersistence();
const result = await client.contributionRecord.create({
data,
});
results.push(this.toDomain(result));
}
return results;
}
async findExpiring(beforeDate: Date, limit?: number): Promise<ContributionRecordAggregate[]> {

View File

@ -242,6 +242,66 @@ model SyncedAdoption {
@@map("synced_adoptions")
}
// =============================================================================
// CDC 同步表 - 算力记录明细 (from contribution-service)
// =============================================================================
model SyncedContributionRecord {
id String @id @default(uuid())
originalRecordId BigInt @unique // contribution-service 中的原始 ID
accountSequence String // 获得算力的账户
// 来源信息
sourceType String // PERSONAL / TEAM_LEVEL / TEAM_BONUS
sourceAdoptionId BigInt // 来源认种ID
sourceAccountSequence String // 认种人账号
// 计算参数
treeCount Int // 认种棵数
baseContribution Decimal @db.Decimal(20, 10) // 基础贡献值/棵
distributionRate Decimal @db.Decimal(10, 6) // 分配比例
levelDepth Int? // 层级深度 (1-15)
bonusTier Int? // 加成档位 (1-3)
// 金额
amount Decimal @db.Decimal(30, 10) // 算力金额
// 有效期
effectiveDate DateTime @db.Date // 生效日期
expireDate DateTime @db.Date // 过期日期
isExpired Boolean @default(false)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
createdAt DateTime // 原始记录创建时间
@@index([accountSequence])
@@index([sourceAccountSequence])
@@index([sourceAdoptionId])
@@index([sourceType])
@@index([createdAt(sort: Desc)])
@@map("synced_contribution_records")
}
// =============================================================================
// CDC 同步表 - 全网算力进度 (from contribution-service)
// =============================================================================
model SyncedNetworkProgress {
id String @id @default(uuid())
totalTreeCount Int @default(0) // 全网累计认种棵数
totalAdoptionOrders Int @default(0) // 全网累计认种订单数
totalAdoptedUsers Int @default(0) // 全网累计认种用户数
currentUnit Int @default(0) // 当前单位数
currentMultiplier Decimal @db.Decimal(10, 6) @default(1.0) // 当前系数
currentContributionPerTree Decimal @db.Decimal(20, 10) @default(22617) // 当前每棵树贡献值
nextUnitTreeCount Int @default(1000) // 下一个单位触发的棵数
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_network_progress")
}
// =============================================================================
// CDC 同步表 - 挖矿账户 (from mining-service)
// =============================================================================

View File

@ -296,8 +296,7 @@ export class UsersService {
}
/**
*
* contribution-service
*
*/
async getUserContributions(
accountSequence: string,
@ -315,22 +314,77 @@ export class UsersService {
// 返回算力账户概要
const contribution = user.contributionAccount;
const emptySummary = {
accountSequence,
personalContribution: '0',
teamLevelContribution: '0',
teamBonusContribution: '0',
totalContribution: '0',
effectiveContribution: '0',
hasAdopted: false,
directReferralCount: 0,
unlockedLevelDepth: 0,
unlockedBonusTiers: 0,
};
// 获取算力明细记录
const [records, total] = await Promise.all([
this.prisma.syncedContributionRecord.findMany({
where: { accountSequence },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * pageSize,
take: pageSize,
}),
this.prisma.syncedContributionRecord.count({
where: { accountSequence },
}),
]);
// 格式化记录
const formattedRecords = await Promise.all(
records.map(async (record) => {
// 获取来源用户信息
let sourceUserInfo = null;
if (record.sourceAccountSequence !== accountSequence) {
const sourceUser = await this.prisma.syncedUser.findUnique({
where: { accountSequence: record.sourceAccountSequence },
select: { nickname: true, phone: true },
});
sourceUserInfo = sourceUser ? {
nickname: sourceUser.nickname,
phone: this.maskPhone(sourceUser.phone),
} : null;
}
return {
id: record.originalRecordId.toString(),
sourceType: record.sourceType,
sourceAccountSequence: record.sourceAccountSequence,
sourceUserInfo,
treeCount: record.treeCount,
baseContribution: record.baseContribution.toString(),
distributionRate: record.distributionRate.toString(),
levelDepth: record.levelDepth,
bonusTier: record.bonusTier,
amount: record.amount.toString(),
effectiveDate: record.effectiveDate,
expireDate: record.expireDate,
isExpired: record.isExpired,
createdAt: record.createdAt,
};
})
);
if (!contribution) {
return {
summary: {
accountSequence,
personalContribution: '0',
teamLevelContribution: '0',
teamBonusContribution: '0',
totalContribution: '0',
effectiveContribution: '0',
hasAdopted: false,
directReferralCount: 0,
unlockedLevelDepth: 0,
unlockedBonusTiers: 0,
summary: emptySummary,
records: formattedRecords,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
};
}
@ -347,10 +401,13 @@ export class UsersService {
unlockedLevelDepth: contribution.unlockedLevelDepth,
unlockedBonusTiers: contribution.unlockedBonusTiers,
},
// 详细流水需要从 contribution-service 获取
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
note: '详细算力流水请查看 contribution-service',
records: formattedRecords,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}

View File

@ -101,6 +101,16 @@ export class CdcSyncService implements OnModuleInit {
'AdoptionSynced',
this.handleAdoptionSynced.bind(this),
);
// ContributionRecordSynced 事件 - 同步算力明细记录
this.cdcConsumer.registerServiceHandler(
'ContributionRecordSynced',
this.handleContributionRecordSynced.bind(this),
);
// NetworkProgressUpdated 事件 - 同步全网算力进度
this.cdcConsumer.registerServiceHandler(
'NetworkProgressUpdated',
this.handleNetworkProgressUpdated.bind(this),
);
// ===========================================================================
// 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表)
@ -620,6 +630,102 @@ export class CdcSyncService implements OnModuleInit {
}
}
/**
* ContributionRecordSynced -
*/
private async handleContributionRecordSynced(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedContributionRecord.upsert({
where: { originalRecordId: BigInt(payload.originalRecordId) },
create: {
originalRecordId: BigInt(payload.originalRecordId),
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
createdAt: new Date(payload.createdAt),
},
update: {
accountSequence: payload.accountSequence,
sourceType: payload.sourceType,
sourceAdoptionId: BigInt(payload.sourceAdoptionId),
sourceAccountSequence: payload.sourceAccountSequence,
treeCount: payload.treeCount,
baseContribution: payload.baseContribution,
distributionRate: payload.distributionRate,
levelDepth: payload.levelDepth,
bonusTier: payload.bonusTier,
amount: payload.amount,
effectiveDate: new Date(payload.effectiveDate),
expireDate: new Date(payload.expireDate),
isExpired: payload.isExpired || false,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced contribution record: ${payload.originalRecordId}`);
} catch (error) {
this.logger.error(`Failed to sync contribution record: ${payload.originalRecordId}`, error);
}
}
/**
* NetworkProgressUpdated -
*/
private async handleNetworkProgressUpdated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
// 全网进度只保留一条记录
const existing = await this.prisma.syncedNetworkProgress.findFirst();
if (existing) {
await this.prisma.syncedNetworkProgress.update({
where: { id: existing.id },
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
} else {
await this.prisma.syncedNetworkProgress.create({
data: {
totalTreeCount: payload.totalTreeCount,
totalAdoptionOrders: payload.totalAdoptionOrders,
totalAdoptedUsers: payload.totalAdoptedUsers,
currentUnit: payload.currentUnit,
currentMultiplier: payload.currentMultiplier,
currentContributionPerTree: payload.currentContributionPerTree,
nextUnitTreeCount: payload.nextUnitTreeCount,
},
});
}
await this.recordProcessedEvent(event);
this.logger.debug(
`Synced network progress: trees=${payload.totalTreeCount}, unit=${payload.currentUnit}, multiplier=${payload.currentMultiplier}`
);
} catch (error) {
this.logger.error('Failed to sync network progress', error);
}
}
// ===========================================================================
// 挖矿账户事件处理
// ===========================================================================

View File

@ -3,22 +3,26 @@
import { useState } from 'react';
import { useContributionRecords } from '../hooks/use-users';
import { formatDecimal, formatPercent } from '@/lib/utils/format';
import { formatDateTime } from '@/lib/utils/date';
import { formatDateTime, formatDate } from '@/lib/utils/date';
import { Card, CardContent } from '@/components/ui/card';
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '@/components/ui/table';
import { Button } from '@/components/ui/button';
import { Skeleton } from '@/components/ui/skeleton';
import { Badge } from '@/components/ui/badge';
import { ChevronLeft, ChevronRight } from 'lucide-react';
const contributionTypeLabels: Record<string, string> = {
PERSONAL: '个人',
SYSTEM_OPERATION: '系统运营',
SYSTEM_PROVINCE: '系统省级',
SYSTEM_CITY: '系统市级',
const sourceTypeLabels: Record<string, string> = {
PERSONAL: '个人认种',
TEAM_LEVEL: '团队层级',
TEAM_BONUS: '团队奖励',
};
const sourceTypeBadgeVariant: Record<string, 'default' | 'secondary' | 'outline'> = {
PERSONAL: 'default',
TEAM_LEVEL: 'secondary',
TEAM_BONUS: 'outline',
};
interface ContributionRecordsListProps {
accountSequence: string;
}
@ -35,20 +39,22 @@ export function ContributionRecordsList({ accountSequence }: ContributionRecords
<Table>
<TableHeader>
<TableRow>
<TableHead></TableHead>
<TableHead></TableHead>
<TableHead></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead></TableHead>
<TableHead></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead className="text-right"></TableHead>
<TableHead>/</TableHead>
<TableHead></TableHead>
<TableHead></TableHead>
<TableHead></TableHead>
</TableRow>
</TableHeader>
<TableBody>
{isLoading ? (
[...Array(5)].map((_, i) => (
<TableRow key={i}>
{[...Array(7)].map((_, j) => (
{[...Array(9)].map((_, j) => (
<TableCell key={j}>
<Skeleton className="h-4 w-full" />
</TableCell>
@ -57,18 +63,34 @@ export function ContributionRecordsList({ accountSequence }: ContributionRecords
))
) : data?.items.length === 0 ? (
<TableRow>
<TableCell colSpan={7} className="text-center py-8 text-muted-foreground">
<TableCell colSpan={9} className="text-center py-8 text-muted-foreground">
</TableCell>
</TableRow>
) : (
data?.items.map((record) => (
<TableRow key={record.id}>
<TableCell className="font-mono">{record.sourceAccountSequence}</TableCell>
<TableCell>{contributionTypeLabels[record.contributionType] || record.contributionType}</TableCell>
<TableCell>{record.distributionType === 'UPSTREAM' ? '上游' : '下游'}</TableCell>
<TableCell className="text-right">{formatPercent(record.rate)}</TableCell>
<TableCell className="text-right font-mono">{formatDecimal(record.amount, 4)}</TableCell>
<TableRow key={record.id} className={record.isExpired ? 'opacity-50' : ''}>
<TableCell>
<Badge variant={sourceTypeBadgeVariant[record.sourceType] || 'default'}>
{sourceTypeLabels[record.sourceType] || record.sourceType}
</Badge>
</TableCell>
<TableCell>
{record.sourceType === 'PERSONAL' ? (
<span className="text-muted-foreground"></span>
) : record.sourceUserInfo ? (
<div className="flex flex-col">
<span className="font-medium">{record.sourceUserInfo.nickname || record.sourceUserInfo.phone}</span>
<span className="text-xs text-muted-foreground font-mono">{record.sourceAccountSequence}</span>
</div>
) : (
<span className="font-mono text-sm">{record.sourceAccountSequence}</span>
)}
</TableCell>
<TableCell className="text-right">{record.treeCount}</TableCell>
<TableCell className="text-right font-mono">{formatDecimal(record.baseContribution, 2)}</TableCell>
<TableCell className="text-right">{formatPercent(record.distributionRate)}</TableCell>
<TableCell className="text-right font-mono font-medium">{formatDecimal(record.amount, 4)}</TableCell>
<TableCell>
{record.levelDepth !== null
? `L${record.levelDepth}`
@ -76,7 +98,14 @@ export function ContributionRecordsList({ accountSequence }: ContributionRecords
? `T${record.bonusTier}`
: '-'}
</TableCell>
<TableCell className="text-sm">{formatDateTime(record.createdAt)}</TableCell>
<TableCell className="text-sm">{formatDate(record.effectiveDate)}</TableCell>
<TableCell>
{record.isExpired ? (
<Badge variant="destructive"></Badge>
) : (
<Badge variant="secondary"></Badge>
)}
</TableCell>
</TableRow>
))
)}

View File

@ -65,15 +65,21 @@ export interface ContributionBreakdown {
export interface ContributionRecord {
id: string;
accountSequence: number;
sourceAccountSequence: number;
adoptionId: string;
contributionType: string;
distributionType: string;
amount: string;
rate: string;
sourceType: string; // PERSONAL, TEAM_LEVEL, TEAM_BONUS
sourceAccountSequence: string;
sourceUserInfo?: {
nickname: string | null;
phone: string;
} | null;
treeCount: number;
baseContribution: string;
distributionRate: string;
levelDepth: number | null;
bonusTier: number | null;
amount: string;
effectiveDate: string;
expireDate: string;
isExpired: boolean;
createdAt: string;
}