feat(adoption-injection): 认种自动 fUSDT 注入做市商钱包 + CDC outbox 修复
## contribution-service 改动 ### 1. CDC Outbox 实时发布修复 - adoption-synced.handler: handleCreate/handleUpdate 在同步数据后立即写入 AdoptionSynced outbox 事件,确保 mining-admin-service 实时接收认种同步 - referral-synced.handler: 同理,写入 ReferralSynced outbox 事件 - 之前只有手动调用 /admin/adoptions/publish-all 才会创建 outbox 事件 ### 2. 认种 fUSDT 注入事件 - 新增 AdoptionFusdtInjectionRequestedEvent 事件类 - 当认种状态变为 MINING_ENABLED 时触发 - 写入 outbox,topic: contribution.adoptionfusdtinjectionrequested - payload 含: adoptionId, accountSequence, treeCount, adoptionDate, amount - 转账金额 = treeCount × 5760 fUSDT ## mining-blockchain-service 改动 ### 3. fUSDT 注入钱包 MPC 签名支持 - mpc-signing.client: 新增 FUSDT_INJECTION_WALLET_USERNAME/ADDRESS 配置 isFusdtInjectionConfigured(), signMessageAsFusdtInjection() 等方法 - erc20-transfer.service: IMpcSigningClient 接口增加注入钱包方法 新增 transferFusdtAsInjectionWallet() 转账方法(含余额检查、MPC签名、广播) ### 4. 认种注入 Kafka 消费者 - adoption-injection-consumer.service: 订阅 cdc.contribution.outbox 过滤 AdoptionFusdtInjectionRequested 事件,解析 Debezium 扁平化消息格式 - adoption-injection.handler: 处理注入事件 - 幂等性检查(已确认的跳过) - 写入 adoption_injection_records 分类账(PROCESSING 状态) - 从注入钱包转 fUSDT 到做市商钱包 - 成功: markConfirmed (txHash, blockNumber, gasUsed) - 失败: markFailed (errorMessage),支持重试 - 发布 confirmed/failed Kafka 事件 ### 5. 分类账持久化 - Prisma schema: 新增 AdoptionInjectionRecord 模型 (adoption_injection_records 表) 字段: adoption_id, account_sequence, tree_count, adoption_date, from/to_address, amount, chain_type, tx_hash, block_number, gas_used, memo, status, error_message 索引: uk_adoption_id (唯一), idx_injection_account, idx_injection_status 等 - migration: 20250203000000_add_adoption_injection_records - Repository 接口 + 实现: save, findByAdoptionId, markConfirmed, markFailed ### 6. 启动余额检查 - main.ts: 启动时异步查询注入钱包 fUSDT 余额,余额为 0 时输出警告日志 新增注入钱包配置验证 ## 部署前需添加环境变量 (.env) FUSDT_INJECTION_WALLET_USERNAME=wallet-bff20b69 FUSDT_INJECTION_WALLET_ADDRESS=0x7BDB89dA47F16869c90446C41e70A00dDc432DBB Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
4a69fdd070
commit
4817d92507
|
|
@ -1,6 +1,8 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import Decimal from 'decimal.js';
|
||||
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||||
import { AdoptionSyncedEvent } from '../../domain/events/adoption-synced.event';
|
||||
import { AdoptionFusdtInjectionRequestedEvent } from '../../domain/events/adoption-fusdt-injection-requested.event';
|
||||
import { ContributionCalculationService } from '../services/contribution-calculation.service';
|
||||
import { ContributionRateService } from '../services/contribution-rate.service';
|
||||
|
||||
|
|
@ -148,8 +150,15 @@ export class AdoptionSyncedHandler {
|
|||
|
||||
this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${status}`);
|
||||
|
||||
// 只有 MINING_ENABLED 状态才触发算力计算
|
||||
// 发布 AdoptionSynced outbox 事件,实时同步到 mining-admin-service
|
||||
await this.publishAdoptionOutboxEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt), status, contributionPerTree);
|
||||
|
||||
// 只有 MINING_ENABLED 状态才触发算力计算和 fUSDT 注入
|
||||
const needsCalculation = status === 'MINING_ENABLED';
|
||||
if (needsCalculation) {
|
||||
await this.publishFusdtInjectionEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt));
|
||||
}
|
||||
|
||||
return {
|
||||
originalAdoptionId,
|
||||
needsCalculation,
|
||||
|
|
@ -209,9 +218,15 @@ export class AdoptionSyncedHandler {
|
|||
|
||||
this.logger.log(`[CDC] Adoption synced: orderId=${orderId}, status=${newStatus}`);
|
||||
|
||||
// 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算
|
||||
// 发布 AdoptionSynced outbox 事件,实时同步到 mining-admin-service
|
||||
await this.publishAdoptionOutboxEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt), newStatus, contributionPerTree);
|
||||
|
||||
// 只有当 status 变为 MINING_ENABLED 且尚未计算过算力时,才触发算力计算和 fUSDT 注入
|
||||
const statusChangedToMiningEnabled = newStatus === 'MINING_ENABLED' && oldStatus !== 'MINING_ENABLED';
|
||||
const needsCalculation = statusChangedToMiningEnabled && !existingAdoption?.contributionDistributed;
|
||||
if (needsCalculation) {
|
||||
await this.publishFusdtInjectionEvent(tx, originalAdoptionId, accountSequence, treeCount, new Date(createdAt));
|
||||
}
|
||||
|
||||
return {
|
||||
originalAdoptionId,
|
||||
|
|
@ -229,4 +244,75 @@ export class AdoptionSyncedHandler {
|
|||
// 但通常不会发生删除操作
|
||||
this.logger.warn(`[CDC] Adoption delete event received: ${orderId}. This may require contribution rollback.`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 在事务内发布 AdoptionSynced outbox 事件
|
||||
* 确保 mining-admin-service 能实时收到认种数据变更
|
||||
*/
|
||||
private async publishAdoptionOutboxEvent(
|
||||
tx: TransactionClient,
|
||||
originalAdoptionId: bigint,
|
||||
accountSequence: string,
|
||||
treeCount: number,
|
||||
adoptionDate: Date,
|
||||
status: string | null,
|
||||
contributionPerTree: Decimal,
|
||||
): Promise<void> {
|
||||
const event = new AdoptionSyncedEvent(
|
||||
originalAdoptionId,
|
||||
accountSequence,
|
||||
treeCount,
|
||||
adoptionDate,
|
||||
status,
|
||||
contributionPerTree.toString(),
|
||||
);
|
||||
|
||||
await tx.outboxEvent.create({
|
||||
data: {
|
||||
aggregateType: AdoptionSyncedEvent.AGGREGATE_TYPE,
|
||||
aggregateId: originalAdoptionId.toString(),
|
||||
eventType: AdoptionSyncedEvent.EVENT_TYPE,
|
||||
topic: 'contribution.adoptionsynced',
|
||||
key: originalAdoptionId.toString(),
|
||||
payload: event.toPayload(),
|
||||
status: 'PENDING',
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.debug(`[CDC] Published AdoptionSynced outbox event: orderId=${originalAdoptionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 在事务内发布 fUSDT 注入请求到 outbox
|
||||
* 当认种状态变为 MINING_ENABLED 时触发
|
||||
* mining-blockchain-service 会消费此事件并执行区块链转账
|
||||
*/
|
||||
private async publishFusdtInjectionEvent(
|
||||
tx: TransactionClient,
|
||||
originalAdoptionId: bigint,
|
||||
accountSequence: string,
|
||||
treeCount: number,
|
||||
adoptionDate: Date,
|
||||
): Promise<void> {
|
||||
const event = new AdoptionFusdtInjectionRequestedEvent(
|
||||
originalAdoptionId,
|
||||
accountSequence,
|
||||
treeCount,
|
||||
adoptionDate,
|
||||
);
|
||||
|
||||
await tx.outboxEvent.create({
|
||||
data: {
|
||||
aggregateType: AdoptionFusdtInjectionRequestedEvent.AGGREGATE_TYPE,
|
||||
aggregateId: originalAdoptionId.toString(),
|
||||
eventType: AdoptionFusdtInjectionRequestedEvent.EVENT_TYPE,
|
||||
topic: 'contribution.adoptionfusdtinjectionrequested',
|
||||
key: originalAdoptionId.toString(),
|
||||
payload: event.toPayload(),
|
||||
status: 'PENDING',
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`[CDC] Published fUSDT injection request: orderId=${originalAdoptionId}, amount=${event.amount} (${treeCount} trees × ${AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE})`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||||
import { ReferralSyncedEvent } from '../../domain/events/referral-synced.event';
|
||||
|
||||
/**
|
||||
* 引荐关系 CDC 事件处理器
|
||||
|
|
@ -97,6 +98,9 @@ export class ReferralSyncedHandler {
|
|||
});
|
||||
|
||||
this.logger.log(`[CDC] Referral synced: ${accountSequence}, referrerId=${referrerUserId || 'none'}, depth=${depth}`);
|
||||
|
||||
// 发布 ReferralSynced outbox 事件,实时同步到 mining-admin-service
|
||||
await this.publishReferralOutboxEvent(tx, accountSequence, referrerAccountSequence, referrerUserId ? BigInt(referrerUserId) : null, originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth);
|
||||
}
|
||||
|
||||
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
|
||||
|
|
@ -156,6 +160,9 @@ export class ReferralSyncedHandler {
|
|||
});
|
||||
|
||||
this.logger.log(`[CDC] Referral synced: ${accountSequence}`);
|
||||
|
||||
// 发布 ReferralSynced outbox 事件,实时同步到 mining-admin-service
|
||||
await this.publishReferralOutboxEvent(tx, accountSequence, referrerAccountSequence, referrerUserId ? BigInt(referrerUserId) : null, originalUserId ? BigInt(originalUserId) : null, ancestorPath, depth);
|
||||
}
|
||||
|
||||
private async handleDelete(data: any): Promise<void> {
|
||||
|
|
@ -168,6 +175,43 @@ export class ReferralSyncedHandler {
|
|||
this.logger.warn(`[CDC] Referral delete event received: ${accountSequence} (not processed, keeping history)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 在事务内发布 ReferralSynced outbox 事件
|
||||
* 确保 mining-admin-service 能实时收到推荐关系变更
|
||||
*/
|
||||
private async publishReferralOutboxEvent(
|
||||
tx: TransactionClient,
|
||||
accountSequence: string,
|
||||
referrerAccountSequence: string | null,
|
||||
referrerUserId: bigint | null,
|
||||
originalUserId: bigint | null,
|
||||
ancestorPath: string | null,
|
||||
depth: number,
|
||||
): Promise<void> {
|
||||
const event = new ReferralSyncedEvent(
|
||||
accountSequence,
|
||||
referrerAccountSequence,
|
||||
referrerUserId,
|
||||
originalUserId,
|
||||
ancestorPath,
|
||||
depth,
|
||||
);
|
||||
|
||||
await tx.outboxEvent.create({
|
||||
data: {
|
||||
aggregateType: ReferralSyncedEvent.AGGREGATE_TYPE,
|
||||
aggregateId: accountSequence,
|
||||
eventType: ReferralSyncedEvent.EVENT_TYPE,
|
||||
topic: 'contribution.referralsynced',
|
||||
key: accountSequence,
|
||||
payload: event.toPayload(),
|
||||
status: 'PENDING',
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.debug(`[CDC] Published ReferralSynced outbox event: account=${accountSequence}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 BigInt[] 数组转换为逗号分隔的字符串
|
||||
* @param ancestorPath BigInt 数组或 null
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* 认种 fUSDT 注入请求事件
|
||||
* 当认种状态变为 MINING_ENABLED 时触发
|
||||
* 通知 mining-blockchain-service 从注入钱包向做市商钱包转入 fUSDT
|
||||
*
|
||||
* 转账金额 = treeCount × 5760
|
||||
*/
|
||||
export class AdoptionFusdtInjectionRequestedEvent {
|
||||
static readonly EVENT_TYPE = 'AdoptionFusdtInjectionRequested';
|
||||
static readonly AGGREGATE_TYPE = 'Adoption';
|
||||
/** 每棵树对应的 fUSDT 注入金额 */
|
||||
static readonly FUSDT_PER_TREE = 5760;
|
||||
|
||||
constructor(
|
||||
public readonly originalAdoptionId: bigint,
|
||||
public readonly accountSequence: string,
|
||||
public readonly treeCount: number,
|
||||
public readonly adoptionDate: Date,
|
||||
) {}
|
||||
|
||||
get amount(): number {
|
||||
return this.treeCount * AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE;
|
||||
}
|
||||
|
||||
toPayload(): Record<string, any> {
|
||||
return {
|
||||
eventType: AdoptionFusdtInjectionRequestedEvent.EVENT_TYPE,
|
||||
originalAdoptionId: this.originalAdoptionId.toString(),
|
||||
accountSequence: this.accountSequence,
|
||||
treeCount: this.treeCount,
|
||||
adoptionDate: this.adoptionDate.toISOString(),
|
||||
amount: this.amount.toString(),
|
||||
fusdtPerTree: AdoptionFusdtInjectionRequestedEvent.FUSDT_PER_TREE,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -9,3 +9,4 @@ export * from './network-progress-updated.event';
|
|||
export * from './system-account-synced.event';
|
||||
export * from './system-contribution-record-created.event';
|
||||
export * from './unallocated-contribution-synced.event';
|
||||
export * from './adoption-fusdt-injection-requested.event';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
-- CreateTable
|
||||
CREATE TABLE "adoption_injection_records" (
|
||||
"record_id" BIGSERIAL NOT NULL,
|
||||
"adoption_id" VARCHAR(100) NOT NULL,
|
||||
"account_sequence" VARCHAR(20) NOT NULL,
|
||||
"tree_count" INTEGER NOT NULL,
|
||||
"adoption_date" TIMESTAMP(3) NOT NULL,
|
||||
"from_address" VARCHAR(42) NOT NULL,
|
||||
"to_address" VARCHAR(42) NOT NULL,
|
||||
"amount" DECIMAL(36,8) NOT NULL,
|
||||
"chain_type" VARCHAR(20) NOT NULL,
|
||||
"tx_hash" VARCHAR(66),
|
||||
"block_number" BIGINT,
|
||||
"gas_used" VARCHAR(50),
|
||||
"memo" TEXT,
|
||||
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||||
"error_message" TEXT,
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updated_at" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "adoption_injection_records_pkey" PRIMARY KEY ("record_id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "uk_adoption_id" ON "adoption_injection_records"("adoption_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_injection_account" ON "adoption_injection_records"("account_sequence");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_injection_status" ON "adoption_injection_records"("status");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_injection_tx_hash" ON "adoption_injection_records"("tx_hash");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_injection_created" ON "adoption_injection_records"("created_at");
|
||||
|
|
@ -328,6 +328,48 @@ model MarketMakerBlockCheckpoint {
|
|||
@@map("market_maker_block_checkpoints")
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 认种 fUSDT 注入分类账(对账用)
|
||||
// 记录每笔认种触发的 fUSDT 自动注入转账
|
||||
// ============================================
|
||||
model AdoptionInjectionRecord {
|
||||
id BigInt @id @default(autoincrement()) @map("record_id")
|
||||
|
||||
// 认种信息
|
||||
adoptionId String @map("adoption_id") @db.VarChar(100) // 1.0 原始认种ID
|
||||
accountSequence String @map("account_sequence") @db.VarChar(20) // 用户账户序列号
|
||||
treeCount Int @map("tree_count") // 认种数量(棵)
|
||||
adoptionDate DateTime @map("adoption_date") // 认种时间
|
||||
|
||||
// 转账信息
|
||||
fromAddress String @map("from_address") @db.VarChar(42) // 注入钱包地址
|
||||
toAddress String @map("to_address") @db.VarChar(42) // 做市商钱包地址
|
||||
amount Decimal @db.Decimal(36, 8) // 转账金额 (treeCount × 5760)
|
||||
chainType String @map("chain_type") @db.VarChar(20) // KAVA
|
||||
|
||||
// 链上结果
|
||||
txHash String? @map("tx_hash") @db.VarChar(66)
|
||||
blockNumber BigInt? @map("block_number")
|
||||
gasUsed String? @map("gas_used") @db.VarChar(50)
|
||||
|
||||
// 分类账备注
|
||||
memo String? @db.Text
|
||||
|
||||
// 状态: PENDING → PROCESSING → CONFIRMED / FAILED
|
||||
status String @default("PENDING") @db.VarChar(20)
|
||||
errorMessage String? @map("error_message") @db.Text
|
||||
|
||||
createdAt DateTime @default(now()) @map("created_at")
|
||||
updatedAt DateTime @updatedAt @map("updated_at")
|
||||
|
||||
@@unique([adoptionId], name: "uk_adoption_id")
|
||||
@@index([accountSequence], name: "idx_injection_account")
|
||||
@@index([status], name: "idx_injection_status")
|
||||
@@index([txHash], name: "idx_injection_tx_hash")
|
||||
@@index([createdAt], name: "idx_injection_created")
|
||||
@@map("adoption_injection_records")
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 池账户充值交易表
|
||||
// 记录检测到的100亿销毁池和200万挖矿池的 fUSDT 充值
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { MarketMakerDepositDetectionService } from './services/market-maker-depo
|
|||
import { PoolAccountDepositDetectionService } from './services/pool-account-deposit-detection.service';
|
||||
import { OutboxPublisherService } from './services/outbox-publisher.service';
|
||||
import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service';
|
||||
import { AdoptionInjectionHandler } from './event-handlers/adoption-injection.handler';
|
||||
|
||||
@Module({
|
||||
imports: [InfrastructureModule, DomainModule],
|
||||
|
|
@ -20,6 +21,8 @@ import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-co
|
|||
OutboxPublisherService,
|
||||
// 充值 ACK 消费者(接收 wallet-service 的确认回执)
|
||||
DepositAckConsumerService,
|
||||
// 认种 fUSDT 注入处理器(监听认种事件,自动转积分值到做市商)
|
||||
AdoptionInjectionHandler,
|
||||
],
|
||||
exports: [OutboxPublisherService],
|
||||
})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,192 @@
|
|||
/**
|
||||
* Adoption fUSDT Injection Handler
|
||||
*
|
||||
* When an adoption reaches MINING_ENABLED status, this handler:
|
||||
* 1. Receives the AdoptionFusdtInjectionRequested event via Kafka
|
||||
* 2. Creates a PENDING ledger record in adoption_injection_records
|
||||
* 3. Transfers fUSDT from injection wallet to fUSDT market maker wallet
|
||||
* 4. Updates ledger record with tx result (CONFIRMED or FAILED)
|
||||
* 5. Publishes confirmation/failure Kafka event
|
||||
*
|
||||
* Amount = treeCount x 5760
|
||||
* Ledger memo includes: user ID, adoption quantity, adoption time, transfer amount
|
||||
*/
|
||||
|
||||
import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common';
|
||||
import {
|
||||
AdoptionInjectionConsumerService,
|
||||
AdoptionInjectionPayload,
|
||||
} from '@/infrastructure/kafka/adoption-injection-consumer.service';
|
||||
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
|
||||
import { Erc20TransferService } from '@/domain/services/erc20-transfer.service';
|
||||
import { MpcSigningClient } from '@/infrastructure/mpc/mpc-signing.client';
|
||||
import { ChainTypeEnum } from '@/domain/enums';
|
||||
import {
|
||||
ADOPTION_INJECTION_RECORD_REPOSITORY,
|
||||
IAdoptionInjectionRecordRepository,
|
||||
} from '@/domain/repositories';
|
||||
|
||||
@Injectable()
|
||||
export class AdoptionInjectionHandler implements OnModuleInit {
|
||||
private readonly logger = new Logger(AdoptionInjectionHandler.name);
|
||||
|
||||
constructor(
|
||||
private readonly injectionConsumer: AdoptionInjectionConsumerService,
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
private readonly transferService: Erc20TransferService,
|
||||
private readonly mpcSigningClient: MpcSigningClient,
|
||||
@Inject(ADOPTION_INJECTION_RECORD_REPOSITORY)
|
||||
private readonly injectionRepo: IAdoptionInjectionRecordRepository,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
this.injectionConsumer.onAdoptionInjectionRequested(
|
||||
this.handleInjection.bind(this),
|
||||
);
|
||||
this.logger.log(`[INIT] AdoptionInjectionHandler registered`);
|
||||
}
|
||||
|
||||
private async handleInjection(payload: AdoptionInjectionPayload): Promise<void> {
|
||||
this.logger.log(`[HANDLE] ========== Adoption fUSDT Injection ==========`);
|
||||
this.logger.log(`[HANDLE] adoptionId: ${payload.originalAdoptionId}`);
|
||||
this.logger.log(`[HANDLE] accountSequence: ${payload.accountSequence}`);
|
||||
this.logger.log(`[HANDLE] treeCount: ${payload.treeCount}`);
|
||||
this.logger.log(`[HANDLE] adoptionDate: ${payload.adoptionDate}`);
|
||||
this.logger.log(`[HANDLE] amount: ${payload.amount} fUSDT`);
|
||||
|
||||
const chainType = ChainTypeEnum.KAVA;
|
||||
|
||||
// Step 1: 幂等性检查 - 是否已处理过此认种
|
||||
const existing = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId);
|
||||
if (existing && existing.status === 'CONFIRMED') {
|
||||
this.logger.warn(`[HANDLE] Adoption ${payload.originalAdoptionId} already injected (txHash=${existing.txHash}), skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Step 2: Check injection wallet is configured
|
||||
if (!this.transferService.isFusdtInjectionConfigured(chainType)) {
|
||||
throw new Error('fUSDT Injection wallet not configured');
|
||||
}
|
||||
|
||||
// Step 3: Get addresses
|
||||
const injectionAddress = this.transferService.getFusdtInjectionAddress();
|
||||
const marketMakerAddress = this.mpcSigningClient.getFusdtMarketMakerAddress();
|
||||
if (!injectionAddress || !marketMakerAddress) {
|
||||
throw new Error('fUSDT Injection or Market Maker address not configured');
|
||||
}
|
||||
|
||||
// Step 4: Build ledger memo
|
||||
const memo = [
|
||||
`认种注入(Adoption Injection)`,
|
||||
`用户: ${payload.accountSequence}`,
|
||||
`认种ID: ${payload.originalAdoptionId}`,
|
||||
`认种数量: ${payload.treeCount}`,
|
||||
`认种时间: ${payload.adoptionDate}`,
|
||||
`转账金额: ${payload.amount} fUSDT`,
|
||||
`单价: ${payload.fusdtPerTree} fUSDT/棵`,
|
||||
].join(' | ');
|
||||
this.logger.log(`[HANDLE] Memo: ${memo}`);
|
||||
|
||||
// Step 5: 写入分类账 - PENDING 状态(或更新已有的 FAILED 记录)
|
||||
let ledgerRecord = existing;
|
||||
if (!ledgerRecord) {
|
||||
ledgerRecord = await this.injectionRepo.save({
|
||||
adoptionId: payload.originalAdoptionId,
|
||||
accountSequence: payload.accountSequence,
|
||||
treeCount: payload.treeCount,
|
||||
adoptionDate: new Date(payload.adoptionDate),
|
||||
fromAddress: injectionAddress,
|
||||
toAddress: marketMakerAddress,
|
||||
amount: payload.amount,
|
||||
chainType: 'KAVA',
|
||||
memo,
|
||||
status: 'PROCESSING',
|
||||
});
|
||||
this.logger.log(`[LEDGER] Created injection record id=${ledgerRecord.id}, status=PROCESSING`);
|
||||
} else {
|
||||
// 重试之前失败的记录
|
||||
ledgerRecord = await this.injectionRepo.save({
|
||||
...ledgerRecord,
|
||||
status: 'PROCESSING',
|
||||
errorMessage: null,
|
||||
});
|
||||
this.logger.log(`[LEDGER] Retrying failed injection record id=${ledgerRecord.id}`);
|
||||
}
|
||||
|
||||
// Step 6: Execute fUSDT transfer
|
||||
this.logger.log(`[PROCESS] Transferring ${payload.amount} fUSDT to market maker ${marketMakerAddress}`);
|
||||
const result = await this.transferService.transferFusdtAsInjectionWallet(
|
||||
chainType,
|
||||
marketMakerAddress,
|
||||
payload.amount,
|
||||
memo,
|
||||
);
|
||||
|
||||
if (result.success && result.txHash) {
|
||||
this.logger.log(`[SUCCESS] Adoption injection transfer confirmed!`);
|
||||
this.logger.log(`[SUCCESS] TxHash: ${result.txHash}`);
|
||||
this.logger.log(`[SUCCESS] Block: ${result.blockNumber}`);
|
||||
|
||||
// Step 7a: 更新分类账 - CONFIRMED
|
||||
await this.injectionRepo.markConfirmed(
|
||||
ledgerRecord.id!,
|
||||
result.txHash,
|
||||
BigInt(result.blockNumber || 0),
|
||||
result.gasUsed || '0',
|
||||
);
|
||||
this.logger.log(`[LEDGER] Injection record id=${ledgerRecord.id} marked CONFIRMED`);
|
||||
|
||||
// Publish success event
|
||||
await this.eventPublisher.publish({
|
||||
eventType: 'mining_blockchain.adoption_injection.confirmed',
|
||||
toPayload: () => ({
|
||||
originalAdoptionId: payload.originalAdoptionId,
|
||||
accountSequence: payload.accountSequence,
|
||||
treeCount: payload.treeCount,
|
||||
adoptionDate: payload.adoptionDate,
|
||||
amount: payload.amount,
|
||||
txHash: result.txHash,
|
||||
blockNumber: result.blockNumber,
|
||||
memo,
|
||||
}),
|
||||
eventId: `adoption-injection-confirmed-${payload.originalAdoptionId}-${Date.now()}`,
|
||||
occurredAt: new Date(),
|
||||
});
|
||||
} else {
|
||||
throw new Error(result.error || 'Injection transfer failed');
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`[ERROR] Adoption injection transfer failed for adoptionId=${payload.originalAdoptionId}`,
|
||||
error,
|
||||
);
|
||||
|
||||
// Step 7b: 更新分类账 - FAILED
|
||||
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
|
||||
if (existing?.id || (await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId))?.id) {
|
||||
const record = await this.injectionRepo.findByAdoptionId(payload.originalAdoptionId);
|
||||
if (record?.id) {
|
||||
await this.injectionRepo.markFailed(record.id, errorMsg);
|
||||
this.logger.log(`[LEDGER] Injection record id=${record.id} marked FAILED: ${errorMsg}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Publish failure event
|
||||
await this.eventPublisher.publish({
|
||||
eventType: 'mining_blockchain.adoption_injection.failed',
|
||||
toPayload: () => ({
|
||||
originalAdoptionId: payload.originalAdoptionId,
|
||||
accountSequence: payload.accountSequence,
|
||||
treeCount: payload.treeCount,
|
||||
amount: payload.amount,
|
||||
error: errorMsg,
|
||||
}),
|
||||
eventId: `adoption-injection-failed-${payload.originalAdoptionId}-${Date.now()}`,
|
||||
occurredAt: new Date(),
|
||||
});
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
export const ADOPTION_INJECTION_RECORD_REPOSITORY = Symbol('ADOPTION_INJECTION_RECORD_REPOSITORY');
|
||||
|
||||
/**
|
||||
* 认种 fUSDT 注入分类账 DTO
|
||||
*/
|
||||
export interface AdoptionInjectionRecordDto {
|
||||
id?: bigint;
|
||||
adoptionId: string;
|
||||
accountSequence: string;
|
||||
treeCount: number;
|
||||
adoptionDate: Date;
|
||||
fromAddress: string;
|
||||
toAddress: string;
|
||||
amount: string;
|
||||
chainType: string;
|
||||
txHash?: string | null;
|
||||
blockNumber?: bigint | null;
|
||||
gasUsed?: string | null;
|
||||
memo?: string | null;
|
||||
status: string;
|
||||
errorMessage?: string | null;
|
||||
createdAt?: Date;
|
||||
updatedAt?: Date;
|
||||
}
|
||||
|
||||
export interface IAdoptionInjectionRecordRepository {
|
||||
/**
|
||||
* 保存注入记录(创建或更新)
|
||||
*/
|
||||
save(record: AdoptionInjectionRecordDto): Promise<AdoptionInjectionRecordDto>;
|
||||
|
||||
/**
|
||||
* 根据认种ID查找(幂等性检查)
|
||||
*/
|
||||
findByAdoptionId(adoptionId: string): Promise<AdoptionInjectionRecordDto | null>;
|
||||
|
||||
/**
|
||||
* 根据用户账户查找所有注入记录
|
||||
*/
|
||||
findByAccountSequence(accountSequence: string): Promise<AdoptionInjectionRecordDto[]>;
|
||||
|
||||
/**
|
||||
* 更新转账结果(成功)
|
||||
*/
|
||||
markConfirmed(id: bigint, txHash: string, blockNumber: bigint, gasUsed: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 更新转账结果(失败)
|
||||
*/
|
||||
markFailed(id: bigint, errorMessage: string): Promise<void>;
|
||||
}
|
||||
|
|
@ -5,3 +5,4 @@ export * from './transaction-request.repository.interface';
|
|||
export * from './outbox-event.repository.interface';
|
||||
export * from './market-maker-deposit.repository.interface';
|
||||
export * from './pool-account-deposit.repository.interface';
|
||||
export * from './adoption-injection-record.repository.interface';
|
||||
|
|
|
|||
|
|
@ -55,6 +55,10 @@ export interface IMpcSigningClient {
|
|||
isMiningPoolConfigured(): boolean;
|
||||
getMiningPoolAddress(): string;
|
||||
signMessageAsMiningPool(messageHash: string): Promise<string>;
|
||||
// fUSDT 注入钱包(认种自动转积分值到做市商)
|
||||
isFusdtInjectionConfigured(): boolean;
|
||||
getFusdtInjectionAddress(): string;
|
||||
signMessageAsFusdtInjection(messageHash: string): Promise<string>;
|
||||
}
|
||||
|
||||
// 池账户类型(用于 transferTokenAsPoolAccount)
|
||||
|
|
@ -81,6 +85,8 @@ export class Erc20TransferService {
|
|||
private readonly burnPoolAddress: string;
|
||||
// 200万挖矿池钱包地址
|
||||
private readonly miningPoolAddress: string;
|
||||
// fUSDT 注入钱包地址(认种自动转积分值到做市商)
|
||||
private readonly fusdtInjectionAddress: string;
|
||||
private mpcSigningClient: IMpcSigningClient | null = null;
|
||||
|
||||
constructor(
|
||||
|
|
@ -93,6 +99,7 @@ export class Erc20TransferService {
|
|||
this.fusdtMarketMakerAddress = this.configService.get<string>('FUSDT_MARKET_MAKER_ADDRESS', '');
|
||||
this.burnPoolAddress = this.configService.get<string>('BURN_POOL_WALLET_ADDRESS', '');
|
||||
this.miningPoolAddress = this.configService.get<string>('MINING_POOL_WALLET_ADDRESS', '');
|
||||
this.fusdtInjectionAddress = this.configService.get<string>('FUSDT_INJECTION_WALLET_ADDRESS', '');
|
||||
this.initializeWalletConfig();
|
||||
}
|
||||
|
||||
|
|
@ -158,6 +165,13 @@ export class Erc20TransferService {
|
|||
} else {
|
||||
this.logger.warn('[INIT] FUSDT_MARKET_MAKER_ADDRESS not configured');
|
||||
}
|
||||
|
||||
// 检查 fUSDT 注入钱包地址配置
|
||||
if (this.fusdtInjectionAddress) {
|
||||
this.logger.log(`[INIT] fUSDT Injection wallet configured: ${this.fusdtInjectionAddress}`);
|
||||
} else {
|
||||
this.logger.warn('[INIT] FUSDT_INJECTION_WALLET_ADDRESS not configured (adoption auto-injection disabled)');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -913,6 +927,213 @@ export class Erc20TransferService {
|
|||
return formatUnits(balance, decimals);
|
||||
}
|
||||
|
||||
// ============ fUSDT 注入钱包方法(认种自动转积分值到做市商) ============
|
||||
|
||||
/**
|
||||
* 检查 fUSDT 注入钱包是否已配置
|
||||
*/
|
||||
isFusdtInjectionConfigured(chainType: ChainTypeEnum): boolean {
|
||||
try {
|
||||
this.rpcProviderManager.getProvider(chainType);
|
||||
return !!this.fusdtInjectionAddress && !!this.mpcSigningClient?.isFusdtInjectionConfigured();
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 fUSDT 注入钱包地址
|
||||
*/
|
||||
getFusdtInjectionAddress(): string | null {
|
||||
return this.fusdtInjectionAddress || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 fUSDT 注入钱包的 fUSDT 链上余额
|
||||
*/
|
||||
async getFusdtInjectionTokenBalance(chainType: ChainTypeEnum): Promise<string> {
|
||||
const provider = this.getProvider(chainType);
|
||||
|
||||
if (!this.fusdtInjectionAddress) {
|
||||
throw new Error('fUSDT Injection wallet address not configured');
|
||||
}
|
||||
|
||||
const contractAddress = this.getTokenContract(chainType, 'FUSDT');
|
||||
if (!contractAddress) {
|
||||
throw new Error(`fUSDT not configured for chain ${chainType}`);
|
||||
}
|
||||
|
||||
const contract = new Contract(contractAddress, ERC20_TRANSFER_ABI, provider);
|
||||
const balance = await contract.balanceOf(this.fusdtInjectionAddress);
|
||||
const decimals = await contract.decimals();
|
||||
|
||||
return formatUnits(balance, decimals);
|
||||
}
|
||||
|
||||
/**
|
||||
* fUSDT 注入钱包转账到做市商钱包(认种触发)
|
||||
*
|
||||
* @param chainType 链类型
|
||||
* @param toAddress 做市商钱包地址
|
||||
* @param amount 转账金额 (人类可读格式)
|
||||
* @param memo 分类账备注(含认种信息)
|
||||
* @returns 转账结果
|
||||
*/
|
||||
async transferFusdtAsInjectionWallet(
|
||||
chainType: ChainTypeEnum,
|
||||
toAddress: string,
|
||||
amount: string,
|
||||
memo?: string,
|
||||
): Promise<TransferResult> {
|
||||
this.logger.log(`[INJECTION-TRANSFER] Starting fUSDT injection transfer`);
|
||||
this.logger.log(`[INJECTION-TRANSFER] From: ${this.fusdtInjectionAddress}`);
|
||||
this.logger.log(`[INJECTION-TRANSFER] To: ${toAddress}`);
|
||||
this.logger.log(`[INJECTION-TRANSFER] Amount: ${amount} fUSDT`);
|
||||
if (memo) {
|
||||
this.logger.log(`[INJECTION-TRANSFER] Memo: ${memo}`);
|
||||
}
|
||||
|
||||
const provider = this.getProvider(chainType);
|
||||
|
||||
if (!this.mpcSigningClient || !this.mpcSigningClient.isFusdtInjectionConfigured()) {
|
||||
const error = 'fUSDT Injection MPC signing not configured';
|
||||
this.logger.error(`[INJECTION-TRANSFER] ${error}`);
|
||||
return { success: false, error };
|
||||
}
|
||||
|
||||
if (!this.fusdtInjectionAddress) {
|
||||
const error = 'fUSDT Injection wallet address not configured';
|
||||
this.logger.error(`[INJECTION-TRANSFER] ${error}`);
|
||||
return { success: false, error };
|
||||
}
|
||||
|
||||
try {
|
||||
const config = this.chainConfig.getConfig(ChainType.fromEnum(chainType));
|
||||
const contractAddress = this.getTokenContract(chainType, 'FUSDT');
|
||||
|
||||
if (!contractAddress) {
|
||||
const error = `fUSDT not configured for chain ${chainType}`;
|
||||
this.logger.error(`[INJECTION-TRANSFER] ${error}`);
|
||||
return { success: false, error };
|
||||
}
|
||||
|
||||
const contract = new Contract(contractAddress, ERC20_TRANSFER_ABI, provider);
|
||||
|
||||
const decimals = await contract.decimals();
|
||||
const amountInWei = parseUnits(amount, decimals);
|
||||
|
||||
// 检查余额
|
||||
const balance = await contract.balanceOf(this.fusdtInjectionAddress);
|
||||
this.logger.log(`[INJECTION-TRANSFER] Injection wallet balance: ${formatUnits(balance, decimals)} fUSDT`);
|
||||
|
||||
if (balance < amountInWei) {
|
||||
const error = `Insufficient fUSDT balance in injection wallet (need ${amount}, have ${formatUnits(balance, decimals)})`;
|
||||
this.logger.error(`[INJECTION-TRANSFER] ${error}`);
|
||||
return { success: false, error };
|
||||
}
|
||||
|
||||
// 构建交易
|
||||
const nonce = await provider.getTransactionCount(this.fusdtInjectionAddress);
|
||||
const feeData = await provider.getFeeData();
|
||||
const transferData = contract.interface.encodeFunctionData('transfer', [toAddress, amountInWei]);
|
||||
|
||||
const gasEstimate = await provider.estimateGas({
|
||||
from: this.fusdtInjectionAddress,
|
||||
to: contractAddress,
|
||||
data: transferData,
|
||||
});
|
||||
const gasLimit = gasEstimate * BigInt(120) / BigInt(100);
|
||||
|
||||
const supportsEip1559 = feeData.maxFeePerGas && feeData.maxFeePerGas > BigInt(0);
|
||||
|
||||
let tx: Transaction;
|
||||
if (supportsEip1559) {
|
||||
tx = Transaction.from({
|
||||
type: 2,
|
||||
chainId: config.chainId,
|
||||
nonce,
|
||||
to: contractAddress,
|
||||
data: transferData,
|
||||
gasLimit,
|
||||
maxFeePerGas: feeData.maxFeePerGas,
|
||||
maxPriorityFeePerGas: feeData.maxPriorityFeePerGas,
|
||||
});
|
||||
} else {
|
||||
const gasPrice = feeData.gasPrice || BigInt(1000000000);
|
||||
tx = Transaction.from({
|
||||
type: 0,
|
||||
chainId: config.chainId,
|
||||
nonce,
|
||||
to: contractAddress,
|
||||
data: transferData,
|
||||
gasLimit,
|
||||
gasPrice,
|
||||
});
|
||||
}
|
||||
|
||||
const unsignedTxHash = tx.unsignedHash;
|
||||
|
||||
// 使用注入钱包 MPC 签名
|
||||
this.logger.log(`[INJECTION-TRANSFER] Requesting fUSDT Injection MPC signature...`);
|
||||
const signatureHex = await this.mpcSigningClient!.signMessageAsFusdtInjection(unsignedTxHash);
|
||||
|
||||
// 解析签名
|
||||
const normalizedSig = signatureHex.startsWith('0x') ? signatureHex : `0x${signatureHex}`;
|
||||
const sigBytes = normalizedSig.slice(2);
|
||||
const r = `0x${sigBytes.slice(0, 64)}`;
|
||||
const s = `0x${sigBytes.slice(64, 128)}`;
|
||||
|
||||
let signature: Signature | null = null;
|
||||
for (const yParity of [0, 1] as const) {
|
||||
try {
|
||||
const testSig = Signature.from({ r, s, yParity });
|
||||
const recoveredAddress = recoverAddress(unsignedTxHash, testSig);
|
||||
if (recoveredAddress.toLowerCase() === this.fusdtInjectionAddress.toLowerCase()) {
|
||||
signature = testSig;
|
||||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.debug(`[INJECTION-TRANSFER] yParity=${yParity} failed: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!signature) {
|
||||
throw new Error('Failed to recover correct signature - address mismatch');
|
||||
}
|
||||
|
||||
const signedTx = tx.clone();
|
||||
signedTx.signature = signature;
|
||||
|
||||
this.logger.log(`[INJECTION-TRANSFER] Broadcasting transaction...`);
|
||||
const txResponse = await provider.broadcastTransaction(signedTx.serialized);
|
||||
this.logger.log(`[INJECTION-TRANSFER] Transaction sent: ${txResponse.hash}`);
|
||||
|
||||
const receipt = await txResponse.wait();
|
||||
|
||||
if (receipt && receipt.status === 1) {
|
||||
this.logger.log(`[INJECTION-TRANSFER] Transaction confirmed! Block: ${receipt.blockNumber}`);
|
||||
this.rpcProviderManager.reportSuccess(chainType);
|
||||
return {
|
||||
success: true,
|
||||
txHash: txResponse.hash,
|
||||
gasUsed: receipt.gasUsed.toString(),
|
||||
blockNumber: receipt.blockNumber,
|
||||
};
|
||||
} else {
|
||||
return { success: false, txHash: txResponse.hash, error: 'Transaction failed (reverted)' };
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (this.isRpcConnectionError(error)) {
|
||||
this.rpcProviderManager.reportFailure(chainType, error);
|
||||
}
|
||||
this.logger.error(`[INJECTION-TRANSFER] Transfer failed:`, error);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message || 'Unknown error during transfer',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ============ 池账户钱包方法 ============
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import { HttpModule } from '@nestjs/axios';
|
|||
import { JwtModule } from '@nestjs/jwt';
|
||||
import { PrismaService } from './persistence/prisma/prisma.service';
|
||||
import { RedisService, AddressCacheService } from './redis';
|
||||
import { EventPublisherService, WithdrawalEventConsumerService } from './kafka';
|
||||
import { EventPublisherService, WithdrawalEventConsumerService, AdoptionInjectionConsumerService } from './kafka';
|
||||
import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain';
|
||||
import { MpcSigningClient } from './mpc';
|
||||
import { DomainModule } from '@/domain/domain.module';
|
||||
|
|
@ -17,6 +17,7 @@ import {
|
|||
MARKET_MAKER_CHECKPOINT_REPOSITORY,
|
||||
POOL_ACCOUNT_DEPOSIT_REPOSITORY,
|
||||
POOL_ACCOUNT_CHECKPOINT_REPOSITORY,
|
||||
ADOPTION_INJECTION_RECORD_REPOSITORY,
|
||||
} from '@/domain/repositories';
|
||||
import {
|
||||
DepositTransactionRepositoryImpl,
|
||||
|
|
@ -28,6 +29,7 @@ import {
|
|||
MarketMakerCheckpointRepositoryImpl,
|
||||
PoolAccountDepositRepositoryImpl,
|
||||
PoolAccountCheckpointRepositoryImpl,
|
||||
AdoptionInjectionRecordRepositoryImpl,
|
||||
} from './persistence/repositories';
|
||||
|
||||
@Global()
|
||||
|
|
@ -39,6 +41,7 @@ import {
|
|||
RedisService,
|
||||
EventPublisherService,
|
||||
WithdrawalEventConsumerService,
|
||||
AdoptionInjectionConsumerService,
|
||||
MpcSigningClient,
|
||||
|
||||
// 区块链适配器
|
||||
|
|
@ -88,12 +91,17 @@ import {
|
|||
provide: POOL_ACCOUNT_CHECKPOINT_REPOSITORY,
|
||||
useClass: PoolAccountCheckpointRepositoryImpl,
|
||||
},
|
||||
{
|
||||
provide: ADOPTION_INJECTION_RECORD_REPOSITORY,
|
||||
useClass: AdoptionInjectionRecordRepositoryImpl,
|
||||
},
|
||||
],
|
||||
exports: [
|
||||
PrismaService,
|
||||
RedisService,
|
||||
EventPublisherService,
|
||||
WithdrawalEventConsumerService,
|
||||
AdoptionInjectionConsumerService,
|
||||
MpcSigningClient,
|
||||
EvmProviderAdapter,
|
||||
AddressDerivationAdapter,
|
||||
|
|
@ -110,6 +118,7 @@ import {
|
|||
MARKET_MAKER_CHECKPOINT_REPOSITORY,
|
||||
POOL_ACCOUNT_DEPOSIT_REPOSITORY,
|
||||
POOL_ACCOUNT_CHECKPOINT_REPOSITORY,
|
||||
ADOPTION_INJECTION_RECORD_REPOSITORY,
|
||||
],
|
||||
})
|
||||
export class InfrastructureModule {}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
/**
|
||||
* Adoption fUSDT Injection Event Consumer
|
||||
*
|
||||
* Subscribes to `cdc.contribution.outbox` topic (Debezium CDC)
|
||||
* Filters for `AdoptionFusdtInjectionRequested` events
|
||||
* Triggers fUSDT transfer from injection wallet to market maker wallet
|
||||
*/
|
||||
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
|
||||
|
||||
export interface AdoptionInjectionPayload {
|
||||
originalAdoptionId: string;
|
||||
accountSequence: string;
|
||||
treeCount: number;
|
||||
adoptionDate: string;
|
||||
amount: string;
|
||||
fusdtPerTree: number;
|
||||
}
|
||||
|
||||
export type AdoptionInjectionEventHandler = (payload: AdoptionInjectionPayload) => Promise<void>;
|
||||
|
||||
@Injectable()
|
||||
export class AdoptionInjectionConsumerService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(AdoptionInjectionConsumerService.name);
|
||||
private kafka: Kafka;
|
||||
private consumer: Consumer;
|
||||
private isConnected = false;
|
||||
private readonly topic: string;
|
||||
|
||||
private injectionHandler?: AdoptionInjectionEventHandler;
|
||||
|
||||
constructor(private readonly configService: ConfigService) {
|
||||
this.topic = this.configService.get<string>(
|
||||
'CDC_TOPIC_CONTRIBUTION_OUTBOX',
|
||||
'cdc.contribution.outbox',
|
||||
);
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
|
||||
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mining-blockchain-service';
|
||||
const groupId = 'mining-blockchain-adoption-injection';
|
||||
|
||||
this.logger.log(`[INIT] Adoption Injection Consumer initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] GroupId: ${groupId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
|
||||
this.logger.log(`[INIT] Topic: ${this.topic}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId,
|
||||
brokers,
|
||||
logLevel: logLevel.WARN,
|
||||
retry: {
|
||||
initialRetryTime: 1000,
|
||||
maxRetryTime: 300000,
|
||||
retries: 15,
|
||||
multiplier: 2,
|
||||
restartOnFailure: async () => true,
|
||||
},
|
||||
});
|
||||
|
||||
this.consumer = this.kafka.consumer({
|
||||
groupId,
|
||||
sessionTimeout: 30000,
|
||||
heartbeatInterval: 3000,
|
||||
});
|
||||
|
||||
try {
|
||||
this.logger.log(`[CONNECT] Connecting Adoption Injection consumer...`);
|
||||
await this.consumer.connect();
|
||||
this.isConnected = true;
|
||||
this.logger.log(`[CONNECT] Adoption Injection consumer connected successfully`);
|
||||
|
||||
await this.consumer.subscribe({
|
||||
topics: [this.topic],
|
||||
fromBeginning: false,
|
||||
});
|
||||
this.logger.log(`[SUBSCRIBE] Subscribed to ${this.topic}`);
|
||||
|
||||
await this.startConsuming();
|
||||
} catch (error) {
|
||||
this.logger.error(`[ERROR] Failed to connect Adoption Injection consumer`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
if (this.isConnected) {
|
||||
await this.consumer.disconnect();
|
||||
this.logger.log('Adoption Injection consumer disconnected');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register handler for adoption injection events
|
||||
*/
|
||||
onAdoptionInjectionRequested(handler: AdoptionInjectionEventHandler): void {
|
||||
this.injectionHandler = handler;
|
||||
this.logger.log(`[REGISTER] AdoptionInjectionRequested handler registered`);
|
||||
}
|
||||
|
||||
private async startConsuming(): Promise<void> {
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
|
||||
try {
|
||||
const value = message.value?.toString();
|
||||
if (!value) return;
|
||||
|
||||
const parsed = JSON.parse(value);
|
||||
|
||||
// Extract event_type from Debezium flattened outbox format
|
||||
// ExtractNewRecordState flattens the message to just the row data
|
||||
const eventType = parsed.event_type;
|
||||
|
||||
// Only process AdoptionFusdtInjectionRequested events
|
||||
if (eventType !== 'AdoptionFusdtInjectionRequested') {
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip non-create operations (DELETE, UPDATE)
|
||||
const op = parsed.__op || parsed.op;
|
||||
if (op && op !== 'c' && op !== 'r') {
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[RECEIVE] AdoptionFusdtInjectionRequested event received`);
|
||||
this.logger.log(`[RECEIVE] offset=${message.offset}, partition=${partition}`);
|
||||
|
||||
// Parse the payload (JSON string from outbox_events table)
|
||||
let payload: AdoptionInjectionPayload;
|
||||
try {
|
||||
const rawPayload = typeof parsed.payload === 'string'
|
||||
? JSON.parse(parsed.payload)
|
||||
: parsed.payload;
|
||||
payload = {
|
||||
originalAdoptionId: rawPayload.originalAdoptionId,
|
||||
accountSequence: rawPayload.accountSequence,
|
||||
treeCount: Number(rawPayload.treeCount),
|
||||
adoptionDate: rawPayload.adoptionDate,
|
||||
amount: rawPayload.amount,
|
||||
fusdtPerTree: Number(rawPayload.fusdtPerTree),
|
||||
};
|
||||
} catch (parseErr) {
|
||||
this.logger.error(`[ERROR] Failed to parse injection event payload`, parseErr);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[RECEIVE] adoptionId=${payload.originalAdoptionId}`);
|
||||
this.logger.log(`[RECEIVE] accountSequence=${payload.accountSequence}`);
|
||||
this.logger.log(`[RECEIVE] treeCount=${payload.treeCount}`);
|
||||
this.logger.log(`[RECEIVE] amount=${payload.amount} fUSDT`);
|
||||
|
||||
if (this.injectionHandler) {
|
||||
await this.injectionHandler(payload);
|
||||
this.logger.log(`[HANDLE] AdoptionInjectionRequested handler completed`);
|
||||
} else {
|
||||
this.logger.warn(`[HANDLE] No handler registered for AdoptionInjectionRequested`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`[ERROR] Error processing adoption injection event`, error);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`[START] Started consuming adoption injection events from ${this.topic}`);
|
||||
}
|
||||
}
|
||||
|
|
@ -2,3 +2,4 @@ export * from './event-publisher.service';
|
|||
export * from './event-consumer.controller';
|
||||
export * from './withdrawal-event-consumer.service';
|
||||
export * from './deposit-ack-consumer.service';
|
||||
export * from './adoption-injection-consumer.service';
|
||||
|
|
|
|||
|
|
@ -48,6 +48,9 @@ export class MpcSigningClient {
|
|||
// 200万挖矿池钱包
|
||||
private readonly miningPoolUsername: string;
|
||||
private readonly miningPoolAddress: string;
|
||||
// fUSDT 注入钱包(认种自动转积分值到做市商)
|
||||
private readonly fusdtInjectionUsername: string;
|
||||
private readonly fusdtInjectionAddress: string;
|
||||
// MPC system 配置
|
||||
private readonly mpcAccountServiceUrl: string;
|
||||
private readonly mpcJwtSecret: string;
|
||||
|
|
@ -74,6 +77,9 @@ export class MpcSigningClient {
|
|||
// 200万挖矿池钱包配置
|
||||
this.miningPoolUsername = this.configService.get<string>('MINING_POOL_WALLET_USERNAME', '');
|
||||
this.miningPoolAddress = this.configService.get<string>('MINING_POOL_WALLET_ADDRESS', '');
|
||||
// fUSDT 注入钱包配置(认种自动转积分值到做市商)
|
||||
this.fusdtInjectionUsername = this.configService.get<string>('FUSDT_INJECTION_WALLET_USERNAME', '');
|
||||
this.fusdtInjectionAddress = this.configService.get<string>('FUSDT_INJECTION_WALLET_ADDRESS', '');
|
||||
// MPC system 配置
|
||||
this.mpcAccountServiceUrl = this.configService.get<string>('MPC_ACCOUNT_SERVICE_URL', 'http://localhost:4000');
|
||||
this.mpcJwtSecret = this.configService.get<string>('MPC_JWT_SECRET', '');
|
||||
|
|
@ -96,6 +102,9 @@ export class MpcSigningClient {
|
|||
if (!this.miningPoolUsername || !this.miningPoolAddress) {
|
||||
this.logger.warn('[INIT] Mining Pool wallet not configured');
|
||||
}
|
||||
if (!this.fusdtInjectionUsername || !this.fusdtInjectionAddress) {
|
||||
this.logger.warn('[INIT] fUSDT Injection wallet not configured (adoption auto-injection disabled)');
|
||||
}
|
||||
if (!this.mpcJwtSecret) {
|
||||
this.logger.warn('[INIT] MPC_JWT_SECRET not configured - signing will fail');
|
||||
}
|
||||
|
|
@ -105,6 +114,7 @@ export class MpcSigningClient {
|
|||
this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`);
|
||||
this.logger.log(`[INIT] Burn Pool: ${this.burnPoolAddress || '(not configured)'}`);
|
||||
this.logger.log(`[INIT] Mining Pool: ${this.miningPoolAddress || '(not configured)'}`);
|
||||
this.logger.log(`[INIT] fUSDT Injection: ${this.fusdtInjectionAddress || '(not configured)'}`);
|
||||
this.logger.log(`[INIT] MPC Account Service: ${this.mpcAccountServiceUrl}`);
|
||||
this.logger.log(`[INIT] Using HTTP direct call to mpc-system`);
|
||||
}
|
||||
|
|
@ -214,6 +224,27 @@ export class MpcSigningClient {
|
|||
return this.signMessageWithUsername(this.miningPoolUsername, messageHash);
|
||||
}
|
||||
|
||||
// ============ fUSDT 注入钱包(认种自动转积分值到做市商) ============
|
||||
|
||||
isFusdtInjectionConfigured(): boolean {
|
||||
return !!this.fusdtInjectionUsername && !!this.fusdtInjectionAddress;
|
||||
}
|
||||
|
||||
getFusdtInjectionAddress(): string {
|
||||
return this.fusdtInjectionAddress;
|
||||
}
|
||||
|
||||
getFusdtInjectionUsername(): string {
|
||||
return this.fusdtInjectionUsername;
|
||||
}
|
||||
|
||||
async signMessageAsFusdtInjection(messageHash: string): Promise<string> {
|
||||
if (!this.fusdtInjectionUsername) {
|
||||
throw new Error('fUSDT Injection MPC username not configured');
|
||||
}
|
||||
return this.signMessageWithUsername(this.fusdtInjectionUsername, messageHash);
|
||||
}
|
||||
|
||||
/**
|
||||
* 签名消息(使用 C2C Bot 热钱包)
|
||||
*
|
||||
|
|
|
|||
|
|
@ -0,0 +1,104 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { Prisma } from '@prisma/client';
|
||||
import { PrismaService } from '../prisma/prisma.service';
|
||||
import {
|
||||
IAdoptionInjectionRecordRepository,
|
||||
AdoptionInjectionRecordDto,
|
||||
} from '@/domain/repositories/adoption-injection-record.repository.interface';
|
||||
|
||||
@Injectable()
|
||||
export class AdoptionInjectionRecordRepositoryImpl implements IAdoptionInjectionRecordRepository {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async save(record: AdoptionInjectionRecordDto): Promise<AdoptionInjectionRecordDto> {
|
||||
const data = {
|
||||
adoptionId: record.adoptionId,
|
||||
accountSequence: record.accountSequence,
|
||||
treeCount: record.treeCount,
|
||||
adoptionDate: record.adoptionDate,
|
||||
fromAddress: record.fromAddress,
|
||||
toAddress: record.toAddress,
|
||||
amount: new Prisma.Decimal(record.amount),
|
||||
chainType: record.chainType,
|
||||
txHash: record.txHash,
|
||||
blockNumber: record.blockNumber,
|
||||
gasUsed: record.gasUsed,
|
||||
memo: record.memo,
|
||||
status: record.status,
|
||||
errorMessage: record.errorMessage,
|
||||
};
|
||||
|
||||
if (record.id) {
|
||||
const updated = await this.prisma.adoptionInjectionRecord.update({
|
||||
where: { id: record.id },
|
||||
data,
|
||||
});
|
||||
return this.mapToDto(updated);
|
||||
} else {
|
||||
const created = await this.prisma.adoptionInjectionRecord.create({
|
||||
data,
|
||||
});
|
||||
return this.mapToDto(created);
|
||||
}
|
||||
}
|
||||
|
||||
async findByAdoptionId(adoptionId: string): Promise<AdoptionInjectionRecordDto | null> {
|
||||
const record = await this.prisma.adoptionInjectionRecord.findUnique({
|
||||
where: { adoptionId },
|
||||
});
|
||||
return record ? this.mapToDto(record) : null;
|
||||
}
|
||||
|
||||
async findByAccountSequence(accountSequence: string): Promise<AdoptionInjectionRecordDto[]> {
|
||||
const records = await this.prisma.adoptionInjectionRecord.findMany({
|
||||
where: { accountSequence },
|
||||
orderBy: { createdAt: 'desc' },
|
||||
});
|
||||
return records.map(this.mapToDto);
|
||||
}
|
||||
|
||||
async markConfirmed(id: bigint, txHash: string, blockNumber: bigint, gasUsed: string): Promise<void> {
|
||||
await this.prisma.adoptionInjectionRecord.update({
|
||||
where: { id },
|
||||
data: {
|
||||
txHash,
|
||||
blockNumber,
|
||||
gasUsed,
|
||||
status: 'CONFIRMED',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async markFailed(id: bigint, errorMessage: string): Promise<void> {
|
||||
await this.prisma.adoptionInjectionRecord.update({
|
||||
where: { id },
|
||||
data: {
|
||||
errorMessage,
|
||||
status: 'FAILED',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
private mapToDto(record: any): AdoptionInjectionRecordDto {
|
||||
return {
|
||||
id: record.id,
|
||||
adoptionId: record.adoptionId,
|
||||
accountSequence: record.accountSequence,
|
||||
treeCount: record.treeCount,
|
||||
adoptionDate: record.adoptionDate,
|
||||
fromAddress: record.fromAddress,
|
||||
toAddress: record.toAddress,
|
||||
amount: record.amount.toString(),
|
||||
chainType: record.chainType,
|
||||
txHash: record.txHash,
|
||||
blockNumber: record.blockNumber,
|
||||
gasUsed: record.gasUsed,
|
||||
memo: record.memo,
|
||||
status: record.status,
|
||||
errorMessage: record.errorMessage,
|
||||
createdAt: record.createdAt,
|
||||
updatedAt: record.updatedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -7,3 +7,4 @@ export * from './market-maker-deposit.repository.impl';
|
|||
export * from './market-maker-checkpoint.repository.impl';
|
||||
export * from './pool-account-deposit.repository.impl';
|
||||
export * from './pool-account-checkpoint.repository.impl';
|
||||
export * from './adoption-injection-record.repository.impl';
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ import { ConfigService } from '@nestjs/config';
|
|||
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
|
||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||
import { AppModule } from './app.module';
|
||||
import { Erc20TransferService } from './domain/services/erc20-transfer.service';
|
||||
import { ChainTypeEnum } from './domain/enums';
|
||||
|
||||
async function bootstrap() {
|
||||
const logger = new Logger('Bootstrap');
|
||||
|
|
@ -70,6 +72,11 @@ async function bootstrap() {
|
|||
|
||||
logger.log(`Mining Blockchain service is running on port ${port}`);
|
||||
logger.log(`Swagger docs available at http://localhost:${port}/api`);
|
||||
|
||||
// 异步检查 fUSDT 注入钱包余额(不阻塞启动)
|
||||
checkInjectionWalletBalance(app, logger).catch(err => {
|
||||
logger.error('[STARTUP] Failed to check injection wallet balance', err);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -121,6 +128,20 @@ function validateBlockchainConfig(configService: ConfigService, logger: Logger)
|
|||
warnings.push('[CONFIG WARNING] C2C_BOT_WALLET_USERNAME 未配置!C2C Bot MPC 签名功能将不可用。');
|
||||
}
|
||||
|
||||
// fUSDT 注入钱包配置验证(认种自动转积分值到做市商)
|
||||
const fusdtInjectionAddress = configService.get<string>('FUSDT_INJECTION_WALLET_ADDRESS');
|
||||
const fusdtInjectionUsername = configService.get<string>('FUSDT_INJECTION_WALLET_USERNAME');
|
||||
|
||||
logger.log(`[CONFIG] fUSDT 注入钱包配置(认种自动注入):`);
|
||||
if (fusdtInjectionAddress) {
|
||||
logger.log(` - 地址: ${fusdtInjectionAddress}`);
|
||||
} else {
|
||||
warnings.push('[CONFIG WARNING] FUSDT_INJECTION_WALLET_ADDRESS 未配置!认种自动 fUSDT 注入功能将不可用。');
|
||||
}
|
||||
if (!fusdtInjectionUsername) {
|
||||
warnings.push('[CONFIG WARNING] FUSDT_INJECTION_WALLET_USERNAME 未配置!认种自动 fUSDT 注入签名将不可用。');
|
||||
}
|
||||
|
||||
// KAVA 代币合约配置日志
|
||||
const eUsdtContract = configService.get<string>('blockchain.kava.eUsdtContract');
|
||||
const fUsdtContract = configService.get<string>('blockchain.kava.fUsdtContract');
|
||||
|
|
@ -139,4 +160,33 @@ function validateBlockchainConfig(configService: ConfigService, logger: Logger)
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查 fUSDT 注入钱包余额
|
||||
* 如果余额为 0,输出警告日志
|
||||
*/
|
||||
async function checkInjectionWalletBalance(app: any, logger: Logger) {
|
||||
try {
|
||||
const erc20Service = app.get(Erc20TransferService);
|
||||
const injectionAddress = erc20Service.getFusdtInjectionAddress();
|
||||
|
||||
if (!injectionAddress) {
|
||||
logger.warn('[BALANCE] fUSDT 注入钱包未配置,跳过余额检查');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.log(`[BALANCE] 正在查询 fUSDT 注入钱包余额: ${injectionAddress}`);
|
||||
const balance = await erc20Service.getFusdtInjectionTokenBalance(ChainTypeEnum.KAVA);
|
||||
const balanceNum = parseFloat(balance);
|
||||
|
||||
if (balanceNum === 0) {
|
||||
logger.warn(`[BALANCE] ⚠️ fUSDT 注入钱包余额为 0!地址: ${injectionAddress}`);
|
||||
logger.warn(`[BALANCE] ⚠️ 认种自动注入将因余额不足而失败,请及时充值!`);
|
||||
} else {
|
||||
logger.log(`[BALANCE] fUSDT 注入钱包余额: ${balance} fUSDT`);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`[BALANCE] 无法查询 fUSDT 注入钱包余额 (链可能未就绪): ${error instanceof Error ? error.message : error}`);
|
||||
}
|
||||
}
|
||||
|
||||
bootstrap();
|
||||
|
|
|
|||
Loading…
Reference in New Issue