rwadurian/backend/services/blockchain-service/DEVELOPMENT_GUIDE.md

60 KiB
Raw Blame History

Blockchain Service 开发指南

1. 服务概述

1.1 服务定位

blockchain-service 是 RWA 榴莲女皇平台的区块链基础设施服务,负责:

  • 链上事件监听:监听 ERC20 Transfer 事件,检测用户充值
  • 充值入账触发:检测到充值后通知 wallet-service 入账
  • 余额查询:查询链上 USDT/原生代币余额
  • 交易广播:提交签名后的交易到链上
  • 地址管理:管理平台充值地址池

1.2 技术栈

组件 技术选型
框架 NestJS 10.x
语言 TypeScript 5.x
数据库 PostgreSQL 15 + Prisma
消息队列 Kafka
缓存 Redis
区块链 ethers.js 6.x
容器化 Docker

1.3 端口分配

  • HTTP API: 3012
  • 数据库: rwa_blockchain (共享 PostgreSQL)
  • Redis DB: 11

2. 架构设计

2.1 六边形架构 (Hexagonal Architecture)

┌─────────────────────────────────────────────────────────────────────┐
│                          blockchain-service                          │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                        API Layer                             │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ Health Ctrl  │  │ Balance Ctrl │  │ Internal Ctrl│       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Application Layer                         │   │
│  │  ┌──────────────────┐  ┌──────────────────┐                 │   │
│  │  │ DepositDetection │  │ BalanceQuery     │                 │   │
│  │  │ Service          │  │ Service          │                 │   │
│  │  └──────────────────┘  └──────────────────┘                 │   │
│  │  ┌──────────────────┐  ┌──────────────────┐                 │   │
│  │  │ TransactionBroad │  │ AddressRegistry  │                 │   │
│  │  │ castService      │  │ Service          │                 │   │
│  │  └──────────────────┘  └──────────────────┘                 │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      Domain Layer                            │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Aggregates                                              │ │   │
│  │  │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐   │ │   │
│  │  │ │DepositTx   │ │MonitoredAddr│ │TransactionRequest │   │ │   │
│  │  │ └────────────┘ └────────────┘ └────────────────────┘   │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Domain Events                                           │ │   │
│  │  │ DepositDetected, TransactionBroadcasted, BlockScanned  │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Repository Interfaces (Ports)                          │ │   │
│  │  │ IDepositTxRepository, IMonitoredAddressRepository      │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                  Infrastructure Layer                        │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ Prisma Repos │  │ Kafka        │  │ Redis Cache  │       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ EVM Provider │  │ Event        │  │ Block        │       │   │
│  │  │ Adapter      │  │ Listener     │  │ Scanner      │       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

2.2 目录结构

blockchain-service/
├── prisma/
│   └── schema.prisma
├── src/
│   ├── api/                          # 入站适配器 (Driving Adapters)
│   │   ├── controllers/
│   │   │   ├── health.controller.ts
│   │   │   ├── balance.controller.ts
│   │   │   └── internal.controller.ts
│   │   └── dto/
│   │       ├── request/
│   │       │   └── query-balance.dto.ts
│   │       └── response/
│   │           └── balance.dto.ts
│   │
│   ├── application/                  # 应用层 (Use Cases)
│   │   ├── services/
│   │   │   ├── deposit-detection.service.ts
│   │   │   ├── balance-query.service.ts
│   │   │   ├── transaction-broadcast.service.ts
│   │   │   └── address-registry.service.ts
│   │   ├── commands/
│   │   │   ├── register-address/
│   │   │   │   ├── register-address.command.ts
│   │   │   │   └── register-address.handler.ts
│   │   │   └── broadcast-transaction/
│   │   │       ├── broadcast-transaction.command.ts
│   │   │       └── broadcast-transaction.handler.ts
│   │   └── queries/
│   │       ├── get-balance/
│   │       │   ├── get-balance.query.ts
│   │       │   └── get-balance.handler.ts
│   │       └── get-deposit-history/
│   │           ├── get-deposit-history.query.ts
│   │           └── get-deposit-history.handler.ts
│   │
│   ├── domain/                       # 领域层 (核心业务)
│   │   ├── aggregates/
│   │   │   ├── deposit-transaction/
│   │   │   │   ├── deposit-transaction.aggregate.ts
│   │   │   │   ├── deposit-transaction.factory.ts
│   │   │   │   └── index.ts
│   │   │   ├── monitored-address/
│   │   │   │   ├── monitored-address.aggregate.ts
│   │   │   │   └── index.ts
│   │   │   └── transaction-request/
│   │   │       ├── transaction-request.aggregate.ts
│   │   │       └── index.ts
│   │   ├── entities/
│   │   │   └── block-checkpoint.entity.ts
│   │   ├── events/
│   │   │   ├── domain-event.base.ts
│   │   │   ├── deposit-detected.event.ts
│   │   │   ├── deposit-confirmed.event.ts
│   │   │   ├── transaction-broadcasted.event.ts
│   │   │   └── index.ts
│   │   ├── repositories/
│   │   │   ├── deposit-transaction.repository.interface.ts
│   │   │   ├── monitored-address.repository.interface.ts
│   │   │   ├── block-checkpoint.repository.interface.ts
│   │   │   └── index.ts
│   │   ├── services/
│   │   │   ├── confirmation-policy.service.ts
│   │   │   └── chain-config.service.ts
│   │   ├── value-objects/
│   │   │   ├── chain-type.vo.ts
│   │   │   ├── tx-hash.vo.ts
│   │   │   ├── evm-address.vo.ts
│   │   │   ├── token-amount.vo.ts
│   │   │   ├── block-number.vo.ts
│   │   │   └── index.ts
│   │   └── enums/
│   │       ├── deposit-status.enum.ts
│   │       ├── chain-type.enum.ts
│   │       └── index.ts
│   │
│   ├── infrastructure/              # 出站适配器 (Driven Adapters)
│   │   ├── blockchain/
│   │   │   ├── evm-provider.adapter.ts
│   │   │   ├── event-listener.service.ts
│   │   │   ├── block-scanner.service.ts
│   │   │   ├── transaction-sender.service.ts
│   │   │   └── blockchain.module.ts
│   │   ├── persistence/
│   │   │   ├── prisma/
│   │   │   │   └── prisma.service.ts
│   │   │   ├── repositories/
│   │   │   │   ├── deposit-transaction.repository.impl.ts
│   │   │   │   ├── monitored-address.repository.impl.ts
│   │   │   │   └── block-checkpoint.repository.impl.ts
│   │   │   └── mappers/
│   │   │       └── deposit-transaction.mapper.ts
│   │   ├── kafka/
│   │   │   ├── event-publisher.service.ts
│   │   │   ├── event-consumer.controller.ts
│   │   │   └── kafka.module.ts
│   │   ├── redis/
│   │   │   ├── redis.service.ts
│   │   │   ├── address-cache.service.ts
│   │   │   └── redis.module.ts
│   │   ├── external/
│   │   │   ├── wallet-service/
│   │   │   │   └── wallet-client.service.ts
│   │   │   └── identity-service/
│   │   │       └── identity-client.service.ts
│   │   └── infrastructure.module.ts
│   │
│   ├── config/
│   │   ├── app.config.ts
│   │   ├── database.config.ts
│   │   ├── kafka.config.ts
│   │   ├── redis.config.ts
│   │   ├── chain.config.ts
│   │   └── index.ts
│   │
│   ├── shared/
│   │   ├── decorators/
│   │   │   ├── public.decorator.ts
│   │   │   └── index.ts
│   │   ├── exceptions/
│   │   │   ├── domain.exception.ts
│   │   │   ├── blockchain.exception.ts
│   │   │   └── index.ts
│   │   ├── filters/
│   │   │   ├── global-exception.filter.ts
│   │   │   └── domain-exception.filter.ts
│   │   └── interceptors/
│   │       └── transform.interceptor.ts
│   │
│   ├── app.module.ts
│   └── main.ts
│
├── test/
│   ├── unit/
│   ├── integration/
│   └── e2e/
│
├── .env.example
├── Dockerfile
├── docker-compose.yml
├── package.json
├── tsconfig.json
├── nest-cli.json
└── DEVELOPMENT_GUIDE.md

3. 数据模型设计

3.1 Prisma Schema

// prisma/schema.prisma

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

// ============================================
// 监控地址表
// 存储需要监听充值的地址
// ============================================
model MonitoredAddress {
  id            BigInt   @id @default(autoincrement()) @map("address_id")

  chainType     String   @map("chain_type") @db.VarChar(20)    // KAVA, BSC
  address       String   @db.VarChar(42)                        // 0x地址

  userId        BigInt   @map("user_id")                        // 关联用户ID

  isActive      Boolean  @default(true) @map("is_active")       // 是否激活监听

  createdAt     DateTime @default(now()) @map("created_at")
  updatedAt     DateTime @updatedAt @map("updated_at")

  deposits      DepositTransaction[]

  @@unique([chainType, address], name: "uk_chain_address")
  @@index([userId], name: "idx_user")
  @@index([chainType, isActive], name: "idx_chain_active")
  @@map("monitored_addresses")
}

// ============================================
// 充值交易表 (Append-Only)
// 记录检测到的所有充值交易
// ============================================
model DepositTransaction {
  id              BigInt   @id @default(autoincrement()) @map("deposit_id")

  chainType       String   @map("chain_type") @db.VarChar(20)
  txHash          String   @unique @map("tx_hash") @db.VarChar(66)

  fromAddress     String   @map("from_address") @db.VarChar(42)
  toAddress       String   @map("to_address") @db.VarChar(42)

  tokenContract   String   @map("token_contract") @db.VarChar(42)  // USDT合约地址
  amount          Decimal  @db.Decimal(36, 18)                      // 原始金额
  amountFormatted Decimal  @map("amount_formatted") @db.Decimal(20, 8)  // 格式化金额

  blockNumber     BigInt   @map("block_number")
  blockTimestamp  DateTime @map("block_timestamp")
  logIndex        Int      @map("log_index")

  // 确认状态
  confirmations   Int      @default(0)
  status          String   @default("DETECTED") @db.VarChar(20)    // DETECTED, CONFIRMING, CONFIRMED, NOTIFIED

  // 关联
  addressId       BigInt   @map("address_id")
  userId          BigInt   @map("user_id")

  // 通知状态
  notifiedAt      DateTime? @map("notified_at")
  notifyAttempts  Int       @default(0) @map("notify_attempts")
  lastNotifyError String?   @map("last_notify_error") @db.Text

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  monitoredAddress MonitoredAddress @relation(fields: [addressId], references: [id])

  @@index([chainType, status], name: "idx_chain_status")
  @@index([userId], name: "idx_deposit_user")
  @@index([blockNumber], name: "idx_block")
  @@index([status, notifiedAt], name: "idx_pending_notify")
  @@map("deposit_transactions")
}

// ============================================
// 区块扫描检查点 (每条链一条记录)
// 记录扫描进度,用于断点续扫
// ============================================
model BlockCheckpoint {
  id              BigInt   @id @default(autoincrement()) @map("checkpoint_id")

  chainType       String   @unique @map("chain_type") @db.VarChar(20)

  lastScannedBlock BigInt  @map("last_scanned_block")
  lastScannedAt    DateTime @map("last_scanned_at")

  // 健康状态
  isHealthy       Boolean  @default(true) @map("is_healthy")
  lastError       String?  @map("last_error") @db.Text

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  @@map("block_checkpoints")
}

// ============================================
// 交易广播请求表
// 记录待广播和已广播的交易
// ============================================
model TransactionRequest {
  id              BigInt   @id @default(autoincrement()) @map("request_id")

  chainType       String   @map("chain_type") @db.VarChar(20)

  // 请求来源
  sourceService   String   @map("source_service") @db.VarChar(50)
  sourceOrderId   String   @map("source_order_id") @db.VarChar(100)

  // 交易数据
  fromAddress     String   @map("from_address") @db.VarChar(42)
  toAddress       String   @map("to_address") @db.VarChar(42)
  value           Decimal  @db.Decimal(36, 18)
  data            String?  @db.Text                                // 合约调用数据

  // 签名数据 (由 MPC 服务提供)
  signedTx        String?  @map("signed_tx") @db.Text

  // 广播结果
  txHash          String?  @map("tx_hash") @db.VarChar(66)
  status          String   @default("PENDING") @db.VarChar(20)     // PENDING, SIGNED, BROADCASTED, CONFIRMED, FAILED

  // Gas 信息
  gasLimit        BigInt?  @map("gas_limit")
  gasPrice        Decimal? @map("gas_price") @db.Decimal(36, 18)
  nonce           Int?

  // 错误信息
  errorMessage    String?  @map("error_message") @db.Text
  retryCount      Int      @default(0) @map("retry_count")

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  @@unique([sourceService, sourceOrderId], name: "uk_source_order")
  @@index([chainType, status], name: "idx_tx_chain_status")
  @@index([txHash], name: "idx_tx_hash")
  @@map("transaction_requests")
}

// ============================================
// 区块链事件日志 (Append-Only 审计)
// ============================================
model BlockchainEvent {
  id            BigInt   @id @default(autoincrement()) @map("event_id")

  eventType     String   @map("event_type") @db.VarChar(50)

  aggregateId   String   @map("aggregate_id") @db.VarChar(100)
  aggregateType String   @map("aggregate_type") @db.VarChar(50)

  eventData     Json     @map("event_data")

  chainType     String?  @map("chain_type") @db.VarChar(20)
  txHash        String?  @map("tx_hash") @db.VarChar(66)

  occurredAt    DateTime @default(now()) @map("occurred_at") @db.Timestamp(6)

  @@index([aggregateType, aggregateId], name: "idx_event_aggregate")
  @@index([eventType], name: "idx_event_type")
  @@index([chainType], name: "idx_event_chain")
  @@index([occurredAt], name: "idx_event_occurred")
  @@map("blockchain_events")
}

4. 领域层设计

4.1 聚合根DepositTransaction

// src/domain/aggregates/deposit-transaction/deposit-transaction.aggregate.ts

import { DomainEvent, DepositDetectedEvent, DepositConfirmedEvent } from '@/domain/events';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';

export interface DepositTransactionProps {
  id?: bigint;
  chainType: ChainType;
  txHash: TxHash;
  fromAddress: EvmAddress;
  toAddress: EvmAddress;
  tokenContract: EvmAddress;
  amount: TokenAmount;
  blockNumber: BlockNumber;
  blockTimestamp: Date;
  logIndex: number;
  confirmations: number;
  status: DepositStatus;
  addressId: bigint;
  userId: bigint;
  notifiedAt?: Date;
  notifyAttempts: number;
  lastNotifyError?: string;
  createdAt?: Date;
  updatedAt?: Date;
}

export class DepositTransaction {
  private readonly _domainEvents: DomainEvent[] = [];

  private constructor(private props: DepositTransactionProps) {}

  // Getters
  get id(): bigint | undefined { return this.props.id; }
  get chainType(): ChainType { return this.props.chainType; }
  get txHash(): TxHash { return this.props.txHash; }
  get fromAddress(): EvmAddress { return this.props.fromAddress; }
  get toAddress(): EvmAddress { return this.props.toAddress; }
  get amount(): TokenAmount { return this.props.amount; }
  get blockNumber(): BlockNumber { return this.props.blockNumber; }
  get confirmations(): number { return this.props.confirmations; }
  get status(): DepositStatus { return this.props.status; }
  get userId(): bigint { return this.props.userId; }
  get isConfirmed(): boolean { return this.props.status === DepositStatus.CONFIRMED; }
  get isNotified(): boolean { return this.props.status === DepositStatus.NOTIFIED; }
  get domainEvents(): DomainEvent[] { return [...this._domainEvents]; }

  /**
   * 创建新的充值交易(检测到时)
   */
  static create(params: {
    chainType: ChainType;
    txHash: TxHash;
    fromAddress: EvmAddress;
    toAddress: EvmAddress;
    tokenContract: EvmAddress;
    amount: TokenAmount;
    blockNumber: BlockNumber;
    blockTimestamp: Date;
    logIndex: number;
    addressId: bigint;
    userId: bigint;
  }): DepositTransaction {
    const deposit = new DepositTransaction({
      ...params,
      confirmations: 1,
      status: DepositStatus.DETECTED,
      notifyAttempts: 0,
    });

    deposit.addDomainEvent(new DepositDetectedEvent({
      chainType: params.chainType.value,
      txHash: params.txHash.value,
      fromAddress: params.fromAddress.value,
      toAddress: params.toAddress.value,
      amount: params.amount.formatted,
      userId: params.userId.toString(),
      blockNumber: params.blockNumber.value,
    }));

    return deposit;
  }

  /**
   * 从数据库重建
   */
  static reconstruct(props: DepositTransactionProps): DepositTransaction {
    return new DepositTransaction(props);
  }

  /**
   * 更新确认数
   */
  updateConfirmations(newConfirmations: number, requiredConfirmations: number): void {
    this.props.confirmations = newConfirmations;

    if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.DETECTED) {
      this.props.status = DepositStatus.CONFIRMING;
    }

    if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.CONFIRMING) {
      this.confirm();
    }
  }

  /**
   * 确认充值
   */
  private confirm(): void {
    if (this.props.status === DepositStatus.CONFIRMED) {
      return;
    }

    this.props.status = DepositStatus.CONFIRMED;

    this.addDomainEvent(new DepositConfirmedEvent({
      depositId: this.props.id!.toString(),
      chainType: this.props.chainType.value,
      txHash: this.props.txHash.value,
      amount: this.props.amount.formatted,
      userId: this.props.userId.toString(),
      confirmations: this.props.confirmations,
    }));
  }

  /**
   * 标记已通知
   */
  markNotified(): void {
    this.props.status = DepositStatus.NOTIFIED;
    this.props.notifiedAt = new Date();
  }

  /**
   * 记录通知失败
   */
  recordNotifyFailure(error: string): void {
    this.props.notifyAttempts++;
    this.props.lastNotifyError = error;
  }

  private addDomainEvent(event: DomainEvent): void {
    this._domainEvents.push(event);
  }

  clearDomainEvents(): void {
    this._domainEvents.length = 0;
  }

  toProps(): DepositTransactionProps {
    return { ...this.props };
  }
}

4.2 值对象

// src/domain/value-objects/evm-address.vo.ts

import { ethers } from 'ethers';

export class EvmAddress {
  private constructor(private readonly _value: string) {}

  get value(): string {
    return this._value;
  }

  get checksummed(): string {
    return ethers.getAddress(this._value);
  }

  static create(address: string): EvmAddress {
    if (!ethers.isAddress(address)) {
      throw new Error(`Invalid EVM address: ${address}`);
    }
    return new EvmAddress(ethers.getAddress(address)); // 标准化为 checksum 格式
  }

  equals(other: EvmAddress): boolean {
    return this._value.toLowerCase() === other._value.toLowerCase();
  }

  toString(): string {
    return this._value;
  }
}
// src/domain/value-objects/token-amount.vo.ts

import Decimal from 'decimal.js';

export class TokenAmount {
  private constructor(
    private readonly _raw: Decimal,      // 链上原始值 (wei)
    private readonly _decimals: number,  // 代币精度
  ) {}

  get raw(): Decimal {
    return this._raw;
  }

  get decimals(): number {
    return this._decimals;
  }

  get formatted(): string {
    return this._raw.div(new Decimal(10).pow(this._decimals)).toFixed(8);
  }

  get value(): number {
    return parseFloat(this.formatted);
  }

  static create(raw: string | bigint, decimals: number): TokenAmount {
    return new TokenAmount(new Decimal(raw.toString()), decimals);
  }

  static fromFormatted(amount: string, decimals: number): TokenAmount {
    const raw = new Decimal(amount).mul(new Decimal(10).pow(decimals));
    return new TokenAmount(raw, decimals);
  }

  isZero(): boolean {
    return this._raw.isZero();
  }

  greaterThan(other: TokenAmount): boolean {
    return this._raw.greaterThan(other._raw);
  }
}
// src/domain/value-objects/chain-type.vo.ts

export enum ChainTypeEnum {
  KAVA = 'KAVA',
  BSC = 'BSC',
}

export class ChainType {
  private constructor(private readonly _value: ChainTypeEnum) {}

  get value(): ChainTypeEnum {
    return this._value;
  }

  get name(): string {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 'KAVA EVM';
      case ChainTypeEnum.BSC: return 'BSC';
    }
  }

  get requiredConfirmations(): number {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 12;
      case ChainTypeEnum.BSC: return 15;
    }
  }

  get blockTime(): number {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 6;  // 约 6 秒
      case ChainTypeEnum.BSC: return 3;   // 约 3 秒
    }
  }

  static create(value: string): ChainType {
    if (!Object.values(ChainTypeEnum).includes(value as ChainTypeEnum)) {
      throw new Error(`Invalid chain type: ${value}`);
    }
    return new ChainType(value as ChainTypeEnum);
  }

  static KAVA(): ChainType {
    return new ChainType(ChainTypeEnum.KAVA);
  }

  static BSC(): ChainType {
    return new ChainType(ChainTypeEnum.BSC);
  }

  equals(other: ChainType): boolean {
    return this._value === other._value;
  }
}

4.3 领域事件

// src/domain/events/deposit-detected.event.ts

import { DomainEvent } from './domain-event.base';

export interface DepositDetectedPayload {
  chainType: string;
  txHash: string;
  fromAddress: string;
  toAddress: string;
  amount: string;
  userId: string;
  blockNumber: number;
}

export class DepositDetectedEvent extends DomainEvent {
  constructor(public readonly payload: DepositDetectedPayload) {
    super();
  }

  get eventType(): string {
    return 'DepositDetected';
  }

  get aggregateId(): string {
    return this.payload.txHash;
  }

  get aggregateType(): string {
    return 'DepositTransaction';
  }
}
// src/domain/events/deposit-confirmed.event.ts

import { DomainEvent } from './domain-event.base';

export interface DepositConfirmedPayload {
  depositId: string;
  chainType: string;
  txHash: string;
  amount: string;
  userId: string;
  confirmations: number;
}

export class DepositConfirmedEvent extends DomainEvent {
  constructor(public readonly payload: DepositConfirmedPayload) {
    super();
  }

  get eventType(): string {
    return 'DepositConfirmed';
  }

  get aggregateId(): string {
    return this.payload.depositId;
  }

  get aggregateType(): string {
    return 'DepositTransaction';
  }
}

4.4 仓储接口 (Ports)

// src/domain/repositories/deposit-transaction.repository.interface.ts

import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';

export interface IDepositTransactionRepository {
  save(deposit: DepositTransaction): Promise<DepositTransaction>;
  findById(id: bigint): Promise<DepositTransaction | null>;
  findByTxHash(txHash: TxHash): Promise<DepositTransaction | null>;
  existsByTxHash(txHash: TxHash): Promise<boolean>;
  findByStatus(status: DepositStatus): Promise<DepositTransaction[]>;
  findPendingNotification(limit?: number): Promise<DepositTransaction[]>;
  findByUserIdAndChain(userId: bigint, chainType: ChainType): Promise<DepositTransaction[]>;
  findUnconfirmed(chainType: ChainType, minBlockNumber: bigint): Promise<DepositTransaction[]>;
}

export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
// src/domain/repositories/monitored-address.repository.interface.ts

import { ChainType, EvmAddress } from '@/domain/value-objects';

export interface MonitoredAddressData {
  id: bigint;
  chainType: ChainType;
  address: EvmAddress;
  userId: bigint;
  isActive: boolean;
  createdAt: Date;
}

export interface IMonitoredAddressRepository {
  save(data: Omit<MonitoredAddressData, 'id' | 'createdAt'>): Promise<MonitoredAddressData>;
  findByAddress(chainType: ChainType, address: EvmAddress): Promise<MonitoredAddressData | null>;
  findByUserId(userId: bigint): Promise<MonitoredAddressData[]>;
  findActiveByChain(chainType: ChainType): Promise<MonitoredAddressData[]>;
  getAllActiveAddresses(): Promise<Set<string>>;  // 返回所有激活地址的 Set
  deactivate(id: bigint): Promise<void>;
}

export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');

5. 基础设施层设计

5.1 EVM Provider 适配器

// src/infrastructure/blockchain/evm-provider.adapter.ts

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ethers, JsonRpcProvider, WebSocketProvider, Contract } from 'ethers';
import { ChainTypeEnum } from '@/domain/value-objects';

interface ChainConfig {
  rpcUrl: string;
  wsUrl?: string;
  usdtContract: string;
  decimals: number;
  chainId: number;
}

const ERC20_ABI = [
  'event Transfer(address indexed from, address indexed to, uint256 value)',
  'function balanceOf(address owner) view returns (uint256)',
  'function decimals() view returns (uint8)',
];

@Injectable()
export class EvmProviderAdapter implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(EvmProviderAdapter.name);

  private rpcProviders: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
  private wsProviders: Map<ChainTypeEnum, WebSocketProvider> = new Map();
  private contracts: Map<ChainTypeEnum, Contract> = new Map();

  private readonly chainConfigs: Record<ChainTypeEnum, ChainConfig> = {
    [ChainTypeEnum.KAVA]: {
      rpcUrl: this.configService.get('KAVA_RPC_URL', 'https://evm.kava.io'),
      wsUrl: this.configService.get('KAVA_WS_URL'),
      usdtContract: '0x919C1c267BC06a7039e03fcc2eF738525769109c',
      decimals: 6,
      chainId: 2222,
    },
    [ChainTypeEnum.BSC]: {
      rpcUrl: this.configService.get('BSC_RPC_URL', 'https://bsc-dataseed.binance.org'),
      wsUrl: this.configService.get('BSC_WS_URL'),
      usdtContract: '0x55d398326f99059fF775485246999027B3197955',
      decimals: 18,
      chainId: 56,
    },
  };

  constructor(private readonly configService: ConfigService) {}

  async onModuleInit(): Promise<void> {
    await this.initializeProviders();
  }

  async onModuleDestroy(): Promise<void> {
    await this.closeProviders();
  }

  private async initializeProviders(): Promise<void> {
    for (const [chainType, config] of Object.entries(this.chainConfigs)) {
      try {
        // RPC Provider (用于查询)
        const rpcProvider = new JsonRpcProvider(config.rpcUrl, config.chainId, {
          staticNetwork: true,
        });
        this.rpcProviders.set(chainType as ChainTypeEnum, rpcProvider);

        // WebSocket Provider (用于事件监听)
        if (config.wsUrl) {
          const wsProvider = new WebSocketProvider(config.wsUrl, config.chainId);
          this.wsProviders.set(chainType as ChainTypeEnum, wsProvider);
        }

        // USDT Contract
        const provider = config.wsUrl
          ? this.wsProviders.get(chainType as ChainTypeEnum)!
          : rpcProvider;
        const contract = new Contract(config.usdtContract, ERC20_ABI, provider);
        this.contracts.set(chainType as ChainTypeEnum, contract);

        this.logger.log(`Initialized ${chainType} provider`);
      } catch (error) {
        this.logger.error(`Failed to initialize ${chainType} provider: ${error.message}`);
      }
    }
  }

  private async closeProviders(): Promise<void> {
    for (const wsProvider of this.wsProviders.values()) {
      await wsProvider.destroy();
    }
  }

  getRpcProvider(chainType: ChainTypeEnum): JsonRpcProvider {
    const provider = this.rpcProviders.get(chainType);
    if (!provider) {
      throw new Error(`No RPC provider for chain: ${chainType}`);
    }
    return provider;
  }

  getWsProvider(chainType: ChainTypeEnum): WebSocketProvider | undefined {
    return this.wsProviders.get(chainType);
  }

  getUsdtContract(chainType: ChainTypeEnum): Contract {
    const contract = this.contracts.get(chainType);
    if (!contract) {
      throw new Error(`No USDT contract for chain: ${chainType}`);
    }
    return contract;
  }

  getChainConfig(chainType: ChainTypeEnum): ChainConfig {
    return this.chainConfigs[chainType];
  }

  async getCurrentBlockNumber(chainType: ChainTypeEnum): Promise<bigint> {
    const provider = this.getRpcProvider(chainType);
    const blockNumber = await provider.getBlockNumber();
    return BigInt(blockNumber);
  }

  async getBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
    const contract = this.getUsdtContract(chainType);
    const config = this.getChainConfig(chainType);
    const balance = await contract.balanceOf(address);
    return ethers.formatUnits(balance, config.decimals);
  }
}

5.2 事件监听器服务

// src/infrastructure/blockchain/event-listener.service.ts

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ChainTypeEnum } from '@/domain/value-objects';
import { Contract, EventLog } from 'ethers';

@Injectable()
export class EventListenerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(EventListenerService.name);
  private isListening = false;
  private listeners: Map<ChainTypeEnum, any> = new Map();

  constructor(
    private readonly evmProvider: EvmProviderAdapter,
    private readonly depositService: DepositDetectionService,
    private readonly addressCache: AddressCacheService,
  ) {}

  async onModuleInit(): Promise<void> {
    await this.startListening();
  }

  async onModuleDestroy(): Promise<void> {
    await this.stopListening();
  }

  async startListening(): Promise<void> {
    if (this.isListening) return;

    for (const chainType of Object.values(ChainTypeEnum)) {
      await this.subscribeToChain(chainType);
    }

    this.isListening = true;
    this.logger.log('Event listeners started');
  }

  private async subscribeToChain(chainType: ChainTypeEnum): Promise<void> {
    const wsProvider = this.evmProvider.getWsProvider(chainType);
    if (!wsProvider) {
      this.logger.warn(`No WebSocket provider for ${chainType}, using polling`);
      return;
    }

    const contract = this.evmProvider.getUsdtContract(chainType);

    // 监听所有 Transfer 事件
    const filter = contract.filters.Transfer();

    const listener = async (from: string, to: string, value: bigint, event: EventLog) => {
      try {
        // 检查目标地址是否是平台监控地址
        const isMonitored = await this.addressCache.isMonitoredAddress(chainType, to);
        if (!isMonitored) return;

        this.logger.log(`Detected deposit: ${chainType} ${event.transactionHash}`);

        await this.depositService.handleTransferEvent({
          chainType,
          txHash: event.transactionHash,
          fromAddress: from,
          toAddress: to,
          amount: value.toString(),
          blockNumber: event.blockNumber,
          logIndex: event.index,
        });
      } catch (error) {
        this.logger.error(`Error processing Transfer event: ${error.message}`);
      }
    };

    contract.on(filter, listener);
    this.listeners.set(chainType, { contract, filter, listener });

    this.logger.log(`Subscribed to ${chainType} Transfer events`);
  }

  async stopListening(): Promise<void> {
    for (const [chainType, { contract, filter, listener }] of this.listeners) {
      contract.off(filter, listener);
      this.logger.log(`Unsubscribed from ${chainType}`);
    }
    this.listeners.clear();
    this.isListening = false;
  }
}

5.3 区块扫描器(补扫服务)

// src/infrastructure/blockchain/block-scanner.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { BlockCheckpointRepository } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl';
import { ChainTypeEnum } from '@/domain/value-objects';
import { ethers } from 'ethers';

const BATCH_SIZE = 1000;  // 每次扫描的区块数

@Injectable()
export class BlockScannerService {
  private readonly logger = new Logger(BlockScannerService.name);
  private isScanning: Map<ChainTypeEnum, boolean> = new Map();

  constructor(
    private readonly evmProvider: EvmProviderAdapter,
    private readonly depositService: DepositDetectionService,
    private readonly addressCache: AddressCacheService,
    private readonly checkpointRepo: BlockCheckpointRepository,
  ) {}

  /**
   * 定时补扫任务 (每 5 分钟)
   */
  @Cron(CronExpression.EVERY_5_MINUTES)
  async scheduledScan(): Promise<void> {
    for (const chainType of Object.values(ChainTypeEnum)) {
      await this.scanChain(chainType);
    }
  }

  /**
   * 扫描指定链的区块
   */
  async scanChain(chainType: ChainTypeEnum): Promise<void> {
    if (this.isScanning.get(chainType)) {
      this.logger.debug(`${chainType} scan already in progress`);
      return;
    }

    this.isScanning.set(chainType, true);

    try {
      const checkpoint = await this.checkpointRepo.findByChain(chainType);
      const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);

      let fromBlock = checkpoint
        ? checkpoint.lastScannedBlock + 1n
        : currentBlock - 1000n;  // 首次启动扫描最近 1000 块

      while (fromBlock <= currentBlock) {
        const toBlock = fromBlock + BigInt(BATCH_SIZE) - 1n > currentBlock
          ? currentBlock
          : fromBlock + BigInt(BATCH_SIZE) - 1n;

        await this.scanBlockRange(chainType, fromBlock, toBlock);

        // 更新检查点
        await this.checkpointRepo.updateCheckpoint(chainType, toBlock);

        fromBlock = toBlock + 1n;
      }

      this.logger.debug(`${chainType} scan completed up to block ${currentBlock}`);
    } catch (error) {
      this.logger.error(`${chainType} scan failed: ${error.message}`);
      await this.checkpointRepo.markUnhealthy(chainType, error.message);
    } finally {
      this.isScanning.set(chainType, false);
    }
  }

  /**
   * 扫描指定区块范围
   */
  private async scanBlockRange(
    chainType: ChainTypeEnum,
    fromBlock: bigint,
    toBlock: bigint,
  ): Promise<void> {
    const contract = this.evmProvider.getUsdtContract(chainType);
    const monitoredAddresses = await this.addressCache.getMonitoredAddresses(chainType);

    if (monitoredAddresses.size === 0) {
      return;
    }

    // 查询 Transfer 事件
    const filter = contract.filters.Transfer(null, [...monitoredAddresses]);
    const events = await contract.queryFilter(filter, Number(fromBlock), Number(toBlock));

    for (const event of events) {
      if (!('args' in event)) continue;

      const [from, to, value] = event.args;

      try {
        await this.depositService.handleTransferEvent({
          chainType,
          txHash: event.transactionHash,
          fromAddress: from,
          toAddress: to,
          amount: value.toString(),
          blockNumber: event.blockNumber,
          logIndex: event.index,
        });
      } catch (error) {
        // 重复交易等预期错误,忽略
        if (!error.message.includes('already exists')) {
          this.logger.error(`Error processing event: ${error.message}`);
        }
      }
    }
  }

  /**
   * 手动触发全量扫描
   */
  async fullScan(chainType: ChainTypeEnum, fromBlock: bigint, toBlock: bigint): Promise<void> {
    this.logger.log(`Starting full scan for ${chainType} from ${fromBlock} to ${toBlock}`);
    await this.scanBlockRange(chainType, fromBlock, toBlock);
    this.logger.log(`Full scan completed for ${chainType}`);
  }
}

5.4 地址缓存服务

// src/infrastructure/redis/address-cache.service.ts

import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { RedisService } from './redis.service';
import { IMonitoredAddressRepository, MONITORED_ADDRESS_REPOSITORY } from '@/domain/repositories';
import { Inject } from '@nestjs/common';
import { ChainTypeEnum } from '@/domain/value-objects';

const CACHE_KEY_PREFIX = 'blockchain:addresses:';
const CACHE_TTL = 300;  // 5 分钟

@Injectable()
export class AddressCacheService implements OnModuleInit {
  private readonly logger = new Logger(AddressCacheService.name);
  private localCache: Map<ChainTypeEnum, Set<string>> = new Map();

  constructor(
    private readonly redis: RedisService,
    @Inject(MONITORED_ADDRESS_REPOSITORY)
    private readonly addressRepo: IMonitoredAddressRepository,
  ) {}

  async onModuleInit(): Promise<void> {
    await this.refreshCache();
  }

  /**
   * 检查地址是否被监控
   */
  async isMonitoredAddress(chainType: ChainTypeEnum, address: string): Promise<boolean> {
    const normalizedAddress = address.toLowerCase();

    // 先查本地缓存
    const localSet = this.localCache.get(chainType);
    if (localSet?.has(normalizedAddress)) {
      return true;
    }

    // 再查 Redis
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
    const exists = await this.redis.sismember(cacheKey, normalizedAddress);

    if (exists) {
      // 更新本地缓存
      if (!localSet) {
        this.localCache.set(chainType, new Set([normalizedAddress]));
      } else {
        localSet.add(normalizedAddress);
      }
    }

    return exists;
  }

  /**
   * 获取链的所有监控地址
   */
  async getMonitoredAddresses(chainType: ChainTypeEnum): Promise<Set<string>> {
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
    const addresses = await this.redis.smembers(cacheKey);
    return new Set(addresses);
  }

  /**
   * 添加监控地址
   */
  async addAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
    const normalizedAddress = address.toLowerCase();
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;

    await this.redis.sadd(cacheKey, normalizedAddress);

    // 更新本地缓存
    let localSet = this.localCache.get(chainType);
    if (!localSet) {
      localSet = new Set();
      this.localCache.set(chainType, localSet);
    }
    localSet.add(normalizedAddress);
  }

  /**
   * 移除监控地址
   */
  async removeAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
    const normalizedAddress = address.toLowerCase();
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;

    await this.redis.srem(cacheKey, normalizedAddress);

    // 更新本地缓存
    this.localCache.get(chainType)?.delete(normalizedAddress);
  }

  /**
   * 从数据库刷新缓存
   */
  async refreshCache(): Promise<void> {
    this.logger.log('Refreshing address cache from database...');

    for (const chainType of Object.values(ChainTypeEnum)) {
      const addresses = await this.addressRepo.findActiveByChain(
        { value: chainType } as any
      );

      const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
      const addressSet = new Set(addresses.map(a => a.address.value.toLowerCase()));

      // 更新 Redis
      if (addressSet.size > 0) {
        await this.redis.del(cacheKey);
        await this.redis.sadd(cacheKey, ...addressSet);
      }

      // 更新本地缓存
      this.localCache.set(chainType, addressSet);

      this.logger.log(`Loaded ${addressSet.size} addresses for ${chainType}`);
    }
  }
}

6. 应用层设计

6.1 充值检测服务

// src/application/services/deposit-detection.service.ts

import { Injectable, Inject, Logger } from '@nestjs/common';
import {
  IDepositTransactionRepository,
  DEPOSIT_TRANSACTION_REPOSITORY,
  IMonitoredAddressRepository,
  MONITORED_ADDRESS_REPOSITORY,
} from '@/domain/repositories';
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber, ChainTypeEnum } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';

export interface TransferEventData {
  chainType: ChainTypeEnum;
  txHash: string;
  fromAddress: string;
  toAddress: string;
  amount: string;
  blockNumber: number;
  logIndex: number;
}

@Injectable()
export class DepositDetectionService {
  private readonly logger = new Logger(DepositDetectionService.name);

  constructor(
    @Inject(DEPOSIT_TRANSACTION_REPOSITORY)
    private readonly depositRepo: IDepositTransactionRepository,
    @Inject(MONITORED_ADDRESS_REPOSITORY)
    private readonly addressRepo: IMonitoredAddressRepository,
    private readonly eventPublisher: EventPublisherService,
    private readonly walletClient: WalletClientService,
    private readonly evmProvider: EvmProviderAdapter,
  ) {}

  /**
   * 处理检测到的 Transfer 事件
   */
  async handleTransferEvent(data: TransferEventData): Promise<void> {
    const txHash = TxHash.create(data.txHash);

    // 检查是否已处理
    const exists = await this.depositRepo.existsByTxHash(txHash);
    if (exists) {
      this.logger.debug(`Deposit ${data.txHash} already exists, skipping`);
      return;
    }

    // 查找监控地址信息
    const chainType = ChainType.create(data.chainType);
    const toAddress = EvmAddress.create(data.toAddress);
    const addressInfo = await this.addressRepo.findByAddress(chainType, toAddress);

    if (!addressInfo) {
      this.logger.warn(`Address ${data.toAddress} not found in monitored addresses`);
      return;
    }

    // 获取区块时间
    const block = await this.evmProvider.getRpcProvider(data.chainType).getBlock(data.blockNumber);
    const blockTimestamp = new Date(block!.timestamp * 1000);

    // 获取代币精度
    const chainConfig = this.evmProvider.getChainConfig(data.chainType);

    // 创建充值记录
    const deposit = DepositTransaction.create({
      chainType,
      txHash,
      fromAddress: EvmAddress.create(data.fromAddress),
      toAddress,
      tokenContract: EvmAddress.create(chainConfig.usdtContract),
      amount: TokenAmount.create(data.amount, chainConfig.decimals),
      blockNumber: BlockNumber.create(data.blockNumber),
      blockTimestamp,
      logIndex: data.logIndex,
      addressId: addressInfo.id,
      userId: addressInfo.userId,
    });

    // 保存并发布事件
    await this.depositRepo.save(deposit);
    await this.eventPublisher.publishAll(deposit.domainEvents);
    deposit.clearDomainEvents();

    this.logger.log(`Deposit detected: ${data.txHash}, amount: ${deposit.amount.formatted}`);

    // 检查确认数
    await this.checkConfirmationsAndNotify(deposit);
  }

  /**
   * 检查确认数并通知 wallet-service
   */
  async checkConfirmationsAndNotify(deposit: DepositTransaction): Promise<void> {
    const chainType = deposit.chainType;
    const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType.value as ChainTypeEnum);
    const confirmations = Number(currentBlock - deposit.blockNumber.value) + 1;
    const requiredConfirmations = chainType.requiredConfirmations;

    deposit.updateConfirmations(confirmations, requiredConfirmations);

    if (deposit.isConfirmed && !deposit.isNotified) {
      try {
        // 调用 wallet-service 入账
        await this.walletClient.handleDeposit({
          userId: deposit.userId.toString(),
          amount: deposit.amount.value,
          chainType: deposit.chainType.value,
          txHash: deposit.txHash.value,
        });

        deposit.markNotified();
        this.logger.log(`Deposit notified: ${deposit.txHash.value}`);
      } catch (error) {
        deposit.recordNotifyFailure(error.message);
        this.logger.error(`Failed to notify deposit: ${error.message}`);
      }
    }

    await this.depositRepo.save(deposit);
    await this.eventPublisher.publishAll(deposit.domainEvents);
    deposit.clearDomainEvents();
  }

  /**
   * 定时任务:更新未确认交易的确认数
   */
  async updatePendingConfirmations(): Promise<void> {
    for (const chainType of Object.values(ChainTypeEnum)) {
      const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
      const minBlock = currentBlock - 100n;  // 只处理最近 100 块内的

      const pendingDeposits = await this.depositRepo.findUnconfirmed(
        ChainType.create(chainType),
        minBlock,
      );

      for (const deposit of pendingDeposits) {
        await this.checkConfirmationsAndNotify(deposit);
      }
    }
  }

  /**
   * 定时任务:重试失败的通知
   */
  async retryFailedNotifications(): Promise<void> {
    const pendingNotifications = await this.depositRepo.findPendingNotification(50);

    for (const deposit of pendingNotifications) {
      if (deposit.toProps().notifyAttempts >= 10) {
        this.logger.error(`Deposit ${deposit.txHash.value} exceeded max retry attempts`);
        continue;
      }

      await this.checkConfirmationsAndNotify(deposit);
    }
  }
}

6.2 余额查询服务

// src/application/services/balance-query.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { ChainTypeEnum } from '@/domain/value-objects';

export interface BalanceResult {
  chainType: string;
  address: string;
  usdtBalance: string;
  nativeBalance: string;
}

@Injectable()
export class BalanceQueryService {
  private readonly logger = new Logger(BalanceQueryService.name);

  constructor(private readonly evmProvider: EvmProviderAdapter) {}

  /**
   * 查询单个地址的余额
   */
  async getBalance(chainType: ChainTypeEnum, address: string): Promise<BalanceResult> {
    const [usdtBalance, nativeBalance] = await Promise.all([
      this.evmProvider.getBalance(chainType, address),
      this.getNativeBalance(chainType, address),
    ]);

    return {
      chainType,
      address,
      usdtBalance,
      nativeBalance,
    };
  }

  /**
   * 批量查询余额
   */
  async getBatchBalances(
    addresses: Array<{ chainType: ChainTypeEnum; address: string }>,
  ): Promise<BalanceResult[]> {
    return Promise.all(
      addresses.map(({ chainType, address }) => this.getBalance(chainType, address)),
    );
  }

  private async getNativeBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
    const provider = this.evmProvider.getRpcProvider(chainType);
    const balance = await provider.getBalance(address);
    const { ethers } = await import('ethers');
    return ethers.formatEther(balance);
  }
}

7. Kafka 事件设计

7.1 发布的事件

// src/infrastructure/kafka/event-publisher.service.ts

export const BLOCKCHAIN_TOPICS = {
  DEPOSIT_DETECTED: 'blockchain.DepositDetected',
  DEPOSIT_CONFIRMED: 'blockchain.DepositConfirmed',
  TRANSACTION_BROADCASTED: 'blockchain.TransactionBroadcasted',
  TRANSACTION_CONFIRMED: 'blockchain.TransactionConfirmed',
  BLOCK_SCANNED: 'blockchain.BlockScanned',
} as const;

7.2 消费的事件

// src/infrastructure/kafka/event-consumer.controller.ts

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
import { AddressRegistryService } from '@/application/services/address-registry.service';

@Controller()
export class EventConsumerController {
  private readonly logger = new Logger(EventConsumerController.name);

  constructor(private readonly addressRegistry: AddressRegistryService) {}

  /**
   * 监听用户创建事件,注册监控地址
   */
  @MessagePattern('identity.UserAccountCreated')
  async handleUserCreated(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received UserAccountCreated: ${message.payload.userId}`);

    // 注册用户的钱包地址到监控列表
    // 注意:地址信息可能在后续的 WalletBound 事件中
  }

  /**
   * 监听钱包绑定事件,注册监控地址
   */
  @MessagePattern('identity.WalletBound')
  async handleWalletBound(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received WalletBound: userId=${message.payload.userId}`);

    await this.addressRegistry.registerAddress({
      userId: BigInt(message.payload.userId),
      chainType: message.payload.chainType,
      address: message.payload.address,
    });
  }

  /**
   * 监听多钱包绑定事件
   */
  @MessagePattern('identity.MultipleWalletsBound')
  async handleMultipleWalletsBound(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received MultipleWalletsBound: userId=${message.payload.userId}`);

    for (const wallet of message.payload.wallets) {
      await this.addressRegistry.registerAddress({
        userId: BigInt(message.payload.userId),
        chainType: wallet.chainType,
        address: wallet.address,
      });
    }
  }
}

8. API 设计

8.1 健康检查

// src/api/controllers/health.controller.ts

@Controller('blockchain/health')
export class HealthController {
  @Get()
  @Public()
  async check(): Promise<{ status: string; chains: Record<string, boolean> }> {
    // 检查各链连接状态
  }
}

8.2 余额查询 (内部 API)

// src/api/controllers/balance.controller.ts

@Controller('blockchain/balance')
export class BalanceController {
  @Get(':chainType/:address')
  @Public()  // 内部服务调用
  async getBalance(
    @Param('chainType') chainType: string,
    @Param('address') address: string,
  ): Promise<BalanceResult> {
    // ...
  }
}

9. 配置设计

9.1 环境变量

# .env.example

# 服务配置
PORT=3012
NODE_ENV=development

# 数据库
DATABASE_URL=postgresql://rwa_user:password@localhost:5432/rwa_blockchain

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=11

# Kafka
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=blockchain-service

# 区块链 RPC
KAVA_RPC_URL=https://evm.kava.io
KAVA_WS_URL=wss://evm.kava.io/ws
BSC_RPC_URL=https://bsc-dataseed.binance.org
BSC_WS_URL=wss://bsc-ws-node.nariox.org:443

# 服务间调用
WALLET_SERVICE_URL=http://localhost:3001
IDENTITY_SERVICE_URL=http://localhost:3000

10. 部署配置

10.1 docker-compose.yml 配置

在主 docker-compose.yml 中添加:

blockchain-service:
  build:
    context: ./blockchain-service
    dockerfile: Dockerfile
  container_name: rwa-blockchain-service
  ports:
    - "3012:3012"
  environment:
    - NODE_ENV=production
    - DATABASE_URL=postgresql://${POSTGRES_USER:-rwa_user}:${POSTGRES_PASSWORD:-rwa_secure_password}@postgres:5432/rwa_blockchain?schema=public
    - REDIS_HOST=redis
    - REDIS_PORT=6379
    - REDIS_DB=11
    - KAFKA_BROKERS=kafka:29092
    - KAFKA_CLIENT_ID=blockchain-service
    - KAVA_RPC_URL=${KAVA_RPC_URL:-https://evm.kava.io}
    - KAVA_WS_URL=${KAVA_WS_URL}
    - BSC_RPC_URL=${BSC_RPC_URL:-https://bsc-dataseed.binance.org}
    - BSC_WS_URL=${BSC_WS_URL}
    - WALLET_SERVICE_URL=http://rwa-wallet-service:3001
    - IDENTITY_SERVICE_URL=http://rwa-identity-service:3000
  depends_on:
    postgres:
      condition: service_healthy
    redis:
      condition: service_healthy
    kafka:
      condition: service_healthy
  networks:
    - rwa-network
  restart: unless-stopped
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:3012/api/v1/blockchain/health"]
    interval: 30s
    timeout: 10s
    retries: 3
    start_period: 40s

10.2 Kong 路由配置

# 在 kong.yml 中添加
- name: blockchain-service
  url: http://192.168.1.111:3012
  routes:
    - name: blockchain-api
      paths:
        - /api/v1/blockchain
      strip_path: false

11. 初始化脚本

scripts/init-databases.sh 中添加 rwa_blockchain

for db in rwa_identity rwa_wallet ... rwa_blockchain; do

12. 关键设计决策

12.1 双重保障机制

实时监听 (WebSocket)     补扫任务 (定时)
        │                      │
        ▼                      ▼
    检测充值 ───────────────> 去重处理
        │                      │
        └──────────┬───────────┘
                   ▼
             确认数检查
                   │
                   ▼
           通知 wallet-service

12.2 确认数策略

要求确认数 约等待时间
KAVA 12 块 ~72 秒
BSC 15 块 ~45 秒

12.3 失败重试策略

  • 通知失败最多重试 10 次
  • 指数退避1s, 2s, 4s, 8s, ...
  • 超过最大重试次数告警

13. 测试策略

13.1 单元测试

  • 领域对象测试 (DepositTransaction, ValueObjects)
  • 确认数计算逻辑
  • 事件生成逻辑

13.2 集成测试

  • 区块链 Provider 连接
  • 事件监听和解析
  • 数据库读写

13.3 E2E 测试

  • 完整充值流程模拟
  • 补扫逻辑验证
  • 失败重试验证

14. 监控指标

  • blockchain_deposit_detected_total - 检测到的充值数
  • blockchain_deposit_confirmed_total - 确认的充值数
  • blockchain_notify_failed_total - 通知失败数
  • blockchain_block_scan_lag - 扫描延迟 (当前块 - 已扫描块)
  • blockchain_rpc_latency - RPC 调用延迟
  • blockchain_ws_reconnect_total - WebSocket 重连次数

15. 迁移计划

从 identity-service 迁移

  1. BlockchainQueryService 逻辑迁移到 BalanceQueryService
  2. DepositService 逻辑拆分:
    • 地址获取 → 保留在 identity-service
    • 余额查询 → 迁移到 blockchain-service
  3. 新增事件监听、补扫逻辑
  4. identity-service 发布 WalletBound 事件
  5. blockchain-service 消费事件并注册监控地址

文档版本: 1.0.0 最后更新: 2025-12-03