From 4817d92507ef712cb9a3087651f379d26dff3b90 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 3 Feb 2026 05:25:56 -0800 Subject: [PATCH] =?UTF-8?q?feat(adoption-injection):=20=E8=AE=A4=E7=A7=8D?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=20fUSDT=20=E6=B3=A8=E5=85=A5=E5=81=9A?= =?UTF-8?q?=E5=B8=82=E5=95=86=E9=92=B1=E5=8C=85=20+=20CDC=20outbox=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## contribution-service 改动 ### 1. CDC Outbox 实时发布修复 - adoption-synced.handler: handleCreate/handleUpdate 在同步数据后立即写入 AdoptionSynced outbox 事件,确保 mining-admin-service 实时接收认种同步 - referral-synced.handler: 同理,写入 ReferralSynced outbox 事件 - 之前只有手动调用 /admin/adoptions/publish-all 才会创建 outbox 事件 ### 2. 认种 fUSDT 注入事件 - 新增 AdoptionFusdtInjectionRequestedEvent 事件类 - 当认种状态变为 MINING_ENABLED 时触发 - 写入 outbox,topic: contribution.adoptionfusdtinjectionrequested - payload 含: adoptionId, accountSequence, treeCount, adoptionDate, amount - 转账金额 = treeCount × 5760 fUSDT ## mining-blockchain-service 改动 ### 3. fUSDT 注入钱包 MPC 签名支持 - mpc-signing.client: 新增 FUSDT_INJECTION_WALLET_USERNAME/ADDRESS 配置 isFusdtInjectionConfigured(), signMessageAsFusdtInjection() 等方法 - erc20-transfer.service: IMpcSigningClient 接口增加注入钱包方法 新增 transferFusdtAsInjectionWallet() 转账方法(含余额检查、MPC签名、广播) ### 4. 认种注入 Kafka 消费者 - adoption-injection-consumer.service: 订阅 cdc.contribution.outbox 过滤 AdoptionFusdtInjectionRequested 事件,解析 Debezium 扁平化消息格式 - adoption-injection.handler: 处理注入事件 - 幂等性检查(已确认的跳过) - 写入 adoption_injection_records 分类账(PROCESSING 状态) - 从注入钱包转 fUSDT 到做市商钱包 - 成功: markConfirmed (txHash, blockNumber, gasUsed) - 失败: markFailed (errorMessage),支持重试 - 发布 confirmed/failed Kafka 事件 ### 5. 分类账持久化 - Prisma schema: 新增 AdoptionInjectionRecord 模型 (adoption_injection_records 表) 字段: adoption_id, account_sequence, tree_count, adoption_date, from/to_address, amount, chain_type, tx_hash, block_number, gas_used, memo, status, error_message 索引: uk_adoption_id (唯一), idx_injection_account, idx_injection_status 等 - migration: 20250203000000_add_adoption_injection_records - Repository 接口 + 实现: save, findByAdoptionId, markConfirmed, markFailed ### 6. 启动余额检查 - main.ts: 启动时异步查询注入钱包 fUSDT 余额,余额为 0 时输出警告日志 新增注入钱包配置验证 ## 部署前需添加环境变量 (.env) FUSDT_INJECTION_WALLET_USERNAME=wallet-bff20b69 FUSDT_INJECTION_WALLET_ADDRESS=0x7BDB89dA47F16869c90446C41e70A00dDc432DBB Co-Authored-By: Claude Opus 4.5 --- .../event-handlers/adoption-synced.handler.ts | 90 ++++++- .../event-handlers/referral-synced.handler.ts | 44 ++++ ...doption-fusdt-injection-requested.event.ts | 36 +++ .../src/domain/events/index.ts | 1 + .../migration.sql | 37 +++ .../prisma/schema.prisma | 42 ++++ .../src/application/application.module.ts | 3 + .../adoption-injection.handler.ts | 192 +++++++++++++++ ...n-injection-record.repository.interface.ts | 51 ++++ .../src/domain/repositories/index.ts | 1 + .../domain/services/erc20-transfer.service.ts | 221 ++++++++++++++++++ .../infrastructure/infrastructure.module.ts | 11 +- .../adoption-injection-consumer.service.ts | 169 ++++++++++++++ .../src/infrastructure/kafka/index.ts | 1 + .../infrastructure/mpc/mpc-signing.client.ts | 31 +++ ...option-injection-record.repository.impl.ts | 104 +++++++++ .../persistence/repositories/index.ts | 1 + .../mining-blockchain-service/src/main.ts | 50 ++++ 18 files changed, 1082 insertions(+), 3 deletions(-) create mode 100644 backend/services/contribution-service/src/domain/events/adoption-fusdt-injection-requested.event.ts create mode 100644 backend/services/mining-blockchain-service/prisma/migrations/20250203000000_add_adoption_injection_records/migration.sql create mode 100644 backend/services/mining-blockchain-service/src/application/event-handlers/adoption-injection.handler.ts create mode 100644 backend/services/mining-blockchain-service/src/domain/repositories/adoption-injection-record.repository.interface.ts create mode 100644 backend/services/mining-blockchain-service/src/infrastructure/kafka/adoption-injection-consumer.service.ts create mode 100644 backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/adoption-injection-record.repository.impl.ts diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index 6db467f2..60b9251a 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -1,6 +1,8 @@ import { Injectable, Logger } from '@nestjs/common'; import Decimal from 'decimal.js'; import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; +import { AdoptionSyncedEvent } from '../../domain/events/adoption-synced.event'; +import { AdoptionFusdtInjectionRequestedEvent } from '../../domain/events/adoption-fusdt-injection-requested.event'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; import { ContributionRateService } from '../services/contribution-rate.service'; @@ -148,8 +150,15 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${status}`); - // 只有 MINING_ENABLED 状态才触发算力计算 + // 发布 AdoptionSynced outbox 事件,实时同步到 mining-admin-service + await this.publishAdoptionOutboxEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt), status, contributionPerTree); + + // 只有 MINING_ENABLED 状态才触发算力计算和 fUSDT 注入 const needsCalculation = status === 'MINING_ENABLED'; + if (needsCalculation) { + await this.publishFusdtInjectionEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt)); + } + return { originalAdoptionId, needsCalculation, @@ -209,9 +218,15 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${newStatus}`); - // 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算 + // 发布 AdoptionSynced outbox 事件,实时同步到 mining-admin-service + await this.publishAdoptionOutboxEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt), newStatus, contributionPerTree); + + // 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算和 fUSDT 注入 const statusChangedToMiningEnabled = newStatus === 'MINING_ENABLED' && oldStatus !== 'MINING_ENABLED'; const needsCalculation = statusChangedToMiningEnabled && !existingAdoption?.contributionDistributed; + if (needsCalculation) { + await this.publishFusdtInjectionEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt)); + } return { originalAdoptionId, @@ -229,4 +244,75 @@ export class AdoptionSyncedHandler { // 但通常不会发生删除操作 this.logger.warn(`[CDC] Adoption delete event received: ${orderId}. This may require contribution rollback.`); } + + /** + * 在事务内发布 AdoptionSynced outbox 事件 + * 确保 mining-admin-service 能实时收到认种数据变更 + */ + private async publishAdoptionOutboxEvent( + tx: TransactionClient, + originalAdoptionId: bigint, + accountSequence: string, + treeCount: number, + adoptionDate: Date, + status: string | null, + contributionPerTree: Decimal, + ): Promise { + const event = new AdoptionSyncedEvent( + originalAdoptionId, + accountSequence, + treeCount, + adoptionDate, + status, + contributionPerTree.toString(), + ); + + await tx.outboxEvent.create({ + data: { + aggregateType: AdoptionSyncedEvent.AGGREGATE_TYPE, + aggregateId: originalAdoptionId.toString(), + eventType: AdoptionSyncedEvent.EVENT_TYPE, + topic: 'contribution.adoptionsynced', + key: originalAdoptionId.toString(), + payload: event.toPayload(), + status: 'PENDING', + }, + }); + + this.logger.debug(`[CDC] Published AdoptionSynced outbox event: orderId=${originalAdoptionId}`); + } + + /** + * 在事务内发布 fUSDT 注入请求到 outbox + * 当认种状态变为 MINING_ENABLED 时触发 + * mining-blockchain-service 会消费此事件并执行区块链转账 + */ + private async publishFusdtInjectionEvent( + tx: TransactionClient, + originalAdoptionId: bigint, + accountSequence: string, + treeCount: number, + adoptionDate: Date, + ): Promise { + const event = new AdoptionFusdtInjectionRequestedEvent( + originalAdoptionId, + accountSequence, + treeCount, + adoptionDate, + ); + + await tx.outboxEvent.create({ + data: { + aggregateType: AdoptionFusdtInjectionRequestedEvent.AGGREGATE_TYPE, + aggregateId: originalAdoptionId.toString(), + eventType: AdoptionFusdtInjectionRequestedEvent.EVENT_TYPE, + topic: 'contribution.adoptionfusdtinjectionrequested', + key: originalAdoptionId.toString(), + payload: event.toPayload(), + status: 'PENDING', + }, + }); + + this.logger.log(`[CDC] Published fUSDT injection request: orderId=${originalAdoptionId}, amount=${event.amount} (${treeCount} trees × ${AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE})`); + } } diff --git a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts index 834243e4..d10eb2b2 100644 --- a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts @@ -1,5 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; +import { ReferralSyncedEvent } from '../../domain/events/referral-synced.event'; /** * 引荐关系 CDC 事件处理器 @@ -97,6 +98,9 @@ export class ReferralSyncedHandler { }); this.logger.log(`[CDC] Referral synced: ${accountSequence}, referrerId=${referrerUserId || 'none'}, depth=${depth}`); + + // 发布 ReferralSynced outbox 事件,实时同步到 mining-admin-service + await this.publishReferralOutboxEvent(tx, accountSequence, referrerAccountSequence, referrerUserId ? BigInt(referrerUserId) : null, originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth); } private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { @@ -156,6 +160,9 @@ export class ReferralSyncedHandler { }); this.logger.log(`[CDC] Referral synced: ${accountSequence}`); + + // 发布 ReferralSynced outbox 事件,实时同步到 mining-admin-service + await this.publishReferralOutboxEvent(tx, accountSequence, referrerAccountSequence, referrerUserId ? BigInt(referrerUserId) : null, originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth); } private async handleDelete(data: any): Promise { @@ -168,6 +175,43 @@ export class ReferralSyncedHandler { this.logger.warn(`[CDC] Referral delete event received: ${accountSequence} (not processed, keeping history)`); } + /** + * 在事务内发布 ReferralSynced outbox 事件 + * 确保 mining-admin-service 能实时收到推荐关系变更 + */ + private async publishReferralOutboxEvent( + tx: TransactionClient, + accountSequence: string, + referrerAccountSequence: string | null, + referrerUserId: bigint | null, + originalUserId: bigint | null, + ancestorPath: string | null, + depth: number, + ): Promise { + const event = new ReferralSyncedEvent( + accountSequence, + referrerAccountSequence, + referrerUserId, + originalUserId, + ancestorPath, + depth, + ); + + await tx.outboxEvent.create({ + data: { + aggregateType: ReferralSyncedEvent.AGGREGATE_TYPE, + aggregateId: accountSequence, + eventType: ReferralSyncedEvent.EVENT_TYPE, + topic: 'contribution.referralsynced', + key: accountSequence, + payload: event.toPayload(), + status: 'PENDING', + }, + }); + + this.logger.debug(`[CDC] Published ReferralSynced outbox event: account=${accountSequence}`); + } + /** * 将 BigInt[] 数组转换为逗号分隔的字符串 * @param ancestorPath BigInt 数组或 null diff --git a/backend/services/contribution-service/src/domain/events/adoption-fusdt-injection-requested.event.ts b/backend/services/contribution-service/src/domain/events/adoption-fusdt-injection-requested.event.ts new file mode 100644 index 00000000..6df9ee0b --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/adoption-fusdt-injection-requested.event.ts @@ -0,0 +1,36 @@ +/** + * 认种 fUSDT 注入请求事件 + * 当认种状态变为 MINING_ENABLED 时触发 + * 通知 mining-blockchain-service 从注入钱包向做市商钱包转入 fUSDT + * + * 转账金额 = treeCount × 5760 + */ +export class AdoptionFusdtInjectionRequestedEvent { + static readonly EVENT_TYPE = 'AdoptionFusdtInjectionRequested'; + static readonly AGGREGATE_TYPE = 'Adoption'; + /** 每棵树对应的 fUSDT 注入金额 */ + static readonly FUSDT_PER_TREE = 5760; + + constructor( + public readonly originalAdoptionId: bigint, + public readonly accountSequence: string, + public readonly treeCount: number, + public readonly adoptionDate: Date, + ) {} + + get amount(): number { + return this.treeCount * AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE; + } + + toPayload(): Record { + return { + eventType: AdoptionFusdtInjectionRequestedEvent.EVENT_TYPE, + originalAdoptionId: this.originalAdoptionId.toString(), + accountSequence: this.accountSequence, + treeCount: this.treeCount, + adoptionDate: this.adoptionDate.toISOString(), + amount: this.amount.toString(), + fusdtPerTree: AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE, + }; + } +} diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index 0b8cce39..37213acb 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -9,3 +9,4 @@ export * from './network-progress-updated.event'; export * from './system-account-synced.event'; export * from './system-contribution-record-created.event'; export * from './unallocated-contribution-synced.event'; +export * from './adoption-fusdt-injection-requested.event'; diff --git a/backend/services/mining-blockchain-service/prisma/migrations/20250203000000_add_adoption_injection_records/migration.sql b/backend/services/mining-blockchain-service/prisma/migrations/20250203000000_add_adoption_injection_records/migration.sql new file mode 100644 index 00000000..b8b27b47 --- /dev/null +++ b/backend/services/mining-blockchain-service/prisma/migrations/20250203000000_add_adoption_injection_records/migration.sql @@ -0,0 +1,37 @@ +-- CreateTable +CREATE TABLE "adoption_injection_records" ( + "record_id" BIGSERIAL NOT NULL, + "adoption_id" VARCHAR(100) NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "tree_count" INTEGER NOT NULL, + "adoption_date" TIMESTAMP(3) NOT NULL, + "from_address" VARCHAR(42) NOT NULL, + "to_address" VARCHAR(42) NOT NULL, + "amount" DECIMAL(36,8) NOT NULL, + "chain_type" VARCHAR(20) NOT NULL, + "tx_hash" VARCHAR(66), + "block_number" BIGINT, + "gas_used" VARCHAR(50), + "memo" TEXT, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "error_message" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "adoption_injection_records_pkey" PRIMARY KEY ("record_id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "uk_adoption_id" ON "adoption_injection_records"("adoption_id"); + +-- CreateIndex +CREATE INDEX "idx_injection_account" ON "adoption_injection_records"("account_sequence"); + +-- CreateIndex +CREATE INDEX "idx_injection_status" ON "adoption_injection_records"("status"); + +-- CreateIndex +CREATE INDEX "idx_injection_tx_hash" ON "adoption_injection_records"("tx_hash"); + +-- CreateIndex +CREATE INDEX "idx_injection_created" ON "adoption_injection_records"("created_at"); diff --git a/backend/services/mining-blockchain-service/prisma/schema.prisma b/backend/services/mining-blockchain-service/prisma/schema.prisma index 706cd374..9ab31142 100644 --- a/backend/services/mining-blockchain-service/prisma/schema.prisma +++ b/backend/services/mining-blockchain-service/prisma/schema.prisma @@ -328,6 +328,48 @@ model MarketMakerBlockCheckpoint { @@map("market_maker_block_checkpoints") } +// ============================================ +// 认种 fUSDT 注入分类账(对账用) +// 记录每笔认种触发的 fUSDT 自动注入转账 +// ============================================ +model AdoptionInjectionRecord { + id BigInt @id @default(autoincrement()) @map("record_id") + + // 认种信息 + adoptionId String @map("adoption_id") @db.VarChar(100) // 1.0 原始认种ID + accountSequence String @map("account_sequence") @db.VarChar(20) // 用户账户序列号 + treeCount Int @map("tree_count") // 认种数量(棵) + adoptionDate DateTime @map("adoption_date") // 认种时间 + + // 转账信息 + fromAddress String @map("from_address") @db.VarChar(42) // 注入钱包地址 + toAddress String @map("to_address") @db.VarChar(42) // 做市商钱包地址 + amount Decimal @db.Decimal(36, 8) // 转账金额 (treeCount × 5760) + chainType String @map("chain_type") @db.VarChar(20) // KAVA + + // 链上结果 + txHash String? @map("tx_hash") @db.VarChar(66) + blockNumber BigInt? @map("block_number") + gasUsed String? @map("gas_used") @db.VarChar(50) + + // 分类账备注 + memo String? @db.Text + + // 状态: PENDING → PROCESSING → CONFIRMED / FAILED + status String @default("PENDING") @db.VarChar(20) + errorMessage String? @map("error_message") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@unique([adoptionId], name: "uk_adoption_id") + @@index([accountSequence], name: "idx_injection_account") + @@index([status], name: "idx_injection_status") + @@index([txHash], name: "idx_injection_tx_hash") + @@index([createdAt], name: "idx_injection_created") + @@map("adoption_injection_records") +} + // ============================================ // 池账户充值交易表 // 记录检测到的100亿销毁池和200万挖矿池的 fUSDT 充值 diff --git a/backend/services/mining-blockchain-service/src/application/application.module.ts b/backend/services/mining-blockchain-service/src/application/application.module.ts index c1e0980f..f2a401e7 100644 --- a/backend/services/mining-blockchain-service/src/application/application.module.ts +++ b/backend/services/mining-blockchain-service/src/application/application.module.ts @@ -6,6 +6,7 @@ import { MarketMakerDepositDetectionService } from './services/market-maker-depo import { PoolAccountDepositDetectionService } from './services/pool-account-deposit-detection.service'; import { OutboxPublisherService } from './services/outbox-publisher.service'; import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service'; +import { AdoptionInjectionHandler } from './event-handlers/adoption-injection.handler'; @Module({ imports: [InfrastructureModule, DomainModule], @@ -20,6 +21,8 @@ import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-co OutboxPublisherService, // 充值 ACK 消费者(接收 wallet-service 的确认回执) DepositAckConsumerService, + // 认种 fUSDT 注入处理器(监听认种事件,自动转积分值到做市商) + AdoptionInjectionHandler, ], exports: [OutboxPublisherService], }) diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/adoption-injection.handler.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/adoption-injection.handler.ts new file mode 100644 index 00000000..02d607db --- /dev/null +++ b/backend/services/mining-blockchain-service/src/application/event-handlers/adoption-injection.handler.ts @@ -0,0 +1,192 @@ +/** + * Adoption fUSDT Injection Handler + * + * When an adoption reaches MINING_ENABLED status, this handler: + * 1. Receives the AdoptionFusdtInjectionRequested event via Kafka + * 2. Creates a PENDING ledger record in adoption_injection_records + * 3. Transfers fUSDT from injection wallet to fUSDT market maker wallet + * 4. Updates ledger record with tx result (CONFIRMED or FAILED) + * 5. Publishes confirmation/failure Kafka event + * + * Amount = treeCount x 5760 + * Ledger memo includes: user ID, adoption quantity, adoption time, transfer amount + */ + +import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common'; +import { + AdoptionInjectionConsumerService, + AdoptionInjectionPayload, +} from '@/infrastructure/kafka/adoption-injection-consumer.service'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { Erc20TransferService } from '@/domain/services/erc20-transfer.service'; +import { MpcSigningClient } from '@/infrastructure/mpc/mpc-signing.client'; +import { ChainTypeEnum } from '@/domain/enums'; +import { + ADOPTION_INJECTION_RECORD_REPOSITORY, + IAdoptionInjectionRecordRepository, +} from '@/domain/repositories'; + +@Injectable() +export class AdoptionInjectionHandler implements OnModuleInit { + private readonly logger = new Logger(AdoptionInjectionHandler.name); + + constructor( + private readonly injectionConsumer: AdoptionInjectionConsumerService, + private readonly eventPublisher: EventPublisherService, + private readonly transferService: Erc20TransferService, + private readonly mpcSigningClient: MpcSigningClient, + @Inject(ADOPTION_INJECTION_RECORD_REPOSITORY) + private readonly injectionRepo: IAdoptionInjectionRecordRepository, + ) {} + + onModuleInit() { + this.injectionConsumer.onAdoptionInjectionRequested( + this.handleInjection.bind(this), + ); + this.logger.log(`[INIT] AdoptionInjectionHandler registered`); + } + + private async handleInjection(payload: AdoptionInjectionPayload): Promise { + this.logger.log(`[HANDLE] ========== Adoption fUSDT Injection ==========`); + this.logger.log(`[HANDLE] adoptionId: ${payload.originalAdoptionId}`); + this.logger.log(`[HANDLE] accountSequence: ${payload.accountSequence}`); + this.logger.log(`[HANDLE] treeCount: ${payload.treeCount}`); + this.logger.log(`[HANDLE] adoptionDate: ${payload.adoptionDate}`); + this.logger.log(`[HANDLE] amount: ${payload.amount} fUSDT`); + + const chainType = ChainTypeEnum.KAVA; + + // Step 1: 幂等性检查 - 是否已处理过此认种 + const existing = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId); + if (existing && existing.status === 'CONFIRMED') { + this.logger.warn(`[HANDLE] Adoption ${payload.originalAdoptionId} already injected (txHash=${existing.txHash}), skipping`); + return; + } + + try { + // Step 2: Check injection wallet is configured + if (!this.transferService.isFusdtInjectionConfigured(chainType)) { + throw new Error('fUSDT Injection wallet not configured'); + } + + // Step 3: Get addresses + const injectionAddress = this.transferService.getFusdtInjectionAddress(); + const marketMakerAddress = this.mpcSigningClient.getFusdtMarketMakerAddress(); + if (!injectionAddress || !marketMakerAddress) { + throw new Error('fUSDT Injection or Market Maker address not configured'); + } + + // Step 4: Build ledger memo + const memo = [ + `认种注入(Adoption Injection)`, + `用户: ${payload.accountSequence}`, + `认种ID: ${payload.originalAdoptionId}`, + `认种数量: ${payload.treeCount}`, + `认种时间: ${payload.adoptionDate}`, + `转账金额: ${payload.amount} fUSDT`, + `单价: ${payload.fusdtPerTree} fUSDT/棵`, + ].join(' | '); + this.logger.log(`[HANDLE] Memo: ${memo}`); + + // Step 5: 写入分类账 - PENDING 状态(或更新已有的 FAILED 记录) + let ledgerRecord = existing; + if (!ledgerRecord) { + ledgerRecord = await this.injectionRepo.save({ + adoptionId: payload.originalAdoptionId, + accountSequence: payload.accountSequence, + treeCount: payload.treeCount, + adoptionDate: new Date(payload.adoptionDate), + fromAddress: injectionAddress, + toAddress: marketMakerAddress, + amount: payload.amount, + chainType: 'KAVA', + memo, + status: 'PROCESSING', + }); + this.logger.log(`[LEDGER] Created injection record id=${ledgerRecord.id}, status=PROCESSING`); + } else { + // 重试之前失败的记录 + ledgerRecord = await this.injectionRepo.save({ + ...ledgerRecord, + status: 'PROCESSING', + errorMessage: null, + }); + this.logger.log(`[LEDGER] Retrying failed injection record id=${ledgerRecord.id}`); + } + + // Step 6: Execute fUSDT transfer + this.logger.log(`[PROCESS] Transferring ${payload.amount} fUSDT to market maker ${marketMakerAddress}`); + const result = await this.transferService.transferFusdtAsInjectionWallet( + chainType, + marketMakerAddress, + payload.amount, + memo, + ); + + if (result.success && result.txHash) { + this.logger.log(`[SUCCESS] Adoption injection transfer confirmed!`); + this.logger.log(`[SUCCESS] TxHash: ${result.txHash}`); + this.logger.log(`[SUCCESS] Block: ${result.blockNumber}`); + + // Step 7a: 更新分类账 - CONFIRMED + await this.injectionRepo.markConfirmed( + ledgerRecord.id!, + result.txHash, + BigInt(result.blockNumber || 0), + result.gasUsed || '0', + ); + this.logger.log(`[LEDGER] Injection record id=${ledgerRecord.id} marked CONFIRMED`); + + // Publish success event + await this.eventPublisher.publish({ + eventType: 'mining_blockchain.adoption_injection.confirmed', + toPayload: () => ({ + originalAdoptionId: payload.originalAdoptionId, + accountSequence: payload.accountSequence, + treeCount: payload.treeCount, + adoptionDate: payload.adoptionDate, + amount: payload.amount, + txHash: result.txHash, + blockNumber: result.blockNumber, + memo, + }), + eventId: `adoption-injection-confirmed-${payload.originalAdoptionId}-${Date.now()}`, + occurredAt: new Date(), + }); + } else { + throw new Error(result.error || 'Injection transfer failed'); + } + } catch (error) { + this.logger.error( + `[ERROR] Adoption injection transfer failed for adoptionId=${payload.originalAdoptionId}`, + error, + ); + + // Step 7b: 更新分类账 - FAILED + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + if (existing?.id || (await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId))?.id) { + const record = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId); + if (record?.id) { + await this.injectionRepo.markFailed(record.id, errorMsg); + this.logger.log(`[LEDGER] Injection record id=${record.id} marked FAILED: ${errorMsg}`); + } + } + + // Publish failure event + await this.eventPublisher.publish({ + eventType: 'mining_blockchain.adoption_injection.failed', + toPayload: () => ({ + originalAdoptionId: payload.originalAdoptionId, + accountSequence: payload.accountSequence, + treeCount: payload.treeCount, + amount: payload.amount, + error: errorMsg, + }), + eventId: `adoption-injection-failed-${payload.originalAdoptionId}-${Date.now()}`, + occurredAt: new Date(), + }); + + throw error; + } + } +} diff --git a/backend/services/mining-blockchain-service/src/domain/repositories/adoption-injection-record.repository.interface.ts b/backend/services/mining-blockchain-service/src/domain/repositories/adoption-injection-record.repository.interface.ts new file mode 100644 index 00000000..ff48f718 --- /dev/null +++ b/backend/services/mining-blockchain-service/src/domain/repositories/adoption-injection-record.repository.interface.ts @@ -0,0 +1,51 @@ +export const ADOPTION_INJECTION_RECORD_REPOSITORY = Symbol('ADOPTION_INJECTION_RECORD_REPOSITORY'); + +/** + * 认种 fUSDT 注入分类账 DTO + */ +export interface AdoptionInjectionRecordDto { + id?: bigint; + adoptionId: string; + accountSequence: string; + treeCount: number; + adoptionDate: Date; + fromAddress: string; + toAddress: string; + amount: string; + chainType: string; + txHash?: string | null; + blockNumber?: bigint | null; + gasUsed?: string | null; + memo?: string | null; + status: string; + errorMessage?: string | null; + createdAt?: Date; + updatedAt?: Date; +} + +export interface IAdoptionInjectionRecordRepository { + /** + * 保存注入记录(创建或更新) + */ + save(record: AdoptionInjectionRecordDto): Promise; + + /** + * 根据认种ID查找(幂等性检查) + */ + findByAdoptionId(adoptionId: string): Promise; + + /** + * 根据用户账户查找所有注入记录 + */ + findByAccountSequence(accountSequence: string): Promise; + + /** + * 更新转账结果(成功) + */ + markConfirmed(id: bigint, txHash: string, blockNumber: bigint, gasUsed: string): Promise; + + /** + * 更新转账结果(失败) + */ + markFailed(id: bigint, errorMessage: string): Promise; +} diff --git a/backend/services/mining-blockchain-service/src/domain/repositories/index.ts b/backend/services/mining-blockchain-service/src/domain/repositories/index.ts index 8fc16459..79c8aad7 100644 --- a/backend/services/mining-blockchain-service/src/domain/repositories/index.ts +++ b/backend/services/mining-blockchain-service/src/domain/repositories/index.ts @@ -5,3 +5,4 @@ export * from './transaction-request.repository.interface'; export * from './outbox-event.repository.interface'; export * from './market-maker-deposit.repository.interface'; export * from './pool-account-deposit.repository.interface'; +export * from './adoption-injection-record.repository.interface'; diff --git a/backend/services/mining-blockchain-service/src/domain/services/erc20-transfer.service.ts b/backend/services/mining-blockchain-service/src/domain/services/erc20-transfer.service.ts index 80caf1dc..5353e3bf 100644 --- a/backend/services/mining-blockchain-service/src/domain/services/erc20-transfer.service.ts +++ b/backend/services/mining-blockchain-service/src/domain/services/erc20-transfer.service.ts @@ -55,6 +55,10 @@ export interface IMpcSigningClient { isMiningPoolConfigured(): boolean; getMiningPoolAddress(): string; signMessageAsMiningPool(messageHash: string): Promise; + // fUSDT 注入钱包(认种自动转积分值到做市商) + isFusdtInjectionConfigured(): boolean; + getFusdtInjectionAddress(): string; + signMessageAsFusdtInjection(messageHash: string): Promise; } // 池账户类型(用于 transferTokenAsPoolAccount) @@ -81,6 +85,8 @@ export class Erc20TransferService { private readonly burnPoolAddress: string; // 200万挖矿池钱包地址 private readonly miningPoolAddress: string; + // fUSDT 注入钱包地址(认种自动转积分值到做市商) + private readonly fusdtInjectionAddress: string; private mpcSigningClient: IMpcSigningClient | null = null; constructor( @@ -93,6 +99,7 @@ export class Erc20TransferService { this.fusdtMarketMakerAddress = this.configService.get('FUSDT_MARKET_MAKER_ADDRESS', ''); this.burnPoolAddress = this.configService.get('BURN_POOL_WALLET_ADDRESS', ''); this.miningPoolAddress = this.configService.get('MINING_POOL_WALLET_ADDRESS', ''); + this.fusdtInjectionAddress = this.configService.get('FUSDT_INJECTION_WALLET_ADDRESS', ''); this.initializeWalletConfig(); } @@ -158,6 +165,13 @@ export class Erc20TransferService { } else { this.logger.warn('[INIT] FUSDT_MARKET_MAKER_ADDRESS not configured'); } + + // 检查 fUSDT 注入钱包地址配置 + if (this.fusdtInjectionAddress) { + this.logger.log(`[INIT] fUSDT Injection wallet configured: ${this.fusdtInjectionAddress}`); + } else { + this.logger.warn('[INIT] FUSDT_INJECTION_WALLET_ADDRESS not configured (adoption auto-injection disabled)'); + } } /** @@ -913,6 +927,213 @@ export class Erc20TransferService { return formatUnits(balance, decimals); } + // ============ fUSDT 注入钱包方法(认种自动转积分值到做市商) ============ + + /** + * 检查 fUSDT 注入钱包是否已配置 + */ + isFusdtInjectionConfigured(chainType: ChainTypeEnum): boolean { + try { + this.rpcProviderManager.getProvider(chainType); + return !!this.fusdtInjectionAddress && !!this.mpcSigningClient?.isFusdtInjectionConfigured(); + } catch { + return false; + } + } + + /** + * 获取 fUSDT 注入钱包地址 + */ + getFusdtInjectionAddress(): string | null { + return this.fusdtInjectionAddress || null; + } + + /** + * 获取 fUSDT 注入钱包的 fUSDT 链上余额 + */ + async getFusdtInjectionTokenBalance(chainType: ChainTypeEnum): Promise { + const provider = this.getProvider(chainType); + + if (!this.fusdtInjectionAddress) { + throw new Error('fUSDT Injection wallet address not configured'); + } + + const contractAddress = this.getTokenContract(chainType, 'FUSDT'); + if (!contractAddress) { + throw new Error(`fUSDT not configured for chain ${chainType}`); + } + + const contract = new Contract(contractAddress, ERC20_TRANSFER_ABI, provider); + const balance = await contract.balanceOf(this.fusdtInjectionAddress); + const decimals = await contract.decimals(); + + return formatUnits(balance, decimals); + } + + /** + * fUSDT 注入钱包转账到做市商钱包(认种触发) + * + * @param chainType 链类型 + * @param toAddress 做市商钱包地址 + * @param amount 转账金额 (人类可读格式) + * @param memo 分类账备注(含认种信息) + * @returns 转账结果 + */ + async transferFusdtAsInjectionWallet( + chainType: ChainTypeEnum, + toAddress: string, + amount: string, + memo?: string, + ): Promise { + this.logger.log(`[INJECTION-TRANSFER] Starting fUSDT injection transfer`); + this.logger.log(`[INJECTION-TRANSFER] From: ${this.fusdtInjectionAddress}`); + this.logger.log(`[INJECTION-TRANSFER] To: ${toAddress}`); + this.logger.log(`[INJECTION-TRANSFER] Amount: ${amount} fUSDT`); + if (memo) { + this.logger.log(`[INJECTION-TRANSFER] Memo: ${memo}`); + } + + const provider = this.getProvider(chainType); + + if (!this.mpcSigningClient || !this.mpcSigningClient.isFusdtInjectionConfigured()) { + const error = 'fUSDT Injection MPC signing not configured'; + this.logger.error(`[INJECTION-TRANSFER] ${error}`); + return { success: false, error }; + } + + if (!this.fusdtInjectionAddress) { + const error = 'fUSDT Injection wallet address not configured'; + this.logger.error(`[INJECTION-TRANSFER] ${error}`); + return { success: false, error }; + } + + try { + const config = this.chainConfig.getConfig(ChainType.fromEnum(chainType)); + const contractAddress = this.getTokenContract(chainType, 'FUSDT'); + + if (!contractAddress) { + const error = `fUSDT not configured for chain ${chainType}`; + this.logger.error(`[INJECTION-TRANSFER] ${error}`); + return { success: false, error }; + } + + const contract = new Contract(contractAddress, ERC20_TRANSFER_ABI, provider); + + const decimals = await contract.decimals(); + const amountInWei = parseUnits(amount, decimals); + + // 检查余额 + const balance = await contract.balanceOf(this.fusdtInjectionAddress); + this.logger.log(`[INJECTION-TRANSFER] Injection wallet balance: ${formatUnits(balance, decimals)} fUSDT`); + + if (balance < amountInWei) { + const error = `Insufficient fUSDT balance in injection wallet (need ${amount}, have ${formatUnits(balance, decimals)})`; + this.logger.error(`[INJECTION-TRANSFER] ${error}`); + return { success: false, error }; + } + + // 构建交易 + const nonce = await provider.getTransactionCount(this.fusdtInjectionAddress); + const feeData = await provider.getFeeData(); + const transferData = contract.interface.encodeFunctionData('transfer', [toAddress, amountInWei]); + + const gasEstimate = await provider.estimateGas({ + from: this.fusdtInjectionAddress, + to: contractAddress, + data: transferData, + }); + const gasLimit = gasEstimate * BigInt(120) / BigInt(100); + + const supportsEip1559 = feeData.maxFeePerGas && feeData.maxFeePerGas > BigInt(0); + + let tx: Transaction; + if (supportsEip1559) { + tx = Transaction.from({ + type: 2, + chainId: config.chainId, + nonce, + to: contractAddress, + data: transferData, + gasLimit, + maxFeePerGas: feeData.maxFeePerGas, + maxPriorityFeePerGas: feeData.maxPriorityFeePerGas, + }); + } else { + const gasPrice = feeData.gasPrice || BigInt(1000000000); + tx = Transaction.from({ + type: 0, + chainId: config.chainId, + nonce, + to: contractAddress, + data: transferData, + gasLimit, + gasPrice, + }); + } + + const unsignedTxHash = tx.unsignedHash; + + // 使用注入钱包 MPC 签名 + this.logger.log(`[INJECTION-TRANSFER] Requesting fUSDT Injection MPC signature...`); + const signatureHex = await this.mpcSigningClient!.signMessageAsFusdtInjection(unsignedTxHash); + + // 解析签名 + const normalizedSig = signatureHex.startsWith('0x') ? signatureHex : `0x${signatureHex}`; + const sigBytes = normalizedSig.slice(2); + const r = `0x${sigBytes.slice(0, 64)}`; + const s = `0x${sigBytes.slice(64, 128)}`; + + let signature: Signature | null = null; + for (const yParity of [0, 1] as const) { + try { + const testSig = Signature.from({ r, s, yParity }); + const recoveredAddress = recoverAddress(unsignedTxHash, testSig); + if (recoveredAddress.toLowerCase() === this.fusdtInjectionAddress.toLowerCase()) { + signature = testSig; + break; + } + } catch (e) { + this.logger.debug(`[INJECTION-TRANSFER] yParity=${yParity} failed: ${e}`); + } + } + + if (!signature) { + throw new Error('Failed to recover correct signature - address mismatch'); + } + + const signedTx = tx.clone(); + signedTx.signature = signature; + + this.logger.log(`[INJECTION-TRANSFER] Broadcasting transaction...`); + const txResponse = await provider.broadcastTransaction(signedTx.serialized); + this.logger.log(`[INJECTION-TRANSFER] Transaction sent: ${txResponse.hash}`); + + const receipt = await txResponse.wait(); + + if (receipt && receipt.status === 1) { + this.logger.log(`[INJECTION-TRANSFER] Transaction confirmed! Block: ${receipt.blockNumber}`); + this.rpcProviderManager.reportSuccess(chainType); + return { + success: true, + txHash: txResponse.hash, + gasUsed: receipt.gasUsed.toString(), + blockNumber: receipt.blockNumber, + }; + } else { + return { success: false, txHash: txResponse.hash, error: 'Transaction failed (reverted)' }; + } + } catch (error: any) { + if (this.isRpcConnectionError(error)) { + this.rpcProviderManager.reportFailure(chainType, error); + } + this.logger.error(`[INJECTION-TRANSFER] Transfer failed:`, error); + return { + success: false, + error: error.message || 'Unknown error during transfer', + }; + } + } + // ============ 池账户钱包方法 ============ /** diff --git a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts index 8c31e918..f0aeaa67 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts @@ -3,7 +3,7 @@ import { HttpModule } from '@nestjs/axios'; import { JwtModule } from '@nestjs/jwt'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService, AddressCacheService } from './redis'; -import { EventPublisherService, WithdrawalEventConsumerService } from './kafka'; +import { EventPublisherService, WithdrawalEventConsumerService, AdoptionInjectionConsumerService } from './kafka'; import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain'; import { MpcSigningClient } from './mpc'; import { DomainModule } from '@/domain/domain.module'; @@ -17,6 +17,7 @@ import { MARKET_MAKER_CHECKPOINT_REPOSITORY, POOL_ACCOUNT_DEPOSIT_REPOSITORY, POOL_ACCOUNT_CHECKPOINT_REPOSITORY, + ADOPTION_INJECTION_RECORD_REPOSITORY, } from '@/domain/repositories'; import { DepositTransactionRepositoryImpl, @@ -28,6 +29,7 @@ import { MarketMakerCheckpointRepositoryImpl, PoolAccountDepositRepositoryImpl, PoolAccountCheckpointRepositoryImpl, + AdoptionInjectionRecordRepositoryImpl, } from './persistence/repositories'; @Global() @@ -39,6 +41,7 @@ import { RedisService, EventPublisherService, WithdrawalEventConsumerService, + AdoptionInjectionConsumerService, MpcSigningClient, // 区块链适配器 @@ -88,12 +91,17 @@ import { provide: POOL_ACCOUNT_CHECKPOINT_REPOSITORY, useClass: PoolAccountCheckpointRepositoryImpl, }, + { + provide: ADOPTION_INJECTION_RECORD_REPOSITORY, + useClass: AdoptionInjectionRecordRepositoryImpl, + }, ], exports: [ PrismaService, RedisService, EventPublisherService, WithdrawalEventConsumerService, + AdoptionInjectionConsumerService, MpcSigningClient, EvmProviderAdapter, AddressDerivationAdapter, @@ -110,6 +118,7 @@ import { MARKET_MAKER_CHECKPOINT_REPOSITORY, POOL_ACCOUNT_DEPOSIT_REPOSITORY, POOL_ACCOUNT_CHECKPOINT_REPOSITORY, + ADOPTION_INJECTION_RECORD_REPOSITORY, ], }) export class InfrastructureModule {} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/adoption-injection-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/adoption-injection-consumer.service.ts new file mode 100644 index 00000000..44d18cac --- /dev/null +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/adoption-injection-consumer.service.ts @@ -0,0 +1,169 @@ +/** + * Adoption fUSDT Injection Event Consumer + * + * Subscribes to `cdc.contribution.outbox` topic (Debezium CDC) + * Filters for `AdoptionFusdtInjectionRequested` events + * Triggers fUSDT transfer from injection wallet to market maker wallet + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +export interface AdoptionInjectionPayload { + originalAdoptionId: string; + accountSequence: string; + treeCount: number; + adoptionDate: string; + amount: string; + fusdtPerTree: number; +} + +export type AdoptionInjectionEventHandler = (payload: AdoptionInjectionPayload) => Promise; + +@Injectable() +export class AdoptionInjectionConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(AdoptionInjectionConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + private readonly topic: string; + + private injectionHandler?: AdoptionInjectionEventHandler; + + constructor(private readonly configService: ConfigService) { + this.topic = this.configService.get( + 'CDC_TOPIC_CONTRIBUTION_OUTBOX', + 'cdc.contribution.outbox', + ); + } + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; + const groupId = 'mining-blockchain-adoption-injection'; + + this.logger.log(`[INIT] Adoption Injection Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topic: ${this.topic}`); + + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 1000, + maxRetryTime: 300000, + retries: 15, + multiplier: 2, + restartOnFailure: async () => true, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting Adoption Injection consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] Adoption Injection consumer connected successfully`); + + await this.consumer.subscribe({ + topics: [this.topic], + fromBeginning: false, + }); + this.logger.log(`[SUBSCRIBE] Subscribed to ${this.topic}`); + + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect Adoption Injection consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Adoption Injection consumer disconnected'); + } + } + + /** + * Register handler for adoption injection events + */ + onAdoptionInjectionRequested(handler: AdoptionInjectionEventHandler): void { + this.injectionHandler = handler; + this.logger.log(`[REGISTER] AdoptionInjectionRequested handler registered`); + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + try { + const value = message.value?.toString(); + if (!value) return; + + const parsed = JSON.parse(value); + + // Extract event_type from Debezium flattened outbox format + // ExtractNewRecordState flattens the message to just the row data + const eventType = parsed.event_type; + + // Only process AdoptionFusdtInjectionRequested events + if (eventType !== 'AdoptionFusdtInjectionRequested') { + return; + } + + // Skip non-create operations (DELETE, UPDATE) + const op = parsed.__op || parsed.op; + if (op && op !== 'c' && op !== 'r') { + return; + } + + this.logger.log(`[RECEIVE] AdoptionFusdtInjectionRequested event received`); + this.logger.log(`[RECEIVE] offset=${message.offset}, partition=${partition}`); + + // Parse the payload (JSON string from outbox_events table) + let payload: AdoptionInjectionPayload; + try { + const rawPayload = typeof parsed.payload === 'string' + ? JSON.parse(parsed.payload) + : parsed.payload; + payload = { + originalAdoptionId: rawPayload.originalAdoptionId, + accountSequence: rawPayload.accountSequence, + treeCount: Number(rawPayload.treeCount), + adoptionDate: rawPayload.adoptionDate, + amount: rawPayload.amount, + fusdtPerTree: Number(rawPayload.fusdtPerTree), + }; + } catch (parseErr) { + this.logger.error(`[ERROR] Failed to parse injection event payload`, parseErr); + return; + } + + this.logger.log(`[RECEIVE] adoptionId=${payload.originalAdoptionId}`); + this.logger.log(`[RECEIVE] accountSequence=${payload.accountSequence}`); + this.logger.log(`[RECEIVE] treeCount=${payload.treeCount}`); + this.logger.log(`[RECEIVE] amount=${payload.amount} fUSDT`); + + if (this.injectionHandler) { + await this.injectionHandler(payload); + this.logger.log(`[HANDLE] AdoptionInjectionRequested handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for AdoptionInjectionRequested`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing adoption injection event`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming adoption injection events from ${this.topic}`); + } +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts index df1e0281..dfc20d4d 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts @@ -2,3 +2,4 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; export * from './withdrawal-event-consumer.service'; export * from './deposit-ack-consumer.service'; +export * from './adoption-injection-consumer.service'; diff --git a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts index 08203535..7e494c0b 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts @@ -48,6 +48,9 @@ export class MpcSigningClient { // 200万挖矿池钱包 private readonly miningPoolUsername: string; private readonly miningPoolAddress: string; + // fUSDT 注入钱包(认种自动转积分值到做市商) + private readonly fusdtInjectionUsername: string; + private readonly fusdtInjectionAddress: string; // MPC system 配置 private readonly mpcAccountServiceUrl: string; private readonly mpcJwtSecret: string; @@ -74,6 +77,9 @@ export class MpcSigningClient { // 200万挖矿池钱包配置 this.miningPoolUsername = this.configService.get('MINING_POOL_WALLET_USERNAME', ''); this.miningPoolAddress = this.configService.get('MINING_POOL_WALLET_ADDRESS', ''); + // fUSDT 注入钱包配置(认种自动转积分值到做市商) + this.fusdtInjectionUsername = this.configService.get('FUSDT_INJECTION_WALLET_USERNAME', ''); + this.fusdtInjectionAddress = this.configService.get('FUSDT_INJECTION_WALLET_ADDRESS', ''); // MPC system 配置 this.mpcAccountServiceUrl = this.configService.get('MPC_ACCOUNT_SERVICE_URL', 'http://localhost:4000'); this.mpcJwtSecret = this.configService.get('MPC_JWT_SECRET', ''); @@ -96,6 +102,9 @@ export class MpcSigningClient { if (!this.miningPoolUsername || !this.miningPoolAddress) { this.logger.warn('[INIT] Mining Pool wallet not configured'); } + if (!this.fusdtInjectionUsername || !this.fusdtInjectionAddress) { + this.logger.warn('[INIT] fUSDT Injection wallet not configured (adoption auto-injection disabled)'); + } if (!this.mpcJwtSecret) { this.logger.warn('[INIT] MPC_JWT_SECRET not configured - signing will fail'); } @@ -105,6 +114,7 @@ export class MpcSigningClient { this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] Burn Pool: ${this.burnPoolAddress || '(not configured)'}`); this.logger.log(`[INIT] Mining Pool: ${this.miningPoolAddress || '(not configured)'}`); + this.logger.log(`[INIT] fUSDT Injection: ${this.fusdtInjectionAddress || '(not configured)'}`); this.logger.log(`[INIT] MPC Account Service: ${this.mpcAccountServiceUrl}`); this.logger.log(`[INIT] Using HTTP direct call to mpc-system`); } @@ -214,6 +224,27 @@ export class MpcSigningClient { return this.signMessageWithUsername(this.miningPoolUsername, messageHash); } + // ============ fUSDT 注入钱包(认种自动转积分值到做市商) ============ + + isFusdtInjectionConfigured(): boolean { + return !!this.fusdtInjectionUsername && !!this.fusdtInjectionAddress; + } + + getFusdtInjectionAddress(): string { + return this.fusdtInjectionAddress; + } + + getFusdtInjectionUsername(): string { + return this.fusdtInjectionUsername; + } + + async signMessageAsFusdtInjection(messageHash: string): Promise { + if (!this.fusdtInjectionUsername) { + throw new Error('fUSDT Injection MPC username not configured'); + } + return this.signMessageWithUsername(this.fusdtInjectionUsername, messageHash); + } + /** * 签名消息(使用 C2C Bot 热钱包) * diff --git a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/adoption-injection-record.repository.impl.ts b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/adoption-injection-record.repository.impl.ts new file mode 100644 index 00000000..67b51f90 --- /dev/null +++ b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/adoption-injection-record.repository.impl.ts @@ -0,0 +1,104 @@ +import { Injectable } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IAdoptionInjectionRecordRepository, + AdoptionInjectionRecordDto, +} from '@/domain/repositories/adoption-injection-record.repository.interface'; + +@Injectable() +export class AdoptionInjectionRecordRepositoryImpl implements IAdoptionInjectionRecordRepository { + constructor(private readonly prisma: PrismaService) {} + + async save(record: AdoptionInjectionRecordDto): Promise { + const data = { + adoptionId: record.adoptionId, + accountSequence: record.accountSequence, + treeCount: record.treeCount, + adoptionDate: record.adoptionDate, + fromAddress: record.fromAddress, + toAddress: record.toAddress, + amount: new Prisma.Decimal(record.amount), + chainType: record.chainType, + txHash: record.txHash, + blockNumber: record.blockNumber, + gasUsed: record.gasUsed, + memo: record.memo, + status: record.status, + errorMessage: record.errorMessage, + }; + + if (record.id) { + const updated = await this.prisma.adoptionInjectionRecord.update({ + where: { id: record.id }, + data, + }); + return this.mapToDto(updated); + } else { + const created = await this.prisma.adoptionInjectionRecord.create({ + data, + }); + return this.mapToDto(created); + } + } + + async findByAdoptionId(adoptionId: string): Promise { + const record = await this.prisma.adoptionInjectionRecord.findUnique({ + where: { adoptionId }, + }); + return record ? this.mapToDto(record) : null; + } + + async findByAccountSequence(accountSequence: string): Promise { + const records = await this.prisma.adoptionInjectionRecord.findMany({ + where: { accountSequence }, + orderBy: { createdAt: 'desc' }, + }); + return records.map(this.mapToDto); + } + + async markConfirmed(id: bigint, txHash: string, blockNumber: bigint, gasUsed: string): Promise { + await this.prisma.adoptionInjectionRecord.update({ + where: { id }, + data: { + txHash, + blockNumber, + gasUsed, + status: 'CONFIRMED', + }, + }); + } + + async markFailed(id: bigint, errorMessage: string): Promise { + await this.prisma.adoptionInjectionRecord.update({ + where: { id }, + data: { + errorMessage, + status: 'FAILED', + }, + }); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private mapToDto(record: any): AdoptionInjectionRecordDto { + return { + id: record.id, + adoptionId: record.adoptionId, + accountSequence: record.accountSequence, + treeCount: record.treeCount, + adoptionDate: record.adoptionDate, + fromAddress: record.fromAddress, + toAddress: record.toAddress, + amount: record.amount.toString(), + chainType: record.chainType, + txHash: record.txHash, + blockNumber: record.blockNumber, + gasUsed: record.gasUsed, + memo: record.memo, + status: record.status, + errorMessage: record.errorMessage, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + }; + } +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts index 6493c741..f57705f0 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts @@ -7,3 +7,4 @@ export * from './market-maker-deposit.repository.impl'; export * from './market-maker-checkpoint.repository.impl'; export * from './pool-account-deposit.repository.impl'; export * from './pool-account-checkpoint.repository.impl'; +export * from './adoption-injection-record.repository.impl'; diff --git a/backend/services/mining-blockchain-service/src/main.ts b/backend/services/mining-blockchain-service/src/main.ts index d7907729..851729af 100644 --- a/backend/services/mining-blockchain-service/src/main.ts +++ b/backend/services/mining-blockchain-service/src/main.ts @@ -4,6 +4,8 @@ import { ConfigService } from '@nestjs/config'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { AppModule } from './app.module'; +import { Erc20TransferService } from './domain/services/erc20-transfer.service'; +import { ChainTypeEnum } from './domain/enums'; async function bootstrap() { const logger = new Logger('Bootstrap'); @@ -70,6 +72,11 @@ async function bootstrap() { logger.log(`Mining Blockchain service is running on port ${port}`); logger.log(`Swagger docs available at http://localhost:${port}/api`); + + // 异步检查 fUSDT 注入钱包余额(不阻塞启动) + checkInjectionWalletBalance(app, logger).catch(err => { + logger.error('[STARTUP] Failed to check injection wallet balance', err); + }); } /** @@ -121,6 +128,20 @@ function validateBlockchainConfig(configService: ConfigService, logger: Logger) warnings.push('[CONFIG WARNING] C2C_BOT_WALLET_USERNAME 未配置!C2C Bot MPC 签名功能将不可用。'); } + // fUSDT 注入钱包配置验证(认种自动转积分值到做市商) + const fusdtInjectionAddress = configService.get('FUSDT_INJECTION_WALLET_ADDRESS'); + const fusdtInjectionUsername = configService.get('FUSDT_INJECTION_WALLET_USERNAME'); + + logger.log(`[CONFIG] fUSDT 注入钱包配置(认种自动注入):`); + if (fusdtInjectionAddress) { + logger.log(` - 地址: ${fusdtInjectionAddress}`); + } else { + warnings.push('[CONFIG WARNING] FUSDT_INJECTION_WALLET_ADDRESS 未配置!认种自动 fUSDT 注入功能将不可用。'); + } + if (!fusdtInjectionUsername) { + warnings.push('[CONFIG WARNING] FUSDT_INJECTION_WALLET_USERNAME 未配置!认种自动 fUSDT 注入签名将不可用。'); + } + // KAVA 代币合约配置日志 const eUsdtContract = configService.get('blockchain.kava.eUsdtContract'); const fUsdtContract = configService.get('blockchain.kava.fUsdtContract'); @@ -139,4 +160,33 @@ function validateBlockchainConfig(configService: ConfigService, logger: Logger) } } +/** + * 检查 fUSDT 注入钱包余额 + * 如果余额为 0,输出警告日志 + */ +async function checkInjectionWalletBalance(app: any, logger: Logger) { + try { + const erc20Service = app.get(Erc20TransferService); + const injectionAddress = erc20Service.getFusdtInjectionAddress(); + + if (!injectionAddress) { + logger.warn('[BALANCE] fUSDT 注入钱包未配置,跳过余额检查'); + return; + } + + logger.log(`[BALANCE] 正在查询 fUSDT 注入钱包余额: ${injectionAddress}`); + const balance = await erc20Service.getFusdtInjectionTokenBalance(ChainTypeEnum.KAVA); + const balanceNum = parseFloat(balance); + + if (balanceNum === 0) { + logger.warn(`[BALANCE] ⚠️ fUSDT 注入钱包余额为 0!地址: ${injectionAddress}`); + logger.warn(`[BALANCE] ⚠️ 认种自动注入将因余额不足而失败,请及时充值!`); + } else { + logger.log(`[BALANCE] fUSDT 注入钱包余额: ${balance} fUSDT`); + } + } catch (error) { + logger.warn(`[BALANCE] 无法查询 fUSDT 注入钱包余额 (链可能未就绪): ${error instanceof Error ? error.message : error}`); + } +} + bootstrap();