From de5416aee69553842c1112ae53066e6e7ef6ff4e Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 16 Jan 2026 03:39:56 -0800 Subject: [PATCH] =?UTF-8?q?feat(mining):=20=E5=AE=9E=E7=8E=B0=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E8=B4=A6=E6=88=B7=E5=92=8C=E5=BE=85=E8=A7=A3=E9=94=81?= =?UTF-8?q?=E7=AE=97=E5=8A=9B=E5=8F=82=E4=B8=8E=E6=8C=96=E7=9F=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重大变更: - 挖矿分母从用户有效算力改为全网理论算力(networkTotalContribution) - 系统账户(运营12%/省1%/市2%)参与挖矿,有独立的挖矿记录 - 待解锁算力参与挖矿,收益归总部账户,记录包含完整来源信息 新增功能: - mining-service: 系统挖矿账户表、待解锁算力表及相关挖矿记录表 - mining-service: NetworkSyncService 同步全网数据 - mining-service: /admin/sync-network 和 /admin/system-accounts 端点 - contribution-service: /admin/system-accounts 和发布系统账户事件端点 - mining-admin-service: 状态检查返回全网理论算力信息 Co-Authored-By: Claude Opus 4.5 --- .../src/api/controllers/admin.controller.ts | 67 +++ .../src/domain/events/index.ts | 1 + .../events/system-account-synced.event.ts | 25 ++ .../src/api/controllers/config.controller.ts | 62 ++- .../migration.sql | 238 +++++++++++ .../mining-service/prisma/schema.prisma | 136 +++++++ .../src/api/controllers/admin.controller.ts | 68 +++- .../src/application/application.module.ts | 3 + .../contribution-event.handler.ts | 22 + .../services/mining-distribution.service.ts | 383 +++++++++++++++++- .../services/network-sync.service.ts | 198 +++++++++ .../infrastructure/infrastructure.module.ts | 3 + .../repositories/mining-config.repository.ts | 17 + .../system-mining-account.repository.ts | 182 +++++++++ 14 files changed, 1379 insertions(+), 26 deletions(-) create mode 100644 backend/services/contribution-service/src/domain/events/system-account-synced.event.ts create mode 100644 backend/services/mining-service/prisma/migrations/0003_add_system_accounts_and_pending_mining/migration.sql create mode 100644 backend/services/mining-service/src/application/services/network-sync.service.ts create mode 100644 backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index edbd2811..ea1ccc99 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -10,6 +10,7 @@ import { AdoptionSyncedEvent, ContributionRecordSyncedEvent, NetworkProgressUpdatedEvent, + SystemAccountSyncedEvent, } from '../../domain/events'; import { Public } from '../../shared/guards/jwt-auth.guard'; @@ -420,4 +421,70 @@ export class AdminController { }; } } + + @Post('system-accounts/publish-all') + @Public() + @ApiOperation({ summary: '发布所有系统账户算力事件到 outbox,用于同步到 mining-service' }) + async publishAllSystemAccounts(): Promise<{ + success: boolean; + publishedCount: number; + message: string; + }> { + try { + const systemAccounts = await this.prisma.systemAccount.findMany(); + + await this.unitOfWork.executeInTransaction(async () => { + const events = systemAccounts.map((account) => { + const event = new SystemAccountSyncedEvent( + account.accountType, + account.name, + account.contributionBalance.toString(), + account.createdAt, + ); + + return { + aggregateType: SystemAccountSyncedEvent.AGGREGATE_TYPE, + aggregateId: account.accountType, + eventType: SystemAccountSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + this.logger.log(`Published ${systemAccounts.length} system account events`); + + return { + success: true, + publishedCount: systemAccounts.length, + message: `Published ${systemAccounts.length} system account events`, + }; + } catch (error) { + this.logger.error('Failed to publish system accounts', error); + return { + success: false, + publishedCount: 0, + message: `Failed: ${error.message}`, + }; + } + } + + @Get('system-accounts') + @Public() + @ApiOperation({ summary: '获取所有系统账户算力' }) + async getSystemAccounts() { + const systemAccounts = await this.prisma.systemAccount.findMany(); + + return { + accounts: systemAccounts.map((a) => ({ + accountType: a.accountType, + name: a.name, + contributionBalance: a.contributionBalance.toString(), + createdAt: a.createdAt, + updatedAt: a.updatedAt, + })), + total: systemAccounts.length, + }; + } } diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index debea840..028f7e60 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -6,3 +6,4 @@ export * from './referral-synced.event'; export * from './adoption-synced.event'; export * from './contribution-record-synced.event'; export * from './network-progress-updated.event'; +export * from './system-account-synced.event'; diff --git a/backend/services/contribution-service/src/domain/events/system-account-synced.event.ts b/backend/services/contribution-service/src/domain/events/system-account-synced.event.ts new file mode 100644 index 00000000..89c13e13 --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/system-account-synced.event.ts @@ -0,0 +1,25 @@ +/** + * 系统账户算力同步事件 + * 用于将系统账户(运营、省、市、总部)的算力同步到 mining-service + */ +export class SystemAccountSyncedEvent { + static readonly EVENT_TYPE = 'SystemAccountSynced'; + static readonly AGGREGATE_TYPE = 'SystemAccount'; + + constructor( + public readonly accountType: string, // OPERATION / PROVINCE / CITY / HEADQUARTERS + public readonly name: string, + public readonly contributionBalance: string, + public readonly createdAt: Date, + ) {} + + toPayload(): Record { + return { + eventType: SystemAccountSyncedEvent.EVENT_TYPE, + accountType: this.accountType, + name: this.name, + contributionBalance: this.contributionBalance, + createdAt: this.createdAt.toISOString(), + }; + } +} diff --git a/backend/services/mining-admin-service/src/api/controllers/config.controller.ts b/backend/services/mining-admin-service/src/api/controllers/config.controller.ts index 22c55825..bed25de2 100644 --- a/backend/services/mining-admin-service/src/api/controllers/config.controller.ts +++ b/backend/services/mining-admin-service/src/api/controllers/config.controller.ts @@ -45,7 +45,7 @@ export class ConfigController { this.logger.log(`Fetching mining status from ${miningServiceUrl}/api/v2/admin/status`); try { - // 并行获取 mining-service 状态和 contribution-service 总算力 + // 并行获取 mining-service 状态和 contribution-service 统计数据 const [miningResponse, contributionResponse] = await Promise.all([ fetch(`${miningServiceUrl}/api/v2/admin/status`), fetch(`${contributionServiceUrl}/api/v2/contribution/stats`).catch(() => null), @@ -57,28 +57,59 @@ export class ConfigController { const miningResult = await miningResponse.json(); this.logger.log(`Mining service response: ${JSON.stringify(miningResult)}`); - // 获取 contribution-service 的总有效算力 - let contributionTotal: string | null = null; + const miningData = miningResult.data || miningResult; + + // 获取 contribution-service 的全网理论算力 + let networkTotalContribution: string | null = null; + let userEffectiveContribution: string | null = null; + let systemAccountsContribution: string | null = null; + if (contributionResponse && contributionResponse.ok) { const contributionResult = await contributionResponse.json(); - // contribution-service 返回的是 data.totalContribution - contributionTotal = contributionResult.data?.totalContribution || contributionResult.totalContribution || null; + const data = contributionResult.data || contributionResult; + + // 全网理论算力 = 总认种树 × 每棵树算力 + networkTotalContribution = data.networkTotalContribution || null; + // 用户有效算力 + userEffectiveContribution = data.totalContribution || null; + // 系统账户算力 + const systemAccounts = data.systemAccounts || []; + const systemTotal = systemAccounts + .filter((a: any) => a.accountType !== 'HEADQUARTERS') + .reduce((sum: number, a: any) => sum + parseFloat(a.totalContribution || '0'), 0); + systemAccountsContribution = systemTotal.toString(); } - const miningData = miningResult.data || miningResult; - const miningTotal = miningData.totalContribution || '0'; + // mining-service 中的全网理论算力 + const miningNetworkTotal = miningData.networkTotalContribution || '0'; + // mining-service 中的用户有效算力 + const miningUserTotal = miningData.totalContribution || '0'; - // 判断算力是否同步完成:两边总算力相等 - const isSynced = contributionTotal !== null && - parseFloat(contributionTotal) > 0 && - Math.abs(parseFloat(miningTotal) - parseFloat(contributionTotal)) < 0.01; + // 判断算力是否同步完成 + // 条件1:全网理论算力已同步(mining-service 中的 networkTotalContribution > 0) + // 条件2:用户有效算力已同步(mining-service 中的 totalContribution 与 contribution-service 相近) + const hasNetworkContribution = parseFloat(miningNetworkTotal) > 0; + const userSynced = userEffectiveContribution !== null && + parseFloat(userEffectiveContribution) > 0 && + Math.abs(parseFloat(miningUserTotal) - parseFloat(userEffectiveContribution)) < 0.01; + + const isSynced = hasNetworkContribution && userSynced; return { ...miningData, contributionSyncStatus: { isSynced, - miningTotal, - contributionTotal: contributionTotal || '0', + // 全网理论算力(应作为挖矿分母) + networkTotalContribution: networkTotalContribution || '0', + miningNetworkTotal, + // 用户有效算力 + userEffectiveContribution: userEffectiveContribution || '0', + miningUserTotal, + // 系统账户算力 + systemAccountsContribution: systemAccountsContribution || '0', + // 兼容旧字段 + miningTotal: miningUserTotal, + contributionTotal: userEffectiveContribution || '0', }, }; } catch (error) { @@ -89,6 +120,11 @@ export class ConfigController { error: `Unable to connect to mining service: ${error.message}`, contributionSyncStatus: { isSynced: false, + networkTotalContribution: '0', + miningNetworkTotal: '0', + userEffectiveContribution: '0', + miningUserTotal: '0', + systemAccountsContribution: '0', miningTotal: '0', contributionTotal: '0', }, diff --git a/backend/services/mining-service/prisma/migrations/0003_add_system_accounts_and_pending_mining/migration.sql b/backend/services/mining-service/prisma/migrations/0003_add_system_accounts_and_pending_mining/migration.sql new file mode 100644 index 00000000..6ebd1564 --- /dev/null +++ b/backend/services/mining-service/prisma/migrations/0003_add_system_accounts_and_pending_mining/migration.sql @@ -0,0 +1,238 @@ +-- ============================================================================ +-- 添加系统账户和待解锁算力挖矿功能 +-- 1. MiningConfig 添加全网理论算力字段 +-- 2. 系统账户(运营/省/市/总部)挖矿 +-- 3. 待解锁算力挖矿记录 +-- 4. 挖矿收益分配明细 +-- ============================================================================ + +-- ==================== MiningConfig 添加全网理论算力字段 ==================== + +ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "network_total_contribution" DECIMAL(30, 8) NOT NULL DEFAULT 0; +ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "total_tree_count" INTEGER NOT NULL DEFAULT 0; +ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "contribution_per_tree" DECIMAL(20, 10) NOT NULL DEFAULT 22617; +ALTER TABLE "mining_configs" ADD COLUMN IF NOT EXISTS "network_last_synced_at" TIMESTAMP(3); + +-- ==================== 系统账户类型枚举 ==================== + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'SystemAccountType') THEN + CREATE TYPE "SystemAccountType" AS ENUM ('OPERATION', 'PROVINCE', 'CITY', 'HEADQUARTERS'); + END IF; +END$$; + +-- ==================== 系统挖矿账户 ==================== + +CREATE TABLE IF NOT EXISTS "system_mining_accounts" ( + "id" TEXT NOT NULL, + "account_type" "SystemAccountType" NOT NULL, + "name" VARCHAR(100) NOT NULL, + "totalMined" DECIMAL(30, 8) NOT NULL DEFAULT 0, + "availableBalance" DECIMAL(30, 8) NOT NULL DEFAULT 0, + "totalContribution" DECIMAL(30, 8) NOT NULL DEFAULT 0, + "last_synced_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "system_mining_accounts_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX IF NOT EXISTS "system_mining_accounts_account_type_key" ON "system_mining_accounts"("account_type"); +CREATE INDEX IF NOT EXISTS "system_mining_accounts_totalContribution_idx" ON "system_mining_accounts"("totalContribution" DESC); + +-- ==================== 系统账户挖矿记录 ==================== + +CREATE TABLE IF NOT EXISTS "system_mining_records" ( + "id" TEXT NOT NULL, + "account_type" "SystemAccountType" NOT NULL, + "mining_minute" TIMESTAMP(3) NOT NULL, + "contribution_ratio" DECIMAL(30, 18) NOT NULL, + "total_contribution" DECIMAL(30, 8) NOT NULL, + "second_distribution" DECIMAL(30, 18) NOT NULL, + "mined_amount" DECIMAL(30, 18) NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "system_mining_records_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX IF NOT EXISTS "system_mining_records_account_type_mining_minute_key" ON "system_mining_records"("account_type", "mining_minute"); +CREATE INDEX IF NOT EXISTS "system_mining_records_mining_minute_idx" ON "system_mining_records"("mining_minute"); + +ALTER TABLE "system_mining_records" DROP CONSTRAINT IF EXISTS "system_mining_records_account_type_fkey"; +ALTER TABLE "system_mining_records" ADD CONSTRAINT "system_mining_records_account_type_fkey" + FOREIGN KEY ("account_type") REFERENCES "system_mining_accounts"("account_type") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- ==================== 系统账户交易流水 ==================== + +CREATE TABLE IF NOT EXISTS "system_mining_transactions" ( + "id" TEXT NOT NULL, + "account_type" "SystemAccountType" NOT NULL, + "type" TEXT NOT NULL, + "amount" DECIMAL(30, 8) NOT NULL, + "balance_before" DECIMAL(30, 8) NOT NULL, + "balance_after" DECIMAL(30, 8) NOT NULL, + "reference_id" TEXT, + "reference_type" TEXT, + "memo" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "system_mining_transactions_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX IF NOT EXISTS "system_mining_transactions_account_type_created_at_idx" ON "system_mining_transactions"("account_type", "created_at" DESC); + +ALTER TABLE "system_mining_transactions" DROP CONSTRAINT IF EXISTS "system_mining_transactions_account_type_fkey"; +ALTER TABLE "system_mining_transactions" ADD CONSTRAINT "system_mining_transactions_account_type_fkey" + FOREIGN KEY ("account_type") REFERENCES "system_mining_accounts"("account_type") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- ==================== 待解锁算力挖矿 ==================== + +CREATE TABLE IF NOT EXISTS "pending_contribution_mining" ( + "id" BIGSERIAL NOT NULL, + "source_adoption_id" BIGINT NOT NULL, + "source_account_sequence" VARCHAR(20) NOT NULL, + "would_be_account_sequence" VARCHAR(20), + "contribution_type" VARCHAR(30) NOT NULL, + "amount" DECIMAL(30, 10) NOT NULL, + "reason" VARCHAR(200), + "effective_date" DATE NOT NULL, + "expire_date" DATE NOT NULL, + "is_expired" BOOLEAN NOT NULL DEFAULT false, + "last_synced_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "pending_contribution_mining_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX IF NOT EXISTS "pending_contribution_mining_source_adoption_id_would_be_acco_key" + ON "pending_contribution_mining"("source_adoption_id", "would_be_account_sequence", "contribution_type"); +CREATE INDEX IF NOT EXISTS "pending_contribution_mining_would_be_account_sequence_idx" ON "pending_contribution_mining"("would_be_account_sequence"); +CREATE INDEX IF NOT EXISTS "pending_contribution_mining_contribution_type_idx" ON "pending_contribution_mining"("contribution_type"); +CREATE INDEX IF NOT EXISTS "pending_contribution_mining_is_expired_idx" ON "pending_contribution_mining"("is_expired"); + +-- ==================== 待解锁算力挖矿记录 ==================== + +CREATE TABLE IF NOT EXISTS "pending_mining_records" ( + "id" BIGSERIAL NOT NULL, + "pending_contribution_id" BIGINT NOT NULL, + "mining_minute" TIMESTAMP(3) NOT NULL, + "source_adoption_id" BIGINT NOT NULL, + "source_account_sequence" VARCHAR(20) NOT NULL, + "would_be_account_sequence" VARCHAR(20), + "contribution_type" VARCHAR(30) NOT NULL, + "contribution_amount" DECIMAL(30, 10) NOT NULL, + "network_total_contribution" DECIMAL(30, 10) NOT NULL, + "contribution_ratio" DECIMAL(30, 18) NOT NULL, + "second_distribution" DECIMAL(30, 18) NOT NULL, + "mined_amount" DECIMAL(30, 18) NOT NULL, + "allocated_to" VARCHAR(20) NOT NULL DEFAULT 'HEADQUARTERS', + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "pending_mining_records_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX IF NOT EXISTS "pending_mining_records_pending_contribution_id_mining_minute_key" + ON "pending_mining_records"("pending_contribution_id", "mining_minute"); +CREATE INDEX IF NOT EXISTS "pending_mining_records_mining_minute_idx" ON "pending_mining_records"("mining_minute"); +CREATE INDEX IF NOT EXISTS "pending_mining_records_source_account_sequence_idx" ON "pending_mining_records"("source_account_sequence"); +CREATE INDEX IF NOT EXISTS "pending_mining_records_would_be_account_sequence_idx" ON "pending_mining_records"("would_be_account_sequence"); + +ALTER TABLE "pending_mining_records" DROP CONSTRAINT IF EXISTS "pending_mining_records_pending_contribution_id_fkey"; +ALTER TABLE "pending_mining_records" ADD CONSTRAINT "pending_mining_records_pending_contribution_id_fkey" + FOREIGN KEY ("pending_contribution_id") REFERENCES "pending_contribution_mining"("id") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- ==================== 挖矿收益分配明细 ==================== + +CREATE TABLE IF NOT EXISTS "mining_reward_allocations" ( + "id" BIGSERIAL NOT NULL, + "mining_date" DATE NOT NULL, + "contribution_record_id" BIGINT NOT NULL, + "source_adoption_id" BIGINT NOT NULL, + "source_account_sequence" VARCHAR(20) NOT NULL, + "owner_account_sequence" VARCHAR(20) NOT NULL, + "contribution_type" VARCHAR(30) NOT NULL, + "contribution_amount" DECIMAL(30, 10) NOT NULL, + "network_total_contribution" DECIMAL(30, 10) NOT NULL, + "contribution_ratio" DECIMAL(30, 18) NOT NULL, + "daily_mining_pool" DECIMAL(30, 10) NOT NULL, + "reward_amount" DECIMAL(30, 10) NOT NULL, + "allocation_status" VARCHAR(20) NOT NULL, + "is_unlocked" BOOLEAN NOT NULL, + "allocated_to_account_sequence" VARCHAR(20), + "allocated_to_system_account" VARCHAR(20), + "unlocked_reason" VARCHAR(200), + "owner_has_adopted" BOOLEAN NOT NULL, + "owner_direct_referral_count" INTEGER NOT NULL, + "owner_unlocked_level_depth" INTEGER NOT NULL, + "owner_unlocked_bonus_tiers" INTEGER NOT NULL, + "remark" VARCHAR(500), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "mining_reward_allocations_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_mining_date_idx" ON "mining_reward_allocations"("mining_date"); +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_owner_account_sequence_mining_date_idx" ON "mining_reward_allocations"("owner_account_sequence", "mining_date"); +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_source_account_sequence_idx" ON "mining_reward_allocations"("source_account_sequence"); +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_source_adoption_id_idx" ON "mining_reward_allocations"("source_adoption_id"); +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_allocation_status_idx" ON "mining_reward_allocations"("allocation_status"); +CREATE INDEX IF NOT EXISTS "mining_reward_allocations_contribution_record_id_idx" ON "mining_reward_allocations"("contribution_record_id"); + +-- ==================== 每日挖矿收益汇总 ==================== + +CREATE TABLE IF NOT EXISTS "daily_mining_reward_summaries" ( + "id" BIGSERIAL NOT NULL, + "mining_date" DATE NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "unlocked_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "pending_reward_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "personal_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "level_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "bonus_reward" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "pending_level_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "pending_bonus_to_hq" DECIMAL(30, 10) NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "daily_mining_reward_summaries_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX IF NOT EXISTS "daily_mining_reward_summaries_mining_date_account_sequence_key" + ON "daily_mining_reward_summaries"("mining_date", "account_sequence"); +CREATE INDEX IF NOT EXISTS "daily_mining_reward_summaries_mining_date_idx" ON "daily_mining_reward_summaries"("mining_date"); +CREATE INDEX IF NOT EXISTS "daily_mining_reward_summaries_account_sequence_idx" ON "daily_mining_reward_summaries"("account_sequence"); + +-- ==================== 总部待解锁收益明细 ==================== + +CREATE TABLE IF NOT EXISTS "headquarters_pending_rewards" ( + "id" BIGSERIAL NOT NULL, + "mining_date" DATE NOT NULL, + "would_be_account_sequence" VARCHAR(20) NOT NULL, + "source_adoption_id" BIGINT NOT NULL, + "source_account_sequence" VARCHAR(20) NOT NULL, + "contribution_record_id" BIGINT NOT NULL, + "contribution_type" VARCHAR(30) NOT NULL, + "contribution_amount" DECIMAL(30, 10) NOT NULL, + "reward_amount" DECIMAL(30, 10) NOT NULL, + "reason" VARCHAR(200) NOT NULL, + "owner_has_adopted" BOOLEAN NOT NULL, + "owner_direct_referral_count" INTEGER NOT NULL, + "required_condition" VARCHAR(100) NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "headquarters_pending_rewards_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_mining_date_idx" ON "headquarters_pending_rewards"("mining_date"); +CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_would_be_account_sequence_idx" ON "headquarters_pending_rewards"("would_be_account_sequence"); +CREATE INDEX IF NOT EXISTS "headquarters_pending_rewards_source_adoption_id_idx" ON "headquarters_pending_rewards"("source_adoption_id"); + +-- ==================== 初始化系统账户 ==================== + +INSERT INTO "system_mining_accounts" ("id", "account_type", "name", "totalMined", "availableBalance", "totalContribution", "updated_at") +VALUES + (gen_random_uuid(), 'OPERATION', '运营账户', 0, 0, 0, NOW()), + (gen_random_uuid(), 'PROVINCE', '省公司账户', 0, 0, 0, NOW()), + (gen_random_uuid(), 'CITY', '市公司账户', 0, 0, 0, NOW()), + (gen_random_uuid(), 'HEADQUARTERS', '总部账户', 0, 0, 0, NOW()) +ON CONFLICT ("account_type") DO NOTHING; diff --git a/backend/services/mining-service/prisma/schema.prisma b/backend/services/mining-service/prisma/schema.prisma index b3d4117b..d15846d9 100644 --- a/backend/services/mining-service/prisma/schema.prisma +++ b/backend/services/mining-service/prisma/schema.prisma @@ -21,6 +21,14 @@ model MiningConfig { secondDistribution Decimal @db.Decimal(30, 18) // 每秒分配量 isActive Boolean @default(false) // 是否已激活挖矿 activatedAt DateTime? // 激活时间 + + // 全网理论算力(从 contribution-service 同步) + // 理论算力 = 总认种树 × 基础算力/棵 × 系数 + networkTotalContribution Decimal @default(0) @db.Decimal(30, 8) @map("network_total_contribution") + totalTreeCount Int @default(0) @map("total_tree_count") // 全网总认种树 + contributionPerTree Decimal @default(22617) @db.Decimal(20, 10) @map("contribution_per_tree") // 当前每棵树算力 + networkLastSyncedAt DateTime? @map("network_last_synced_at") // 最后同步时间 + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -42,6 +50,134 @@ model MiningEra { @@map("mining_eras") } +// ==================== 系统账户(运营/省/市/总部)==================== + +// 系统账户类型枚举 +enum SystemAccountType { + OPERATION // 运营账户 12% + PROVINCE // 省公司账户 1% + CITY // 市公司账户 2% + HEADQUARTERS // 总部账户(收取未解锁的收益) +} + +// 系统挖矿账户 +model SystemMiningAccount { + id String @id @default(uuid()) + accountType SystemAccountType @unique @map("account_type") + name String @db.VarChar(100) + totalMined Decimal @default(0) @db.Decimal(30, 8) // 总挖到的积分股 + availableBalance Decimal @default(0) @db.Decimal(30, 8) // 可用余额 + totalContribution Decimal @default(0) @db.Decimal(30, 8) // 当前算力(从 contribution-service 同步) + lastSyncedAt DateTime? @map("last_synced_at") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + records SystemMiningRecord[] + transactions SystemMiningTransaction[] + + @@index([totalContribution(sort: Desc)]) + @@map("system_mining_accounts") +} + +// 系统账户挖矿记录(分钟级别汇总) +model SystemMiningRecord { + id String @id @default(uuid()) + accountType SystemAccountType @map("account_type") + miningMinute DateTime @map("mining_minute") + contributionRatio Decimal @db.Decimal(30, 18) @map("contribution_ratio") + totalContribution Decimal @db.Decimal(30, 8) @map("total_contribution") + secondDistribution Decimal @db.Decimal(30, 18) @map("second_distribution") + minedAmount Decimal @db.Decimal(30, 18) @map("mined_amount") + createdAt DateTime @default(now()) @map("created_at") + + account SystemMiningAccount @relation(fields: [accountType], references: [accountType]) + + @@unique([accountType, miningMinute]) + @@index([miningMinute]) + @@map("system_mining_records") +} + +// 系统账户交易流水 +model SystemMiningTransaction { + id String @id @default(uuid()) + accountType SystemAccountType @map("account_type") + type String // MINE, TRANSFER_OUT, ADJUSTMENT + amount Decimal @db.Decimal(30, 8) + balanceBefore Decimal @db.Decimal(30, 8) @map("balance_before") + balanceAfter Decimal @db.Decimal(30, 8) @map("balance_after") + referenceId String? @map("reference_id") + referenceType String? @map("reference_type") + memo String? @db.Text + createdAt DateTime @default(now()) @map("created_at") + + account SystemMiningAccount @relation(fields: [accountType], references: [accountType]) + + @@index([accountType, createdAt(sort: Desc)]) + @@map("system_mining_transactions") +} + +// ==================== 未解锁算力挖矿(归总部)==================== + +// 未解锁算力参与挖矿,收益归总部 +// 从 contribution-service 同步 unallocated_contributions 数据 +model PendingContributionMining { + id BigInt @id @default(autoincrement()) + sourceAdoptionId BigInt @map("source_adoption_id") // 来源认种ID + sourceAccountSequence String @map("source_account_sequence") @db.VarChar(20) // 认种人账号 + wouldBeAccountSequence String? @map("would_be_account_sequence") @db.VarChar(20) // 应该归属的账户 + contributionType String @map("contribution_type") @db.VarChar(30) // LEVEL_6~15 / BONUS_TIER_2/3 + amount Decimal @db.Decimal(30, 10) // 待解锁算力金额 + reason String? @db.VarChar(200) // 未解锁原因 + effectiveDate DateTime @map("effective_date") @db.Date + expireDate DateTime @map("expire_date") @db.Date + isExpired Boolean @default(false) @map("is_expired") + lastSyncedAt DateTime? @map("last_synced_at") + createdAt DateTime @default(now()) @map("created_at") + + records PendingMiningRecord[] + + @@unique([sourceAdoptionId, wouldBeAccountSequence, contributionType]) + @@index([wouldBeAccountSequence]) + @@index([contributionType]) + @@index([isExpired]) + @@map("pending_contribution_mining") +} + +// 未解锁算力挖矿记录(分钟级别汇总) +// 记录每笔未解锁算力每分钟产生的挖矿收益明细 +// 收益归总部账户(HEADQUARTERS) +model PendingMiningRecord { + id BigInt @id @default(autoincrement()) + pendingContributionId BigInt @map("pending_contribution_id") // 关联的待解锁算力记录 + miningMinute DateTime @map("mining_minute") // 挖矿时间(精确到分钟) + + // ========== 算力来源信息(冗余存储便于查询)========== + sourceAdoptionId BigInt @map("source_adoption_id") // 来源认种ID + sourceAccountSequence String @map("source_account_sequence") @db.VarChar(20) // 认种人账号 + wouldBeAccountSequence String? @map("would_be_account_sequence") @db.VarChar(20) // 应该归属的账户(如果解锁了) + contributionType String @map("contribution_type") @db.VarChar(30) // LEVEL_6~15 / BONUS_TIER_2/3 + + // ========== 挖矿计算参数 ========== + contributionAmount Decimal @map("contribution_amount") @db.Decimal(30, 10) // 参与挖矿的算力 + networkTotalContribution Decimal @map("network_total_contribution") @db.Decimal(30, 10) // 全网理论算力 + contributionRatio Decimal @map("contribution_ratio") @db.Decimal(30, 18) // 算力占比 + secondDistribution Decimal @map("second_distribution") @db.Decimal(30, 18) // 每秒分配量 + + // ========== 挖矿收益 ========== + minedAmount Decimal @map("mined_amount") @db.Decimal(30, 18) // 该分钟挖到的数量 + allocatedTo String @default("HEADQUARTERS") @map("allocated_to") @db.VarChar(20) // 收益归属:HEADQUARTERS + + createdAt DateTime @default(now()) @map("created_at") + + pendingContribution PendingContributionMining @relation(fields: [pendingContributionId], references: [id]) + + @@unique([pendingContributionId, miningMinute]) + @@index([miningMinute]) + @@index([sourceAccountSequence]) + @@index([wouldBeAccountSequence]) + @@map("pending_mining_records") +} + // ==================== 用户挖矿账户 ==================== // 用户挖矿账户 diff --git a/backend/services/mining-service/src/api/controllers/admin.controller.ts b/backend/services/mining-service/src/api/controllers/admin.controller.ts index 7fe0e1e2..ddf313dd 100644 --- a/backend/services/mining-service/src/api/controllers/admin.controller.ts +++ b/backend/services/mining-service/src/api/controllers/admin.controller.ts @@ -1,12 +1,18 @@ import { Controller, Get, Post, HttpException, HttpStatus } from '@nestjs/common'; import { ApiTags, ApiOperation } from '@nestjs/swagger'; +import { ConfigService } from '@nestjs/config'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { NetworkSyncService } from '../../application/services/network-sync.service'; import { Public } from '../../shared/guards/jwt-auth.guard'; @ApiTags('Admin') @Controller('admin') export class AdminController { - constructor(private readonly prisma: PrismaService) {} + constructor( + private readonly prisma: PrismaService, + private readonly networkSyncService: NetworkSyncService, + private readonly configService: ConfigService, + ) {} @Get('accounts/sync') @Public() @@ -45,10 +51,23 @@ export class AdminController { const config = await this.prisma.miningConfig.findFirst(); const blackHole = await this.prisma.blackHole.findFirst(); const accountCount = await this.prisma.miningAccount.count(); - const totalContribution = await this.prisma.miningAccount.aggregate({ + + // 用户有效算力 + const userContribution = await this.prisma.miningAccount.aggregate({ _sum: { totalContribution: true }, }); + // 系统账户算力 + const systemContribution = await this.prisma.systemMiningAccount.aggregate({ + _sum: { totalContribution: true }, + }); + + // 待解锁算力 + const pendingContribution = await this.prisma.pendingContributionMining.aggregate({ + _sum: { amount: true }, + where: { isExpired: false }, + }); + return { initialized: !!config, isActive: config?.isActive || false, @@ -64,7 +83,17 @@ export class AdminController { } : null, accountCount, - totalContribution: totalContribution._sum.totalContribution?.toString() || '0', + // 用户有效算力 + totalContribution: userContribution._sum.totalContribution?.toString() || '0', + // 全网理论算力(从 contribution-service 同步) + networkTotalContribution: config?.networkTotalContribution?.toString() || '0', + totalTreeCount: config?.totalTreeCount || 0, + contributionPerTree: config?.contributionPerTree?.toString() || '22617', + networkLastSyncedAt: config?.networkLastSyncedAt, + // 系统账户算力 + systemContribution: systemContribution._sum.totalContribution?.toString() || '0', + // 待解锁算力 + pendingContribution: pendingContribution._sum.amount?.toString() || '0', }; } @@ -116,4 +145,37 @@ export class AdminController { return { success: true, message: '挖矿系统已停用' }; } + + @Post('sync-network') + @Public() + @ApiOperation({ summary: '从 contribution-service 同步全网数据(系统账户、全网进度)' }) + async syncNetwork() { + const contributionServiceUrl = this.configService.get( + 'CONTRIBUTION_SERVICE_URL', + 'http://localhost:3020', + ); + + return this.networkSyncService.syncFromContributionService(contributionServiceUrl); + } + + @Get('system-accounts') + @Public() + @ApiOperation({ summary: '获取系统账户挖矿状态' }) + async getSystemAccounts() { + const accounts = await this.prisma.systemMiningAccount.findMany({ + orderBy: { accountType: 'asc' }, + }); + + return { + accounts: accounts.map((acc) => ({ + accountType: acc.accountType, + name: acc.name, + totalMined: acc.totalMined.toString(), + availableBalance: acc.availableBalance.toString(), + totalContribution: acc.totalContribution.toString(), + lastSyncedAt: acc.lastSyncedAt, + })), + total: accounts.length, + }; + } } diff --git a/backend/services/mining-service/src/application/application.module.ts b/backend/services/mining-service/src/application/application.module.ts index f185fb78..aac97260 100644 --- a/backend/services/mining-service/src/application/application.module.ts +++ b/backend/services/mining-service/src/application/application.module.ts @@ -5,6 +5,7 @@ import { InfrastructureModule } from '../infrastructure/infrastructure.module'; // Services import { MiningDistributionService } from './services/mining-distribution.service'; import { ContributionSyncService } from './services/contribution-sync.service'; +import { NetworkSyncService } from './services/network-sync.service'; // Queries import { GetMiningAccountQuery } from './queries/get-mining-account.query'; @@ -24,6 +25,7 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler'; // Services MiningDistributionService, ContributionSyncService, + NetworkSyncService, // Queries GetMiningAccountQuery, @@ -40,6 +42,7 @@ import { OutboxScheduler } from './schedulers/outbox.scheduler'; exports: [ MiningDistributionService, ContributionSyncService, + NetworkSyncService, GetMiningAccountQuery, GetMiningStatsQuery, GetPriceQuery, diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index e5e4529c..891e27fc 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -1,6 +1,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { ContributionSyncService } from '../services/contribution-sync.service'; +import { NetworkSyncService } from '../services/network-sync.service'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; @Injectable() @@ -10,6 +11,7 @@ export class ContributionEventHandler implements OnModuleInit { constructor( private readonly syncService: ContributionSyncService, + private readonly networkSyncService: NetworkSyncService, private readonly configService: ConfigService, ) {} @@ -72,6 +74,26 @@ export class ContributionEventHandler implements OnModuleInit { totalContribution: eventPayload.totalContribution, activeAccounts: eventPayload.activeAccounts, }); + } else if (eventType === 'SystemAccountSynced') { + this.logger.log(`Received SystemAccountSynced for ${eventPayload.accountType}`); + + await this.networkSyncService.handleSystemAccountSynced({ + accountType: eventPayload.accountType, + name: eventPayload.name, + contributionBalance: eventPayload.contributionBalance, + }); + } else if (eventType === 'NetworkProgressUpdated') { + this.logger.log(`Received NetworkProgressUpdated: trees=${eventPayload.totalTreeCount}`); + + await this.networkSyncService.handleNetworkProgressUpdated({ + totalTreeCount: eventPayload.totalTreeCount, + totalAdoptionOrders: eventPayload.totalAdoptionOrders, + totalAdoptedUsers: eventPayload.totalAdoptedUsers, + currentUnit: eventPayload.currentUnit, + currentMultiplier: eventPayload.currentMultiplier, + currentContributionPerTree: eventPayload.currentContributionPerTree, + nextUnitTreeCount: eventPayload.nextUnitTreeCount, + }); } } catch (error) { this.logger.error('Failed to handle contribution event', error); diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index aa12e2bc..4dbaca3f 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -5,10 +5,12 @@ import { MiningConfigRepository } from '../../infrastructure/persistence/reposit import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository'; import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { SystemMiningAccountRepository } from '../../infrastructure/persistence/repositories/system-mining-account.repository'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; import { MiningCalculatorService } from '../../domain/services/mining-calculator.service'; import { ShareAmount } from '../../domain/value-objects/share-amount.vo'; +import { SystemAccountType } from '@prisma/client'; import Decimal from 'decimal.js'; /** @@ -32,6 +34,7 @@ export class MiningDistributionService { private readonly blackHoleRepository: BlackHoleRepository, private readonly priceSnapshotRepository: PriceSnapshotRepository, private readonly outboxRepository: OutboxRepository, + private readonly systemMiningAccountRepository: SystemMiningAccountRepository, private readonly prisma: PrismaService, private readonly redis: RedisService, private readonly configService: ConfigService, @@ -41,6 +44,11 @@ export class MiningDistributionService { * 执行每秒挖矿分配 * - 每秒更新账户余额 * - 每分钟写入汇总MiningRecord + * + * 重要变更: + * - 使用全网理论算力作为分母(包含用户有效算力 + 系统账户算力 + 未解锁算力) + * - 系统账户(运营/省/市)参与挖矿,有独立的挖矿记录 + * - 未解锁算力参与挖矿,收益归总部账户,有明细记录 */ async executeSecondDistribution(): Promise { // 获取分布式锁(锁定时间900ms) @@ -72,17 +80,29 @@ export class MiningDistributionService { return; } - // 获取有算力的账户 - const totalContribution = await this.miningAccountRepository.getTotalContribution(); - if (totalContribution.isZero()) { + // 使用全网理论算力作为分母 + // 理论算力 = 用户有效算力 + 系统账户算力 + 未解锁算力 + const networkTotalContribution = config.networkTotalContribution; + if (networkTotalContribution.isZero()) { + // 如果全网理论算力未同步,则回退到用户有效算力 + this.logger.warn('Network total contribution is zero, falling back to user contribution'); + const userContribution = await this.miningAccountRepository.getTotalContribution(); + if (userContribution.isZero()) { + return; + } + // 使用用户有效算力(旧逻辑) + await this.distributeToUsersOnly(secondDistribution, userContribution, currentSecond, currentMinute, isMinuteEnd, config); return; } - // 分批处理账户 + let totalDistributed = ShareAmount.zero(); + let userParticipantCount = 0; + let systemParticipantCount = 0; + let pendingParticipantCount = 0; + + // 1. 分配给用户账户 let page = 1; const pageSize = 1000; - let totalDistributed = ShareAmount.zero(); - let participantCount = 0; while (true) { const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize); @@ -91,7 +111,7 @@ export class MiningDistributionService { for (const account of accounts) { const reward = this.calculator.calculateUserMiningReward( account.totalContribution, - totalContribution, + networkTotalContribution, secondDistribution, ); @@ -106,12 +126,12 @@ export class MiningDistributionService { currentMinute, reward, account.totalContribution, - totalContribution, + networkTotalContribution, secondDistribution, ); totalDistributed = totalDistributed.add(reward); - participantCount++; + userParticipantCount++; } } @@ -119,9 +139,61 @@ export class MiningDistributionService { page++; } + // 2. 分配给系统账户(运营/省/市) + const systemAccounts = await this.systemMiningAccountRepository.findAll(); + for (const systemAccount of systemAccounts) { + // 总部账户不直接参与挖矿,它只接收未解锁算力的收益 + if (systemAccount.accountType === SystemAccountType.HEADQUARTERS) { + continue; + } + + if (systemAccount.totalContribution.isZero()) { + continue; + } + + const reward = this.calculator.calculateUserMiningReward( + systemAccount.totalContribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + await this.systemMiningAccountRepository.mine( + systemAccount.accountType, + reward, + `秒挖矿 ${currentSecond.getTime()}`, + ); + + // 累积系统账户每分钟的挖矿数据到Redis + await this.accumulateSystemMinuteData( + systemAccount.accountType, + currentMinute, + reward, + systemAccount.totalContribution, + networkTotalContribution, + secondDistribution, + ); + + totalDistributed = totalDistributed.add(reward); + systemParticipantCount++; + } + } + + // 3. 分配未解锁算力的收益给总部账户 + const pendingMiningResult = await this.distributePendingContributions( + currentSecond, + currentMinute, + networkTotalContribution, + secondDistribution, + ); + totalDistributed = totalDistributed.add(pendingMiningResult.totalDistributed); + pendingParticipantCount = pendingMiningResult.participantCount; + // 每分钟结束时,写入汇总的MiningRecord if (isMinuteEnd) { await this.writeMinuteRecords(currentMinute); + await this.writeSystemMinuteRecords(currentMinute); + await this.writePendingMinuteRecords(currentMinute); } // 执行销毁 @@ -137,7 +209,7 @@ export class MiningDistributionService { // 每分钟记录一次日志 if (isMinuteEnd) { this.logger.log( - `Minute distribution: distributed=${totalDistributed.toFixed(8)}, participants=${participantCount}`, + `Minute distribution: total=${totalDistributed.toFixed(8)}, users=${userParticipantCount}, system=${systemParticipantCount}, pending=${pendingParticipantCount}`, ); } } catch (error) { @@ -147,6 +219,127 @@ export class MiningDistributionService { } } + /** + * 仅分配给用户(旧逻辑,当全网理论算力未同步时使用) + */ + private async distributeToUsersOnly( + secondDistribution: ShareAmount, + totalContribution: ShareAmount, + currentSecond: Date, + currentMinute: Date, + isMinuteEnd: boolean, + config: any, + ): Promise { + let page = 1; + const pageSize = 1000; + let totalDistributed = ShareAmount.zero(); + let participantCount = 0; + + while (true) { + const { data: accounts, total } = await this.miningAccountRepository.findAllWithContribution(page, pageSize); + if (accounts.length === 0) break; + + for (const account of accounts) { + const reward = this.calculator.calculateUserMiningReward( + account.totalContribution, + totalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + account.mine(reward, `秒挖矿 ${currentSecond.getTime()}`); + await this.miningAccountRepository.save(account); + + await this.accumulateMinuteData( + account.accountSequence, + currentMinute, + reward, + account.totalContribution, + totalContribution, + secondDistribution, + ); + + totalDistributed = totalDistributed.add(reward); + participantCount++; + } + } + + if (page * pageSize >= total) break; + page++; + } + + if (isMinuteEnd) { + await this.writeMinuteRecords(currentMinute); + } + + await this.executeBurn(currentSecond); + + const newRemaining = config.remainingDistribution.subtract(totalDistributed); + await this.miningConfigRepository.updateRemainingDistribution(newRemaining); + + const processedKey = `mining:processed:${currentSecond.getTime()}`; + await this.redis.set(processedKey, '1', 2); + + if (isMinuteEnd) { + this.logger.log( + `Minute distribution (fallback): distributed=${totalDistributed.toFixed(8)}, participants=${participantCount}`, + ); + } + } + + /** + * 分配未解锁算力的挖矿收益给总部账户 + */ + private async distributePendingContributions( + currentSecond: Date, + currentMinute: Date, + networkTotalContribution: ShareAmount, + secondDistribution: ShareAmount, + ): Promise<{ totalDistributed: ShareAmount; participantCount: number }> { + let totalDistributed = ShareAmount.zero(); + let participantCount = 0; + + // 获取所有未过期的待解锁算力 + const pendingContributions = await this.prisma.pendingContributionMining.findMany({ + where: { isExpired: false }, + }); + + for (const pending of pendingContributions) { + const contribution = new ShareAmount(pending.amount); + if (contribution.isZero()) continue; + + const reward = this.calculator.calculateUserMiningReward( + contribution, + networkTotalContribution, + secondDistribution, + ); + + if (!reward.isZero()) { + // 收益归总部账户 + await this.systemMiningAccountRepository.mine( + SystemAccountType.HEADQUARTERS, + reward, + `未解锁算力挖矿 - 来源:${pending.sourceAccountSequence} 类型:${pending.contributionType} 应归:${pending.wouldBeAccountSequence || '无'}`, + ); + + // 累积待解锁算力每分钟的挖矿数据到Redis + await this.accumulatePendingMinuteData( + pending.id, + currentMinute, + reward, + pending, + networkTotalContribution, + secondDistribution, + ); + + totalDistributed = totalDistributed.add(reward); + participantCount++; + } + } + + return { totalDistributed, participantCount }; + } + /** * 累积每分钟的挖矿数据到Redis */ @@ -191,6 +384,176 @@ export class MiningDistributionService { await this.redis.set(key, JSON.stringify(accumulated), 120); } + /** + * 累积系统账户每分钟的挖矿数据到Redis + */ + private async accumulateSystemMinuteData( + accountType: SystemAccountType, + minuteTime: Date, + reward: ShareAmount, + accountContribution: ShareAmount, + totalContribution: ShareAmount, + secondDistribution: ShareAmount, + ): Promise { + const key = `mining:system:minute:${minuteTime.getTime()}:${accountType}`; + const existing = await this.redis.get(key); + + let accumulated: { + minedAmount: string; + contributionRatio: string; + totalContribution: string; + secondDistribution: string; + accountContribution: string; + secondCount: number; + }; + + if (existing) { + accumulated = JSON.parse(existing); + accumulated.minedAmount = new Decimal(accumulated.minedAmount).plus(reward.value).toString(); + accumulated.secondCount += 1; + accumulated.contributionRatio = accountContribution.value.dividedBy(totalContribution.value).toString(); + accumulated.totalContribution = totalContribution.value.toString(); + accumulated.secondDistribution = secondDistribution.value.toString(); + accumulated.accountContribution = accountContribution.value.toString(); + } else { + accumulated = { + minedAmount: reward.value.toString(), + contributionRatio: accountContribution.value.dividedBy(totalContribution.value).toString(), + totalContribution: totalContribution.value.toString(), + secondDistribution: secondDistribution.value.toString(), + accountContribution: accountContribution.value.toString(), + secondCount: 1, + }; + } + + await this.redis.set(key, JSON.stringify(accumulated), 120); + } + + /** + * 累积待解锁算力每分钟的挖矿数据到Redis + */ + private async accumulatePendingMinuteData( + pendingId: bigint, + minuteTime: Date, + reward: ShareAmount, + pending: any, + networkTotalContribution: ShareAmount, + secondDistribution: ShareAmount, + ): Promise { + const key = `mining:pending:minute:${minuteTime.getTime()}:${pendingId.toString()}`; + const existing = await this.redis.get(key); + + const contribution = new Decimal(pending.amount); + let accumulated: { + minedAmount: string; + contributionRatio: string; + networkTotalContribution: string; + secondDistribution: string; + contributionAmount: string; + sourceAdoptionId: string; + sourceAccountSequence: string; + wouldBeAccountSequence: string | null; + contributionType: string; + secondCount: number; + }; + + if (existing) { + accumulated = JSON.parse(existing); + accumulated.minedAmount = new Decimal(accumulated.minedAmount).plus(reward.value).toString(); + accumulated.secondCount += 1; + accumulated.contributionRatio = contribution.dividedBy(networkTotalContribution.value).toString(); + accumulated.networkTotalContribution = networkTotalContribution.value.toString(); + accumulated.secondDistribution = secondDistribution.value.toString(); + } else { + accumulated = { + minedAmount: reward.value.toString(), + contributionRatio: contribution.dividedBy(networkTotalContribution.value).toString(), + networkTotalContribution: networkTotalContribution.value.toString(), + secondDistribution: secondDistribution.value.toString(), + contributionAmount: contribution.toString(), + sourceAdoptionId: pending.sourceAdoptionId.toString(), + sourceAccountSequence: pending.sourceAccountSequence, + wouldBeAccountSequence: pending.wouldBeAccountSequence, + contributionType: pending.contributionType, + secondCount: 1, + }; + } + + await this.redis.set(key, JSON.stringify(accumulated), 120); + } + + /** + * 写入系统账户每分钟汇总的挖矿记录 + */ + private async writeSystemMinuteRecords(minuteTime: Date): Promise { + try { + const pattern = `mining:system:minute:${minuteTime.getTime()}:*`; + const keys = await this.redis.keys(pattern); + + for (const key of keys) { + const data = await this.redis.get(key); + if (!data) continue; + + const accumulated = JSON.parse(data); + const accountType = key.split(':').pop() as SystemAccountType; + + await this.prisma.systemMiningRecord.create({ + data: { + accountType, + miningMinute: minuteTime, + contributionRatio: new Decimal(accumulated.contributionRatio), + totalContribution: new Decimal(accumulated.totalContribution), + secondDistribution: new Decimal(accumulated.secondDistribution), + minedAmount: new Decimal(accumulated.minedAmount), + }, + }); + + await this.redis.del(key); + } + } catch (error) { + this.logger.error('Failed to write system minute records', error); + } + } + + /** + * 写入待解锁算力每分钟汇总的挖矿记录 + */ + private async writePendingMinuteRecords(minuteTime: Date): Promise { + try { + const pattern = `mining:pending:minute:${minuteTime.getTime()}:*`; + const keys = await this.redis.keys(pattern); + + for (const key of keys) { + const data = await this.redis.get(key); + if (!data) continue; + + const accumulated = JSON.parse(data); + const pendingId = BigInt(key.split(':').pop()!); + + await this.prisma.pendingMiningRecord.create({ + data: { + pendingContributionId: pendingId, + miningMinute: minuteTime, + sourceAdoptionId: BigInt(accumulated.sourceAdoptionId), + sourceAccountSequence: accumulated.sourceAccountSequence, + wouldBeAccountSequence: accumulated.wouldBeAccountSequence, + contributionType: accumulated.contributionType, + contributionAmount: new Decimal(accumulated.contributionAmount), + networkTotalContribution: new Decimal(accumulated.networkTotalContribution), + contributionRatio: new Decimal(accumulated.contributionRatio), + secondDistribution: new Decimal(accumulated.secondDistribution), + minedAmount: new Decimal(accumulated.minedAmount), + allocatedTo: 'HEADQUARTERS', + }, + }); + + await this.redis.del(key); + } + } catch (error) { + this.logger.error('Failed to write pending minute records', error); + } + } + /** * 写入每分钟汇总的MiningRecord,并发布事件到 Kafka 通知 mining-wallet-service 扣减池余额 */ diff --git a/backend/services/mining-service/src/application/services/network-sync.service.ts b/backend/services/mining-service/src/application/services/network-sync.service.ts new file mode 100644 index 00000000..b9a738c3 --- /dev/null +++ b/backend/services/mining-service/src/application/services/network-sync.service.ts @@ -0,0 +1,198 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { SystemMiningAccountRepository } from '../../infrastructure/persistence/repositories/system-mining-account.repository'; +import { ShareAmount } from '../../domain/value-objects/share-amount.vo'; +import { SystemAccountType } from '@prisma/client'; +import Decimal from 'decimal.js'; + +interface SystemAccountSyncedData { + accountType: string; + name: string; + contributionBalance: string; +} + +interface NetworkProgressUpdatedData { + totalTreeCount: number; + totalAdoptionOrders: number; + totalAdoptedUsers: number; + currentUnit: number; + currentMultiplier: string; + currentContributionPerTree: string; + nextUnitTreeCount: number; +} + +/** + * 全网同步服务 + * 处理系统账户算力同步和全网理论算力更新 + */ +@Injectable() +export class NetworkSyncService { + private readonly logger = new Logger(NetworkSyncService.name); + + constructor( + private readonly prisma: PrismaService, + private readonly systemAccountRepository: SystemMiningAccountRepository, + ) {} + + /** + * 处理系统账户同步事件 + */ + async handleSystemAccountSynced(data: SystemAccountSyncedData): Promise { + try { + const accountType = this.mapAccountType(data.accountType); + if (!accountType) { + this.logger.warn(`Unknown system account type: ${data.accountType}`); + return; + } + + const contribution = new ShareAmount(data.contributionBalance); + await this.systemAccountRepository.updateContribution(accountType, contribution); + + this.logger.log( + `Synced system account ${data.accountType}: contribution=${data.contributionBalance}`, + ); + } catch (error) { + this.logger.error(`Failed to sync system account ${data.accountType}`, error); + throw error; + } + } + + /** + * 处理全网进度更新事件 + */ + async handleNetworkProgressUpdated(data: NetworkProgressUpdatedData): Promise { + try { + // 计算全网理论算力 + const networkTotalContribution = new Decimal(data.totalTreeCount).mul( + data.currentContributionPerTree, + ); + + // 更新 MiningConfig 中的全网理论算力 + const config = await this.prisma.miningConfig.findFirst(); + if (!config) { + this.logger.warn('MiningConfig not found, skipping network progress update'); + return; + } + + await this.prisma.miningConfig.update({ + where: { id: config.id }, + data: { + networkTotalContribution: networkTotalContribution, + totalTreeCount: data.totalTreeCount, + contributionPerTree: new Decimal(data.currentContributionPerTree), + networkLastSyncedAt: new Date(), + }, + }); + + this.logger.log( + `Updated network progress: trees=${data.totalTreeCount}, contribution=${networkTotalContribution.toString()}`, + ); + } catch (error) { + this.logger.error('Failed to update network progress', error); + throw error; + } + } + + /** + * 从 contribution-service 同步全网数据 + * 可通过 HTTP API 手动触发 + */ + async syncFromContributionService(contributionServiceUrl: string): Promise<{ + success: boolean; + message: string; + data?: { + systemAccounts: number; + networkTotalContribution: string; + }; + }> { + try { + // 1. 同步系统账户 + const systemAccountsResponse = await fetch( + `${contributionServiceUrl}/api/v2/admin/system-accounts`, + ); + if (!systemAccountsResponse.ok) { + throw new Error(`Failed to fetch system accounts: ${systemAccountsResponse.status}`); + } + const systemAccountsResult = await systemAccountsResponse.json(); + const systemAccounts = systemAccountsResult.accounts || []; + + for (const account of systemAccounts) { + await this.handleSystemAccountSynced({ + accountType: account.accountType, + name: account.name, + contributionBalance: account.contributionBalance, + }); + } + + // 2. 同步全网进度 + const progressResponse = await fetch( + `${contributionServiceUrl}/api/v2/admin/network-progress`, + ); + if (!progressResponse.ok) { + throw new Error(`Failed to fetch network progress: ${progressResponse.status}`); + } + const progressResult = await progressResponse.json(); + + await this.handleNetworkProgressUpdated({ + totalTreeCount: progressResult.totalTreeCount, + totalAdoptionOrders: progressResult.totalAdoptionOrders, + totalAdoptedUsers: progressResult.totalAdoptedUsers, + currentUnit: progressResult.currentUnit, + currentMultiplier: progressResult.currentMultiplier, + currentContributionPerTree: progressResult.currentContributionPerTree, + nextUnitTreeCount: progressResult.nextUnitTreeCount, + }); + + // 3. 获取最新的 MiningConfig 来返回结果 + const config = await this.prisma.miningConfig.findFirst(); + + return { + success: true, + message: `Synced ${systemAccounts.length} system accounts and network progress`, + data: { + systemAccounts: systemAccounts.length, + networkTotalContribution: config?.networkTotalContribution?.toString() || '0', + }, + }; + } catch (error) { + this.logger.error('Failed to sync from contribution service', error); + return { + success: false, + message: `Sync failed: ${error.message}`, + }; + } + } + + /** + * 获取全网理论算力 + */ + async getNetworkTotalContribution(): Promise { + const config = await this.prisma.miningConfig.findFirst(); + if (!config || !config.networkTotalContribution) { + return ShareAmount.zero(); + } + return new ShareAmount(config.networkTotalContribution); + } + + /** + * 获取系统账户总算力 + */ + async getSystemAccountsTotalContribution(): Promise { + return this.systemAccountRepository.getTotalContribution(); + } + + private mapAccountType(type: string): SystemAccountType | null { + switch (type.toUpperCase()) { + case 'OPERATION': + return SystemAccountType.OPERATION; + case 'PROVINCE': + return SystemAccountType.PROVINCE; + case 'CITY': + return SystemAccountType.CITY; + case 'HEADQUARTERS': + return SystemAccountType.HEADQUARTERS; + default: + return null; + } + } +} diff --git a/backend/services/mining-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-service/src/infrastructure/infrastructure.module.ts index 27ea546e..383c37e8 100644 --- a/backend/services/mining-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-service/src/infrastructure/infrastructure.module.ts @@ -7,6 +7,7 @@ import { MiningConfigRepository } from './persistence/repositories/mining-config import { BlackHoleRepository } from './persistence/repositories/black-hole.repository'; import { PriceSnapshotRepository } from './persistence/repositories/price-snapshot.repository'; import { OutboxRepository } from './persistence/repositories/outbox.repository'; +import { SystemMiningAccountRepository } from './persistence/repositories/system-mining-account.repository'; import { RedisService } from './redis/redis.service'; import { KafkaProducerService } from './kafka/kafka-producer.service'; @@ -42,6 +43,7 @@ import { KafkaProducerService } from './kafka/kafka-producer.service'; BlackHoleRepository, PriceSnapshotRepository, OutboxRepository, + SystemMiningAccountRepository, KafkaProducerService, { provide: 'REDIS_OPTIONS', @@ -61,6 +63,7 @@ import { KafkaProducerService } from './kafka/kafka-producer.service'; BlackHoleRepository, PriceSnapshotRepository, OutboxRepository, + SystemMiningAccountRepository, KafkaProducerService, RedisService, ClientsModule, diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-config.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-config.repository.ts index 3552bf70..97fee2dd 100644 --- a/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-config.repository.ts +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/mining-config.repository.ts @@ -13,6 +13,11 @@ export interface MiningConfigEntity { secondDistribution: ShareAmount; isActive: boolean; activatedAt: Date | null; + // 全网理论算力 + networkTotalContribution: ShareAmount; + totalTreeCount: number; + contributionPerTree: ShareAmount; + networkLastSyncedAt: Date | null; } @Injectable() @@ -90,6 +95,14 @@ export class MiningConfigRepository { }); } + async getNetworkTotalContribution(): Promise { + const config = await this.prisma.miningConfig.findFirst(); + if (!config || !config.networkTotalContribution) { + return ShareAmount.zero(); + } + return new ShareAmount(config.networkTotalContribution); + } + private toDomain(record: any): MiningConfigEntity { return { id: record.id, @@ -102,6 +115,10 @@ export class MiningConfigRepository { secondDistribution: new ShareAmount(record.secondDistribution), isActive: record.isActive, activatedAt: record.activatedAt, + networkTotalContribution: new ShareAmount(record.networkTotalContribution || 0), + totalTreeCount: record.totalTreeCount || 0, + contributionPerTree: new ShareAmount(record.contributionPerTree || 22617), + networkLastSyncedAt: record.networkLastSyncedAt, }; } } diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts new file mode 100644 index 00000000..0ff88234 --- /dev/null +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/system-mining-account.repository.ts @@ -0,0 +1,182 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { ShareAmount } from '../../../domain/value-objects/share-amount.vo'; +import { SystemAccountType } from '@prisma/client'; + +export interface SystemMiningAccountSnapshot { + accountType: SystemAccountType; + name: string; + totalMined: ShareAmount; + availableBalance: ShareAmount; + totalContribution: ShareAmount; + lastSyncedAt: Date | null; +} + +@Injectable() +export class SystemMiningAccountRepository { + constructor(private readonly prisma: PrismaService) {} + + async findByType(accountType: SystemAccountType): Promise { + const record = await this.prisma.systemMiningAccount.findUnique({ + where: { accountType }, + }); + + if (!record) { + return null; + } + + return this.toSnapshot(record); + } + + async findAll(): Promise { + const records = await this.prisma.systemMiningAccount.findMany({ + orderBy: { accountType: 'asc' }, + }); + + return records.map((r) => this.toSnapshot(r)); + } + + async ensureSystemAccountsExist(): Promise { + const accounts = [ + { accountType: SystemAccountType.OPERATION, name: '运营账户' }, + { accountType: SystemAccountType.PROVINCE, name: '省公司账户' }, + { accountType: SystemAccountType.CITY, name: '市公司账户' }, + { accountType: SystemAccountType.HEADQUARTERS, name: '总部账户' }, + ]; + + for (const account of accounts) { + await this.prisma.systemMiningAccount.upsert({ + where: { accountType: account.accountType }, + create: { + accountType: account.accountType, + name: account.name, + totalMined: 0, + availableBalance: 0, + totalContribution: 0, + }, + update: {}, + }); + } + } + + async updateContribution( + accountType: SystemAccountType, + contribution: ShareAmount, + ): Promise { + await this.prisma.systemMiningAccount.upsert({ + where: { accountType }, + create: { + accountType, + name: this.getAccountName(accountType), + totalContribution: contribution.value, + lastSyncedAt: new Date(), + }, + update: { + totalContribution: contribution.value, + lastSyncedAt: new Date(), + }, + }); + } + + async getTotalContribution(): Promise { + const result = await this.prisma.systemMiningAccount.aggregate({ + _sum: { totalContribution: true }, + }); + + return new ShareAmount(result._sum.totalContribution || 0); + } + + async mine( + accountType: SystemAccountType, + amount: ShareAmount, + memo: string, + ): Promise { + await this.prisma.$transaction(async (tx) => { + const account = await tx.systemMiningAccount.findUnique({ + where: { accountType }, + }); + + if (!account) { + throw new Error(`System account ${accountType} not found`); + } + + const balanceBefore = account.availableBalance; + const balanceAfter = balanceBefore.plus(amount.value); + const totalMined = account.totalMined.plus(amount.value); + + await tx.systemMiningAccount.update({ + where: { accountType }, + data: { + totalMined, + availableBalance: balanceAfter, + }, + }); + + await tx.systemMiningTransaction.create({ + data: { + accountType, + type: 'MINE', + amount: amount.value, + balanceBefore, + balanceAfter, + memo, + }, + }); + }); + } + + async saveMinuteRecord( + accountType: SystemAccountType, + miningMinute: Date, + contributionRatio: ShareAmount, + totalContribution: ShareAmount, + secondDistribution: ShareAmount, + minedAmount: ShareAmount, + ): Promise { + await this.prisma.systemMiningRecord.upsert({ + where: { + accountType_miningMinute: { + accountType, + miningMinute, + }, + }, + create: { + accountType, + miningMinute, + contributionRatio: contributionRatio.value, + totalContribution: totalContribution.value, + secondDistribution: secondDistribution.value, + minedAmount: minedAmount.value, + }, + update: { + minedAmount: { increment: minedAmount.value }, + }, + }); + } + + private getAccountName(accountType: SystemAccountType): string { + switch (accountType) { + case SystemAccountType.OPERATION: + return '运营账户'; + case SystemAccountType.PROVINCE: + return '省公司账户'; + case SystemAccountType.CITY: + return '市公司账户'; + case SystemAccountType.HEADQUARTERS: + return '总部账户'; + default: + return accountType; + } + } + + private toSnapshot(record: any): SystemMiningAccountSnapshot { + return { + accountType: record.accountType, + name: record.name, + totalMined: new ShareAmount(record.totalMined), + availableBalance: new ShareAmount(record.availableBalance), + totalContribution: new ShareAmount(record.totalContribution), + lastSyncedAt: record.lastSyncedAt, + }; + } +}