diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 6af8821a..628a3984 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -41,6 +41,7 @@ # admin -> mining-admin-service # auth -> auth-service # wallet -> mining-wallet-service +# blockchain -> mining-blockchain-service # set -e @@ -60,6 +61,7 @@ MINING_SERVICES=( "mining-admin-service" "auth-service" "mining-wallet-service" + "mining-blockchain-service" ) # Service Aliases @@ -71,6 +73,7 @@ declare -A SERVICE_ALIASES=( ["admin"]="mining-admin-service" ["auth"]="auth-service" ["wallet"]="mining-wallet-service" + ["blockchain"]="mining-blockchain-service" ) # 2.0 Databases @@ -81,6 +84,7 @@ MINING_DATABASES=( "rwa_mining_admin" "rwa_auth" "rwa_mining_wallet" + "rwa_blockchain" ) # Service to Database mapping @@ -91,6 +95,7 @@ declare -A SERVICE_DB=( ["mining-admin-service"]="rwa_mining_admin" ["auth-service"]="rwa_auth" ["mining-wallet-service"]="rwa_mining_wallet" + ["mining-blockchain-service"]="rwa_blockchain" ) # 2.0 Ports @@ -101,6 +106,7 @@ declare -A SERVICE_PORTS=( ["mining-admin-service"]="3023" ["auth-service"]="3024" ["mining-wallet-service"]="3025" + ["mining-blockchain-service"]="3026" ) # CDC Consumer Groups (all groups that need to be reset during full-reset) @@ -1546,6 +1552,7 @@ show_help() { echo " admin -> mining-admin-service" echo " auth -> auth-service" echo " wallet -> mining-wallet-service" + echo " blockchain -> mining-blockchain-service" echo "" echo -e "${BOLD}Examples:${NC}" echo " $0 up # Start all services" diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index da71f2bf..ba5ed04d 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -245,6 +245,61 @@ services: networks: - rwa-network + mining-blockchain-service: + build: + context: ./mining-blockchain-service + dockerfile: Dockerfile + container_name: rwa-mining-blockchain-service + environment: + NODE_ENV: production + TZ: Asia/Shanghai + PORT: 3026 + # PostgreSQL - 使用独立的数据库 + DATABASE_URL: postgresql://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@postgres:5432/rwa_blockchain?schema=public + # Redis - 使用 DB 16 隔离 + REDIS_HOST: redis + REDIS_PORT: 6379 + REDIS_PASSWORD: ${REDIS_PASSWORD:-} + REDIS_DB: 16 + # Kafka - 用于 MPC 签名通信和事件发布 + KAFKA_BROKERS: kafka:29092 + # JWT 配置 + JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} + # 区块链配置 + NETWORK_MODE: ${NETWORK_MODE:-mainnet} + # KAVA 配置 + KAVA_RPC_URL: ${KAVA_RPC_URL:-https://evm.kava.io} + KAVA_CHAIN_ID: ${KAVA_CHAIN_ID:-2222} + KAVA_USDT_CONTRACT: ${KAVA_USDT_CONTRACT:-0xA9F3A35dBa8699c8C681D8db03F0c1A8CEB9D7c3} + # 积分股合约 (eUSDT - Energy USDT) + KAVA_EUSDT_CONTRACT: ${KAVA_EUSDT_CONTRACT:-0x7C3275D808eFbAE90C06C7E3A9AfDdcAa8563931} + # 积分值合约 (fUSDT - Future USDT) + KAVA_FUSDT_CONTRACT: ${KAVA_FUSDT_CONTRACT:-0x14dc4f7d3E4197438d058C3D156dd9826A161134} + # BSC 配置 + BSC_RPC_URL: ${BSC_RPC_URL:-https://bsc-dataseed.binance.org} + BSC_CHAIN_ID: ${BSC_CHAIN_ID:-56} + BSC_USDT_CONTRACT: ${BSC_USDT_CONTRACT:-0x55d398326f99059fF775485246999027B3197955} + # 热钱包配置 (MPC) + HOT_WALLET_USERNAME: ${HOT_WALLET_USERNAME:-} + HOT_WALLET_ADDRESS: ${HOT_WALLET_ADDRESS:-} + # 做市商 MPC 钱包配置(必须配置,否则充值监控不生效) + MARKET_MAKER_WALLET_ADDRESS: ${MARKET_MAKER_WALLET_ADDRESS:-} + MARKET_MAKER_MPC_USERNAME: ${MARKET_MAKER_MPC_USERNAME:-} + # 区块扫描配置 + BLOCK_SCAN_INTERVAL_MS: ${BLOCK_SCAN_INTERVAL_MS:-5000} + BLOCK_CONFIRMATIONS_REQUIRED: ${BLOCK_CONFIRMATIONS_REQUIRED:-12} + ports: + - "3026:3026" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3026/api/v2/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + restart: unless-stopped + networks: + - rwa-network + # =========================================================================== # Frontend Services (2.0) # =========================================================================== diff --git a/backend/services/mining-blockchain-service/prisma/schema.prisma b/backend/services/mining-blockchain-service/prisma/schema.prisma index 78230ecd..9b01d520 100644 --- a/backend/services/mining-blockchain-service/prisma/schema.prisma +++ b/backend/services/mining-blockchain-service/prisma/schema.prisma @@ -258,3 +258,72 @@ model BlockchainEvent { @@index([occurredAt], name: "idx_event_occurred") @@map("blockchain_events") } + +// ============================================ +// 做市商充值交易表 +// 记录检测到的做市商钱包 eUSDT/fUSDT 充值 +// ============================================ +model MarketMakerDeposit { + id BigInt @id @default(autoincrement()) @map("deposit_id") + + chainType String @map("chain_type") @db.VarChar(20) // KAVA + txHash String @unique @map("tx_hash") @db.VarChar(66) + + fromAddress String @map("from_address") @db.VarChar(42) + toAddress String @map("to_address") @db.VarChar(42) // 做市商钱包地址 + + // 代币类型: EUSDT (积分股) 或 FUSDT (积分值) + assetType String @map("asset_type") @db.VarChar(10) + tokenContract String @map("token_contract") @db.VarChar(42) + amount Decimal @db.Decimal(78, 0) // 原始金额 (wei单位) + amountFormatted Decimal @map("amount_formatted") @db.Decimal(36, 8) // 格式化金额 + + blockNumber BigInt @map("block_number") + blockTimestamp DateTime @map("block_timestamp") + logIndex Int @map("log_index") + + // 确认状态 + confirmations Int @default(0) + status String @default("DETECTED") @db.VarChar(20) // DETECTED, CONFIRMING, CONFIRMED, CREDITED, FAILED + + // 通知状态 (通知 trading-service 记账) + notifiedAt DateTime? @map("notified_at") + notifyAttempts Int @default(0) @map("notify_attempts") + lastNotifyError String? @map("last_notify_error") @db.Text + + // 记账状态 (trading-service 确认) + creditedAt DateTime? @map("credited_at") + creditReference String? @map("credit_reference") @db.VarChar(100) // 记账流水号 + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@index([chainType, status], name: "idx_mm_chain_status") + @@index([assetType], name: "idx_mm_asset_type") + @@index([blockNumber], name: "idx_mm_block") + @@index([status, notifiedAt], name: "idx_mm_pending_notify") + @@map("market_maker_deposits") +} + +// ============================================ +// 做市商区块扫描检查点 +// 独立于普通用户的扫描进度 +// ============================================ +model MarketMakerBlockCheckpoint { + id BigInt @id @default(autoincrement()) @map("checkpoint_id") + + chainType String @map("chain_type") @db.VarChar(20) + assetType String @map("asset_type") @db.VarChar(10) // EUSDT 或 FUSDT + + lastScannedBlock BigInt @map("last_scanned_block") + lastScannedAt DateTime @map("last_scanned_at") + + isHealthy Boolean @default(true) @map("is_healthy") + lastError String? @map("last_error") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@unique([chainType, assetType], name: "uk_mm_chain_asset") + @@map("market_maker_block_checkpoints") +} diff --git a/backend/services/mining-blockchain-service/src/application/application.module.ts b/backend/services/mining-blockchain-service/src/application/application.module.ts index 50a3c8c4..b0677eab 100644 --- a/backend/services/mining-blockchain-service/src/application/application.module.ts +++ b/backend/services/mining-blockchain-service/src/application/application.module.ts @@ -2,12 +2,15 @@ import { Module } from '@nestjs/common'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; import { DomainModule } from '@/domain/domain.module'; import { MpcTransferInitializerService } from './services/mpc-transfer-initializer.service'; +import { MarketMakerDepositDetectionService } from './services/market-maker-deposit-detection.service'; @Module({ imports: [InfrastructureModule, DomainModule], providers: [ // MPC 签名客户端注入 MpcTransferInitializerService, + // 做市商充值检测服务 + MarketMakerDepositDetectionService, ], exports: [], }) diff --git a/backend/services/mining-blockchain-service/src/application/services/market-maker-deposit-detection.service.ts b/backend/services/mining-blockchain-service/src/application/services/market-maker-deposit-detection.service.ts new file mode 100644 index 00000000..751bcd8e --- /dev/null +++ b/backend/services/mining-blockchain-service/src/application/services/market-maker-deposit-detection.service.ts @@ -0,0 +1,308 @@ +import { Injectable, Logger, Inject, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { EvmProviderAdapter, TransferEvent } from '@/infrastructure/blockchain/evm-provider.adapter'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { ConfirmationPolicyService } from '@/domain/services/confirmation-policy.service'; +import { ChainConfigService } from '@/domain/services/chain-config.service'; +import { + MARKET_MAKER_DEPOSIT_REPOSITORY, + IMarketMakerDepositRepository, + MarketMakerDepositDto, + MARKET_MAKER_CHECKPOINT_REPOSITORY, + IMarketMakerCheckpointRepository, +} from '@/domain/repositories/market-maker-deposit.repository.interface'; +import { + OUTBOX_EVENT_REPOSITORY, + IOutboxEventRepository, +} from '@/domain/repositories/outbox-event.repository.interface'; +import { ChainType, BlockNumber } from '@/domain/value-objects'; +import { ChainTypeEnum, MarketMakerAssetType, MarketMakerDepositStatus } from '@/domain/enums'; +import { MarketMakerDepositConfirmedEvent } from '@/domain/events'; + +interface TokenConfig { + assetType: MarketMakerAssetType; + contract: string; + name: string; +} + +/** + * 做市商充值检测服务 + * + * 负责扫描区块链、检测做市商钱包的 eUSDT/fUSDT 充值、更新确认状态 + * 当充值达到确认数后,通过 Kafka 通知 trading-service 进行入账 + */ +@Injectable() +export class MarketMakerDepositDetectionService implements OnModuleInit { + private readonly logger = new Logger(MarketMakerDepositDetectionService.name); + private readonly scanBatchSize: number; + private readonly marketMakerWallet: string; + private readonly tokenConfigs: TokenConfig[]; + private isEnabled: boolean = false; + + constructor( + private readonly configService: ConfigService, + private readonly evmProvider: EvmProviderAdapter, + private readonly eventPublisher: EventPublisherService, + private readonly confirmationPolicy: ConfirmationPolicyService, + private readonly chainConfig: ChainConfigService, + @Inject(MARKET_MAKER_DEPOSIT_REPOSITORY) + private readonly depositRepo: IMarketMakerDepositRepository, + @Inject(MARKET_MAKER_CHECKPOINT_REPOSITORY) + private readonly checkpointRepo: IMarketMakerCheckpointRepository, + @Inject(OUTBOX_EVENT_REPOSITORY) + private readonly outboxRepo: IOutboxEventRepository, + ) { + this.scanBatchSize = this.configService.get('blockchain.scanBatchSize', 100); + this.marketMakerWallet = this.configService.get('blockchain.marketMaker.walletAddress', ''); + + // 配置代币信息 + const eUsdtContract = this.configService.get('blockchain.kava.eUsdtContract', ''); + const fUsdtContract = this.configService.get('blockchain.kava.fUsdtContract', ''); + + this.tokenConfigs = [ + { + assetType: MarketMakerAssetType.EUSDT, + contract: eUsdtContract, + name: '积分股 (eUSDT)', + }, + { + assetType: MarketMakerAssetType.FUSDT, + contract: fUsdtContract, + name: '积分值 (fUSDT)', + }, + ]; + } + + async onModuleInit() { + // 验证配置 + if (!this.marketMakerWallet) { + this.logger.warn( + '[INIT] 做市商钱包地址未配置 (MARKET_MAKER_WALLET_ADDRESS),做市商充值监控功能已禁用', + ); + this.isEnabled = false; + return; + } + + // 验证代币合约配置 + const missingContracts = this.tokenConfigs.filter((t) => !t.contract); + if (missingContracts.length > 0) { + this.logger.warn( + `[INIT] 以下代币合约未配置: ${missingContracts.map((t) => t.name).join(', ')}`, + ); + } + + this.isEnabled = true; + this.logger.log(`[INIT] MarketMakerDepositDetectionService initialized`); + this.logger.log(`[INIT] 做市商钱包地址: ${this.marketMakerWallet}`); + this.tokenConfigs.forEach((t) => { + this.logger.log(`[INIT] ${t.name}: ${t.contract || '未配置'}`); + }); + } + + /** + * 定时扫描区块(每5秒) + */ + @Cron(CronExpression.EVERY_5_SECONDS) + async scanBlocks(): Promise { + if (!this.isEnabled) return; + + const chainType = ChainType.fromEnum(ChainTypeEnum.KAVA); + + // 扫描每种代币 + for (const tokenConfig of this.tokenConfigs) { + if (!tokenConfig.contract) continue; + + try { + await this.scanTokenDeposits(chainType, tokenConfig); + } catch (error) { + this.logger.error(`Error scanning ${tokenConfig.name} deposits:`, error); + await this.checkpointRepo.recordError( + chainType, + tokenConfig.assetType, + error instanceof Error ? error.message : 'Unknown error', + ); + } + } + } + + /** + * 扫描单种代币的充值 + */ + private async scanTokenDeposits(chainType: ChainType, tokenConfig: TokenConfig): Promise { + // 获取上次扫描位置 + let lastBlock = await this.checkpointRepo.getLastScannedBlock(chainType, tokenConfig.assetType); + + if (!lastBlock) { + // 首次扫描,从当前区块开始 + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType); + lastBlock = currentBlock.subtract(10); // 从10个块前开始 + await this.checkpointRepo.initializeIfNotExists(chainType, tokenConfig.assetType, lastBlock); + } + + // 获取当前区块 + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType); + + // 计算扫描范围 + const fromBlock = lastBlock.add(1); + let toBlock = fromBlock.add(this.scanBatchSize - 1); + + // 不超过当前区块 + if (toBlock.isGreaterThan(currentBlock)) { + toBlock = currentBlock; + } + + // 如果没有新区块,跳过 + if (fromBlock.isGreaterThan(currentBlock)) { + return; + } + + this.logger.debug( + `Scanning ${tokenConfig.name}: blocks ${fromBlock} to ${toBlock}`, + ); + + // 扫描 Transfer 事件 + const events = await this.evmProvider.scanTransferEvents( + chainType, + fromBlock, + toBlock, + tokenConfig.contract, + ); + + // 过滤出充值到做市商钱包的交易 + const deposits = events.filter( + (e) => e.to.toLowerCase() === this.marketMakerWallet.toLowerCase(), + ); + + // 处理充值 + for (const deposit of deposits) { + await this.processDeposit(chainType, tokenConfig, deposit); + } + + // 更新检查点 + if (toBlock.isGreaterThan(lastBlock)) { + await this.checkpointRepo.updateCheckpoint(chainType, tokenConfig.assetType, toBlock); + } + } + + /** + * 处理检测到的充值 + */ + private async processDeposit( + chainType: ChainType, + tokenConfig: TokenConfig, + event: TransferEvent, + ): Promise { + // 检查是否已处理 + if (await this.depositRepo.existsByTxHash(event.txHash)) { + this.logger.debug(`Market maker deposit already processed: ${event.txHash}`); + return; + } + + // 获取代币的 decimals + const tokenDecimals = await this.evmProvider.getTokenDecimals( + chainType, + tokenConfig.contract, + ); + + // 格式化金额 + const divisor = BigInt(10 ** tokenDecimals); + const integerPart = event.value / divisor; + const fractionalPart = event.value % divisor; + const amountFormatted = `${integerPart}.${fractionalPart.toString().padStart(tokenDecimals, '0')}`; + + // 创建充值记录 + const deposit: MarketMakerDepositDto = { + chainType: chainType.toString(), + txHash: event.txHash, + fromAddress: event.from, + toAddress: event.to, + assetType: tokenConfig.assetType, + tokenContract: tokenConfig.contract, + amount: event.value, + amountFormatted, + blockNumber: event.blockNumber, + blockTimestamp: event.blockTimestamp, + logIndex: event.logIndex, + confirmations: 0, + status: MarketMakerDepositStatus.DETECTED, + notifyAttempts: 0, + }; + + // 保存 + await this.depositRepo.save(deposit); + + this.logger.log( + `[DEPOSIT] ${tokenConfig.name} deposit detected: ${event.txHash.slice(0, 10)}... -> ${this.marketMakerWallet.slice(0, 10)}... (${amountFormatted})`, + ); + } + + /** + * 定时更新确认状态(每30秒) + */ + @Cron(CronExpression.EVERY_30_SECONDS) + async updateConfirmations(): Promise { + if (!this.isEnabled) return; + + const chainType = ChainType.fromEnum(ChainTypeEnum.KAVA); + + try { + await this.updateChainConfirmations(chainType); + } catch (error) { + this.logger.error(`Error updating market maker confirmations:`, error); + } + } + + /** + * 更新确认状态 + */ + private async updateChainConfirmations(chainType: ChainType): Promise { + const pendingDeposits = await this.depositRepo.findPendingConfirmation(chainType); + if (pendingDeposits.length === 0) return; + + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType); + const requiredConfirmations = this.confirmationPolicy.getRequiredConfirmations(chainType); + + for (const deposit of pendingDeposits) { + const confirmations = Number(currentBlock.value - deposit.blockNumber); + const isConfirmed = confirmations >= requiredConfirmations; + + // 更新状态 + const newStatus = isConfirmed + ? MarketMakerDepositStatus.CONFIRMED + : MarketMakerDepositStatus.CONFIRMING; + + await this.depositRepo.updateConfirmations(deposit.id!, confirmations, newStatus); + + // 如果已确认,创建事件 + if (isConfirmed && deposit.status !== MarketMakerDepositStatus.CONFIRMED) { + const event = new MarketMakerDepositConfirmedEvent({ + depositId: deposit.id!.toString(), + chainType: deposit.chainType, + txHash: deposit.txHash, + fromAddress: deposit.fromAddress, + toAddress: deposit.toAddress, + assetType: deposit.assetType, + tokenContract: deposit.tokenContract, + amount: deposit.amount.toString(), + amountFormatted: deposit.amountFormatted, + confirmations, + blockNumber: deposit.blockNumber.toString(), + blockTimestamp: deposit.blockTimestamp.toISOString(), + }); + + // 写入 outbox,保证可靠投递 + await this.outboxRepo.create({ + eventType: event.eventType, + aggregateId: deposit.id!.toString(), + aggregateType: 'MarketMakerDeposit', + payload: event.toPayload(), + }); + + this.logger.log( + `[CONFIRMED] Market maker deposit confirmed: ${deposit.txHash.slice(0, 10)}... (${confirmations} confirmations, ${deposit.assetType})`, + ); + } + } + } +} diff --git a/backend/services/mining-blockchain-service/src/config/blockchain.config.ts b/backend/services/mining-blockchain-service/src/config/blockchain.config.ts index b7c7e012..446b1e71 100644 --- a/backend/services/mining-blockchain-service/src/config/blockchain.config.ts +++ b/backend/services/mining-blockchain-service/src/config/blockchain.config.ts @@ -10,6 +10,11 @@ import { registerAs } from '@nestjs/config'; * 测试网配置: * - BSC Testnet: Chain ID 97, 水龙头: https://testnet.bnbchain.org/faucet-smart * - KAVA Testnet: Chain ID 2221, 水龙头: https://faucet.kava.io + * + * 代币说明: + * - eUSDT (Energy USDT): 积分股代币,总供应量 100.02 亿 + * - fUSDT (Future USDT): 积分值代币,总供应量 1 万亿 + * - dUSDT (Durian USDT): 绿积分代币(旧版本,保留兼容) */ export default registerAs('blockchain', () => { const networkMode = process.env.NETWORK_MODE || 'mainnet'; @@ -33,14 +38,22 @@ export default registerAs('blockchain', () => { chainId: parseInt(process.env.KAVA_CHAIN_ID || '2221', 10), // 测试网 USDT 合约 (自定义部署的 TestUSDT) usdtContract: process.env.KAVA_USDT_CONTRACT || '0xc12f6A4A7Fd0965085B044A67a39CcA2ff7fe0dF', + // 积分股合约 (eUSDT - Energy USDT) + eUsdtContract: process.env.KAVA_EUSDT_CONTRACT || '0x7C3275D808eFbAE90C06C7E3A9AfDdcAa8563931', + // 积分值合约 (fUSDT - Future USDT) + fUsdtContract: process.env.KAVA_FUSDT_CONTRACT || '0x14dc4f7d3E4197438d058C3D156dd9826A161134', confirmations: parseInt(process.env.KAVA_CONFIRMATIONS || '3', 10), } : { // KAVA Mainnet rpcUrl: process.env.KAVA_RPC_URL || 'https://evm.kava.io', chainId: parseInt(process.env.KAVA_CHAIN_ID || '2222', 10), - // dUSDT (绿积分) 合约地址 - Durian USDT, 精度6位 + // dUSDT (绿积分) 合约地址 - Durian USDT, 精度6位(旧版本,保留兼容) usdtContract: process.env.KAVA_USDT_CONTRACT || '0xA9F3A35dBa8699c8C681D8db03F0c1A8CEB9D7c3', + // 积分股合约 (eUSDT - Energy USDT),总供应量 100.02 亿 + eUsdtContract: process.env.KAVA_EUSDT_CONTRACT || '0x7C3275D808eFbAE90C06C7E3A9AfDdcAa8563931', + // 积分值合约 (fUSDT - Future USDT),总供应量 1 万亿 + fUsdtContract: process.env.KAVA_FUSDT_CONTRACT || '0x14dc4f7d3E4197438d058C3D156dd9826A161134', confirmations: parseInt(process.env.KAVA_CONFIRMATIONS || '12', 10), }, @@ -61,5 +74,13 @@ export default registerAs('blockchain', () => { usdtContract: process.env.BSC_USDT_CONTRACT || '0x55d398326f99059fF775485246999027B3197955', confirmations: parseInt(process.env.BSC_CONFIRMATIONS || '15', 10), }, + + // 做市商 MPC 钱包配置 + marketMaker: { + // 做市商 MPC 钱包地址(用于充值监控和提现) + walletAddress: process.env.MARKET_MAKER_WALLET_ADDRESS || '', + // MPC 用户名(用于签名) + mpcUsername: process.env.MARKET_MAKER_MPC_USERNAME || '', + }, }; }); diff --git a/backend/services/mining-blockchain-service/src/domain/enums/index.ts b/backend/services/mining-blockchain-service/src/domain/enums/index.ts index 1248ea7d..c1c9d6e3 100644 --- a/backend/services/mining-blockchain-service/src/domain/enums/index.ts +++ b/backend/services/mining-blockchain-service/src/domain/enums/index.ts @@ -1,3 +1,5 @@ export * from './chain-type.enum'; export * from './deposit-status.enum'; export * from './transaction-status.enum'; +export * from './market-maker-asset-type.enum'; +export * from './market-maker-deposit-status.enum'; diff --git a/backend/services/mining-blockchain-service/src/domain/enums/market-maker-asset-type.enum.ts b/backend/services/mining-blockchain-service/src/domain/enums/market-maker-asset-type.enum.ts new file mode 100644 index 00000000..1bb17aff --- /dev/null +++ b/backend/services/mining-blockchain-service/src/domain/enums/market-maker-asset-type.enum.ts @@ -0,0 +1,9 @@ +/** + * 做市商资产类型 + */ +export enum MarketMakerAssetType { + /** 积分股 (Energy USDT) */ + EUSDT = 'EUSDT', + /** 积分值 (Future USDT) */ + FUSDT = 'FUSDT', +} diff --git a/backend/services/mining-blockchain-service/src/domain/enums/market-maker-deposit-status.enum.ts b/backend/services/mining-blockchain-service/src/domain/enums/market-maker-deposit-status.enum.ts new file mode 100644 index 00000000..e3acdf4b --- /dev/null +++ b/backend/services/mining-blockchain-service/src/domain/enums/market-maker-deposit-status.enum.ts @@ -0,0 +1,15 @@ +/** + * 做市商充值状态 + */ +export enum MarketMakerDepositStatus { + /** 已检测到 */ + DETECTED = 'DETECTED', + /** 确认中 */ + CONFIRMING = 'CONFIRMING', + /** 已确认 (达到确认数) */ + CONFIRMED = 'CONFIRMED', + /** 已入账 (trading-service 确认入账) */ + CREDITED = 'CREDITED', + /** 失败 */ + FAILED = 'FAILED', +} diff --git a/backend/services/mining-blockchain-service/src/domain/events/index.ts b/backend/services/mining-blockchain-service/src/domain/events/index.ts index b69c1096..96dbe253 100644 --- a/backend/services/mining-blockchain-service/src/domain/events/index.ts +++ b/backend/services/mining-blockchain-service/src/domain/events/index.ts @@ -3,3 +3,4 @@ export * from './deposit-detected.event'; export * from './deposit-confirmed.event'; export * from './wallet-address-created.event'; export * from './transaction-broadcasted.event'; +export * from './market-maker-deposit-confirmed.event'; diff --git a/backend/services/mining-blockchain-service/src/domain/events/market-maker-deposit-confirmed.event.ts b/backend/services/mining-blockchain-service/src/domain/events/market-maker-deposit-confirmed.event.ts new file mode 100644 index 00000000..2fc49365 --- /dev/null +++ b/backend/services/mining-blockchain-service/src/domain/events/market-maker-deposit-confirmed.event.ts @@ -0,0 +1,34 @@ +import { DomainEvent } from './domain-event.base'; + +export interface MarketMakerDepositConfirmedPayload { + depositId: string; + chainType: string; + txHash: string; + fromAddress: string; + toAddress: string; + assetType: 'EUSDT' | 'FUSDT'; // 积分股 或 积分值 + tokenContract: string; + amount: string; + amountFormatted: string; + confirmations: number; + blockNumber: string; + blockTimestamp: string; + [key: string]: unknown; +} + +/** + * 做市商充值确认事件 + * 当做市商钱包的 eUSDT/fUSDT 充值交易达到确认数时触发 + * 由 trading-service 消费,用于做市商账户入账 + */ +export class MarketMakerDepositConfirmedEvent extends DomainEvent { + readonly eventType = 'blockchain.market_maker.deposit.confirmed'; + + constructor(private readonly payload: MarketMakerDepositConfirmedPayload) { + super(); + } + + toPayload(): MarketMakerDepositConfirmedPayload { + return this.payload; + } +} diff --git a/backend/services/mining-blockchain-service/src/domain/repositories/index.ts b/backend/services/mining-blockchain-service/src/domain/repositories/index.ts index 17e87bdc..d11aeec9 100644 --- a/backend/services/mining-blockchain-service/src/domain/repositories/index.ts +++ b/backend/services/mining-blockchain-service/src/domain/repositories/index.ts @@ -3,3 +3,4 @@ export * from './monitored-address.repository.interface'; export * from './block-checkpoint.repository.interface'; export * from './transaction-request.repository.interface'; export * from './outbox-event.repository.interface'; +export * from './market-maker-deposit.repository.interface'; diff --git a/backend/services/mining-blockchain-service/src/domain/repositories/market-maker-deposit.repository.interface.ts b/backend/services/mining-blockchain-service/src/domain/repositories/market-maker-deposit.repository.interface.ts new file mode 100644 index 00000000..ab047f92 --- /dev/null +++ b/backend/services/mining-blockchain-service/src/domain/repositories/market-maker-deposit.repository.interface.ts @@ -0,0 +1,139 @@ +import { ChainType, TxHash, BlockNumber } from '@/domain/value-objects'; +import { MarketMakerAssetType, MarketMakerDepositStatus } from '@/domain/enums'; + +export const MARKET_MAKER_DEPOSIT_REPOSITORY = Symbol('MARKET_MAKER_DEPOSIT_REPOSITORY'); + +/** + * 做市商充值记录 DTO + */ +export interface MarketMakerDepositDto { + id?: bigint; + chainType: string; + txHash: string; + fromAddress: string; + toAddress: string; + assetType: MarketMakerAssetType; + tokenContract: string; + amount: bigint; + amountFormatted: string; + blockNumber: bigint; + blockTimestamp: Date; + logIndex: number; + confirmations: number; + status: MarketMakerDepositStatus; + notifiedAt?: Date | null; + notifyAttempts: number; + lastNotifyError?: string | null; + creditedAt?: Date | null; + creditReference?: string | null; + createdAt?: Date; + updatedAt?: Date; +} + +export interface IMarketMakerDepositRepository { + /** + * 保存充值记录 + */ + save(deposit: MarketMakerDepositDto): Promise; + + /** + * 根据ID查找 + */ + findById(id: bigint): Promise; + + /** + * 根据交易哈希查找 + */ + findByTxHash(txHash: string): Promise; + + /** + * 检查交易是否存在 + */ + existsByTxHash(txHash: string): Promise; + + /** + * 查找待确认的充值 + */ + findPendingConfirmation(chainType: ChainType): Promise; + + /** + * 查找待通知的充值 + */ + findPendingNotification(): Promise; + + /** + * 根据资产类型查找 + */ + findByAssetType( + chainType: ChainType, + assetType: MarketMakerAssetType, + limit?: number, + ): Promise; + + /** + * 更新确认状态 + */ + updateConfirmations( + id: bigint, + confirmations: number, + status: MarketMakerDepositStatus, + ): Promise; + + /** + * 更新通知状态 + */ + updateNotifyStatus( + id: bigint, + notifiedAt: Date, + notifyAttempts: number, + error?: string, + ): Promise; + + /** + * 更新入账状态 + */ + updateCreditedStatus( + id: bigint, + creditedAt: Date, + creditReference: string, + ): Promise; +} + +export const MARKET_MAKER_CHECKPOINT_REPOSITORY = Symbol('MARKET_MAKER_CHECKPOINT_REPOSITORY'); + +export interface MarketMakerCheckpointDto { + id?: bigint; + chainType: string; + assetType: MarketMakerAssetType; + lastScannedBlock: bigint; + lastScannedAt: Date; + isHealthy: boolean; + lastError?: string | null; +} + +export interface IMarketMakerCheckpointRepository { + /** + * 获取上次扫描的区块 + */ + getLastScannedBlock(chainType: ChainType, assetType: MarketMakerAssetType): Promise; + + /** + * 更新检查点 + */ + updateCheckpoint(chainType: ChainType, assetType: MarketMakerAssetType, blockNumber: BlockNumber): Promise; + + /** + * 初始化检查点(如果不存在) + */ + initializeIfNotExists(chainType: ChainType, assetType: MarketMakerAssetType, blockNumber: BlockNumber): Promise; + + /** + * 记录错误 + */ + recordError(chainType: ChainType, assetType: MarketMakerAssetType, error: string): Promise; + + /** + * 标记健康 + */ + markHealthy(chainType: ChainType, assetType: MarketMakerAssetType): Promise; +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts index 00266b78..f8d4403d 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts @@ -12,6 +12,8 @@ import { BLOCK_CHECKPOINT_REPOSITORY, TRANSACTION_REQUEST_REPOSITORY, OUTBOX_EVENT_REPOSITORY, + MARKET_MAKER_DEPOSIT_REPOSITORY, + MARKET_MAKER_CHECKPOINT_REPOSITORY, } from '@/domain/repositories'; import { DepositTransactionRepositoryImpl, @@ -19,6 +21,8 @@ import { BlockCheckpointRepositoryImpl, TransactionRequestRepositoryImpl, OutboxEventRepositoryImpl, + MarketMakerDepositRepositoryImpl, + MarketMakerCheckpointRepositoryImpl, } from './persistence/repositories'; @Global() @@ -64,6 +68,14 @@ import { provide: OUTBOX_EVENT_REPOSITORY, useClass: OutboxEventRepositoryImpl, }, + { + provide: MARKET_MAKER_DEPOSIT_REPOSITORY, + useClass: MarketMakerDepositRepositoryImpl, + }, + { + provide: MARKET_MAKER_CHECKPOINT_REPOSITORY, + useClass: MarketMakerCheckpointRepositoryImpl, + }, ], exports: [ PrismaService, @@ -83,6 +95,8 @@ import { BLOCK_CHECKPOINT_REPOSITORY, TRANSACTION_REQUEST_REPOSITORY, OUTBOX_EVENT_REPOSITORY, + MARKET_MAKER_DEPOSIT_REPOSITORY, + MARKET_MAKER_CHECKPOINT_REPOSITORY, ], }) export class InfrastructureModule {} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/event-publisher.service.ts index dd4b4723..257e88a6 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/event-publisher.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/event-publisher.service.ts @@ -105,6 +105,8 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { 'blockchain.transaction.broadcasted': 'blockchain.transactions', // MPC 签名请求 - 发送到 mpc-service 消费的 topic 'blockchain.mpc.signing.requested': 'mpc.SigningRequested', + // 做市商充值事件 - 发送到 trading-service 消费的 topic + 'blockchain.market_maker.deposit.confirmed': 'blockchain.market_maker.deposits', }; return topicMap[eventType] || 'blockchain.events'; } diff --git a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts index 8fc5c7a3..a0fe72fa 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/index.ts @@ -3,3 +3,5 @@ export * from './monitored-address.repository.impl'; export * from './block-checkpoint.repository.impl'; export * from './transaction-request.repository.impl'; export * from './outbox-event.repository.impl'; +export * from './market-maker-deposit.repository.impl'; +export * from './market-maker-checkpoint.repository.impl'; diff --git a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-checkpoint.repository.impl.ts b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-checkpoint.repository.impl.ts new file mode 100644 index 00000000..88e7376f --- /dev/null +++ b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-checkpoint.repository.impl.ts @@ -0,0 +1,123 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IMarketMakerCheckpointRepository } from '@/domain/repositories/market-maker-deposit.repository.interface'; +import { ChainType, BlockNumber } from '@/domain/value-objects'; +import { MarketMakerAssetType } from '@/domain/enums'; + +@Injectable() +export class MarketMakerCheckpointRepositoryImpl implements IMarketMakerCheckpointRepository { + constructor(private readonly prisma: PrismaService) {} + + async getLastScannedBlock( + chainType: ChainType, + assetType: MarketMakerAssetType, + ): Promise { + const record = await this.prisma.marketMakerBlockCheckpoint.findUnique({ + where: { + uk_mm_chain_asset: { + chainType: chainType.toString(), + assetType, + }, + }, + }); + return record ? BlockNumber.create(record.lastScannedBlock) : null; + } + + async updateCheckpoint( + chainType: ChainType, + assetType: MarketMakerAssetType, + blockNumber: BlockNumber, + ): Promise { + await this.prisma.marketMakerBlockCheckpoint.upsert({ + where: { + uk_mm_chain_asset: { + chainType: chainType.toString(), + assetType, + }, + }, + update: { + lastScannedBlock: blockNumber.value, + lastScannedAt: new Date(), + isHealthy: true, + lastError: null, + }, + create: { + chainType: chainType.toString(), + assetType, + lastScannedBlock: blockNumber.value, + lastScannedAt: new Date(), + isHealthy: true, + }, + }); + } + + async initializeIfNotExists( + chainType: ChainType, + assetType: MarketMakerAssetType, + blockNumber: BlockNumber, + ): Promise { + const existing = await this.prisma.marketMakerBlockCheckpoint.findUnique({ + where: { + uk_mm_chain_asset: { + chainType: chainType.toString(), + assetType, + }, + }, + }); + + if (!existing) { + await this.prisma.marketMakerBlockCheckpoint.create({ + data: { + chainType: chainType.toString(), + assetType, + lastScannedBlock: blockNumber.value, + lastScannedAt: new Date(), + isHealthy: true, + }, + }); + } + } + + async recordError( + chainType: ChainType, + assetType: MarketMakerAssetType, + error: string, + ): Promise { + await this.prisma.marketMakerBlockCheckpoint.upsert({ + where: { + uk_mm_chain_asset: { + chainType: chainType.toString(), + assetType, + }, + }, + update: { + isHealthy: false, + lastError: error, + }, + create: { + chainType: chainType.toString(), + assetType, + lastScannedBlock: BigInt(0), + lastScannedAt: new Date(), + isHealthy: false, + lastError: error, + }, + }); + } + + async markHealthy( + chainType: ChainType, + assetType: MarketMakerAssetType, + ): Promise { + await this.prisma.marketMakerBlockCheckpoint.updateMany({ + where: { + chainType: chainType.toString(), + assetType, + }, + data: { + isHealthy: true, + lastError: null, + }, + }); + } +} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-deposit.repository.impl.ts b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-deposit.repository.impl.ts new file mode 100644 index 00000000..59cff759 --- /dev/null +++ b/backend/services/mining-blockchain-service/src/infrastructure/persistence/repositories/market-maker-deposit.repository.impl.ts @@ -0,0 +1,181 @@ +import { Injectable } from '@nestjs/common'; +import { Prisma } from '@prisma/client'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IMarketMakerDepositRepository, + MarketMakerDepositDto, +} from '@/domain/repositories/market-maker-deposit.repository.interface'; +import { ChainType, BlockNumber } from '@/domain/value-objects'; +import { MarketMakerAssetType, MarketMakerDepositStatus } from '@/domain/enums'; + +@Injectable() +export class MarketMakerDepositRepositoryImpl implements IMarketMakerDepositRepository { + constructor(private readonly prisma: PrismaService) {} + + async save(deposit: MarketMakerDepositDto): Promise { + const data = { + chainType: deposit.chainType, + txHash: deposit.txHash, + fromAddress: deposit.fromAddress, + toAddress: deposit.toAddress, + assetType: deposit.assetType, + tokenContract: deposit.tokenContract, + amount: new Prisma.Decimal(deposit.amount.toString()), + amountFormatted: new Prisma.Decimal(deposit.amountFormatted), + blockNumber: deposit.blockNumber, + blockTimestamp: deposit.blockTimestamp, + logIndex: deposit.logIndex, + confirmations: deposit.confirmations, + status: deposit.status, + notifiedAt: deposit.notifiedAt, + notifyAttempts: deposit.notifyAttempts, + lastNotifyError: deposit.lastNotifyError, + creditedAt: deposit.creditedAt, + creditReference: deposit.creditReference, + }; + + if (deposit.id) { + const updated = await this.prisma.marketMakerDeposit.update({ + where: { id: deposit.id }, + data, + }); + return this.mapToDto(updated); + } else { + const created = await this.prisma.marketMakerDeposit.create({ + data, + }); + return this.mapToDto(created); + } + } + + async findById(id: bigint): Promise { + const record = await this.prisma.marketMakerDeposit.findUnique({ + where: { id }, + }); + return record ? this.mapToDto(record) : null; + } + + async findByTxHash(txHash: string): Promise { + const record = await this.prisma.marketMakerDeposit.findUnique({ + where: { txHash }, + }); + return record ? this.mapToDto(record) : null; + } + + async existsByTxHash(txHash: string): Promise { + const count = await this.prisma.marketMakerDeposit.count({ + where: { txHash }, + }); + return count > 0; + } + + async findPendingConfirmation(chainType: ChainType): Promise { + const records = await this.prisma.marketMakerDeposit.findMany({ + where: { + chainType: chainType.toString(), + status: { + in: [MarketMakerDepositStatus.DETECTED, MarketMakerDepositStatus.CONFIRMING], + }, + }, + orderBy: { blockNumber: 'asc' }, + }); + return records.map(this.mapToDto); + } + + async findPendingNotification(): Promise { + const records = await this.prisma.marketMakerDeposit.findMany({ + where: { + status: MarketMakerDepositStatus.CONFIRMED, + notifiedAt: null, + }, + orderBy: { createdAt: 'asc' }, + take: 100, + }); + return records.map(this.mapToDto); + } + + async findByAssetType( + chainType: ChainType, + assetType: MarketMakerAssetType, + limit: number = 50, + ): Promise { + const records = await this.prisma.marketMakerDeposit.findMany({ + where: { + chainType: chainType.toString(), + assetType, + }, + orderBy: { createdAt: 'desc' }, + take: limit, + }); + return records.map(this.mapToDto); + } + + async updateConfirmations( + id: bigint, + confirmations: number, + status: MarketMakerDepositStatus, + ): Promise { + await this.prisma.marketMakerDeposit.update({ + where: { id }, + data: { confirmations, status }, + }); + } + + async updateNotifyStatus( + id: bigint, + notifiedAt: Date, + notifyAttempts: number, + error?: string, + ): Promise { + await this.prisma.marketMakerDeposit.update({ + where: { id }, + data: { + notifiedAt, + notifyAttempts, + lastNotifyError: error, + }, + }); + } + + async updateCreditedStatus( + id: bigint, + creditedAt: Date, + creditReference: string, + ): Promise { + await this.prisma.marketMakerDeposit.update({ + where: { id }, + data: { + creditedAt, + creditReference, + status: MarketMakerDepositStatus.CREDITED, + }, + }); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private mapToDto(record: any): MarketMakerDepositDto { + return { + id: record.id, + chainType: record.chainType, + txHash: record.txHash, + fromAddress: record.fromAddress, + toAddress: record.toAddress, + assetType: record.assetType as MarketMakerAssetType, + tokenContract: record.tokenContract, + amount: BigInt(record.amount.toString()), + amountFormatted: record.amountFormatted.toString(), + blockNumber: record.blockNumber, + blockTimestamp: record.blockTimestamp, + logIndex: record.logIndex, + confirmations: record.confirmations, + status: record.status as MarketMakerDepositStatus, + notifiedAt: record.notifiedAt, + notifyAttempts: record.notifyAttempts, + lastNotifyError: record.lastNotifyError, + creditedAt: record.creditedAt, + creditReference: record.creditReference, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + }; + } +} diff --git a/backend/services/mining-blockchain-service/src/main.ts b/backend/services/mining-blockchain-service/src/main.ts index cf903db4..a4b387cf 100644 --- a/backend/services/mining-blockchain-service/src/main.ts +++ b/backend/services/mining-blockchain-service/src/main.ts @@ -61,6 +61,9 @@ async function bootstrap() { await app.startAllMicroservices(); logger.log('Kafka microservice started for MPC signing'); + // 验证关键配置 + validateBlockchainConfig(configService, logger); + // 启动 HTTP 服务 const port = configService.get('app.port', 3020); await app.listen(port); @@ -69,4 +72,69 @@ async function bootstrap() { logger.log(`Swagger docs available at http://localhost:${port}/api`); } +/** + * 验证区块链关键配置 + * 如果缺少必要配置,输出明确的错误日志 + */ +function validateBlockchainConfig(configService: ConfigService, logger: Logger) { + const errors: string[] = []; + const warnings: string[] = []; + + // 做市商钱包配置验证 + const marketMakerWallet = configService.get('blockchain.marketMaker.walletAddress'); + const marketMakerMpcUsername = configService.get('blockchain.marketMaker.mpcUsername'); + + if (!marketMakerWallet) { + errors.push( + '[CONFIG ERROR] MARKET_MAKER_WALLET_ADDRESS 未配置!做市商充值监控功能将不可用。' + + '请在 .env 中配置: MARKET_MAKER_WALLET_ADDRESS=0x...' + ); + } else { + logger.log(`[CONFIG] 做市商钱包地址: ${marketMakerWallet}`); + } + + if (!marketMakerMpcUsername) { + warnings.push( + '[CONFIG WARNING] MARKET_MAKER_MPC_USERNAME 未配置!做市商提现功能将不可用。' + + '请在 .env 中配置: MARKET_MAKER_MPC_USERNAME=your_mpc_username' + ); + } + + // 热钱包配置验证 + const hotWalletAddress = configService.get('HOT_WALLET_ADDRESS'); + const hotWalletUsername = configService.get('HOT_WALLET_USERNAME'); + + if (!hotWalletAddress) { + warnings.push( + '[CONFIG WARNING] HOT_WALLET_ADDRESS 未配置!普通用户的转账功能将不可用。' + ); + } + + if (!hotWalletUsername) { + warnings.push( + '[CONFIG WARNING] HOT_WALLET_USERNAME 未配置!MPC 签名功能将不可用。' + ); + } + + // KAVA 代币合约配置日志 + const eUsdtContract = configService.get('blockchain.kava.eUsdtContract'); + const fUsdtContract = configService.get('blockchain.kava.fUsdtContract'); + const usdtContract = configService.get('blockchain.kava.usdtContract'); + + logger.log(`[CONFIG] KAVA 代币合约配置:`); + logger.log(` - eUSDT (积分股): ${eUsdtContract || '未配置'}`); + logger.log(` - fUSDT (积分值): ${fUsdtContract || '未配置'}`); + logger.log(` - dUSDT (绿积分): ${usdtContract || '未配置'}`); + + // 输出所有警告 + warnings.forEach(warning => logger.warn(warning)); + + // 输出所有错误 + errors.forEach(error => logger.error(error)); + + if (errors.length > 0) { + logger.error(`[CONFIG] 发现 ${errors.length} 个配置错误,部分功能将不可用!`); + } +} + bootstrap(); diff --git a/backend/services/trading-service/prisma/schema.prisma b/backend/services/trading-service/prisma/schema.prisma index ed50994a..90b00db8 100644 --- a/backend/services/trading-service/prisma/schema.prisma +++ b/backend/services/trading-service/prisma/schema.prisma @@ -468,9 +468,16 @@ model MarketMakerConfig { id String @id @default(uuid()) name String @unique // 做市商名称,如 "MAIN_MARKET_MAKER" accountSequence String @unique @map("account_sequence") // 做市商专用交易账户 + + // ============ 区块链钱包配置(MPC)============ + // 做市商 Kava 链钱包地址(MPC 钱包,用于充值监控和提现) + kavaWalletAddress String? @map("kava_wallet_address") @db.VarChar(42) + // MPC 用户名(用于签名提现交易) + mpcUsername String? @map("mpc_username") @db.VarChar(100) + // 资金配置(从TradingAccount同步,此处仅用于快速查询) - cashBalance Decimal @default(0) @map("cash_balance") @db.Decimal(30, 8) // 资金池余额 - shareBalance Decimal @default(0) @map("share_balance") @db.Decimal(30, 8) // 持有积分股余额 + cashBalance Decimal @default(0) @map("cash_balance") @db.Decimal(30, 8) // 资金池余额(积分值/fUSDT) + shareBalance Decimal @default(0) @map("share_balance") @db.Decimal(30, 8) // 持有积分股余额(eUSDT) frozenCash Decimal @default(0) @map("frozen_cash") @db.Decimal(30, 8) // 冻结资金 frozenShares Decimal @default(0) @map("frozen_shares") @db.Decimal(30, 8) // 冻结积分股 @@ -519,6 +526,8 @@ model MarketMakerConfig { ledgers MarketMakerLedger[] makerOrders MarketMakerOrder[] + deposits MarketMakerDeposit[] + withdraws MarketMakerWithdraw[] @@map("market_maker_configs") } @@ -703,3 +712,129 @@ model C2cOrder { @@index([botPurchased]) @@map("c2c_orders") } + +// ==================== 做市商链上充提记录 ==================== + +// 做市商充值类型 +enum MarketMakerAssetType { + EUSDT // 积分股 (Energy USDT) + FUSDT // 积分值 (Future USDT) +} + +// 做市商充值状态 +enum MarketMakerDepositStatus { + DETECTED // 已检测到 + CONFIRMING // 确认中 + CONFIRMED // 已确认 + CREDITED // 已入账 + FAILED // 失败 +} + +// 做市商充值记录(从区块链检测到的充值) +model MarketMakerDeposit { + id String @id @default(uuid()) + marketMakerId String @map("market_maker_id") + + // 区块链信息 + chainType String @map("chain_type") @db.VarChar(20) // KAVA + txHash String @unique @map("tx_hash") @db.VarChar(66) // 交易哈希(唯一) + blockNumber BigInt @map("block_number") + blockTimestamp DateTime @map("block_timestamp") + + // 资产信息 + assetType MarketMakerAssetType @map("asset_type") // EUSDT 或 FUSDT + tokenContract String @map("token_contract") @db.VarChar(42) // 代币合约地址 + fromAddress String @map("from_address") @db.VarChar(42) // 来源地址 + toAddress String @map("to_address") @db.VarChar(42) // 做市商钱包地址 + amount Decimal @db.Decimal(36, 8) // 充值金额 + amountRaw Decimal @map("amount_raw") @db.Decimal(78, 0) // 原始金额(wei) + + // 确认状态 + confirmations Int @default(0) // 当前确认数 + requiredConfirms Int @default(12) @map("required_confirms") // 所需确认数 + status MarketMakerDepositStatus @default(DETECTED) + + // 入账信息 + creditedAt DateTime? @map("credited_at") // 入账时间 + creditedAmount Decimal? @map("credited_amount") @db.Decimal(36, 8) // 入账金额 + ledgerId String? @map("ledger_id") // 关联的 Ledger 记录 ID + + // 错误信息 + errorMessage String? @map("error_message") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + marketMaker MarketMakerConfig @relation(fields: [marketMakerId], references: [id]) + + @@index([marketMakerId]) + @@index([status]) + @@index([chainType, status]) + @@index([assetType]) + @@index([blockNumber]) + @@index([createdAt(sort: Desc)]) + @@map("market_maker_deposits") +} + +// 做市商提现状态 +enum MarketMakerWithdrawStatus { + PENDING // 待处理 + SIGNING // 签名中(MPC) + SIGNED // 已签名 + BROADCAST // 已广播 + CONFIRMING // 确认中 + COMPLETED // 已完成 + FAILED // 失败 + CANCELLED // 已取消 +} + +// 做市商提现记录 +model MarketMakerWithdraw { + id String @id @default(uuid()) + withdrawNo String @unique @map("withdraw_no") // 提现单号 + marketMakerId String @map("market_maker_id") + + // 资产信息 + assetType MarketMakerAssetType @map("asset_type") // EUSDT 或 FUSDT + amount Decimal @db.Decimal(36, 8) // 提现金额 + + // 区块链信息 + chainType String @map("chain_type") @db.VarChar(20) // KAVA + tokenContract String @map("token_contract") @db.VarChar(42) // 代币合约地址 + fromAddress String @map("from_address") @db.VarChar(42) // 做市商钱包地址 + toAddress String @map("to_address") @db.VarChar(42) // 目标地址 + + // 交易信息 + txHash String? @map("tx_hash") @db.VarChar(66) // 交易哈希 + blockNumber BigInt? @map("block_number") + confirmations Int @default(0) + gasUsed Decimal? @map("gas_used") @db.Decimal(36, 0) + gasPrice Decimal? @map("gas_price") @db.Decimal(36, 0) + + // 状态 + status MarketMakerWithdrawStatus @default(PENDING) + + // 关联 Ledger + ledgerId String? @map("ledger_id") // 冻结时的 Ledger 记录 ID + + // 错误信息 + errorMessage String? @map("error_message") @db.Text + retryCount Int @default(0) @map("retry_count") + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + signedAt DateTime? @map("signed_at") + broadcastAt DateTime? @map("broadcast_at") + completedAt DateTime? @map("completed_at") + cancelledAt DateTime? @map("cancelled_at") + + marketMaker MarketMakerConfig @relation(fields: [marketMakerId], references: [id]) + + @@index([marketMakerId]) + @@index([status]) + @@index([assetType]) + @@index([txHash]) + @@index([createdAt(sort: Desc)]) + @@map("market_maker_withdraws") +} diff --git a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts index cfa86128..7cdc2aa2 100644 --- a/backend/services/trading-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/trading-service/src/infrastructure/infrastructure.module.ts @@ -17,6 +17,7 @@ import { RedisService } from './redis/redis.service'; import { KafkaProducerService } from './kafka/kafka-producer.service'; import { UserRegisteredConsumer } from './kafka/consumers/user-registered.consumer'; import { CdcConsumerService } from './kafka/cdc-consumer.service'; +import { MarketMakerDepositConsumerService } from './kafka/market-maker-deposit-consumer.service'; import { BlockchainClient } from './blockchain/blockchain.client'; import { IdentityClient } from './identity/identity.client'; @@ -59,6 +60,7 @@ import { IdentityClient } from './identity/identity.client'; C2cOrderRepository, KafkaProducerService, CdcConsumerService, + MarketMakerDepositConsumerService, BlockchainClient, IdentityClient, { diff --git a/backend/services/trading-service/src/infrastructure/kafka/index.ts b/backend/services/trading-service/src/infrastructure/kafka/index.ts index 13e788e8..4b6eab1f 100644 --- a/backend/services/trading-service/src/infrastructure/kafka/index.ts +++ b/backend/services/trading-service/src/infrastructure/kafka/index.ts @@ -1 +1,3 @@ export * from './kafka-producer.service'; +export * from './cdc-consumer.service'; +export * from './market-maker-deposit-consumer.service'; diff --git a/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts b/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts new file mode 100644 index 00000000..12971a31 --- /dev/null +++ b/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts @@ -0,0 +1,295 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { ProcessedEventRepository } from '../persistence/repositories/processed-event.repository'; +import { RedisService } from '../redis/redis.service'; +import { Prisma, MarketMakerAssetType, MarketMakerDepositStatus } from '@prisma/client'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +interface MarketMakerDepositConfirmedPayload { + depositId: string; + chainType: string; + txHash: string; + fromAddress: string; + toAddress: string; + assetType: 'EUSDT' | 'FUSDT'; + tokenContract: string; + amount: string; + amountFormatted: string; + confirmations: number; + blockNumber: string; + blockTimestamp: string; +} + +/** + * Market Maker Deposit Consumer Service + * 监听 mining-blockchain-service 的做市商充值确认事件 + * 当充值确认后,为做市商账户入账 + */ +@Injectable() +export class MarketMakerDepositConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(MarketMakerDepositConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isRunning = false; + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + private readonly processedEventRepository: ProcessedEventRepository, + private readonly redis: RedisService, + ) { + const brokers = this.configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','); + + this.kafka = new Kafka({ + clientId: 'trading-service-mm-deposit', + brokers, + }); + + this.consumer = this.kafka.consumer({ + groupId: this.configService.get( + 'MM_DEPOSIT_CONSUMER_GROUP', + 'trading-service-mm-deposit-group', + ), + }); + } + + async onModuleInit() { + await this.start(); + } + + async onModuleDestroy() { + await this.stop(); + } + + async start(): Promise { + if (this.isRunning) { + this.logger.warn('Market maker deposit consumer is already running'); + return; + } + + const topic = 'blockchain.market_maker.deposits'; + + try { + await this.consumer.connect(); + this.logger.log('Market maker deposit consumer connected'); + + await this.consumer.subscribe({ + topics: [topic], + fromBeginning: false, + }); + this.logger.log(`Subscribed to topic: ${topic}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('Market maker deposit consumer started - listening for deposit confirmed events'); + } catch (error) { + this.logger.error('Failed to start market maker deposit consumer', error); + } + } + + async stop(): Promise { + if (!this.isRunning) { + return; + } + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('Market maker deposit consumer stopped'); + } catch (error) { + this.logger.error('Failed to stop market maker deposit consumer', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, message } = payload; + + try { + if (!message.value) { + return; + } + + const eventData = JSON.parse(message.value.toString()); + + // 检查事件类型 + if (eventData.eventType !== 'blockchain.market_maker.deposit.confirmed') { + return; + } + + const depositPayload = eventData.payload as MarketMakerDepositConfirmedPayload; + await this.handleDepositConfirmed(eventData.eventId, depositPayload); + } catch (error) { + this.logger.error(`Error processing message from topic ${topic}`, error); + } + } + + /** + * 处理做市商充值确认事件 + */ + private async handleDepositConfirmed( + eventId: string, + payload: MarketMakerDepositConfirmedPayload, + ): Promise { + this.logger.log( + `Processing MarketMakerDepositConfirmed: ${payload.txHash}, amount: ${payload.amountFormatted} ${payload.assetType}`, + ); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + // 查找做市商配置(通过钱包地址) + const marketMaker = await this.prisma.marketMakerConfig.findFirst({ + where: { + kavaWalletAddress: { + equals: payload.toAddress, + mode: 'insensitive', + }, + }, + }); + + if (!marketMaker) { + this.logger.error( + `Market maker not found for wallet address: ${payload.toAddress}`, + ); + await this.markEventProcessed(eventId); + return; + } + + // 使用事务处理入账 + await this.prisma.$transaction(async (tx) => { + // 1. 创建或更新充值记录 + const depositRecord = await tx.marketMakerDeposit.upsert({ + where: { txHash: payload.txHash }, + create: { + marketMakerId: marketMaker.id, + chainType: payload.chainType, + txHash: payload.txHash, + blockNumber: BigInt(payload.blockNumber), + blockTimestamp: new Date(payload.blockTimestamp), + assetType: payload.assetType as MarketMakerAssetType, + tokenContract: payload.tokenContract, + fromAddress: payload.fromAddress, + toAddress: payload.toAddress, + amount: new Prisma.Decimal(payload.amountFormatted), + amountRaw: new Prisma.Decimal(payload.amount), + confirmations: payload.confirmations, + requiredConfirms: 12, + status: MarketMakerDepositStatus.CONFIRMED, + }, + update: { + confirmations: payload.confirmations, + status: MarketMakerDepositStatus.CONFIRMED, + }, + }); + + // 检查是否已入账 + if (depositRecord.status === MarketMakerDepositStatus.CREDITED) { + this.logger.debug(`Deposit ${payload.txHash} already credited, skipping`); + return; + } + + // 2. 根据资产类型更新余额 + const assetField = payload.assetType === 'EUSDT' ? 'shareBalance' : 'cashBalance'; + const currentBalance = payload.assetType === 'EUSDT' + ? marketMaker.shareBalance + : marketMaker.cashBalance; + const newBalance = currentBalance.add(new Prisma.Decimal(payload.amountFormatted)); + + // 更新做市商余额 + await tx.marketMakerConfig.update({ + where: { id: marketMaker.id }, + data: { + [assetField]: newBalance, + }, + }); + + // 3. 创建流水记录 + const ledger = await tx.marketMakerLedger.create({ + data: { + marketMakerId: marketMaker.id, + type: 'DEPOSIT', + assetType: payload.assetType === 'EUSDT' ? 'SHARE' : 'CASH', + amount: new Prisma.Decimal(payload.amountFormatted), + balanceBefore: currentBalance, + balanceAfter: newBalance, + memo: `区块链充值: ${payload.txHash.slice(0, 10)}... (${payload.fromAddress.slice(0, 10)}...)`, + }, + }); + + // 4. 更新充值记录状态为已入账 + await tx.marketMakerDeposit.update({ + where: { id: depositRecord.id }, + data: { + status: MarketMakerDepositStatus.CREDITED, + creditedAt: new Date(), + creditedAmount: new Prisma.Decimal(payload.amountFormatted), + ledgerId: ledger.id, + }, + }); + + this.logger.log( + `[CREDITED] Market maker deposit: ${payload.txHash.slice(0, 10)}... | ` + + `${payload.assetType}: ${payload.amountFormatted} | ` + + `Balance: ${currentBalance} -> ${newBalance}`, + ); + }); + + // 标记为已处理 + await this.markEventProcessed(eventId); + } catch (error) { + this.logger.error( + `Failed to process deposit for ${payload.txHash}`, + error instanceof Error ? error.stack : error, + ); + throw error; + } + } + + private async isEventProcessed(eventId: string): Promise { + const redisKey = `trading:processed-event:mm-deposit:${eventId}`; + + const cached = await this.redis.get(redisKey); + if (cached) return true; + + const dbRecord = await this.processedEventRepository.findByEventId(eventId); + if (dbRecord) { + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + private async markEventProcessed(eventId: string): Promise { + const redisKey = `trading:processed-event:mm-deposit:${eventId}`; + + try { + await this.processedEventRepository.create({ + eventId, + eventType: 'MarketMakerDepositConfirmed', + sourceService: 'mining-blockchain-service', + }); + } catch (error) { + if (!(error instanceof Error && error.message.includes('Unique constraint'))) { + throw error; + } + } + + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } +}