From 84d619edf9b0856abc8f14631e303b3cc9e5ff1b Mon Sep 17 00:00:00 2001 From: Developer Date: Wed, 3 Dec 2025 20:23:56 -0800 Subject: [PATCH] =?UTF-8?q?docs(blockchain-service):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=8C=BA=E5=9D=97=E9=93=BE=E6=9C=8D=E5=8A=A1=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E6=8C=87=E5=8D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 定义 blockchain-service 领域边界和职责 - 设计 DDD + 六边形架构目录结构 - 设计 Prisma 数据模型 (MonitoredAddress, DepositTransaction, BlockCheckpoint, TransactionRequest) - 设计领域层 (聚合根、值对象、领域事件、仓储接口) - 设计基础设施层 (EVM Provider、事件监听器、区块扫描器、地址缓存) - 设计应用层 (充值检测服务、余额查询服务) - 定义 Kafka 事件和消费者 - 配置 Docker 部署和 Kong 路由 - 制定从 identity-service 迁移计划 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../blockchain-service/DEVELOPMENT_GUIDE.md | 1877 +++++++++++++++++ 1 file changed, 1877 insertions(+) create mode 100644 backend/services/blockchain-service/DEVELOPMENT_GUIDE.md diff --git a/backend/services/blockchain-service/DEVELOPMENT_GUIDE.md b/backend/services/blockchain-service/DEVELOPMENT_GUIDE.md new file mode 100644 index 00000000..b8aa17aa --- /dev/null +++ b/backend/services/blockchain-service/DEVELOPMENT_GUIDE.md @@ -0,0 +1,1877 @@ +# Blockchain Service 开发指南 + +## 1. 服务概述 + +### 1.1 服务定位 + +blockchain-service 是 RWA 榴莲女皇平台的区块链基础设施服务,负责: + +- **链上事件监听**:监听 ERC20 Transfer 事件,检测用户充值 +- **充值入账触发**:检测到充值后通知 wallet-service 入账 +- **余额查询**:查询链上 USDT/原生代币余额 +- **交易广播**:提交签名后的交易到链上 +- **地址管理**:管理平台充值地址池 + +### 1.2 技术栈 + +| 组件 | 技术选型 | +|------|----------| +| 框架 | NestJS 10.x | +| 语言 | TypeScript 5.x | +| 数据库 | PostgreSQL 15 + Prisma | +| 消息队列 | Kafka | +| 缓存 | Redis | +| 区块链 | ethers.js 6.x | +| 容器化 | Docker | + +### 1.3 端口分配 + +- HTTP API: `3012` +- 数据库: `rwa_blockchain` (共享 PostgreSQL) +- Redis DB: `11` + +--- + +## 2. 架构设计 + +### 2.1 六边形架构 (Hexagonal Architecture) + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ blockchain-service │ +├─────────────────────────────────────────────────────────────────────┤ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ API Layer │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ Health Ctrl │ │ Balance Ctrl │ │ Internal Ctrl│ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ Application Layer │ │ +│ │ ┌──────────────────┐ ┌──────────────────┐ │ │ +│ │ │ DepositDetection │ │ BalanceQuery │ │ │ +│ │ │ Service │ │ Service │ │ │ +│ │ └──────────────────┘ └──────────────────┘ │ │ +│ │ ┌──────────────────┐ ┌──────────────────┐ │ │ +│ │ │ TransactionBroad │ │ AddressRegistry │ │ │ +│ │ │ castService │ │ Service │ │ │ +│ │ └──────────────────┘ └──────────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ Domain Layer │ │ +│ │ ┌────────────────────────────────────────────────────────┐ │ │ +│ │ │ Aggregates │ │ │ +│ │ │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │ │ │ +│ │ │ │DepositTx │ │MonitoredAddr│ │TransactionRequest │ │ │ │ +│ │ │ └────────────┘ └────────────┘ └────────────────────┘ │ │ │ +│ │ └────────────────────────────────────────────────────────┘ │ │ +│ │ ┌────────────────────────────────────────────────────────┐ │ │ +│ │ │ Domain Events │ │ │ +│ │ │ DepositDetected, TransactionBroadcasted, BlockScanned │ │ │ +│ │ └────────────────────────────────────────────────────────┘ │ │ +│ │ ┌────────────────────────────────────────────────────────┐ │ │ +│ │ │ Repository Interfaces (Ports) │ │ │ +│ │ │ IDepositTxRepository, IMonitoredAddressRepository │ │ │ +│ │ └────────────────────────────────────────────────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ Infrastructure Layer │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ Prisma Repos │ │ Kafka │ │ Redis Cache │ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ EVM Provider │ │ Event │ │ Block │ │ │ +│ │ │ Adapter │ │ Listener │ │ Scanner │ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +### 2.2 目录结构 + +``` +blockchain-service/ +├── prisma/ +│ └── schema.prisma +├── src/ +│ ├── api/ # 入站适配器 (Driving Adapters) +│ │ ├── controllers/ +│ │ │ ├── health.controller.ts +│ │ │ ├── balance.controller.ts +│ │ │ └── internal.controller.ts +│ │ └── dto/ +│ │ ├── request/ +│ │ │ └── query-balance.dto.ts +│ │ └── response/ +│ │ └── balance.dto.ts +│ │ +│ ├── application/ # 应用层 (Use Cases) +│ │ ├── services/ +│ │ │ ├── deposit-detection.service.ts +│ │ │ ├── balance-query.service.ts +│ │ │ ├── transaction-broadcast.service.ts +│ │ │ └── address-registry.service.ts +│ │ ├── commands/ +│ │ │ ├── register-address/ +│ │ │ │ ├── register-address.command.ts +│ │ │ │ └── register-address.handler.ts +│ │ │ └── broadcast-transaction/ +│ │ │ ├── broadcast-transaction.command.ts +│ │ │ └── broadcast-transaction.handler.ts +│ │ └── queries/ +│ │ ├── get-balance/ +│ │ │ ├── get-balance.query.ts +│ │ │ └── get-balance.handler.ts +│ │ └── get-deposit-history/ +│ │ ├── get-deposit-history.query.ts +│ │ └── get-deposit-history.handler.ts +│ │ +│ ├── domain/ # 领域层 (核心业务) +│ │ ├── aggregates/ +│ │ │ ├── deposit-transaction/ +│ │ │ │ ├── deposit-transaction.aggregate.ts +│ │ │ │ ├── deposit-transaction.factory.ts +│ │ │ │ └── index.ts +│ │ │ ├── monitored-address/ +│ │ │ │ ├── monitored-address.aggregate.ts +│ │ │ │ └── index.ts +│ │ │ └── transaction-request/ +│ │ │ ├── transaction-request.aggregate.ts +│ │ │ └── index.ts +│ │ ├── entities/ +│ │ │ └── block-checkpoint.entity.ts +│ │ ├── events/ +│ │ │ ├── domain-event.base.ts +│ │ │ ├── deposit-detected.event.ts +│ │ │ ├── deposit-confirmed.event.ts +│ │ │ ├── transaction-broadcasted.event.ts +│ │ │ └── index.ts +│ │ ├── repositories/ +│ │ │ ├── deposit-transaction.repository.interface.ts +│ │ │ ├── monitored-address.repository.interface.ts +│ │ │ ├── block-checkpoint.repository.interface.ts +│ │ │ └── index.ts +│ │ ├── services/ +│ │ │ ├── confirmation-policy.service.ts +│ │ │ └── chain-config.service.ts +│ │ ├── value-objects/ +│ │ │ ├── chain-type.vo.ts +│ │ │ ├── tx-hash.vo.ts +│ │ │ ├── evm-address.vo.ts +│ │ │ ├── token-amount.vo.ts +│ │ │ ├── block-number.vo.ts +│ │ │ └── index.ts +│ │ └── enums/ +│ │ ├── deposit-status.enum.ts +│ │ ├── chain-type.enum.ts +│ │ └── index.ts +│ │ +│ ├── infrastructure/ # 出站适配器 (Driven Adapters) +│ │ ├── blockchain/ +│ │ │ ├── evm-provider.adapter.ts +│ │ │ ├── event-listener.service.ts +│ │ │ ├── block-scanner.service.ts +│ │ │ ├── transaction-sender.service.ts +│ │ │ └── blockchain.module.ts +│ │ ├── persistence/ +│ │ │ ├── prisma/ +│ │ │ │ └── prisma.service.ts +│ │ │ ├── repositories/ +│ │ │ │ ├── deposit-transaction.repository.impl.ts +│ │ │ │ ├── monitored-address.repository.impl.ts +│ │ │ │ └── block-checkpoint.repository.impl.ts +│ │ │ └── mappers/ +│ │ │ └── deposit-transaction.mapper.ts +│ │ ├── kafka/ +│ │ │ ├── event-publisher.service.ts +│ │ │ ├── event-consumer.controller.ts +│ │ │ └── kafka.module.ts +│ │ ├── redis/ +│ │ │ ├── redis.service.ts +│ │ │ ├── address-cache.service.ts +│ │ │ └── redis.module.ts +│ │ ├── external/ +│ │ │ ├── wallet-service/ +│ │ │ │ └── wallet-client.service.ts +│ │ │ └── identity-service/ +│ │ │ └── identity-client.service.ts +│ │ └── infrastructure.module.ts +│ │ +│ ├── config/ +│ │ ├── app.config.ts +│ │ ├── database.config.ts +│ │ ├── kafka.config.ts +│ │ ├── redis.config.ts +│ │ ├── chain.config.ts +│ │ └── index.ts +│ │ +│ ├── shared/ +│ │ ├── decorators/ +│ │ │ ├── public.decorator.ts +│ │ │ └── index.ts +│ │ ├── exceptions/ +│ │ │ ├── domain.exception.ts +│ │ │ ├── blockchain.exception.ts +│ │ │ └── index.ts +│ │ ├── filters/ +│ │ │ ├── global-exception.filter.ts +│ │ │ └── domain-exception.filter.ts +│ │ └── interceptors/ +│ │ └── transform.interceptor.ts +│ │ +│ ├── app.module.ts +│ └── main.ts +│ +├── test/ +│ ├── unit/ +│ ├── integration/ +│ └── e2e/ +│ +├── .env.example +├── Dockerfile +├── docker-compose.yml +├── package.json +├── tsconfig.json +├── nest-cli.json +└── DEVELOPMENT_GUIDE.md +``` + +--- + +## 3. 数据模型设计 + +### 3.1 Prisma Schema + +```prisma +// prisma/schema.prisma + +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +// ============================================ +// 监控地址表 +// 存储需要监听充值的地址 +// ============================================ +model MonitoredAddress { + id BigInt @id @default(autoincrement()) @map("address_id") + + chainType String @map("chain_type") @db.VarChar(20) // KAVA, BSC + address String @db.VarChar(42) // 0x地址 + + userId BigInt @map("user_id") // 关联用户ID + + isActive Boolean @default(true) @map("is_active") // 是否激活监听 + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + deposits DepositTransaction[] + + @@unique([chainType, address], name: "uk_chain_address") + @@index([userId], name: "idx_user") + @@index([chainType, isActive], name: "idx_chain_active") + @@map("monitored_addresses") +} + +// ============================================ +// 充值交易表 (Append-Only) +// 记录检测到的所有充值交易 +// ============================================ +model DepositTransaction { + id BigInt @id @default(autoincrement()) @map("deposit_id") + + chainType String @map("chain_type") @db.VarChar(20) + txHash String @unique @map("tx_hash") @db.VarChar(66) + + fromAddress String @map("from_address") @db.VarChar(42) + toAddress String @map("to_address") @db.VarChar(42) + + tokenContract String @map("token_contract") @db.VarChar(42) // USDT合约地址 + amount Decimal @db.Decimal(36, 18) // 原始金额 + amountFormatted Decimal @map("amount_formatted") @db.Decimal(20, 8) // 格式化金额 + + blockNumber BigInt @map("block_number") + blockTimestamp DateTime @map("block_timestamp") + logIndex Int @map("log_index") + + // 确认状态 + confirmations Int @default(0) + status String @default("DETECTED") @db.VarChar(20) // DETECTED, CONFIRMING, CONFIRMED, NOTIFIED + + // 关联 + addressId BigInt @map("address_id") + userId BigInt @map("user_id") + + // 通知状态 + notifiedAt DateTime? @map("notified_at") + notifyAttempts Int @default(0) @map("notify_attempts") + lastNotifyError String? @map("last_notify_error") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + monitoredAddress MonitoredAddress @relation(fields: [addressId], references: [id]) + + @@index([chainType, status], name: "idx_chain_status") + @@index([userId], name: "idx_deposit_user") + @@index([blockNumber], name: "idx_block") + @@index([status, notifiedAt], name: "idx_pending_notify") + @@map("deposit_transactions") +} + +// ============================================ +// 区块扫描检查点 (每条链一条记录) +// 记录扫描进度,用于断点续扫 +// ============================================ +model BlockCheckpoint { + id BigInt @id @default(autoincrement()) @map("checkpoint_id") + + chainType String @unique @map("chain_type") @db.VarChar(20) + + lastScannedBlock BigInt @map("last_scanned_block") + lastScannedAt DateTime @map("last_scanned_at") + + // 健康状态 + isHealthy Boolean @default(true) @map("is_healthy") + lastError String? @map("last_error") @db.Text + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@map("block_checkpoints") +} + +// ============================================ +// 交易广播请求表 +// 记录待广播和已广播的交易 +// ============================================ +model TransactionRequest { + id BigInt @id @default(autoincrement()) @map("request_id") + + chainType String @map("chain_type") @db.VarChar(20) + + // 请求来源 + sourceService String @map("source_service") @db.VarChar(50) + sourceOrderId String @map("source_order_id") @db.VarChar(100) + + // 交易数据 + fromAddress String @map("from_address") @db.VarChar(42) + toAddress String @map("to_address") @db.VarChar(42) + value Decimal @db.Decimal(36, 18) + data String? @db.Text // 合约调用数据 + + // 签名数据 (由 MPC 服务提供) + signedTx String? @map("signed_tx") @db.Text + + // 广播结果 + txHash String? @map("tx_hash") @db.VarChar(66) + status String @default("PENDING") @db.VarChar(20) // PENDING, SIGNED, BROADCASTED, CONFIRMED, FAILED + + // Gas 信息 + gasLimit BigInt? @map("gas_limit") + gasPrice Decimal? @map("gas_price") @db.Decimal(36, 18) + nonce Int? + + // 错误信息 + errorMessage String? @map("error_message") @db.Text + retryCount Int @default(0) @map("retry_count") + + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + + @@unique([sourceService, sourceOrderId], name: "uk_source_order") + @@index([chainType, status], name: "idx_tx_chain_status") + @@index([txHash], name: "idx_tx_hash") + @@map("transaction_requests") +} + +// ============================================ +// 区块链事件日志 (Append-Only 审计) +// ============================================ +model BlockchainEvent { + id BigInt @id @default(autoincrement()) @map("event_id") + + eventType String @map("event_type") @db.VarChar(50) + + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + eventData Json @map("event_data") + + chainType String? @map("chain_type") @db.VarChar(20) + txHash String? @map("tx_hash") @db.VarChar(66) + + occurredAt DateTime @default(now()) @map("occurred_at") @db.Timestamp(6) + + @@index([aggregateType, aggregateId], name: "idx_event_aggregate") + @@index([eventType], name: "idx_event_type") + @@index([chainType], name: "idx_event_chain") + @@index([occurredAt], name: "idx_event_occurred") + @@map("blockchain_events") +} +``` + +--- + +## 4. 领域层设计 + +### 4.1 聚合根:DepositTransaction + +```typescript +// src/domain/aggregates/deposit-transaction/deposit-transaction.aggregate.ts + +import { DomainEvent, DepositDetectedEvent, DepositConfirmedEvent } from '@/domain/events'; +import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects'; +import { DepositStatus } from '@/domain/enums'; + +export interface DepositTransactionProps { + id?: bigint; + chainType: ChainType; + txHash: TxHash; + fromAddress: EvmAddress; + toAddress: EvmAddress; + tokenContract: EvmAddress; + amount: TokenAmount; + blockNumber: BlockNumber; + blockTimestamp: Date; + logIndex: number; + confirmations: number; + status: DepositStatus; + addressId: bigint; + userId: bigint; + notifiedAt?: Date; + notifyAttempts: number; + lastNotifyError?: string; + createdAt?: Date; + updatedAt?: Date; +} + +export class DepositTransaction { + private readonly _domainEvents: DomainEvent[] = []; + + private constructor(private props: DepositTransactionProps) {} + + // Getters + get id(): bigint | undefined { return this.props.id; } + get chainType(): ChainType { return this.props.chainType; } + get txHash(): TxHash { return this.props.txHash; } + get fromAddress(): EvmAddress { return this.props.fromAddress; } + get toAddress(): EvmAddress { return this.props.toAddress; } + get amount(): TokenAmount { return this.props.amount; } + get blockNumber(): BlockNumber { return this.props.blockNumber; } + get confirmations(): number { return this.props.confirmations; } + get status(): DepositStatus { return this.props.status; } + get userId(): bigint { return this.props.userId; } + get isConfirmed(): boolean { return this.props.status === DepositStatus.CONFIRMED; } + get isNotified(): boolean { return this.props.status === DepositStatus.NOTIFIED; } + get domainEvents(): DomainEvent[] { return [...this._domainEvents]; } + + /** + * 创建新的充值交易(检测到时) + */ + static create(params: { + chainType: ChainType; + txHash: TxHash; + fromAddress: EvmAddress; + toAddress: EvmAddress; + tokenContract: EvmAddress; + amount: TokenAmount; + blockNumber: BlockNumber; + blockTimestamp: Date; + logIndex: number; + addressId: bigint; + userId: bigint; + }): DepositTransaction { + const deposit = new DepositTransaction({ + ...params, + confirmations: 1, + status: DepositStatus.DETECTED, + notifyAttempts: 0, + }); + + deposit.addDomainEvent(new DepositDetectedEvent({ + chainType: params.chainType.value, + txHash: params.txHash.value, + fromAddress: params.fromAddress.value, + toAddress: params.toAddress.value, + amount: params.amount.formatted, + userId: params.userId.toString(), + blockNumber: params.blockNumber.value, + })); + + return deposit; + } + + /** + * 从数据库重建 + */ + static reconstruct(props: DepositTransactionProps): DepositTransaction { + return new DepositTransaction(props); + } + + /** + * 更新确认数 + */ + updateConfirmations(newConfirmations: number, requiredConfirmations: number): void { + this.props.confirmations = newConfirmations; + + if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.DETECTED) { + this.props.status = DepositStatus.CONFIRMING; + } + + if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.CONFIRMING) { + this.confirm(); + } + } + + /** + * 确认充值 + */ + private confirm(): void { + if (this.props.status === DepositStatus.CONFIRMED) { + return; + } + + this.props.status = DepositStatus.CONFIRMED; + + this.addDomainEvent(new DepositConfirmedEvent({ + depositId: this.props.id!.toString(), + chainType: this.props.chainType.value, + txHash: this.props.txHash.value, + amount: this.props.amount.formatted, + userId: this.props.userId.toString(), + confirmations: this.props.confirmations, + })); + } + + /** + * 标记已通知 + */ + markNotified(): void { + this.props.status = DepositStatus.NOTIFIED; + this.props.notifiedAt = new Date(); + } + + /** + * 记录通知失败 + */ + recordNotifyFailure(error: string): void { + this.props.notifyAttempts++; + this.props.lastNotifyError = error; + } + + private addDomainEvent(event: DomainEvent): void { + this._domainEvents.push(event); + } + + clearDomainEvents(): void { + this._domainEvents.length = 0; + } + + toProps(): DepositTransactionProps { + return { ...this.props }; + } +} +``` + +### 4.2 值对象 + +```typescript +// src/domain/value-objects/evm-address.vo.ts + +import { ethers } from 'ethers'; + +export class EvmAddress { + private constructor(private readonly _value: string) {} + + get value(): string { + return this._value; + } + + get checksummed(): string { + return ethers.getAddress(this._value); + } + + static create(address: string): EvmAddress { + if (!ethers.isAddress(address)) { + throw new Error(`Invalid EVM address: ${address}`); + } + return new EvmAddress(ethers.getAddress(address)); // 标准化为 checksum 格式 + } + + equals(other: EvmAddress): boolean { + return this._value.toLowerCase() === other._value.toLowerCase(); + } + + toString(): string { + return this._value; + } +} +``` + +```typescript +// src/domain/value-objects/token-amount.vo.ts + +import Decimal from 'decimal.js'; + +export class TokenAmount { + private constructor( + private readonly _raw: Decimal, // 链上原始值 (wei) + private readonly _decimals: number, // 代币精度 + ) {} + + get raw(): Decimal { + return this._raw; + } + + get decimals(): number { + return this._decimals; + } + + get formatted(): string { + return this._raw.div(new Decimal(10).pow(this._decimals)).toFixed(8); + } + + get value(): number { + return parseFloat(this.formatted); + } + + static create(raw: string | bigint, decimals: number): TokenAmount { + return new TokenAmount(new Decimal(raw.toString()), decimals); + } + + static fromFormatted(amount: string, decimals: number): TokenAmount { + const raw = new Decimal(amount).mul(new Decimal(10).pow(decimals)); + return new TokenAmount(raw, decimals); + } + + isZero(): boolean { + return this._raw.isZero(); + } + + greaterThan(other: TokenAmount): boolean { + return this._raw.greaterThan(other._raw); + } +} +``` + +```typescript +// src/domain/value-objects/chain-type.vo.ts + +export enum ChainTypeEnum { + KAVA = 'KAVA', + BSC = 'BSC', +} + +export class ChainType { + private constructor(private readonly _value: ChainTypeEnum) {} + + get value(): ChainTypeEnum { + return this._value; + } + + get name(): string { + switch (this._value) { + case ChainTypeEnum.KAVA: return 'KAVA EVM'; + case ChainTypeEnum.BSC: return 'BSC'; + } + } + + get requiredConfirmations(): number { + switch (this._value) { + case ChainTypeEnum.KAVA: return 12; + case ChainTypeEnum.BSC: return 15; + } + } + + get blockTime(): number { + switch (this._value) { + case ChainTypeEnum.KAVA: return 6; // 约 6 秒 + case ChainTypeEnum.BSC: return 3; // 约 3 秒 + } + } + + static create(value: string): ChainType { + if (!Object.values(ChainTypeEnum).includes(value as ChainTypeEnum)) { + throw new Error(`Invalid chain type: ${value}`); + } + return new ChainType(value as ChainTypeEnum); + } + + static KAVA(): ChainType { + return new ChainType(ChainTypeEnum.KAVA); + } + + static BSC(): ChainType { + return new ChainType(ChainTypeEnum.BSC); + } + + equals(other: ChainType): boolean { + return this._value === other._value; + } +} +``` + +### 4.3 领域事件 + +```typescript +// src/domain/events/deposit-detected.event.ts + +import { DomainEvent } from './domain-event.base'; + +export interface DepositDetectedPayload { + chainType: string; + txHash: string; + fromAddress: string; + toAddress: string; + amount: string; + userId: string; + blockNumber: number; +} + +export class DepositDetectedEvent extends DomainEvent { + constructor(public readonly payload: DepositDetectedPayload) { + super(); + } + + get eventType(): string { + return 'DepositDetected'; + } + + get aggregateId(): string { + return this.payload.txHash; + } + + get aggregateType(): string { + return 'DepositTransaction'; + } +} +``` + +```typescript +// src/domain/events/deposit-confirmed.event.ts + +import { DomainEvent } from './domain-event.base'; + +export interface DepositConfirmedPayload { + depositId: string; + chainType: string; + txHash: string; + amount: string; + userId: string; + confirmations: number; +} + +export class DepositConfirmedEvent extends DomainEvent { + constructor(public readonly payload: DepositConfirmedPayload) { + super(); + } + + get eventType(): string { + return 'DepositConfirmed'; + } + + get aggregateId(): string { + return this.payload.depositId; + } + + get aggregateType(): string { + return 'DepositTransaction'; + } +} +``` + +### 4.4 仓储接口 (Ports) + +```typescript +// src/domain/repositories/deposit-transaction.repository.interface.ts + +import { DepositTransaction } from '@/domain/aggregates/deposit-transaction'; +import { ChainType, TxHash } from '@/domain/value-objects'; +import { DepositStatus } from '@/domain/enums'; + +export interface IDepositTransactionRepository { + save(deposit: DepositTransaction): Promise; + findById(id: bigint): Promise; + findByTxHash(txHash: TxHash): Promise; + existsByTxHash(txHash: TxHash): Promise; + findByStatus(status: DepositStatus): Promise; + findPendingNotification(limit?: number): Promise; + findByUserIdAndChain(userId: bigint, chainType: ChainType): Promise; + findUnconfirmed(chainType: ChainType, minBlockNumber: bigint): Promise; +} + +export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY'); +``` + +```typescript +// src/domain/repositories/monitored-address.repository.interface.ts + +import { ChainType, EvmAddress } from '@/domain/value-objects'; + +export interface MonitoredAddressData { + id: bigint; + chainType: ChainType; + address: EvmAddress; + userId: bigint; + isActive: boolean; + createdAt: Date; +} + +export interface IMonitoredAddressRepository { + save(data: Omit): Promise; + findByAddress(chainType: ChainType, address: EvmAddress): Promise; + findByUserId(userId: bigint): Promise; + findActiveByChain(chainType: ChainType): Promise; + getAllActiveAddresses(): Promise>; // 返回所有激活地址的 Set + deactivate(id: bigint): Promise; +} + +export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY'); +``` + +--- + +## 5. 基础设施层设计 + +### 5.1 EVM Provider 适配器 + +```typescript +// src/infrastructure/blockchain/evm-provider.adapter.ts + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { ethers, JsonRpcProvider, WebSocketProvider, Contract } from 'ethers'; +import { ChainTypeEnum } from '@/domain/value-objects'; + +interface ChainConfig { + rpcUrl: string; + wsUrl?: string; + usdtContract: string; + decimals: number; + chainId: number; +} + +const ERC20_ABI = [ + 'event Transfer(address indexed from, address indexed to, uint256 value)', + 'function balanceOf(address owner) view returns (uint256)', + 'function decimals() view returns (uint8)', +]; + +@Injectable() +export class EvmProviderAdapter implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EvmProviderAdapter.name); + + private rpcProviders: Map = new Map(); + private wsProviders: Map = new Map(); + private contracts: Map = new Map(); + + private readonly chainConfigs: Record = { + [ChainTypeEnum.KAVA]: { + rpcUrl: this.configService.get('KAVA_RPC_URL', 'https://evm.kava.io'), + wsUrl: this.configService.get('KAVA_WS_URL'), + usdtContract: '0x919C1c267BC06a7039e03fcc2eF738525769109c', + decimals: 6, + chainId: 2222, + }, + [ChainTypeEnum.BSC]: { + rpcUrl: this.configService.get('BSC_RPC_URL', 'https://bsc-dataseed.binance.org'), + wsUrl: this.configService.get('BSC_WS_URL'), + usdtContract: '0x55d398326f99059fF775485246999027B3197955', + decimals: 18, + chainId: 56, + }, + }; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit(): Promise { + await this.initializeProviders(); + } + + async onModuleDestroy(): Promise { + await this.closeProviders(); + } + + private async initializeProviders(): Promise { + for (const [chainType, config] of Object.entries(this.chainConfigs)) { + try { + // RPC Provider (用于查询) + const rpcProvider = new JsonRpcProvider(config.rpcUrl, config.chainId, { + staticNetwork: true, + }); + this.rpcProviders.set(chainType as ChainTypeEnum, rpcProvider); + + // WebSocket Provider (用于事件监听) + if (config.wsUrl) { + const wsProvider = new WebSocketProvider(config.wsUrl, config.chainId); + this.wsProviders.set(chainType as ChainTypeEnum, wsProvider); + } + + // USDT Contract + const provider = config.wsUrl + ? this.wsProviders.get(chainType as ChainTypeEnum)! + : rpcProvider; + const contract = new Contract(config.usdtContract, ERC20_ABI, provider); + this.contracts.set(chainType as ChainTypeEnum, contract); + + this.logger.log(`Initialized ${chainType} provider`); + } catch (error) { + this.logger.error(`Failed to initialize ${chainType} provider: ${error.message}`); + } + } + } + + private async closeProviders(): Promise { + for (const wsProvider of this.wsProviders.values()) { + await wsProvider.destroy(); + } + } + + getRpcProvider(chainType: ChainTypeEnum): JsonRpcProvider { + const provider = this.rpcProviders.get(chainType); + if (!provider) { + throw new Error(`No RPC provider for chain: ${chainType}`); + } + return provider; + } + + getWsProvider(chainType: ChainTypeEnum): WebSocketProvider | undefined { + return this.wsProviders.get(chainType); + } + + getUsdtContract(chainType: ChainTypeEnum): Contract { + const contract = this.contracts.get(chainType); + if (!contract) { + throw new Error(`No USDT contract for chain: ${chainType}`); + } + return contract; + } + + getChainConfig(chainType: ChainTypeEnum): ChainConfig { + return this.chainConfigs[chainType]; + } + + async getCurrentBlockNumber(chainType: ChainTypeEnum): Promise { + const provider = this.getRpcProvider(chainType); + const blockNumber = await provider.getBlockNumber(); + return BigInt(blockNumber); + } + + async getBalance(chainType: ChainTypeEnum, address: string): Promise { + const contract = this.getUsdtContract(chainType); + const config = this.getChainConfig(chainType); + const balance = await contract.balanceOf(address); + return ethers.formatUnits(balance, config.decimals); + } +} +``` + +### 5.2 事件监听器服务 + +```typescript +// src/infrastructure/blockchain/event-listener.service.ts + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { EvmProviderAdapter } from './evm-provider.adapter'; +import { DepositDetectionService } from '@/application/services/deposit-detection.service'; +import { AddressCacheService } from '@/infrastructure/redis/address-cache.service'; +import { ChainTypeEnum } from '@/domain/value-objects'; +import { Contract, EventLog } from 'ethers'; + +@Injectable() +export class EventListenerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventListenerService.name); + private isListening = false; + private listeners: Map = new Map(); + + constructor( + private readonly evmProvider: EvmProviderAdapter, + private readonly depositService: DepositDetectionService, + private readonly addressCache: AddressCacheService, + ) {} + + async onModuleInit(): Promise { + await this.startListening(); + } + + async onModuleDestroy(): Promise { + await this.stopListening(); + } + + async startListening(): Promise { + if (this.isListening) return; + + for (const chainType of Object.values(ChainTypeEnum)) { + await this.subscribeToChain(chainType); + } + + this.isListening = true; + this.logger.log('Event listeners started'); + } + + private async subscribeToChain(chainType: ChainTypeEnum): Promise { + const wsProvider = this.evmProvider.getWsProvider(chainType); + if (!wsProvider) { + this.logger.warn(`No WebSocket provider for ${chainType}, using polling`); + return; + } + + const contract = this.evmProvider.getUsdtContract(chainType); + + // 监听所有 Transfer 事件 + const filter = contract.filters.Transfer(); + + const listener = async (from: string, to: string, value: bigint, event: EventLog) => { + try { + // 检查目标地址是否是平台监控地址 + const isMonitored = await this.addressCache.isMonitoredAddress(chainType, to); + if (!isMonitored) return; + + this.logger.log(`Detected deposit: ${chainType} ${event.transactionHash}`); + + await this.depositService.handleTransferEvent({ + chainType, + txHash: event.transactionHash, + fromAddress: from, + toAddress: to, + amount: value.toString(), + blockNumber: event.blockNumber, + logIndex: event.index, + }); + } catch (error) { + this.logger.error(`Error processing Transfer event: ${error.message}`); + } + }; + + contract.on(filter, listener); + this.listeners.set(chainType, { contract, filter, listener }); + + this.logger.log(`Subscribed to ${chainType} Transfer events`); + } + + async stopListening(): Promise { + for (const [chainType, { contract, filter, listener }] of this.listeners) { + contract.off(filter, listener); + this.logger.log(`Unsubscribed from ${chainType}`); + } + this.listeners.clear(); + this.isListening = false; + } +} +``` + +### 5.3 区块扫描器(补扫服务) + +```typescript +// src/infrastructure/blockchain/block-scanner.service.ts + +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { EvmProviderAdapter } from './evm-provider.adapter'; +import { DepositDetectionService } from '@/application/services/deposit-detection.service'; +import { AddressCacheService } from '@/infrastructure/redis/address-cache.service'; +import { BlockCheckpointRepository } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl'; +import { ChainTypeEnum } from '@/domain/value-objects'; +import { ethers } from 'ethers'; + +const BATCH_SIZE = 1000; // 每次扫描的区块数 + +@Injectable() +export class BlockScannerService { + private readonly logger = new Logger(BlockScannerService.name); + private isScanning: Map = new Map(); + + constructor( + private readonly evmProvider: EvmProviderAdapter, + private readonly depositService: DepositDetectionService, + private readonly addressCache: AddressCacheService, + private readonly checkpointRepo: BlockCheckpointRepository, + ) {} + + /** + * 定时补扫任务 (每 5 分钟) + */ + @Cron(CronExpression.EVERY_5_MINUTES) + async scheduledScan(): Promise { + for (const chainType of Object.values(ChainTypeEnum)) { + await this.scanChain(chainType); + } + } + + /** + * 扫描指定链的区块 + */ + async scanChain(chainType: ChainTypeEnum): Promise { + if (this.isScanning.get(chainType)) { + this.logger.debug(`${chainType} scan already in progress`); + return; + } + + this.isScanning.set(chainType, true); + + try { + const checkpoint = await this.checkpointRepo.findByChain(chainType); + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType); + + let fromBlock = checkpoint + ? checkpoint.lastScannedBlock + 1n + : currentBlock - 1000n; // 首次启动扫描最近 1000 块 + + while (fromBlock <= currentBlock) { + const toBlock = fromBlock + BigInt(BATCH_SIZE) - 1n > currentBlock + ? currentBlock + : fromBlock + BigInt(BATCH_SIZE) - 1n; + + await this.scanBlockRange(chainType, fromBlock, toBlock); + + // 更新检查点 + await this.checkpointRepo.updateCheckpoint(chainType, toBlock); + + fromBlock = toBlock + 1n; + } + + this.logger.debug(`${chainType} scan completed up to block ${currentBlock}`); + } catch (error) { + this.logger.error(`${chainType} scan failed: ${error.message}`); + await this.checkpointRepo.markUnhealthy(chainType, error.message); + } finally { + this.isScanning.set(chainType, false); + } + } + + /** + * 扫描指定区块范围 + */ + private async scanBlockRange( + chainType: ChainTypeEnum, + fromBlock: bigint, + toBlock: bigint, + ): Promise { + const contract = this.evmProvider.getUsdtContract(chainType); + const monitoredAddresses = await this.addressCache.getMonitoredAddresses(chainType); + + if (monitoredAddresses.size === 0) { + return; + } + + // 查询 Transfer 事件 + const filter = contract.filters.Transfer(null, [...monitoredAddresses]); + const events = await contract.queryFilter(filter, Number(fromBlock), Number(toBlock)); + + for (const event of events) { + if (!('args' in event)) continue; + + const [from, to, value] = event.args; + + try { + await this.depositService.handleTransferEvent({ + chainType, + txHash: event.transactionHash, + fromAddress: from, + toAddress: to, + amount: value.toString(), + blockNumber: event.blockNumber, + logIndex: event.index, + }); + } catch (error) { + // 重复交易等预期错误,忽略 + if (!error.message.includes('already exists')) { + this.logger.error(`Error processing event: ${error.message}`); + } + } + } + } + + /** + * 手动触发全量扫描 + */ + async fullScan(chainType: ChainTypeEnum, fromBlock: bigint, toBlock: bigint): Promise { + this.logger.log(`Starting full scan for ${chainType} from ${fromBlock} to ${toBlock}`); + await this.scanBlockRange(chainType, fromBlock, toBlock); + this.logger.log(`Full scan completed for ${chainType}`); + } +} +``` + +### 5.4 地址缓存服务 + +```typescript +// src/infrastructure/redis/address-cache.service.ts + +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { RedisService } from './redis.service'; +import { IMonitoredAddressRepository, MONITORED_ADDRESS_REPOSITORY } from '@/domain/repositories'; +import { Inject } from '@nestjs/common'; +import { ChainTypeEnum } from '@/domain/value-objects'; + +const CACHE_KEY_PREFIX = 'blockchain:addresses:'; +const CACHE_TTL = 300; // 5 分钟 + +@Injectable() +export class AddressCacheService implements OnModuleInit { + private readonly logger = new Logger(AddressCacheService.name); + private localCache: Map> = new Map(); + + constructor( + private readonly redis: RedisService, + @Inject(MONITORED_ADDRESS_REPOSITORY) + private readonly addressRepo: IMonitoredAddressRepository, + ) {} + + async onModuleInit(): Promise { + await this.refreshCache(); + } + + /** + * 检查地址是否被监控 + */ + async isMonitoredAddress(chainType: ChainTypeEnum, address: string): Promise { + const normalizedAddress = address.toLowerCase(); + + // 先查本地缓存 + const localSet = this.localCache.get(chainType); + if (localSet?.has(normalizedAddress)) { + return true; + } + + // 再查 Redis + const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`; + const exists = await this.redis.sismember(cacheKey, normalizedAddress); + + if (exists) { + // 更新本地缓存 + if (!localSet) { + this.localCache.set(chainType, new Set([normalizedAddress])); + } else { + localSet.add(normalizedAddress); + } + } + + return exists; + } + + /** + * 获取链的所有监控地址 + */ + async getMonitoredAddresses(chainType: ChainTypeEnum): Promise> { + const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`; + const addresses = await this.redis.smembers(cacheKey); + return new Set(addresses); + } + + /** + * 添加监控地址 + */ + async addAddress(chainType: ChainTypeEnum, address: string): Promise { + const normalizedAddress = address.toLowerCase(); + const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`; + + await this.redis.sadd(cacheKey, normalizedAddress); + + // 更新本地缓存 + let localSet = this.localCache.get(chainType); + if (!localSet) { + localSet = new Set(); + this.localCache.set(chainType, localSet); + } + localSet.add(normalizedAddress); + } + + /** + * 移除监控地址 + */ + async removeAddress(chainType: ChainTypeEnum, address: string): Promise { + const normalizedAddress = address.toLowerCase(); + const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`; + + await this.redis.srem(cacheKey, normalizedAddress); + + // 更新本地缓存 + this.localCache.get(chainType)?.delete(normalizedAddress); + } + + /** + * 从数据库刷新缓存 + */ + async refreshCache(): Promise { + this.logger.log('Refreshing address cache from database...'); + + for (const chainType of Object.values(ChainTypeEnum)) { + const addresses = await this.addressRepo.findActiveByChain( + { value: chainType } as any + ); + + const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`; + const addressSet = new Set(addresses.map(a => a.address.value.toLowerCase())); + + // 更新 Redis + if (addressSet.size > 0) { + await this.redis.del(cacheKey); + await this.redis.sadd(cacheKey, ...addressSet); + } + + // 更新本地缓存 + this.localCache.set(chainType, addressSet); + + this.logger.log(`Loaded ${addressSet.size} addresses for ${chainType}`); + } + } +} +``` + +--- + +## 6. 应用层设计 + +### 6.1 充值检测服务 + +```typescript +// src/application/services/deposit-detection.service.ts + +import { Injectable, Inject, Logger } from '@nestjs/common'; +import { + IDepositTransactionRepository, + DEPOSIT_TRANSACTION_REPOSITORY, + IMonitoredAddressRepository, + MONITORED_ADDRESS_REPOSITORY, +} from '@/domain/repositories'; +import { DepositTransaction } from '@/domain/aggregates/deposit-transaction'; +import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber, ChainTypeEnum } from '@/domain/value-objects'; +import { DepositStatus } from '@/domain/enums'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service'; +import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter'; + +export interface TransferEventData { + chainType: ChainTypeEnum; + txHash: string; + fromAddress: string; + toAddress: string; + amount: string; + blockNumber: number; + logIndex: number; +} + +@Injectable() +export class DepositDetectionService { + private readonly logger = new Logger(DepositDetectionService.name); + + constructor( + @Inject(DEPOSIT_TRANSACTION_REPOSITORY) + private readonly depositRepo: IDepositTransactionRepository, + @Inject(MONITORED_ADDRESS_REPOSITORY) + private readonly addressRepo: IMonitoredAddressRepository, + private readonly eventPublisher: EventPublisherService, + private readonly walletClient: WalletClientService, + private readonly evmProvider: EvmProviderAdapter, + ) {} + + /** + * 处理检测到的 Transfer 事件 + */ + async handleTransferEvent(data: TransferEventData): Promise { + const txHash = TxHash.create(data.txHash); + + // 检查是否已处理 + const exists = await this.depositRepo.existsByTxHash(txHash); + if (exists) { + this.logger.debug(`Deposit ${data.txHash} already exists, skipping`); + return; + } + + // 查找监控地址信息 + const chainType = ChainType.create(data.chainType); + const toAddress = EvmAddress.create(data.toAddress); + const addressInfo = await this.addressRepo.findByAddress(chainType, toAddress); + + if (!addressInfo) { + this.logger.warn(`Address ${data.toAddress} not found in monitored addresses`); + return; + } + + // 获取区块时间 + const block = await this.evmProvider.getRpcProvider(data.chainType).getBlock(data.blockNumber); + const blockTimestamp = new Date(block!.timestamp * 1000); + + // 获取代币精度 + const chainConfig = this.evmProvider.getChainConfig(data.chainType); + + // 创建充值记录 + const deposit = DepositTransaction.create({ + chainType, + txHash, + fromAddress: EvmAddress.create(data.fromAddress), + toAddress, + tokenContract: EvmAddress.create(chainConfig.usdtContract), + amount: TokenAmount.create(data.amount, chainConfig.decimals), + blockNumber: BlockNumber.create(data.blockNumber), + blockTimestamp, + logIndex: data.logIndex, + addressId: addressInfo.id, + userId: addressInfo.userId, + }); + + // 保存并发布事件 + await this.depositRepo.save(deposit); + await this.eventPublisher.publishAll(deposit.domainEvents); + deposit.clearDomainEvents(); + + this.logger.log(`Deposit detected: ${data.txHash}, amount: ${deposit.amount.formatted}`); + + // 检查确认数 + await this.checkConfirmationsAndNotify(deposit); + } + + /** + * 检查确认数并通知 wallet-service + */ + async checkConfirmationsAndNotify(deposit: DepositTransaction): Promise { + const chainType = deposit.chainType; + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType.value as ChainTypeEnum); + const confirmations = Number(currentBlock - deposit.blockNumber.value) + 1; + const requiredConfirmations = chainType.requiredConfirmations; + + deposit.updateConfirmations(confirmations, requiredConfirmations); + + if (deposit.isConfirmed && !deposit.isNotified) { + try { + // 调用 wallet-service 入账 + await this.walletClient.handleDeposit({ + userId: deposit.userId.toString(), + amount: deposit.amount.value, + chainType: deposit.chainType.value, + txHash: deposit.txHash.value, + }); + + deposit.markNotified(); + this.logger.log(`Deposit notified: ${deposit.txHash.value}`); + } catch (error) { + deposit.recordNotifyFailure(error.message); + this.logger.error(`Failed to notify deposit: ${error.message}`); + } + } + + await this.depositRepo.save(deposit); + await this.eventPublisher.publishAll(deposit.domainEvents); + deposit.clearDomainEvents(); + } + + /** + * 定时任务:更新未确认交易的确认数 + */ + async updatePendingConfirmations(): Promise { + for (const chainType of Object.values(ChainTypeEnum)) { + const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType); + const minBlock = currentBlock - 100n; // 只处理最近 100 块内的 + + const pendingDeposits = await this.depositRepo.findUnconfirmed( + ChainType.create(chainType), + minBlock, + ); + + for (const deposit of pendingDeposits) { + await this.checkConfirmationsAndNotify(deposit); + } + } + } + + /** + * 定时任务:重试失败的通知 + */ + async retryFailedNotifications(): Promise { + const pendingNotifications = await this.depositRepo.findPendingNotification(50); + + for (const deposit of pendingNotifications) { + if (deposit.toProps().notifyAttempts >= 10) { + this.logger.error(`Deposit ${deposit.txHash.value} exceeded max retry attempts`); + continue; + } + + await this.checkConfirmationsAndNotify(deposit); + } + } +} +``` + +### 6.2 余额查询服务 + +```typescript +// src/application/services/balance-query.service.ts + +import { Injectable, Logger } from '@nestjs/common'; +import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter'; +import { ChainTypeEnum } from '@/domain/value-objects'; + +export interface BalanceResult { + chainType: string; + address: string; + usdtBalance: string; + nativeBalance: string; +} + +@Injectable() +export class BalanceQueryService { + private readonly logger = new Logger(BalanceQueryService.name); + + constructor(private readonly evmProvider: EvmProviderAdapter) {} + + /** + * 查询单个地址的余额 + */ + async getBalance(chainType: ChainTypeEnum, address: string): Promise { + const [usdtBalance, nativeBalance] = await Promise.all([ + this.evmProvider.getBalance(chainType, address), + this.getNativeBalance(chainType, address), + ]); + + return { + chainType, + address, + usdtBalance, + nativeBalance, + }; + } + + /** + * 批量查询余额 + */ + async getBatchBalances( + addresses: Array<{ chainType: ChainTypeEnum; address: string }>, + ): Promise { + return Promise.all( + addresses.map(({ chainType, address }) => this.getBalance(chainType, address)), + ); + } + + private async getNativeBalance(chainType: ChainTypeEnum, address: string): Promise { + const provider = this.evmProvider.getRpcProvider(chainType); + const balance = await provider.getBalance(address); + const { ethers } = await import('ethers'); + return ethers.formatEther(balance); + } +} +``` + +--- + +## 7. Kafka 事件设计 + +### 7.1 发布的事件 + +```typescript +// src/infrastructure/kafka/event-publisher.service.ts + +export const BLOCKCHAIN_TOPICS = { + DEPOSIT_DETECTED: 'blockchain.DepositDetected', + DEPOSIT_CONFIRMED: 'blockchain.DepositConfirmed', + TRANSACTION_BROADCASTED: 'blockchain.TransactionBroadcasted', + TRANSACTION_CONFIRMED: 'blockchain.TransactionConfirmed', + BLOCK_SCANNED: 'blockchain.BlockScanned', +} as const; +``` + +### 7.2 消费的事件 + +```typescript +// src/infrastructure/kafka/event-consumer.controller.ts + +import { Controller, Logger } from '@nestjs/common'; +import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; +import { AddressRegistryService } from '@/application/services/address-registry.service'; + +@Controller() +export class EventConsumerController { + private readonly logger = new Logger(EventConsumerController.name); + + constructor(private readonly addressRegistry: AddressRegistryService) {} + + /** + * 监听用户创建事件,注册监控地址 + */ + @MessagePattern('identity.UserAccountCreated') + async handleUserCreated( + @Payload() message: any, + @Ctx() context: KafkaContext, + ): Promise { + this.logger.log(`Received UserAccountCreated: ${message.payload.userId}`); + + // 注册用户的钱包地址到监控列表 + // 注意:地址信息可能在后续的 WalletBound 事件中 + } + + /** + * 监听钱包绑定事件,注册监控地址 + */ + @MessagePattern('identity.WalletBound') + async handleWalletBound( + @Payload() message: any, + @Ctx() context: KafkaContext, + ): Promise { + this.logger.log(`Received WalletBound: userId=${message.payload.userId}`); + + await this.addressRegistry.registerAddress({ + userId: BigInt(message.payload.userId), + chainType: message.payload.chainType, + address: message.payload.address, + }); + } + + /** + * 监听多钱包绑定事件 + */ + @MessagePattern('identity.MultipleWalletsBound') + async handleMultipleWalletsBound( + @Payload() message: any, + @Ctx() context: KafkaContext, + ): Promise { + this.logger.log(`Received MultipleWalletsBound: userId=${message.payload.userId}`); + + for (const wallet of message.payload.wallets) { + await this.addressRegistry.registerAddress({ + userId: BigInt(message.payload.userId), + chainType: wallet.chainType, + address: wallet.address, + }); + } + } +} +``` + +--- + +## 8. API 设计 + +### 8.1 健康检查 + +```typescript +// src/api/controllers/health.controller.ts + +@Controller('blockchain/health') +export class HealthController { + @Get() + @Public() + async check(): Promise<{ status: string; chains: Record }> { + // 检查各链连接状态 + } +} +``` + +### 8.2 余额查询 (内部 API) + +```typescript +// src/api/controllers/balance.controller.ts + +@Controller('blockchain/balance') +export class BalanceController { + @Get(':chainType/:address') + @Public() // 内部服务调用 + async getBalance( + @Param('chainType') chainType: string, + @Param('address') address: string, + ): Promise { + // ... + } +} +``` + +--- + +## 9. 配置设计 + +### 9.1 环境变量 + +```bash +# .env.example + +# 服务配置 +PORT=3012 +NODE_ENV=development + +# 数据库 +DATABASE_URL=postgresql://rwa_user:password@localhost:5432/rwa_blockchain + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=11 + +# Kafka +KAFKA_BROKERS=localhost:9092 +KAFKA_CLIENT_ID=blockchain-service + +# 区块链 RPC +KAVA_RPC_URL=https://evm.kava.io +KAVA_WS_URL=wss://evm.kava.io/ws +BSC_RPC_URL=https://bsc-dataseed.binance.org +BSC_WS_URL=wss://bsc-ws-node.nariox.org:443 + +# 服务间调用 +WALLET_SERVICE_URL=http://localhost:3001 +IDENTITY_SERVICE_URL=http://localhost:3000 +``` + +--- + +## 10. 部署配置 + +### 10.1 docker-compose.yml 配置 + +在主 `docker-compose.yml` 中添加: + +```yaml +blockchain-service: + build: + context: ./blockchain-service + dockerfile: Dockerfile + container_name: rwa-blockchain-service + ports: + - "3012:3012" + environment: + - NODE_ENV=production + - DATABASE_URL=postgresql://${POSTGRES_USER:-rwa_user}:${POSTGRES_PASSWORD:-rwa_secure_password}@postgres:5432/rwa_blockchain?schema=public + - REDIS_HOST=redis + - REDIS_PORT=6379 + - REDIS_DB=11 + - KAFKA_BROKERS=kafka:29092 + - KAFKA_CLIENT_ID=blockchain-service + - KAVA_RPC_URL=${KAVA_RPC_URL:-https://evm.kava.io} + - KAVA_WS_URL=${KAVA_WS_URL} + - BSC_RPC_URL=${BSC_RPC_URL:-https://bsc-dataseed.binance.org} + - BSC_WS_URL=${BSC_WS_URL} + - WALLET_SERVICE_URL=http://rwa-wallet-service:3001 + - IDENTITY_SERVICE_URL=http://rwa-identity-service:3000 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + kafka: + condition: service_healthy + networks: + - rwa-network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3012/api/v1/blockchain/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s +``` + +### 10.2 Kong 路由配置 + +```yaml +# 在 kong.yml 中添加 +- name: blockchain-service + url: http://192.168.1.111:3012 + routes: + - name: blockchain-api + paths: + - /api/v1/blockchain + strip_path: false +``` + +--- + +## 11. 初始化脚本 + +在 `scripts/init-databases.sh` 中添加 `rwa_blockchain`: + +```bash +for db in rwa_identity rwa_wallet ... rwa_blockchain; do +``` + +--- + +## 12. 关键设计决策 + +### 12.1 双重保障机制 + +``` +实时监听 (WebSocket) 补扫任务 (定时) + │ │ + ▼ ▼ + 检测充值 ───────────────> 去重处理 + │ │ + └──────────┬───────────┘ + ▼ + 确认数检查 + │ + ▼ + 通知 wallet-service +``` + +### 12.2 确认数策略 + +| 链 | 要求确认数 | 约等待时间 | +|----|-----------|-----------| +| KAVA | 12 块 | ~72 秒 | +| BSC | 15 块 | ~45 秒 | + +### 12.3 失败重试策略 + +- 通知失败最多重试 10 次 +- 指数退避:1s, 2s, 4s, 8s, ... +- 超过最大重试次数告警 + +--- + +## 13. 测试策略 + +### 13.1 单元测试 + +- 领域对象测试 (DepositTransaction, ValueObjects) +- 确认数计算逻辑 +- 事件生成逻辑 + +### 13.2 集成测试 + +- 区块链 Provider 连接 +- 事件监听和解析 +- 数据库读写 + +### 13.3 E2E 测试 + +- 完整充值流程模拟 +- 补扫逻辑验证 +- 失败重试验证 + +--- + +## 14. 监控指标 + +- `blockchain_deposit_detected_total` - 检测到的充值数 +- `blockchain_deposit_confirmed_total` - 确认的充值数 +- `blockchain_notify_failed_total` - 通知失败数 +- `blockchain_block_scan_lag` - 扫描延迟 (当前块 - 已扫描块) +- `blockchain_rpc_latency` - RPC 调用延迟 +- `blockchain_ws_reconnect_total` - WebSocket 重连次数 + +--- + +## 15. 迁移计划 + +### 从 identity-service 迁移 + +1. 将 `BlockchainQueryService` 逻辑迁移到 `BalanceQueryService` +2. 将 `DepositService` 逻辑拆分: + - 地址获取 → 保留在 identity-service + - 余额查询 → 迁移到 blockchain-service +3. 新增事件监听、补扫逻辑 +4. identity-service 发布 `WalletBound` 事件 +5. blockchain-service 消费事件并注册监控地址 + +--- + +*文档版本: 1.0.0* +*最后更新: 2025-12-03*