feat(mining): 实现系统账户和待解锁算力参与挖矿

重大变更:
- 挖矿分母从用户有效算力改为全网理论算力(networkTotalContribution)
- 系统账户(运营12%/省1%/市2%)参与挖矿,有独立的挖矿记录
- 待解锁算力参与挖矿,收益归总部账户,记录包含完整来源信息

新增功能:
- mining-service: 系统挖矿账户表、待解锁算力表及相关挖矿记录表
- mining-service: NetworkSyncService 同步全网数据
- mining-service: /admin/sync-network 和 /admin/system-accounts 端点
- contribution-service: /admin/system-accounts 和发布系统账户事件端点
- mining-admin-service: 状态检查返回全网理论算力信息

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-16 03:39:56 -08:00
parent b5fca7bb04
commit de5416aee6
14 changed files with 1379 additions and 26 deletions

View File

@ -10,6 +10,7 @@ import {
AdoptionSyncedEvent,
ContributionRecordSyncedEvent,
NetworkProgressUpdatedEvent,
SystemAccountSyncedEvent,
} from '../../domain/events';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ -420,4 +421,70 @@ export class AdminController {
};
}
}
@Post('system-accounts/publish-all')
@Public()
@ApiOperation({ summary: '发布所有系统账户算力事件到 outbox用于同步到 mining-service' })
async publishAllSystemAccounts(): Promise<{
success: boolean;
publishedCount: number;
message: string;
}> {
try {
const systemAccounts = await this.prisma.systemAccount.findMany();
await this.unitOfWork.executeInTransaction(async () => {
const events = systemAccounts.map((account) => {
const event = new SystemAccountSyncedEvent(
account.accountType,
account.name,
account.contributionBalance.toString(),
account.createdAt,
);
return {
aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE,
aggregateId: account.accountType,
eventType: SystemAccountSyncedEvent.EVENT_TYPE,
payload: event.toPayload(),
};
});
await this.outboxRepository.saveMany(events);
});
this.logger.log(`Published ${systemAccounts.length} system account events`);
return {
success: true,
publishedCount: systemAccounts.length,
message: `Published ${systemAccounts.length} system account events`,
};
} catch (error) {
this.logger.error('Failed to publish system accounts', error);
return {
success: false,
publishedCount: 0,
message: `Failed: ${error.message}`,
};
}
}
@Get('system-accounts')
@Public()
@ApiOperation({ summary: '获取所有系统账户算力' })
async getSystemAccounts() {
const systemAccounts = await this.prisma.systemAccount.findMany();
return {
accounts: systemAccounts.map((a) => ({
accountType: a.accountType,
name: a.name,
contributionBalance: a.contributionBalance.toString(),
createdAt: a.createdAt,
updatedAt: a.updatedAt,
})),
total: systemAccounts.length,
};
}
}

View File

@ -6,3 +6,4 @@ export * from './referral-synced.event';
export * from './adoption-synced.event';
export * from './contribution-record-synced.event';
export * from './network-progress-updated.event';
export * from './system-account-synced.event';

View File

@ -0,0 +1,25 @@
/**
*
* mining-service
*/
export class SystemAccountSyncedEvent {
static readonly EVENT_TYPE = 'SystemAccountSynced';
static readonly AGGREGATE_TYPE = 'SystemAccount';
constructor(
public readonly accountType: string, // OPERATION / PROVINCE / CITY / HEADQUARTERS
public readonly name: string,
public readonly contributionBalance: string,
public readonly createdAt: Date,
) {}
toPayload(): Record<string, any> {
return {
eventType: SystemAccountSyncedEvent.EVENT_TYPE,
accountType: this.accountType,
name: this.name,
contributionBalance: this.contributionBalance,
createdAt: this.createdAt.toISOString(),
};
}
}

View File

@ -45,7 +45,7 @@ export class ConfigController {
this.logger.log(`Fetching mining status from ${miningServiceUrl}/api/v2/admin/status`);
try {
// 并行获取 mining-service 状态和 contribution-service 总算力
// 并行获取 mining-service 状态和 contribution-service 统计数据
const [miningResponse, contributionResponse] = await Promise.all([
fetch(`${miningServiceUrl}/api/v2/admin/status`),
fetch(`${contributionServiceUrl}/api/v2/contribution/stats`).catch(() => null),
@ -57,28 +57,59 @@ export class ConfigController {
const miningResult = await miningResponse.json();
this.logger.log(`Mining service response: ${JSON.stringify(miningResult)}`);
// 获取 contribution-service 的总有效算力
let contributionTotal: string | null = null;
const miningData = miningResult.data || miningResult;
// 获取 contribution-service 的全网理论算力
let networkTotalContribution: string | null = null;
let userEffectiveContribution: string | null = null;
let systemAccountsContribution: string | null = null;
if (contributionResponse && contributionResponse.ok) {
const contributionResult = await contributionResponse.json();
// contribution-service 返回的是 data.totalContribution
contributionTotal = contributionResult.data?.totalContribution || contributionResult.totalContribution || null;
const data = contributionResult.data || contributionResult;
// 全网理论算力 = 总认种树 × 每棵树算力
networkTotalContribution = data.networkTotalContribution || null;
// 用户有效算力
userEffectiveContribution = data.totalContribution || null;
// 系统账户算力
const systemAccounts = data.systemAccounts || [];
const systemTotal = systemAccounts
.filter((a: any) => a.accountType !== 'HEADQUARTERS')
.reduce((sum: number, a: any) => sum + parseFloat(a.totalContribution || '0'), 0);
systemAccountsContribution = systemTotal.toString();
}
const miningData = miningResult.data || miningResult;
const miningTotal = miningData.totalContribution || '0';
// mining-service 中的全网理论算力
const miningNetworkTotal = miningData.networkTotalContribution || '0';
// mining-service 中的用户有效算力
const miningUserTotal = miningData.totalContribution || '0';
// 判断算力是否同步完成:两边总算力相等
const isSynced = contributionTotal !== null &&
parseFloat(contributionTotal) > 0 &&
Math.abs(parseFloat(miningTotal) - parseFloat(contributionTotal)) < 0.01;
// 判断算力是否同步完成
// 条件1全网理论算力已同步mining-service 中的 networkTotalContribution > 0
// 条件2用户有效算力已同步mining-service 中的 totalContribution 与 contribution-service 相近)
const hasNetworkContribution = parseFloat(miningNetworkTotal) > 0;
const userSynced = userEffectiveContribution !== null &&
parseFloat(userEffectiveContribution) > 0 &&
Math.abs(parseFloat(miningUserTotal) - parseFloat(userEffectiveContribution)) < 0.01;
const isSynced = hasNetworkContribution && userSynced;
return {
...miningData,
contributionSyncStatus: {
isSynced,
miningTotal,
contributionTotal: contributionTotal || '0',
// 全网理论算力(应作为挖矿分母)
networkTotalContribution: networkTotalContribution || '0',
miningNetworkTotal,
// 用户有效算力
userEffectiveContribution: userEffectiveContribution || '0',
miningUserTotal,
// 系统账户算力
systemAccountsContribution: systemAccountsContribution || '0',
// 兼容旧字段
miningTotal: miningUserTotal,
contributionTotal: userEffectiveContribution || '0',
},
};
} catch (error) {
@ -89,6 +120,11 @@ export class ConfigController {
error: `Unable to connect to mining service: ${error.message}`,
contributionSyncStatus: {
isSynced: false,
networkTotalContribution: '0',
miningNetworkTotal: '0',
userEffectiveContribution: '0',
miningUserTotal: '0',
systemAccountsContribution: '0',
miningTotal: '0',
contributionTotal: '0',
},

View File

@ -0,0 +1,238 @@
-- ============================================================================
-- 添加系统账户和待解锁算力挖矿功能
-- 1. MiningConfig 添加全网理论算力字段
-- 2. 系统账户(运营/省/市/总部)挖矿
-- 3. 待解锁算力挖矿记录
-- 4. 挖矿收益分配明细
-- ============================================================================
-- ==================== MiningConfig 添加全网理论算力字段 ====================
ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "network_total_contribution" DECIMAL(30, 8) NOT NULL DEFAULT 0;
ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "total_tree_count" INTEGER NOT NULL DEFAULT 0;
ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "contribution_per_tree" DECIMAL(20, 10) NOT NULL DEFAULT 22617;
ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "network_last_synced_at" TIMESTAMP(3);
-- ==================== 系统账户类型枚举 ====================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'SystemAccountType') THEN
CREATE TYPE "SystemAccountType" AS ENUM ('OPERATION', 'PROVINCE', 'CITY', 'HEADQUARTERS');
END IF;
END$$;
-- ==================== 系统挖矿账户 ====================
CREATE TABLE IF NOT EXISTS "system_mining_accounts" (
"id" TEXT NOT NULL,
"account_type" "SystemAccountType" NOT NULL,
"name" VARCHAR(100) NOT NULL,
"totalMined" DECIMAL(30, 8) NOT NULL DEFAULT 0,
"availableBalance" DECIMAL(30, 8) NOT NULL DEFAULT 0,
"totalContribution" DECIMAL(30, 8) NOT NULL DEFAULT 0,
"last_synced_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "system_mining_accounts_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "system_mining_accounts_account_type_key" ON "system_mining_accounts"("account_type");
CREATE INDEX IF NOT EXISTS "system_mining_accounts_totalContribution_idx" ON "system_mining_accounts"("totalContribution" DESC);
-- ==================== 系统账户挖矿记录 ====================
CREATE TABLE IF NOT EXISTS "system_mining_records" (
"id" TEXT NOT NULL,
"account_type" "SystemAccountType" NOT NULL,
"mining_minute" TIMESTAMP(3) NOT NULL,
"contribution_ratio" DECIMAL(30, 18) NOT NULL,
"total_contribution" DECIMAL(30, 8) NOT NULL,
"second_distribution" DECIMAL(30, 18) NOT NULL,
"mined_amount" DECIMAL(30, 18) NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "system_mining_records_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "system_mining_records_account_type_mining_minute_key" ON "system_mining_records"("account_type", "mining_minute");
CREATE INDEX IF NOT EXISTS "system_mining_records_mining_minute_idx" ON "system_mining_records"("mining_minute");
ALTER TABLE "system_mining_records" DROP CONSTRAINT IF EXISTS "system_mining_records_account_type_fkey";
ALTER TABLE "system_mining_records" ADD CONSTRAINT "system_mining_records_account_type_fkey"
FOREIGN KEY ("account_type") REFERENCES "system_mining_accounts"("account_type") ON DELETE RESTRICT ON UPDATE CASCADE;
-- ==================== 系统账户交易流水 ====================
CREATE TABLE IF NOT EXISTS "system_mining_transactions" (
"id" TEXT NOT NULL,
"account_type" "SystemAccountType" NOT NULL,
"type" TEXT NOT NULL,
"amount" DECIMAL(30, 8) NOT NULL,
"balance_before" DECIMAL(30, 8) NOT NULL,
"balance_after" DECIMAL(30, 8) NOT NULL,
"reference_id" TEXT,
"reference_type" TEXT,
"memo" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "system_mining_transactions_pkey" PRIMARY KEY ("id")
);
CREATE INDEX IF NOT EXISTS "system_mining_transactions_account_type_created_at_idx" ON "system_mining_transactions"("account_type", "created_at" DESC);
ALTER TABLE "system_mining_transactions" DROP CONSTRAINT IF EXISTS "system_mining_transactions_account_type_fkey";
ALTER TABLE "system_mining_transactions" ADD CONSTRAINT "system_mining_transactions_account_type_fkey"
FOREIGN KEY ("account_type") REFERENCES "system_mining_accounts"("account_type") ON DELETE RESTRICT ON UPDATE CASCADE;
-- ==================== 待解锁算力挖矿 ====================
CREATE TABLE IF NOT EXISTS "pending_contribution_mining" (
"id" BIGSERIAL NOT NULL,
"source_adoption_id" BIGINT NOT NULL,
"source_account_sequence" VARCHAR(20) NOT NULL,
"would_be_account_sequence" VARCHAR(20),
"contribution_type" VARCHAR(30) NOT NULL,
"amount" DECIMAL(30, 10) NOT NULL,
"reason" VARCHAR(200),
"effective_date" DATE NOT NULL,
"expire_date" DATE NOT NULL,
"is_expired" BOOLEAN NOT NULL DEFAULT false,
"last_synced_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pending_contribution_mining_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "pending_contribution_mining_source_adoption_id_would_be_acco_key"
ON "pending_contribution_mining"("source_adoption_id", "would_be_account_sequence", "contribution_type");
CREATE INDEX IF NOT EXISTS "pending_contribution_mining_would_be_account_sequence_idx" ON "pending_contribution_mining"("would_be_account_sequence");
CREATE INDEX IF NOT EXISTS "pending_contribution_mining_contribution_type_idx" ON "pending_contribution_mining"("contribution_type");
CREATE INDEX IF NOT EXISTS "pending_contribution_mining_is_expired_idx" ON "pending_contribution_mining"("is_expired");
-- ==================== 待解锁算力挖矿记录 ====================
CREATE TABLE IF NOT EXISTS "pending_mining_records" (
"id" BIGSERIAL NOT NULL,
"pending_contribution_id" BIGINT NOT NULL,
"mining_minute" TIMESTAMP(3) NOT NULL,
"source_adoption_id" BIGINT NOT NULL,
"source_account_sequence" VARCHAR(20) NOT NULL,
"would_be_account_sequence" VARCHAR(20),
"contribution_type" VARCHAR(30) NOT NULL,
"contribution_amount" DECIMAL(30, 10) NOT NULL,
"network_total_contribution" DECIMAL(30, 10) NOT NULL,
"contribution_ratio" DECIMAL(30, 18) NOT NULL,
"second_distribution" DECIMAL(30, 18) NOT NULL,
"mined_amount" DECIMAL(30, 18) NOT NULL,
"allocated_to" VARCHAR(20) NOT NULL DEFAULT 'HEADQUARTERS',
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pending_mining_records_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "pending_mining_records_pending_contribution_id_mining_minute_key"
ON "pending_mining_records"("pending_contribution_id", "mining_minute");
CREATE INDEX IF NOT EXISTS "pending_mining_records_mining_minute_idx" ON "pending_mining_records"("mining_minute");
CREATE INDEX IF NOT EXISTS "pending_mining_records_source_account_sequence_idx" ON "pending_mining_records"("source_account_sequence");
CREATE INDEX IF NOT EXISTS "pending_mining_records_would_be_account_sequence_idx" ON "pending_mining_records"("would_be_account_sequence");
ALTER TABLE "pending_mining_records" DROP CONSTRAINT IF EXISTS "pending_mining_records_pending_contribution_id_fkey";
ALTER TABLE "pending_mining_records" ADD CONSTRAINT "pending_mining_records_pending_contribution_id_fkey"
FOREIGN KEY ("pending_contribution_id") REFERENCES "pending_contribution_mining"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- ==================== 挖矿收益分配明细 ====================
CREATE TABLE IF NOT EXISTS "mining_reward_allocations" (
"id" BIGSERIAL NOT NULL,
"mining_date" DATE NOT NULL,
"contribution_record_id" BIGINT NOT NULL,
"source_adoption_id" BIGINT NOT NULL,
"source_account_sequence" VARCHAR(20) NOT NULL,
"owner_account_sequence" VARCHAR(20) NOT NULL,
"contribution_type" VARCHAR(30) NOT NULL,
"contribution_amount" DECIMAL(30, 10) NOT NULL,
"network_total_contribution" DECIMAL(30, 10) NOT NULL,
"contribution_ratio" DECIMAL(30, 18) NOT NULL,
"daily_mining_pool" DECIMAL(30, 10) NOT NULL,
"reward_amount" DECIMAL(30, 10) NOT NULL,
"allocation_status" VARCHAR(20) NOT NULL,
"is_unlocked" BOOLEAN NOT NULL,
"allocated_to_account_sequence" VARCHAR(20),
"allocated_to_system_account" VARCHAR(20),
"unlocked_reason" VARCHAR(200),
"owner_has_adopted" BOOLEAN NOT NULL,
"owner_direct_referral_count" INTEGER NOT NULL,
"owner_unlocked_level_depth" INTEGER NOT NULL,
"owner_unlocked_bonus_tiers" INTEGER NOT NULL,
"remark" VARCHAR(500),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "mining_reward_allocations_pkey" PRIMARY KEY ("id")
);
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_mining_date_idx" ON "mining_reward_allocations"("mining_date");
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_owner_account_sequence_mining_date_idx" ON "mining_reward_allocations"("owner_account_sequence", "mining_date");
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_source_account_sequence_idx" ON "mining_reward_allocations"("source_account_sequence");
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_source_adoption_id_idx" ON "mining_reward_allocations"("source_adoption_id");
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_allocation_status_idx" ON "mining_reward_allocations"("allocation_status");
CREATE INDEX IF NOT EXISTS "mining_reward_allocations_contribution_record_id_idx" ON "mining_reward_allocations"("contribution_record_id");
-- ==================== 每日挖矿收益汇总 ====================
CREATE TABLE IF NOT EXISTS "daily_mining_reward_summaries" (
"id" BIGSERIAL NOT NULL,
"mining_date" DATE NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"unlocked_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"pending_reward_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"personal_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"level_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"bonus_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"pending_level_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"pending_bonus_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "daily_mining_reward_summaries_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "daily_mining_reward_summaries_mining_date_account_sequence_key"
ON "daily_mining_reward_summaries"("mining_date", "account_sequence");
CREATE INDEX IF NOT EXISTS "daily_mining_reward_summaries_mining_date_idx" ON "daily_mining_reward_summaries"("mining_date");
CREATE INDEX IF NOT EXISTS "daily_mining_reward_summaries_account_sequence_idx" ON "daily_mining_reward_summaries"("account_sequence");
-- ==================== 总部待解锁收益明细 ====================
CREATE TABLE IF NOT EXISTS "headquarters_pending_rewards" (
"id" BIGSERIAL NOT NULL,
"mining_date" DATE NOT NULL,
"would_be_account_sequence" VARCHAR(20) NOT NULL,
"source_adoption_id" BIGINT NOT NULL,
"source_account_sequence" VARCHAR(20) NOT NULL,
"contribution_record_id" BIGINT NOT NULL,
"contribution_type" VARCHAR(30) NOT NULL,
"contribution_amount" DECIMAL(30, 10) NOT NULL,
"reward_amount" DECIMAL(30, 10) NOT NULL,
"reason" VARCHAR(200) NOT NULL,
"owner_has_adopted" BOOLEAN NOT NULL,
"owner_direct_referral_count" INTEGER NOT NULL,
"required_condition" VARCHAR(100) NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "headquarters_pending_rewards_pkey" PRIMARY KEY ("id")
);
CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_mining_date_idx" ON "headquarters_pending_rewards"("mining_date");
CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_would_be_account_sequence_idx" ON "headquarters_pending_rewards"("would_be_account_sequence");
CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_source_adoption_id_idx" ON "headquarters_pending_rewards"("source_adoption_id");
-- ==================== 初始化系统账户 ====================
INSERT INTO "system_mining_accounts" ("id", "account_type", "name", "totalMined", "availableBalance", "totalContribution", "updated_at")
VALUES
(gen_random_uuid(), 'OPERATION', '运营账户', 0, 0, 0, NOW()),
(gen_random_uuid(), 'PROVINCE', '省公司账户', 0, 0, 0, NOW()),
(gen_random_uuid(), 'CITY', '市公司账户', 0, 0, 0, NOW()),
(gen_random_uuid(), 'HEADQUARTERS', '总部账户', 0, 0, 0, NOW())
ON CONFLICT ("account_type") DO NOTHING;

View File

@ -21,6 +21,14 @@ model MiningConfig {
secondDistribution Decimal @db.Decimal(30, 18) // 每秒分配量
isActive Boolean @default(false) // 是否已激活挖矿
activatedAt DateTime? // 激活时间
// 全网理论算力(从 contribution-service 同步)
// 理论算力 = 总认种树 × 基础算力/棵 × 系数
networkTotalContribution Decimal @default(0) @db.Decimal(30, 8) @map("network_total_contribution")
totalTreeCount Int @default(0) @map("total_tree_count") // 全网总认种树
contributionPerTree Decimal @default(22617) @db.Decimal(20, 10) @map("contribution_per_tree") // 当前每棵树算力
networkLastSyncedAt DateTime? @map("network_last_synced_at") // 最后同步时间
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@ -42,6 +50,134 @@ model MiningEra {
@@map("mining_eras")
}
// ==================== 系统账户(运营/省/市/总部)====================
// 系统账户类型枚举
enum SystemAccountType {
OPERATION // 运营账户 12%
PROVINCE // 省公司账户 1%
CITY // 市公司账户 2%
HEADQUARTERS // 总部账户(收取未解锁的收益)
}
// 系统挖矿账户
model SystemMiningAccount {
id String @id @default(uuid())
accountType SystemAccountType @unique @map("account_type")
name String @db.VarChar(100)
totalMined Decimal @default(0) @db.Decimal(30, 8) // 总挖到的积分股
availableBalance Decimal @default(0) @db.Decimal(30, 8) // 可用余额
totalContribution Decimal @default(0) @db.Decimal(30, 8) // 当前算力(从 contribution-service 同步)
lastSyncedAt DateTime? @map("last_synced_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
records SystemMiningRecord[]
transactions SystemMiningTransaction[]
@@index([totalContribution(sort: Desc)])
@@map("system_mining_accounts")
}
// 系统账户挖矿记录(分钟级别汇总)
model SystemMiningRecord {
id String @id @default(uuid())
accountType SystemAccountType @map("account_type")
miningMinute DateTime @map("mining_minute")
contributionRatio Decimal @db.Decimal(30, 18) @map("contribution_ratio")
totalContribution Decimal @db.Decimal(30, 8) @map("total_contribution")
secondDistribution Decimal @db.Decimal(30, 18) @map("second_distribution")
minedAmount Decimal @db.Decimal(30, 18) @map("mined_amount")
createdAt DateTime @default(now()) @map("created_at")
account SystemMiningAccount @relation(fields: [accountType], references: [accountType])
@@unique([accountType, miningMinute])
@@index([miningMinute])
@@map("system_mining_records")
}
// 系统账户交易流水
model SystemMiningTransaction {
id String @id @default(uuid())
accountType SystemAccountType @map("account_type")
type String // MINE, TRANSFER_OUT, ADJUSTMENT
amount Decimal @db.Decimal(30, 8)
balanceBefore Decimal @db.Decimal(30, 8) @map("balance_before")
balanceAfter Decimal @db.Decimal(30, 8) @map("balance_after")
referenceId String? @map("reference_id")
referenceType String? @map("reference_type")
memo String? @db.Text
createdAt DateTime @default(now()) @map("created_at")
account SystemMiningAccount @relation(fields: [accountType], references: [accountType])
@@index([accountType, createdAt(sort: Desc)])
@@map("system_mining_transactions")
}
// ==================== 未解锁算力挖矿(归总部)====================
// 未解锁算力参与挖矿,收益归总部
// 从 contribution-service 同步 unallocated_contributions 数据
model PendingContributionMining {
id BigInt @id @default(autoincrement())
sourceAdoptionId BigInt @map("source_adoption_id") // 来源认种ID
sourceAccountSequence String @map("source_account_sequence") @db.VarChar(20) // 认种人账号
wouldBeAccountSequence String? @map("would_be_account_sequence") @db.VarChar(20) // 应该归属的账户
contributionType String @map("contribution_type") @db.VarChar(30) // LEVEL_6~15 / BONUS_TIER_2/3
amount Decimal @db.Decimal(30, 10) // 待解锁算力金额
reason String? @db.VarChar(200) // 未解锁原因
effectiveDate DateTime @map("effective_date") @db.Date
expireDate DateTime @map("expire_date") @db.Date
isExpired Boolean @default(false) @map("is_expired")
lastSyncedAt DateTime? @map("last_synced_at")
createdAt DateTime @default(now()) @map("created_at")
records PendingMiningRecord[]
@@unique([sourceAdoptionId, wouldBeAccountSequence, contributionType])
@@index([wouldBeAccountSequence])
@@index([contributionType])
@@index([isExpired])
@@map("pending_contribution_mining")
}
// 未解锁算力挖矿记录(分钟级别汇总)
// 记录每笔未解锁算力每分钟产生的挖矿收益明细
// 收益归总部账户HEADQUARTERS
model PendingMiningRecord {
id BigInt @id @default(autoincrement())
pendingContributionId BigInt @map("pending_contribution_id") // 关联的待解锁算力记录
miningMinute DateTime @map("mining_minute") // 挖矿时间(精确到分钟)
// ========== 算力来源信息(冗余存储便于查询)==========
sourceAdoptionId BigInt @map("source_adoption_id") // 来源认种ID
sourceAccountSequence String @map("source_account_sequence") @db.VarChar(20) // 认种人账号
wouldBeAccountSequence String? @map("would_be_account_sequence") @db.VarChar(20) // 应该归属的账户(如果解锁了)
contributionType String @map("contribution_type") @db.VarChar(30) // LEVEL_6~15 / BONUS_TIER_2/3
// ========== 挖矿计算参数 ==========
contributionAmount Decimal @map("contribution_amount") @db.Decimal(30, 10) // 参与挖矿的算力
networkTotalContribution Decimal @map("network_total_contribution") @db.Decimal(30, 10) // 全网理论算力
contributionRatio Decimal @map("contribution_ratio") @db.Decimal(30, 18) // 算力占比
secondDistribution Decimal @map("second_distribution") @db.Decimal(30, 18) // 每秒分配量
// ========== 挖矿收益 ==========
minedAmount Decimal @map("mined_amount") @db.Decimal(30, 18) // 该分钟挖到的数量
allocatedTo String @default("HEADQUARTERS") @map("allocated_to") @db.VarChar(20) // 收益归属HEADQUARTERS
createdAt DateTime @default(now()) @map("created_at")
pendingContribution PendingContributionMining @relation(fields: [pendingContributionId], references: [id])
@@unique([pendingContributionId, miningMinute])
@@index([miningMinute])
@@index([sourceAccountSequence])
@@index([wouldBeAccountSequence])
@@map("pending_mining_records")
}
// ==================== 用户挖矿账户 ====================
// 用户挖矿账户

View File

@ -1,12 +1,18 @@
import { Controller, Get, Post, HttpException, HttpStatus } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { ConfigService } from '@nestjs/config';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { NetworkSyncService } from '../../application/services/network-sync.service';
import { Public } from '../../shared/guards/jwt-auth.guard';
@ApiTags('Admin')
@Controller('admin')
export class AdminController {
constructor(private readonly prisma: PrismaService) {}
constructor(
private readonly prisma: PrismaService,
private readonly networkSyncService: NetworkSyncService,
private readonly configService: ConfigService,
) {}
@Get('accounts/sync')
@Public()
@ -45,10 +51,23 @@ export class AdminController {
const config = await this.prisma.miningConfig.findFirst();
const blackHole = await this.prisma.blackHole.findFirst();
const accountCount = await this.prisma.miningAccount.count();
const totalContribution = await this.prisma.miningAccount.aggregate({
// 用户有效算力
const userContribution = await this.prisma.miningAccount.aggregate({
_sum: { totalContribution: true },
});
// 系统账户算力
const systemContribution = await this.prisma.systemMiningAccount.aggregate({
_sum: { totalContribution: true },
});
// 待解锁算力
const pendingContribution = await this.prisma.pendingContributionMining.aggregate({
_sum: { amount: true },
where: { isExpired: false },
});
return {
initialized: !!config,
isActive: config?.isActive || false,
@ -64,7 +83,17 @@ export class AdminController {
}
: null,
accountCount,
totalContribution: totalContribution._sum.totalContribution?.toString() || '0',
// 用户有效算力
totalContribution: userContribution._sum.totalContribution?.toString() || '0',
// 全网理论算力(从 contribution-service 同步)
networkTotalContribution: config?.networkTotalContribution?.toString() || '0',
totalTreeCount: config?.totalTreeCount || 0,
contributionPerTree: config?.contributionPerTree?.toString() || '22617',
networkLastSyncedAt: config?.networkLastSyncedAt,
// 系统账户算力
systemContribution: systemContribution._sum.totalContribution?.toString() || '0',
// 待解锁算力
pendingContribution: pendingContribution._sum.amount?.toString() || '0',
};
}
@ -116,4 +145,37 @@ export class AdminController {
return { success: true, message: '挖矿系统已停用' };
}
@Post('sync-network')
@Public()
@ApiOperation({ summary: '从 contribution-service 同步全网数据(系统账户、全网进度)' })
async syncNetwork() {
const contributionServiceUrl = this.configService.get<string>(
'CONTRIBUTION_SERVICE_URL',
'http://localhost:3020',
);
return this.networkSyncService.syncFromContributionService(contributionServiceUrl);
}
@Get('system-accounts')
@Public()
@ApiOperation({ summary: '获取系统账户挖矿状态' })
async getSystemAccounts() {
const accounts = await this.prisma.systemMiningAccount.findMany({
orderBy: { accountType: 'asc' },
});
return {
accounts: accounts.map((acc) => ({
accountType: acc.accountType,
name: acc.name,
totalMined: acc.totalMined.toString(),
availableBalance: acc.availableBalance.toString(),
totalContribution: acc.totalContribution.toString(),
lastSyncedAt: acc.lastSyncedAt,
})),
total: accounts.length,
};
}
}

View File

@ -5,6 +5,7 @@ import { InfrastructureModule } from '../infrastructure/infrastructure.module';
// Services
import { MiningDistributionService } from './services/mining-distribution.service';
import { ContributionSyncService } from './services/contribution-sync.service';
import { NetworkSyncService } from './services/network-sync.service';
// Queries
import { GetMiningAccountQuery } from './queries/get-mining-account.query';
@ -24,6 +25,7 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler';
// Services
MiningDistributionService,
ContributionSyncService,
NetworkSyncService,
// Queries
GetMiningAccountQuery,
@ -40,6 +42,7 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler';
exports: [
MiningDistributionService,
ContributionSyncService,
NetworkSyncService,
GetMiningAccountQuery,
GetMiningStatsQuery,
GetPriceQuery,

View File

@ -1,6 +1,7 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ContributionSyncService } from '../services/contribution-sync.service';
import { NetworkSyncService } from '../services/network-sync.service';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
@Injectable()
@ -10,6 +11,7 @@ export class ContributionEventHandler implements OnModuleInit {
constructor(
private readonly syncService: ContributionSyncService,
private readonly networkSyncService: NetworkSyncService,
private readonly configService: ConfigService,
) {}
@ -72,6 +74,26 @@ export class ContributionEventHandler implements OnModuleInit {
totalContribution: eventPayload.totalContribution,
activeAccounts: eventPayload.activeAccounts,
});
} else if (eventType === 'SystemAccountSynced') {
this.logger.log(`Received SystemAccountSynced for ${eventPayload.accountType}`);
await this.networkSyncService.handleSystemAccountSynced({
accountType: eventPayload.accountType,
name: eventPayload.name,
contributionBalance: eventPayload.contributionBalance,
});
} else if (eventType === 'NetworkProgressUpdated') {
this.logger.log(`Received NetworkProgressUpdated: trees=${eventPayload.totalTreeCount}`);
await this.networkSyncService.handleNetworkProgressUpdated({
totalTreeCount: eventPayload.totalTreeCount,
totalAdoptionOrders: eventPayload.totalAdoptionOrders,
totalAdoptedUsers: eventPayload.totalAdoptedUsers,
currentUnit: eventPayload.currentUnit,
currentMultiplier: eventPayload.currentMultiplier,
currentContributionPerTree: eventPayload.currentContributionPerTree,
nextUnitTreeCount: eventPayload.nextUnitTreeCount,
});
}
} catch (error) {
this.logger.error('Failed to handle contribution event', error);

View File

@ -5,10 +5,12 @@ import { MiningConfigRepository } from '../../infrastructure/persistence/reposit
import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository';
import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { SystemMiningAccountRepository } from '../../infrastructure/persistence/repositories/system-mining-account.repository';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { MiningCalculatorService } from '../../domain/services/mining-calculator.service';
import { ShareAmount } from '../../domain/value-objects/share-amount.vo';
import { SystemAccountType } from '@prisma/client';
import Decimal from 'decimal.js';
/**
@ -32,6 +34,7 @@ export class MiningDistributionService {
private readonly blackHoleRepository: BlackHoleRepository,
private readonly priceSnapshotRepository: PriceSnapshotRepository,
private readonly outboxRepository: OutboxRepository,
private readonly systemMiningAccountRepository: SystemMiningAccountRepository,
private readonly prisma: PrismaService,
private readonly redis: RedisService,
private readonly configService: ConfigService,
@ -41,6 +44,11 @@ export class MiningDistributionService {
*
* -
* - MiningRecord
*
*
* - 使 + +
* - //
* -
*/
async executeSecondDistribution(): Promise<void> {
// 获取分布式锁锁定时间900ms
@ -72,17 +80,29 @@ export class MiningDistributionService {
return;
}
// 获取有算力的账户
const totalContribution = await this.miningAccountRepository.getTotalContribution();
if (totalContribution.isZero()) {
// 使用全网理论算力作为分母
// 理论算力 = 用户有效算力 + 系统账户算力 + 未解锁算力
const networkTotalContribution = config.networkTotalContribution;
if (networkTotalContribution.isZero()) {
// 如果全网理论算力未同步,则回退到用户有效算力
this.logger.warn('Network total contribution is zero, falling back to user contribution');
const userContribution = await this.miningAccountRepository.getTotalContribution();
if (userContribution.isZero()) {
return;
}
// 使用用户有效算力(旧逻辑)
await this.distributeToUsersOnly(secondDistribution, userContribution, currentSecond, currentMinute, isMinuteEnd, config);
return;
}
// 分批处理账户
let totalDistributed = ShareAmount.zero();
let userParticipantCount = 0;
let systemParticipantCount = 0;
let pendingParticipantCount = 0;
// 1. 分配给用户账户
let page = 1;
const pageSize = 1000;
let totalDistributed = ShareAmount.zero();
let participantCount = 0;
while (true) {
const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize);
@ -91,7 +111,7 @@ export class MiningDistributionService {
for (const account of accounts) {
const reward = this.calculator.calculateUserMiningReward(
account.totalContribution,
totalContribution,
networkTotalContribution,
secondDistribution,
);
@ -106,12 +126,12 @@ export class MiningDistributionService {
currentMinute,
reward,
account.totalContribution,
totalContribution,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
participantCount++;
userParticipantCount++;
}
}
@ -119,9 +139,61 @@ export class MiningDistributionService {
page++;
}
// 2. 分配给系统账户(运营/省/市)
const systemAccounts = await this.systemMiningAccountRepository.findAll();
for (const systemAccount of systemAccounts) {
// 总部账户不直接参与挖矿,它只接收未解锁算力的收益
if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) {
continue;
}
if (systemAccount.totalContribution.isZero()) {
continue;
}
const reward = this.calculator.calculateUserMiningReward(
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
await this.systemMiningAccountRepository.mine(
systemAccount.accountType,
reward,
`秒挖矿 ${currentSecond.getTime()}`,
);
// 累积系统账户每分钟的挖矿数据到Redis
await this.accumulateSystemMinuteData(
systemAccount.accountType,
currentMinute,
reward,
systemAccount.totalContribution,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
systemParticipantCount++;
}
}
// 3. 分配未解锁算力的收益给总部账户
const pendingMiningResult = await this.distributePendingContributions(
currentSecond,
currentMinute,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(pendingMiningResult.totalDistributed);
pendingParticipantCount = pendingMiningResult.participantCount;
// 每分钟结束时写入汇总的MiningRecord
if (isMinuteEnd) {
await this.writeMinuteRecords(currentMinute);
await this.writeSystemMinuteRecords(currentMinute);
await this.writePendingMinuteRecords(currentMinute);
}
// 执行销毁
@ -137,7 +209,7 @@ export class MiningDistributionService {
// 每分钟记录一次日志
if (isMinuteEnd) {
this.logger.log(
`Minute distribution: distributed=${totalDistributed.toFixed(8)}, participants=${participantCount}`,
`Minute distribution: total=${totalDistributed.toFixed(8)}, users=${userParticipantCount}, system=${systemParticipantCount}, pending=${pendingParticipantCount}`,
);
}
} catch (error) {
@ -147,6 +219,127 @@ export class MiningDistributionService {
}
}
/**
* 使
*/
private async distributeToUsersOnly(
secondDistribution: ShareAmount,
totalContribution: ShareAmount,
currentSecond: Date,
currentMinute: Date,
isMinuteEnd: boolean,
config: any,
): Promise<void> {
let page = 1;
const pageSize = 1000;
let totalDistributed = ShareAmount.zero();
let participantCount = 0;
while (true) {
const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize);
if (accounts.length === 0) break;
for (const account of accounts) {
const reward = this.calculator.calculateUserMiningReward(
account.totalContribution,
totalContribution,
secondDistribution,
);
if (!reward.isZero()) {
account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`);
await this.miningAccountRepository.save(account);
await this.accumulateMinuteData(
account.accountSequence,
currentMinute,
reward,
account.totalContribution,
totalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
participantCount++;
}
}
if (page * pageSize >= total) break;
page++;
}
if (isMinuteEnd) {
await this.writeMinuteRecords(currentMinute);
}
await this.executeBurn(currentSecond);
const newRemaining = config.remainingDistribution.subtract(totalDistributed);
await this.miningConfigRepository.updateRemainingDistribution(newRemaining);
const processedKey = `mining:processed:${currentSecond.getTime()}`;
await this.redis.set(processedKey, '1', 2);
if (isMinuteEnd) {
this.logger.log(
`Minute distribution (fallback): distributed=${totalDistributed.toFixed(8)}, participants=${participantCount}`,
);
}
}
/**
*
*/
private async distributePendingContributions(
currentSecond: Date,
currentMinute: Date,
networkTotalContribution: ShareAmount,
secondDistribution: ShareAmount,
): Promise<{ totalDistributed: ShareAmount; participantCount: number }> {
let totalDistributed = ShareAmount.zero();
let participantCount = 0;
// 获取所有未过期的待解锁算力
const pendingContributions = await this.prisma.pendingContributionMining.findMany({
where: { isExpired: false },
});
for (const pending of pendingContributions) {
const contribution = new ShareAmount(pending.amount);
if (contribution.isZero()) continue;
const reward = this.calculator.calculateUserMiningReward(
contribution,
networkTotalContribution,
secondDistribution,
);
if (!reward.isZero()) {
// 收益归总部账户
await this.systemMiningAccountRepository.mine(
SystemAccountType.HEADQUARTERS,
reward,
`未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`,
);
// 累积待解锁算力每分钟的挖矿数据到Redis
await this.accumulatePendingMinuteData(
pending.id,
currentMinute,
reward,
pending,
networkTotalContribution,
secondDistribution,
);
totalDistributed = totalDistributed.add(reward);
participantCount++;
}
}
return { totalDistributed, participantCount };
}
/**
* Redis
*/
@ -191,6 +384,176 @@ export class MiningDistributionService {
await this.redis.set(key, JSON.stringify(accumulated), 120);
}
/**
* Redis
*/
private async accumulateSystemMinuteData(
accountType: SystemAccountType,
minuteTime: Date,
reward: ShareAmount,
accountContribution: ShareAmount,
totalContribution: ShareAmount,
secondDistribution: ShareAmount,
): Promise<void> {
const key = `mining:system:minute:${minuteTime.getTime()}:${accountType}`;
const existing = await this.redis.get(key);
let accumulated: {
minedAmount: string;
contributionRatio: string;
totalContribution: string;
secondDistribution: string;
accountContribution: string;
secondCount: number;
};
if (existing) {
accumulated = JSON.parse(existing);
accumulated.minedAmount = new Decimal(accumulated.minedAmount).plus(reward.value).toString();
accumulated.secondCount += 1;
accumulated.contributionRatio = accountContribution.value.dividedBy(totalContribution.value).toString();
accumulated.totalContribution = totalContribution.value.toString();
accumulated.secondDistribution = secondDistribution.value.toString();
accumulated.accountContribution = accountContribution.value.toString();
} else {
accumulated = {
minedAmount: reward.value.toString(),
contributionRatio: accountContribution.value.dividedBy(totalContribution.value).toString(),
totalContribution: totalContribution.value.toString(),
secondDistribution: secondDistribution.value.toString(),
accountContribution: accountContribution.value.toString(),
secondCount: 1,
};
}
await this.redis.set(key, JSON.stringify(accumulated), 120);
}
/**
* Redis
*/
private async accumulatePendingMinuteData(
pendingId: bigint,
minuteTime: Date,
reward: ShareAmount,
pending: any,
networkTotalContribution: ShareAmount,
secondDistribution: ShareAmount,
): Promise<void> {
const key = `mining:pending:minute:${minuteTime.getTime()}:${pendingId.toString()}`;
const existing = await this.redis.get(key);
const contribution = new Decimal(pending.amount);
let accumulated: {
minedAmount: string;
contributionRatio: string;
networkTotalContribution: string;
secondDistribution: string;
contributionAmount: string;
sourceAdoptionId: string;
sourceAccountSequence: string;
wouldBeAccountSequence: string | null;
contributionType: string;
secondCount: number;
};
if (existing) {
accumulated = JSON.parse(existing);
accumulated.minedAmount = new Decimal(accumulated.minedAmount).plus(reward.value).toString();
accumulated.secondCount += 1;
accumulated.contributionRatio = contribution.dividedBy(networkTotalContribution.value).toString();
accumulated.networkTotalContribution = networkTotalContribution.value.toString();
accumulated.secondDistribution = secondDistribution.value.toString();
} else {
accumulated = {
minedAmount: reward.value.toString(),
contributionRatio: contribution.dividedBy(networkTotalContribution.value).toString(),
networkTotalContribution: networkTotalContribution.value.toString(),
secondDistribution: secondDistribution.value.toString(),
contributionAmount: contribution.toString(),
sourceAdoptionId: pending.sourceAdoptionId.toString(),
sourceAccountSequence: pending.sourceAccountSequence,
wouldBeAccountSequence: pending.wouldBeAccountSequence,
contributionType: pending.contributionType,
secondCount: 1,
};
}
await this.redis.set(key, JSON.stringify(accumulated), 120);
}
/**
*
*/
private async writeSystemMinuteRecords(minuteTime: Date): Promise<void> {
try {
const pattern = `mining:system:minute:${minuteTime.getTime()}:*`;
const keys = await this.redis.keys(pattern);
for (const key of keys) {
const data = await this.redis.get(key);
if (!data) continue;
const accumulated = JSON.parse(data);
const accountType = key.split(':').pop() as SystemAccountType;
await this.prisma.systemMiningRecord.create({
data: {
accountType,
miningMinute: minuteTime,
contributionRatio: new Decimal(accumulated.contributionRatio),
totalContribution: new Decimal(accumulated.totalContribution),
secondDistribution: new Decimal(accumulated.secondDistribution),
minedAmount: new Decimal(accumulated.minedAmount),
},
});
await this.redis.del(key);
}
} catch (error) {
this.logger.error('Failed to write system minute records', error);
}
}
/**
*
*/
private async writePendingMinuteRecords(minuteTime: Date): Promise<void> {
try {
const pattern = `mining:pending:minute:${minuteTime.getTime()}:*`;
const keys = await this.redis.keys(pattern);
for (const key of keys) {
const data = await this.redis.get(key);
if (!data) continue;
const accumulated = JSON.parse(data);
const pendingId = BigInt(key.split(':').pop()!);
await this.prisma.pendingMiningRecord.create({
data: {
pendingContributionId: pendingId,
miningMinute: minuteTime,
sourceAdoptionId: BigInt(accumulated.sourceAdoptionId),
sourceAccountSequence: accumulated.sourceAccountSequence,
wouldBeAccountSequence: accumulated.wouldBeAccountSequence,
contributionType: accumulated.contributionType,
contributionAmount: new Decimal(accumulated.contributionAmount),
networkTotalContribution: new Decimal(accumulated.networkTotalContribution),
contributionRatio: new Decimal(accumulated.contributionRatio),
secondDistribution: new Decimal(accumulated.secondDistribution),
minedAmount: new Decimal(accumulated.minedAmount),
allocatedTo: 'HEADQUARTERS',
},
});
await this.redis.del(key);
}
} catch (error) {
this.logger.error('Failed to write pending minute records', error);
}
}
/**
* MiningRecord Kafka mining-wallet-service
*/

View File

@ -0,0 +1,198 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { SystemMiningAccountRepository } from '../../infrastructure/persistence/repositories/system-mining-account.repository';
import { ShareAmount } from '../../domain/value-objects/share-amount.vo';
import { SystemAccountType } from '@prisma/client';
import Decimal from 'decimal.js';
interface SystemAccountSyncedData {
accountType: string;
name: string;
contributionBalance: string;
}
interface NetworkProgressUpdatedData {
totalTreeCount: number;
totalAdoptionOrders: number;
totalAdoptedUsers: number;
currentUnit: number;
currentMultiplier: string;
currentContributionPerTree: string;
nextUnitTreeCount: number;
}
/**
*
*
*/
@Injectable()
export class NetworkSyncService {
private readonly logger = new Logger(NetworkSyncService.name);
constructor(
private readonly prisma: PrismaService,
private readonly systemAccountRepository: SystemMiningAccountRepository,
) {}
/**
*
*/
async handleSystemAccountSynced(data: SystemAccountSyncedData): Promise<void> {
try {
const accountType = this.mapAccountType(data.accountType);
if (!accountType) {
this.logger.warn(`Unknown system account type: ${data.accountType}`);
return;
}
const contribution = new ShareAmount(data.contributionBalance);
await this.systemAccountRepository.updateContribution(accountType, contribution);
this.logger.log(
`Synced system account ${data.accountType}: contribution=${data.contributionBalance}`,
);
} catch (error) {
this.logger.error(`Failed to sync system account ${data.accountType}`, error);
throw error;
}
}
/**
*
*/
async handleNetworkProgressUpdated(data: NetworkProgressUpdatedData): Promise<void> {
try {
// 计算全网理论算力
const networkTotalContribution = new Decimal(data.totalTreeCount).mul(
data.currentContributionPerTree,
);
// 更新 MiningConfig 中的全网理论算力
const config = await this.prisma.miningConfig.findFirst();
if (!config) {
this.logger.warn('MiningConfig not found, skipping network progress update');
return;
}
await this.prisma.miningConfig.update({
where: { id: config.id },
data: {
networkTotalContribution: networkTotalContribution,
totalTreeCount: data.totalTreeCount,
contributionPerTree: new Decimal(data.currentContributionPerTree),
networkLastSyncedAt: new Date(),
},
});
this.logger.log(
`Updated network progress: trees=${data.totalTreeCount}, contribution=${networkTotalContribution.toString()}`,
);
} catch (error) {
this.logger.error('Failed to update network progress', error);
throw error;
}
}
/**
* contribution-service
* HTTP API
*/
async syncFromContributionService(contributionServiceUrl: string): Promise<{
success: boolean;
message: string;
data?: {
systemAccounts: number;
networkTotalContribution: string;
};
}> {
try {
// 1. 同步系统账户
const systemAccountsResponse = await fetch(
`${contributionServiceUrl}/api/v2/admin/system-accounts`,
);
if (!systemAccountsResponse.ok) {
throw new Error(`Failed to fetch system accounts: ${systemAccountsResponse.status}`);
}
const systemAccountsResult = await systemAccountsResponse.json();
const systemAccounts = systemAccountsResult.accounts || [];
for (const account of systemAccounts) {
await this.handleSystemAccountSynced({
accountType: account.accountType,
name: account.name,
contributionBalance: account.contributionBalance,
});
}
// 2. 同步全网进度
const progressResponse = await fetch(
`${contributionServiceUrl}/api/v2/admin/network-progress`,
);
if (!progressResponse.ok) {
throw new Error(`Failed to fetch network progress: ${progressResponse.status}`);
}
const progressResult = await progressResponse.json();
await this.handleNetworkProgressUpdated({
totalTreeCount: progressResult.totalTreeCount,
totalAdoptionOrders: progressResult.totalAdoptionOrders,
totalAdoptedUsers: progressResult.totalAdoptedUsers,
currentUnit: progressResult.currentUnit,
currentMultiplier: progressResult.currentMultiplier,
currentContributionPerTree: progressResult.currentContributionPerTree,
nextUnitTreeCount: progressResult.nextUnitTreeCount,
});
// 3. 获取最新的 MiningConfig 来返回结果
const config = await this.prisma.miningConfig.findFirst();
return {
success: true,
message: `Synced ${systemAccounts.length} system accounts and network progress`,
data: {
systemAccounts: systemAccounts.length,
networkTotalContribution: config?.networkTotalContribution?.toString() || '0',
},
};
} catch (error) {
this.logger.error('Failed to sync from contribution service', error);
return {
success: false,
message: `Sync failed: ${error.message}`,
};
}
}
/**
*
*/
async getNetworkTotalContribution(): Promise<ShareAmount> {
const config = await this.prisma.miningConfig.findFirst();
if (!config || !config.networkTotalContribution) {
return ShareAmount.zero();
}
return new ShareAmount(config.networkTotalContribution);
}
/**
*
*/
async getSystemAccountsTotalContribution(): Promise<ShareAmount> {
return this.systemAccountRepository.getTotalContribution();
}
private mapAccountType(type: string): SystemAccountType | null {
switch (type.toUpperCase()) {
case 'OPERATION':
return SystemAccountType.OPERATION;
case 'PROVINCE':
return SystemAccountType.PROVINCE;
case 'CITY':
return SystemAccountType.CITY;
case 'HEADQUARTERS':
return SystemAccountType.HEADQUARTERS;
default:
return null;
}
}
}

View File

@ -7,6 +7,7 @@ import { MiningConfigRepository } from './persistence/repositories/mining-config
import { BlackHoleRepository } from './persistence/repositories/black-hole.repository';
import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { SystemMiningAccountRepository } from './persistence/repositories/system-mining-account.repository';
import { RedisService } from './redis/redis.service';
import { KafkaProducerService } from './kafka/kafka-producer.service';
@ -42,6 +43,7 @@ import { KafkaProducerService } from './kafka/kafka-producer.service';
BlackHoleRepository,
PriceSnapshotRepository,
OutboxRepository,
SystemMiningAccountRepository,
KafkaProducerService,
{
provide: 'REDIS_OPTIONS',
@ -61,6 +63,7 @@ import { KafkaProducerService } from './kafka/kafka-producer.service';
BlackHoleRepository,
PriceSnapshotRepository,
OutboxRepository,
SystemMiningAccountRepository,
KafkaProducerService,
RedisService,
ClientsModule,

View File

@ -13,6 +13,11 @@ export interface MiningConfigEntity {
secondDistribution: ShareAmount;
isActive: boolean;
activatedAt: Date | null;
// 全网理论算力
networkTotalContribution: ShareAmount;
totalTreeCount: number;
contributionPerTree: ShareAmount;
networkLastSyncedAt: Date | null;
}
@Injectable()
@ -90,6 +95,14 @@ export class MiningConfigRepository {
});
}
async getNetworkTotalContribution(): Promise<ShareAmount> {
const config = await this.prisma.miningConfig.findFirst();
if (!config || !config.networkTotalContribution) {
return ShareAmount.zero();
}
return new ShareAmount(config.networkTotalContribution);
}
private toDomain(record: any): MiningConfigEntity {
return {
id: record.id,
@ -102,6 +115,10 @@ export class MiningConfigRepository {
secondDistribution: new ShareAmount(record.secondDistribution),
isActive: record.isActive,
activatedAt: record.activatedAt,
networkTotalContribution: new ShareAmount(record.networkTotalContribution || 0),
totalTreeCount: record.totalTreeCount || 0,
contributionPerTree: new ShareAmount(record.contributionPerTree || 22617),
networkLastSyncedAt: record.networkLastSyncedAt,
};
}
}

View File

@ -0,0 +1,182 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { ShareAmount } from '../../../domain/value-objects/share-amount.vo';
import { SystemAccountType } from '@prisma/client';
export interface SystemMiningAccountSnapshot {
accountType: SystemAccountType;
name: string;
totalMined: ShareAmount;
availableBalance: ShareAmount;
totalContribution: ShareAmount;
lastSyncedAt: Date | null;
}
@Injectable()
export class SystemMiningAccountRepository {
constructor(private readonly prisma: PrismaService) {}
async findByType(accountType: SystemAccountType): Promise<SystemMiningAccountSnapshot | null> {
const record = await this.prisma.systemMiningAccount.findUnique({
where: { accountType },
});
if (!record) {
return null;
}
return this.toSnapshot(record);
}
async findAll(): Promise<SystemMiningAccountSnapshot[]> {
const records = await this.prisma.systemMiningAccount.findMany({
orderBy: { accountType: 'asc' },
});
return records.map((r) => this.toSnapshot(r));
}
async ensureSystemAccountsExist(): Promise<void> {
const accounts = [
{ accountType: SystemAccountType.OPERATION, name: '运营账户' },
{ accountType: SystemAccountType.PROVINCE, name: '省公司账户' },
{ accountType: SystemAccountType.CITY, name: '市公司账户' },
{ accountType: SystemAccountType.HEADQUARTERS, name: '总部账户' },
];
for (const account of accounts) {
await this.prisma.systemMiningAccount.upsert({
where: { accountType: account.accountType },
create: {
accountType: account.accountType,
name: account.name,
totalMined: 0,
availableBalance: 0,
totalContribution: 0,
},
update: {},
});
}
}
async updateContribution(
accountType: SystemAccountType,
contribution: ShareAmount,
): Promise<void> {
await this.prisma.systemMiningAccount.upsert({
where: { accountType },
create: {
accountType,
name: this.getAccountName(accountType),
totalContribution: contribution.value,
lastSyncedAt: new Date(),
},
update: {
totalContribution: contribution.value,
lastSyncedAt: new Date(),
},
});
}
async getTotalContribution(): Promise<ShareAmount> {
const result = await this.prisma.systemMiningAccount.aggregate({
_sum: { totalContribution: true },
});
return new ShareAmount(result._sum.totalContribution || 0);
}
async mine(
accountType: SystemAccountType,
amount: ShareAmount,
memo: string,
): Promise<void> {
await this.prisma.$transaction(async (tx) => {
const account = await tx.systemMiningAccount.findUnique({
where: { accountType },
});
if (!account) {
throw new Error(`System account ${accountType} not found`);
}
const balanceBefore = account.availableBalance;
const balanceAfter = balanceBefore.plus(amount.value);
const totalMined = account.totalMined.plus(amount.value);
await tx.systemMiningAccount.update({
where: { accountType },
data: {
totalMined,
availableBalance: balanceAfter,
},
});
await tx.systemMiningTransaction.create({
data: {
accountType,
type: 'MINE',
amount: amount.value,
balanceBefore,
balanceAfter,
memo,
},
});
});
}
async saveMinuteRecord(
accountType: SystemAccountType,
miningMinute: Date,
contributionRatio: ShareAmount,
totalContribution: ShareAmount,
secondDistribution: ShareAmount,
minedAmount: ShareAmount,
): Promise<void> {
await this.prisma.systemMiningRecord.upsert({
where: {
accountType_miningMinute: {
accountType,
miningMinute,
},
},
create: {
accountType,
miningMinute,
contributionRatio: contributionRatio.value,
totalContribution: totalContribution.value,
secondDistribution: secondDistribution.value,
minedAmount: minedAmount.value,
},
update: {
minedAmount: { increment: minedAmount.value },
},
});
}
private getAccountName(accountType: SystemAccountType): string {
switch (accountType) {
case SystemAccountType.OPERATION:
return '运营账户';
case SystemAccountType.PROVINCE:
return '省公司账户';
case SystemAccountType.CITY:
return '市公司账户';
case SystemAccountType.HEADQUARTERS:
return '总部账户';
default:
return accountType;
}
}
private toSnapshot(record: any): SystemMiningAccountSnapshot {
return {
accountType: record.accountType,
name: record.name,
totalMined: new ShareAmount(record.totalMined),
availableBalance: new ShareAmount(record.availableBalance),
totalContribution: new ShareAmount(record.totalContribution),
lastSyncedAt: record.lastSyncedAt,
};
}
}