rwadurian/backend/services/blockchain-service/DEVELOPMENT_GUIDE.md

100 KiB
Raw Blame History

Blockchain Service 开发指南

1. 服务概述

1.1 服务定位

blockchain-service 是 RWA 榴莲皇后平台的区块链基础设施服务,负责:

  • 公钥→地址派生:从 MPC 公钥派生多链钱包地址 (EVM/Cosmos)
  • 链上事件监听:监听 ERC20 Transfer 事件,检测用户充值
  • 充值入账触发:检测到充值后通知 wallet-service 入账
  • 余额查询:查询链上 USDT/原生代币余额
  • 交易广播:提交签名后的交易到链上
  • 地址管理:管理平台充值地址池

1.4 与其他服务的关系

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ identity-service│     │   mpc-service   │     │blockchain-service│
├─────────────────┤     ├─────────────────┤     ├─────────────────┤
│ - 用户账户      │     │ - 密钥分片生成  │     │ - 公钥→地址派生 │
│ - 设备绑定      │     │ - 签名协调      │     │ - 充值检测      │
│ - KYC验证       │     │ - 分片存储      │     │ - 交易广播      │
│ - 身份认证      │     │ - 阈值策略      │     │ - 余额查询      │
└────────┬────────┘     └────────┬────────┘     └────────┬────────┘
         │                       │                       │
         │ MpcKeygenRequested    │ KeygenCompleted       │
         │──────────────────────>│ (with publicKey)      │
         │                       │──────────────────────>│
         │                       │                       │ derive address
         │           WalletAddressCreated                │
         │<──────────────────────────────────────────────│
         │ (存储 userId ↔ walletAddress 关联)            │
         │                       │                       │
         │                wallet-service                 │
         │                       │                       │
         │           WalletAddressCreated                │
         │<──────────────────────────────────────────────│
         │ (存储钱包地址、管理余额)                       │

职责边界原则

  • identity-service:只关心用户身份,不处理区块链技术细节
  • mpc-service:只关心 MPC 协议,生成公钥后发布事件
  • blockchain-service:封装所有区块链技术,包括地址派生、链交互
  • wallet-service:管理用户钱包的业务逻辑,不直接与链交互

1.2 技术栈

组件 技术选型
框架 NestJS 10.x
语言 TypeScript 5.x
数据库 PostgreSQL 15 + Prisma
消息队列 Kafka
缓存 Redis
区块链 ethers.js 6.x
容器化 Docker

1.3 端口分配

  • HTTP API: 3012
  • 数据库: rwa_blockchain (共享 PostgreSQL)
  • Redis DB: 11

2. 架构设计

2.1 六边形架构 (Hexagonal Architecture)

┌─────────────────────────────────────────────────────────────────────┐
│                          blockchain-service                          │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                        API Layer                             │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ Health Ctrl  │  │ Balance Ctrl │  │ Internal Ctrl│       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Application Layer                         │   │
│  │  ┌──────────────────┐  ┌──────────────────┐                 │   │
│  │  │ DepositDetection │  │ BalanceQuery     │                 │   │
│  │  │ Service          │  │ Service          │                 │   │
│  │  └──────────────────┘  └──────────────────┘                 │   │
│  │  ┌──────────────────┐  ┌──────────────────┐                 │   │
│  │  │ TransactionBroad │  │ AddressRegistry  │                 │   │
│  │  │ castService      │  │ Service          │                 │   │
│  │  └──────────────────┘  └──────────────────┘                 │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      Domain Layer                            │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Aggregates                                              │ │   │
│  │  │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐   │ │   │
│  │  │ │DepositTx   │ │MonitoredAddr│ │TransactionRequest │   │ │   │
│  │  │ └────────────┘ └────────────┘ └────────────────────┘   │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Domain Events                                           │ │   │
│  │  │ DepositDetected, TransactionBroadcasted, BlockScanned  │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  │  ┌────────────────────────────────────────────────────────┐ │   │
│  │  │ Repository Interfaces (Ports)                          │ │   │
│  │  │ IDepositTxRepository, IMonitoredAddressRepository      │ │   │
│  │  └────────────────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                  Infrastructure Layer                        │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ Prisma Repos │  │ Kafka        │  │ Redis Cache  │       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │   │
│  │  │ EVM Provider │  │ Event        │  │ Block        │       │   │
│  │  │ Adapter      │  │ Listener     │  │ Scanner      │       │   │
│  │  └──────────────┘  └──────────────┘  └──────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

2.2 目录结构

blockchain-service/
├── prisma/
│   └── schema.prisma
├── src/
│   ├── modules/                      # NestJS 模块定义 (与其他服务保持一致)
│   │   ├── api.module.ts             # API 层模块
│   │   ├── application.module.ts     # 应用层模块
│   │   ├── domain.module.ts          # 领域层模块
│   │   └── infrastructure.module.ts  # 基础设施层模块
│   │
│   ├── api/                          # 入站适配器 (Driving Adapters)
│   │   ├── controllers/
│   │   │   ├── health.controller.ts
│   │   │   ├── balance.controller.ts
│   │   │   └── internal.controller.ts
│   │   └── dto/
│   │       ├── request/
│   │       │   └── query-balance.dto.ts
│   │       └── response/
│   │           └── balance.dto.ts
│   │
│   ├── application/                  # 应用层 (Use Cases)
│   │   ├── services/
│   │   │   ├── deposit-detection.service.ts
│   │   │   ├── balance-query.service.ts
│   │   │   ├── transaction-broadcast.service.ts
│   │   │   ├── address-registry.service.ts
│   │   │   └── address-derivation.service.ts    # 公钥→地址派生
│   │   ├── commands/
│   │   │   ├── register-address/
│   │   │   │   ├── register-address.command.ts
│   │   │   │   └── register-address.handler.ts
│   │   │   └── broadcast-transaction/
│   │   │       ├── broadcast-transaction.command.ts
│   │   │       └── broadcast-transaction.handler.ts
│   │   └── queries/
│   │       ├── get-balance/
│   │       │   ├── get-balance.query.ts
│   │       │   └── get-balance.handler.ts
│   │       └── get-deposit-history/
│   │           ├── get-deposit-history.query.ts
│   │           └── get-deposit-history.handler.ts
│   │
│   ├── domain/                       # 领域层 (核心业务)
│   │   ├── aggregates/
│   │   │   ├── aggregate-root.base.ts           # 聚合根基类 (统一事件管理)
│   │   │   ├── deposit-transaction/
│   │   │   │   ├── deposit-transaction.aggregate.ts
│   │   │   │   ├── deposit-transaction.factory.ts
│   │   │   │   └── index.ts
│   │   │   ├── monitored-address/
│   │   │   │   ├── monitored-address.aggregate.ts
│   │   │   │   └── index.ts
│   │   │   └── transaction-request/
│   │   │       ├── transaction-request.aggregate.ts
│   │   │       └── index.ts
│   │   ├── entities/
│   │   │   └── block-checkpoint.entity.ts
│   │   ├── events/
│   │   │   ├── domain-event.base.ts
│   │   │   ├── deposit-detected.event.ts
│   │   │   ├── deposit-confirmed.event.ts
│   │   │   ├── transaction-broadcasted.event.ts
│   │   │   ├── wallet-address-created.event.ts  # 地址派生完成事件
│   │   │   └── index.ts
│   │   ├── repositories/
│   │   │   ├── deposit-transaction.repository.interface.ts
│   │   │   ├── monitored-address.repository.interface.ts
│   │   │   ├── block-checkpoint.repository.interface.ts
│   │   │   └── index.ts
│   │   ├── services/
│   │   │   ├── confirmation-policy.service.ts
│   │   │   └── chain-config.service.ts
│   │   ├── value-objects/
│   │   │   ├── chain-type.vo.ts
│   │   │   ├── tx-hash.vo.ts
│   │   │   ├── evm-address.vo.ts
│   │   │   ├── token-amount.vo.ts
│   │   │   ├── block-number.vo.ts
│   │   │   └── index.ts
│   │   └── enums/
│   │       ├── deposit-status.enum.ts
│   │       ├── chain-type.enum.ts
│   │       └── index.ts
│   │
│   ├── infrastructure/              # 出站适配器 (Driven Adapters)
│   │   ├── blockchain/
│   │   │   ├── evm-provider.adapter.ts
│   │   │   ├── event-listener.service.ts
│   │   │   ├── block-scanner.service.ts
│   │   │   ├── transaction-sender.service.ts
│   │   │   ├── address-derivation.adapter.ts    # 地址派生适配器
│   │   │   └── blockchain.module.ts
│   │   ├── persistence/
│   │   │   ├── prisma/
│   │   │   │   └── prisma.service.ts
│   │   │   ├── repositories/
│   │   │   │   ├── deposit-transaction.repository.impl.ts
│   │   │   │   ├── monitored-address.repository.impl.ts
│   │   │   │   └── block-checkpoint.repository.impl.ts
│   │   │   └── mappers/
│   │   │       ├── deposit-transaction.mapper.ts
│   │   │       ├── monitored-address.mapper.ts
│   │   │       └── transaction-request.mapper.ts
│   │   ├── kafka/
│   │   │   ├── event-publisher.service.ts
│   │   │   ├── event-consumer.controller.ts
│   │   │   └── kafka.module.ts
│   │   ├── redis/
│   │   │   ├── redis.service.ts
│   │   │   ├── address-cache.service.ts
│   │   │   └── redis.module.ts
│   │   ├── external/
│   │   │   ├── wallet-service/
│   │   │   │   └── wallet-client.service.ts
│   │   │   └── identity-service/
│   │   │       └── identity-client.service.ts
│   │   └── infrastructure.module.ts
│   │
│   ├── config/
│   │   ├── app.config.ts
│   │   ├── database.config.ts
│   │   ├── kafka.config.ts
│   │   ├── redis.config.ts
│   │   ├── chain.config.ts
│   │   └── index.ts
│   │
│   ├── shared/
│   │   ├── decorators/
│   │   │   ├── public.decorator.ts
│   │   │   └── index.ts
│   │   ├── exceptions/
│   │   │   ├── domain.exception.ts
│   │   │   ├── blockchain.exception.ts
│   │   │   └── index.ts
│   │   ├── filters/
│   │   │   ├── global-exception.filter.ts
│   │   │   └── domain-exception.filter.ts
│   │   └── interceptors/
│   │       └── transform.interceptor.ts
│   │
│   ├── app.module.ts
│   └── main.ts
│
├── test/
│   ├── unit/
│   ├── integration/
│   └── e2e/
│
├── .env.example
├── Dockerfile
├── docker-compose.yml
├── package.json
├── tsconfig.json
├── nest-cli.json
└── DEVELOPMENT_GUIDE.md

2.3 模块化设计 (与其他服务保持一致)

为了与 identity-service、wallet-service、leaderboard-service 等服务保持架构一致性,采用分层模块化设计:

// src/modules/infrastructure.module.ts
import { Global, Module } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service';
import { IdentityClientService } from '@/infrastructure/external/identity-service/identity-client.service';

@Global()
@Module({
  providers: [
    PrismaService,
    RedisService,
    EvmProviderAdapter,
    EventPublisherService,
    WalletClientService,
    IdentityClientService,
  ],
  exports: [
    PrismaService,
    RedisService,
    EvmProviderAdapter,
    EventPublisherService,
    WalletClientService,
    IdentityClientService,
  ],
})
export class InfrastructureModule {}
// src/modules/domain.module.ts
import { Module } from '@nestjs/common';
import { InfrastructureModule } from './infrastructure.module';
import {
  DEPOSIT_TRANSACTION_REPOSITORY,
  MONITORED_ADDRESS_REPOSITORY,
  BLOCK_CHECKPOINT_REPOSITORY,
} from '@/domain/repositories';
import { DepositTransactionRepositoryImpl } from '@/infrastructure/persistence/repositories/deposit-transaction.repository.impl';
import { MonitoredAddressRepositoryImpl } from '@/infrastructure/persistence/repositories/monitored-address.repository.impl';
import { BlockCheckpointRepositoryImpl } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl';
import { ConfirmationPolicyService } from '@/domain/services/confirmation-policy.service';
import { ChainConfigService } from '@/domain/services/chain-config.service';

@Module({
  imports: [InfrastructureModule],
  providers: [
    // 仓储实现 (依赖倒置)
    {
      provide: DEPOSIT_TRANSACTION_REPOSITORY,
      useClass: DepositTransactionRepositoryImpl,
    },
    {
      provide: MONITORED_ADDRESS_REPOSITORY,
      useClass: MonitoredAddressRepositoryImpl,
    },
    {
      provide: BLOCK_CHECKPOINT_REPOSITORY,
      useClass: BlockCheckpointRepositoryImpl,
    },
    // 领域服务
    ConfirmationPolicyService,
    ChainConfigService,
  ],
  exports: [
    DEPOSIT_TRANSACTION_REPOSITORY,
    MONITORED_ADDRESS_REPOSITORY,
    BLOCK_CHECKPOINT_REPOSITORY,
    ConfirmationPolicyService,
    ChainConfigService,
  ],
})
export class DomainModule {}
// src/modules/application.module.ts
import { Module } from '@nestjs/common';
import { DomainModule } from './domain.module';
import { InfrastructureModule } from './infrastructure.module';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { BalanceQueryService } from '@/application/services/balance-query.service';
import { TransactionBroadcastService } from '@/application/services/transaction-broadcast.service';
import { AddressRegistryService } from '@/application/services/address-registry.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { EventListenerService } from '@/infrastructure/blockchain/event-listener.service';
import { BlockScannerService } from '@/infrastructure/blockchain/block-scanner.service';

@Module({
  imports: [DomainModule, InfrastructureModule],
  providers: [
    // 应用服务
    DepositDetectionService,
    BalanceQueryService,
    TransactionBroadcastService,
    AddressRegistryService,
    // 基础设施服务 (需要应用层依赖)
    AddressCacheService,
    EventListenerService,
    BlockScannerService,
  ],
  exports: [
    DepositDetectionService,
    BalanceQueryService,
    TransactionBroadcastService,
    AddressRegistryService,
    AddressCacheService,
  ],
})
export class ApplicationModule {}
// src/modules/api.module.ts
import { Module } from '@nestjs/common';
import { ApplicationModule } from './application.module';
import { HealthController } from '@/api/controllers/health.controller';
import { BalanceController } from '@/api/controllers/balance.controller';
import { InternalController } from '@/api/controllers/internal.controller';
import { EventConsumerController } from '@/infrastructure/kafka/event-consumer.controller';

@Module({
  imports: [ApplicationModule],
  controllers: [
    HealthController,
    BalanceController,
    InternalController,
    EventConsumerController,
  ],
})
export class ApiModule {}
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from '@/modules/api.module';
import appConfig from '@/config/app.config';
import databaseConfig from '@/config/database.config';
import kafkaConfig from '@/config/kafka.config';
import redisConfig from '@/config/redis.config';
import chainConfig from '@/config/chain.config';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      load: [appConfig, databaseConfig, kafkaConfig, redisConfig, chainConfig],
    }),
    ScheduleModule.forRoot(),
    ApiModule,
  ],
})
export class AppModule {}

模块依赖关系图:

┌─────────────────────────────────────────────────────────────┐
│                       AppModule                             │
│  ┌───────────────────────────────────────────────────────┐ │
│  │                     ApiModule                          │ │
│  │  Controllers: Health, Balance, Internal, EventConsumer │ │
│  └───────────────────────┬───────────────────────────────┘ │
│                          │ imports                          │
│  ┌───────────────────────▼───────────────────────────────┐ │
│  │                 ApplicationModule                      │ │
│  │  Services: DepositDetection, BalanceQuery, ...        │ │
│  └───────────────────────┬───────────────────────────────┘ │
│                          │ imports                          │
│  ┌───────────────────────▼───────────────────────────────┐ │
│  │                   DomainModule                         │ │
│  │  Repositories (interfaces → impl), Domain Services    │ │
│  └───────────────────────┬───────────────────────────────┘ │
│                          │ imports                          │
│  ┌───────────────────────▼───────────────────────────────┐ │
│  │               InfrastructureModule (@Global)          │ │
│  │  Prisma, Redis, Kafka, EVM Provider, External Clients │ │
│  └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

3. 数据模型设计

3.1 Prisma Schema

// prisma/schema.prisma

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

// ============================================
// 监控地址表
// 存储需要监听充值的地址
// ============================================
model MonitoredAddress {
  id            BigInt   @id @default(autoincrement()) @map("address_id")

  chainType     String   @map("chain_type") @db.VarChar(20)    // KAVA, BSC
  address       String   @db.VarChar(42)                        // 0x地址

  userId        BigInt   @map("user_id")                        // 关联用户ID

  isActive      Boolean  @default(true) @map("is_active")       // 是否激活监听

  createdAt     DateTime @default(now()) @map("created_at")
  updatedAt     DateTime @updatedAt @map("updated_at")

  deposits      DepositTransaction[]

  @@unique([chainType, address], name: "uk_chain_address")
  @@index([userId], name: "idx_user")
  @@index([chainType, isActive], name: "idx_chain_active")
  @@map("monitored_addresses")
}

// ============================================
// 充值交易表 (Append-Only)
// 记录检测到的所有充值交易
// ============================================
model DepositTransaction {
  id              BigInt   @id @default(autoincrement()) @map("deposit_id")

  chainType       String   @map("chain_type") @db.VarChar(20)
  txHash          String   @unique @map("tx_hash") @db.VarChar(66)

  fromAddress     String   @map("from_address") @db.VarChar(42)
  toAddress       String   @map("to_address") @db.VarChar(42)

  tokenContract   String   @map("token_contract") @db.VarChar(42)  // USDT合约地址
  amount          Decimal  @db.Decimal(36, 18)                      // 原始金额
  amountFormatted Decimal  @map("amount_formatted") @db.Decimal(20, 8)  // 格式化金额

  blockNumber     BigInt   @map("block_number")
  blockTimestamp  DateTime @map("block_timestamp")
  logIndex        Int      @map("log_index")

  // 确认状态
  confirmations   Int      @default(0)
  status          String   @default("DETECTED") @db.VarChar(20)    // DETECTED, CONFIRMING, CONFIRMED, NOTIFIED

  // 关联
  addressId       BigInt   @map("address_id")
  userId          BigInt   @map("user_id")

  // 通知状态
  notifiedAt      DateTime? @map("notified_at")
  notifyAttempts  Int       @default(0) @map("notify_attempts")
  lastNotifyError String?   @map("last_notify_error") @db.Text

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  monitoredAddress MonitoredAddress @relation(fields: [addressId], references: [id])

  @@index([chainType, status], name: "idx_chain_status")
  @@index([userId], name: "idx_deposit_user")
  @@index([blockNumber], name: "idx_block")
  @@index([status, notifiedAt], name: "idx_pending_notify")
  @@map("deposit_transactions")
}

// ============================================
// 区块扫描检查点 (每条链一条记录)
// 记录扫描进度,用于断点续扫
// ============================================
model BlockCheckpoint {
  id              BigInt   @id @default(autoincrement()) @map("checkpoint_id")

  chainType       String   @unique @map("chain_type") @db.VarChar(20)

  lastScannedBlock BigInt  @map("last_scanned_block")
  lastScannedAt    DateTime @map("last_scanned_at")

  // 健康状态
  isHealthy       Boolean  @default(true) @map("is_healthy")
  lastError       String?  @map("last_error") @db.Text

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  @@map("block_checkpoints")
}

// ============================================
// 交易广播请求表
// 记录待广播和已广播的交易
// ============================================
model TransactionRequest {
  id              BigInt   @id @default(autoincrement()) @map("request_id")

  chainType       String   @map("chain_type") @db.VarChar(20)

  // 请求来源
  sourceService   String   @map("source_service") @db.VarChar(50)
  sourceOrderId   String   @map("source_order_id") @db.VarChar(100)

  // 交易数据
  fromAddress     String   @map("from_address") @db.VarChar(42)
  toAddress       String   @map("to_address") @db.VarChar(42)
  value           Decimal  @db.Decimal(36, 18)
  data            String?  @db.Text                                // 合约调用数据

  // 签名数据 (由 MPC 服务提供)
  signedTx        String?  @map("signed_tx") @db.Text

  // 广播结果
  txHash          String?  @map("tx_hash") @db.VarChar(66)
  status          String   @default("PENDING") @db.VarChar(20)     // PENDING, SIGNED, BROADCASTED, CONFIRMED, FAILED

  // Gas 信息
  gasLimit        BigInt?  @map("gas_limit")
  gasPrice        Decimal? @map("gas_price") @db.Decimal(36, 18)
  nonce           Int?

  // 错误信息
  errorMessage    String?  @map("error_message") @db.Text
  retryCount      Int      @default(0) @map("retry_count")

  createdAt       DateTime @default(now()) @map("created_at")
  updatedAt       DateTime @updatedAt @map("updated_at")

  @@unique([sourceService, sourceOrderId], name: "uk_source_order")
  @@index([chainType, status], name: "idx_tx_chain_status")
  @@index([txHash], name: "idx_tx_hash")
  @@map("transaction_requests")
}

// ============================================
// 区块链事件日志 (Append-Only 审计)
// ============================================
model BlockchainEvent {
  id            BigInt   @id @default(autoincrement()) @map("event_id")

  eventType     String   @map("event_type") @db.VarChar(50)

  aggregateId   String   @map("aggregate_id") @db.VarChar(100)
  aggregateType String   @map("aggregate_type") @db.VarChar(50)

  eventData     Json     @map("event_data")

  chainType     String?  @map("chain_type") @db.VarChar(20)
  txHash        String?  @map("tx_hash") @db.VarChar(66)

  occurredAt    DateTime @default(now()) @map("occurred_at") @db.Timestamp(6)

  @@index([aggregateType, aggregateId], name: "idx_event_aggregate")
  @@index([eventType], name: "idx_event_type")
  @@index([chainType], name: "idx_event_chain")
  @@index([occurredAt], name: "idx_event_occurred")
  @@map("blockchain_events")
}

4. 领域层设计

4.1 聚合根基类 (与其他服务保持一致)

为了与 authorization-service、wallet-service 等服务保持架构一致性,抽取统一的聚合根基类:

// src/domain/aggregates/aggregate-root.base.ts

import { DomainEvent } from '@/domain/events/domain-event.base';

/**
 * 聚合根基类
 *
 * 所有聚合根都应继承此基类,统一管理领域事件的收集和清理。
 * 参考: authorization-service/src/domain/aggregates/aggregate-root.base.ts
 */
export abstract class AggregateRoot<TId = bigint> {
  private readonly _domainEvents: DomainEvent[] = [];

  /**
   * 聚合根唯一标识
   */
  abstract get id(): TId | undefined;

  /**
   * 获取所有待发布的领域事件
   */
  get domainEvents(): ReadonlyArray<DomainEvent> {
    return [...this._domainEvents];
  }

  /**
   * 添加领域事件
   * @param event 领域事件
   */
  protected addDomainEvent(event: DomainEvent): void {
    this._domainEvents.push(event);
  }

  /**
   * 清空领域事件(在事件发布后调用)
   */
  clearDomainEvents(): void {
    this._domainEvents.length = 0;
  }

  /**
   * 检查是否有待发布的领域事件
   */
  hasDomainEvents(): boolean {
    return this._domainEvents.length > 0;
  }
}

4.2 聚合根DepositTransaction

// src/domain/aggregates/deposit-transaction/deposit-transaction.aggregate.ts

import { AggregateRoot } from '../aggregate-root.base';
import { DomainEvent, DepositDetectedEvent, DepositConfirmedEvent } from '@/domain/events';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';

export interface DepositTransactionProps {
  id?: bigint;
  chainType: ChainType;
  txHash: TxHash;
  fromAddress: EvmAddress;
  toAddress: EvmAddress;
  tokenContract: EvmAddress;
  amount: TokenAmount;
  blockNumber: BlockNumber;
  blockTimestamp: Date;
  logIndex: number;
  confirmations: number;
  status: DepositStatus;
  addressId: bigint;
  userId: bigint;
  notifiedAt?: Date;
  notifyAttempts: number;
  lastNotifyError?: string;
  createdAt?: Date;
  updatedAt?: Date;
}

export class DepositTransaction extends AggregateRoot<bigint> {
  private constructor(private props: DepositTransactionProps) {
    super();
  }

  // Getters
  get id(): bigint | undefined { return this.props.id; }
  get chainType(): ChainType { return this.props.chainType; }
  get txHash(): TxHash { return this.props.txHash; }
  get fromAddress(): EvmAddress { return this.props.fromAddress; }
  get toAddress(): EvmAddress { return this.props.toAddress; }
  get amount(): TokenAmount { return this.props.amount; }
  get blockNumber(): BlockNumber { return this.props.blockNumber; }
  get confirmations(): number { return this.props.confirmations; }
  get status(): DepositStatus { return this.props.status; }
  get userId(): bigint { return this.props.userId; }
  get isConfirmed(): boolean { return this.props.status === DepositStatus.CONFIRMED; }
  get isNotified(): boolean { return this.props.status === DepositStatus.NOTIFIED; }
  get domainEvents(): DomainEvent[] { return [...this._domainEvents]; }

  /**
   * 创建新的充值交易(检测到时)
   */
  static create(params: {
    chainType: ChainType;
    txHash: TxHash;
    fromAddress: EvmAddress;
    toAddress: EvmAddress;
    tokenContract: EvmAddress;
    amount: TokenAmount;
    blockNumber: BlockNumber;
    blockTimestamp: Date;
    logIndex: number;
    addressId: bigint;
    userId: bigint;
  }): DepositTransaction {
    const deposit = new DepositTransaction({
      ...params,
      confirmations: 1,
      status: DepositStatus.DETECTED,
      notifyAttempts: 0,
    });

    deposit.addDomainEvent(new DepositDetectedEvent({
      chainType: params.chainType.value,
      txHash: params.txHash.value,
      fromAddress: params.fromAddress.value,
      toAddress: params.toAddress.value,
      amount: params.amount.formatted,
      userId: params.userId.toString(),
      blockNumber: params.blockNumber.value,
    }));

    return deposit;
  }

  /**
   * 从数据库重建
   */
  static reconstruct(props: DepositTransactionProps): DepositTransaction {
    return new DepositTransaction(props);
  }

  /**
   * 更新确认数
   */
  updateConfirmations(newConfirmations: number, requiredConfirmations: number): void {
    this.props.confirmations = newConfirmations;

    if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.DETECTED) {
      this.props.status = DepositStatus.CONFIRMING;
    }

    if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.CONFIRMING) {
      this.confirm();
    }
  }

  /**
   * 确认充值
   */
  private confirm(): void {
    if (this.props.status === DepositStatus.CONFIRMED) {
      return;
    }

    this.props.status = DepositStatus.CONFIRMED;

    this.addDomainEvent(new DepositConfirmedEvent({
      depositId: this.props.id!.toString(),
      chainType: this.props.chainType.value,
      txHash: this.props.txHash.value,
      amount: this.props.amount.formatted,
      userId: this.props.userId.toString(),
      confirmations: this.props.confirmations,
    }));
  }

  /**
   * 标记已通知
   */
  markNotified(): void {
    this.props.status = DepositStatus.NOTIFIED;
    this.props.notifiedAt = new Date();
  }

  /**
   * 记录通知失败
   */
  recordNotifyFailure(error: string): void {
    this.props.notifyAttempts++;
    this.props.lastNotifyError = error;
  }

  // 注意addDomainEvent 和 clearDomainEvents 方法已在 AggregateRoot 基类中定义

  toProps(): DepositTransactionProps {
    return { ...this.props };
  }
}

4.3 值对象

4.3.1 EVM 地址值对象

// src/domain/value-objects/evm-address.vo.ts

import { ethers } from 'ethers';

export class EvmAddress {
  private constructor(private readonly _value: string) {}

  get value(): string {
    return this._value;
  }

  get checksummed(): string {
    return ethers.getAddress(this._value);
  }

  static create(address: string): EvmAddress {
    if (!ethers.isAddress(address)) {
      throw new Error(`Invalid EVM address: ${address}`);
    }
    return new EvmAddress(ethers.getAddress(address)); // 标准化为 checksum 格式
  }

  equals(other: EvmAddress): boolean {
    return this._value.toLowerCase() === other._value.toLowerCase();
  }

  toString(): string {
    return this._value;
  }
}

4.3.2 代币金额值对象

// src/domain/value-objects/token-amount.vo.ts

import Decimal from 'decimal.js';

export class TokenAmount {
  private constructor(
    private readonly _raw: Decimal,      // 链上原始值 (wei)
    private readonly _decimals: number,  // 代币精度
  ) {}

  get raw(): Decimal {
    return this._raw;
  }

  get decimals(): number {
    return this._decimals;
  }

  get formatted(): string {
    return this._raw.div(new Decimal(10).pow(this._decimals)).toFixed(8);
  }

  get value(): number {
    return parseFloat(this.formatted);
  }

  static create(raw: string | bigint, decimals: number): TokenAmount {
    return new TokenAmount(new Decimal(raw.toString()), decimals);
  }

  static fromFormatted(amount: string, decimals: number): TokenAmount {
    const raw = new Decimal(amount).mul(new Decimal(10).pow(decimals));
    return new TokenAmount(raw, decimals);
  }

  isZero(): boolean {
    return this._raw.isZero();
  }

  greaterThan(other: TokenAmount): boolean {
    return this._raw.greaterThan(other._raw);
  }
}

4.3.3 链类型值对象

// src/domain/value-objects/chain-type.vo.ts

export enum ChainTypeEnum {
  KAVA = 'KAVA',
  BSC = 'BSC',
}

export class ChainType {
  private constructor(private readonly _value: ChainTypeEnum) {}

  get value(): ChainTypeEnum {
    return this._value;
  }

  get name(): string {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 'KAVA EVM';
      case ChainTypeEnum.BSC: return 'BSC';
    }
  }

  get requiredConfirmations(): number {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 12;
      case ChainTypeEnum.BSC: return 15;
    }
  }

  get blockTime(): number {
    switch (this._value) {
      case ChainTypeEnum.KAVA: return 6;  // 约 6 秒
      case ChainTypeEnum.BSC: return 3;   // 约 3 秒
    }
  }

  static create(value: string): ChainType {
    if (!Object.values(ChainTypeEnum).includes(value as ChainTypeEnum)) {
      throw new Error(`Invalid chain type: ${value}`);
    }
    return new ChainType(value as ChainTypeEnum);
  }

  static KAVA(): ChainType {
    return new ChainType(ChainTypeEnum.KAVA);
  }

  static BSC(): ChainType {
    return new ChainType(ChainTypeEnum.BSC);
  }

  equals(other: ChainType): boolean {
    return this._value === other._value;
  }
}

4.3.4 区块号值对象

// src/domain/value-objects/block-number.vo.ts

/**
 * 区块号值对象
 *
 * 封装区块号,确保类型安全和业务规则验证
 */
export class BlockNumber {
  private constructor(private readonly _value: bigint) {}

  get value(): bigint {
    return this._value;
  }

  /**
   * 创建区块号值对象
   * @param value 区块号(支持 number 或 bigint
   * @throws Error 如果区块号为负数
   */
  static create(value: number | bigint): BlockNumber {
    const bn = BigInt(value);
    if (bn < 0n) {
      throw new Error(`Invalid block number: ${value}. Block number cannot be negative.`);
    }
    return new BlockNumber(bn);
  }

  /**
   * 计算两个区块之间的差值
   */
  diff(other: BlockNumber): bigint {
    return this._value - other._value;
  }

  /**
   * 检查是否大于另一个区块号
   */
  greaterThan(other: BlockNumber): boolean {
    return this._value > other._value;
  }

  /**
   * 检查是否小于另一个区块号
   */
  lessThan(other: BlockNumber): boolean {
    return this._value < other._value;
  }

  /**
   * 加上指定数量的区块
   */
  add(blocks: number | bigint): BlockNumber {
    return new BlockNumber(this._value + BigInt(blocks));
  }

  /**
   * 减去指定数量的区块
   */
  subtract(blocks: number | bigint): BlockNumber {
    const result = this._value - BigInt(blocks);
    if (result < 0n) {
      throw new Error('Block number cannot be negative after subtraction');
    }
    return new BlockNumber(result);
  }

  equals(other: BlockNumber): boolean {
    return this._value === other._value;
  }

  toString(): string {
    return this._value.toString();
  }

  toNumber(): number {
    return Number(this._value);
  }
}

4.3.5 交易哈希值对象

// src/domain/value-objects/tx-hash.vo.ts

/**
 * 交易哈希值对象
 *
 * 封装 EVM 交易哈希确保格式正确0x + 64位十六进制字符
 */
export class TxHash {
  private static readonly PATTERN = /^0x[a-fA-F0-9]{64}$/;

  private constructor(private readonly _value: string) {}

  get value(): string {
    return this._value;
  }

  /**
   * 获取标准化(小写)的哈希值
   */
  get normalized(): string {
    return this._value.toLowerCase();
  }

  /**
   * 创建交易哈希值对象
   * @param hash 交易哈希字符串
   * @throws Error 如果哈希格式无效
   */
  static create(hash: string): TxHash {
    if (!this.isValid(hash)) {
      throw new Error(`Invalid transaction hash: ${hash}. Expected format: 0x + 64 hex characters.`);
    }
    // 统一存储为小写格式
    return new TxHash(hash.toLowerCase());
  }

  /**
   * 验证交易哈希格式是否有效
   */
  static isValid(hash: string): boolean {
    return this.PATTERN.test(hash);
  }

  /**
   * 获取缩短显示格式 (0x1234...abcd)
   */
  toShortString(prefixLength = 6, suffixLength = 4): string {
    return `${this._value.slice(0, prefixLength + 2)}...${this._value.slice(-suffixLength)}`;
  }

  equals(other: TxHash): boolean {
    return this._value === other._value;
  }

  toString(): string {
    return this._value;
  }
}

4.3.6 值对象导出索引

// src/domain/value-objects/index.ts

export { ChainType, ChainTypeEnum } from './chain-type.vo';
export { EvmAddress } from './evm-address.vo';
export { TokenAmount } from './token-amount.vo';
export { BlockNumber } from './block-number.vo';
export { TxHash } from './tx-hash.vo';

4.4 领域事件

4.6 WalletAddressCreated 事件

// src/domain/events/wallet-address-created.event.ts

import { DomainEvent } from './domain-event.base';

export interface WalletAddressCreatedPayload {
  userId: string;
  username: string;
  publicKey: string;           // 压缩公钥 (33 bytes hex)
  addresses: Array<{
    chainType: string;         // BSC, KAVA, DST
    address: string;           // 派生的地址
    addressType: string;       // EVM 或 COSMOS
  }>;
  mpcSessionId: string;        // MPC 会话 ID
  delegateShare?: {
    partyId: string;
    partyIndex: number;
    encryptedShare: string;
  };
}

export class WalletAddressCreatedEvent extends DomainEvent {
  constructor(public readonly payload: WalletAddressCreatedPayload) {
    super();
  }

  get eventType(): string {
    return 'WalletAddressCreated';
  }

  get aggregateId(): string {
    return this.payload.userId;
  }

  get aggregateType(): string {
    return 'WalletAddress';
  }
}

// src/domain/events/deposit-detected.event.ts

import { DomainEvent } from './domain-event.base';

export interface DepositDetectedPayload {
  chainType: string;
  txHash: string;
  fromAddress: string;
  toAddress: string;
  amount: string;
  userId: string;
  blockNumber: number;
}

export class DepositDetectedEvent extends DomainEvent {
  constructor(public readonly payload: DepositDetectedPayload) {
    super();
  }

  get eventType(): string {
    return 'DepositDetected';
  }

  get aggregateId(): string {
    return this.payload.txHash;
  }

  get aggregateType(): string {
    return 'DepositTransaction';
  }
}
// src/domain/events/deposit-confirmed.event.ts

import { DomainEvent } from './domain-event.base';

export interface DepositConfirmedPayload {
  depositId: string;
  chainType: string;
  txHash: string;
  amount: string;
  userId: string;
  confirmations: number;
}

export class DepositConfirmedEvent extends DomainEvent {
  constructor(public readonly payload: DepositConfirmedPayload) {
    super();
  }

  get eventType(): string {
    return 'DepositConfirmed';
  }

  get aggregateId(): string {
    return this.payload.depositId;
  }

  get aggregateType(): string {
    return 'DepositTransaction';
  }
}

4.5 仓储接口 (Ports)

// src/domain/repositories/deposit-transaction.repository.interface.ts

import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';

export interface IDepositTransactionRepository {
  save(deposit: DepositTransaction): Promise<DepositTransaction>;
  findById(id: bigint): Promise<DepositTransaction | null>;
  findByTxHash(txHash: TxHash): Promise<DepositTransaction | null>;
  existsByTxHash(txHash: TxHash): Promise<boolean>;
  findByStatus(status: DepositStatus): Promise<DepositTransaction[]>;
  findPendingNotification(limit?: number): Promise<DepositTransaction[]>;
  findByUserIdAndChain(userId: bigint, chainType: ChainType): Promise<DepositTransaction[]>;
  findUnconfirmed(chainType: ChainType, minBlockNumber: bigint): Promise<DepositTransaction[]>;
}

export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
// src/domain/repositories/monitored-address.repository.interface.ts

import { ChainType, EvmAddress } from '@/domain/value-objects';

export interface MonitoredAddressData {
  id: bigint;
  chainType: ChainType;
  address: EvmAddress;
  userId: bigint;
  isActive: boolean;
  createdAt: Date;
}

export interface IMonitoredAddressRepository {
  save(data: Omit<MonitoredAddressData, 'id' | 'createdAt'>): Promise<MonitoredAddressData>;
  findByAddress(chainType: ChainType, address: EvmAddress): Promise<MonitoredAddressData | null>;
  findByUserId(userId: bigint): Promise<MonitoredAddressData[]>;
  findActiveByChain(chainType: ChainType): Promise<MonitoredAddressData[]>;
  getAllActiveAddresses(): Promise<Set<string>>;  // 返回所有激活地址的 Set
  deactivate(id: bigint): Promise<void>;
}

export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');

5. 基础设施层设计

5.1 EVM Provider 适配器

// src/infrastructure/blockchain/evm-provider.adapter.ts

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ethers, JsonRpcProvider, WebSocketProvider, Contract } from 'ethers';
import { ChainTypeEnum } from '@/domain/value-objects';

interface ChainConfig {
  rpcUrl: string;
  wsUrl?: string;
  usdtContract: string;
  decimals: number;
  chainId: number;
}

const ERC20_ABI = [
  'event Transfer(address indexed from, address indexed to, uint256 value)',
  'function balanceOf(address owner) view returns (uint256)',
  'function decimals() view returns (uint8)',
];

@Injectable()
export class EvmProviderAdapter implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(EvmProviderAdapter.name);

  private rpcProviders: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
  private wsProviders: Map<ChainTypeEnum, WebSocketProvider> = new Map();
  private contracts: Map<ChainTypeEnum, Contract> = new Map();

  private readonly chainConfigs: Record<ChainTypeEnum, ChainConfig> = {
    [ChainTypeEnum.KAVA]: {
      rpcUrl: this.configService.get('KAVA_RPC_URL', 'https://evm.kava.io'),
      wsUrl: this.configService.get('KAVA_WS_URL'),
      usdtContract: '0x919C1c267BC06a7039e03fcc2eF738525769109c',
      decimals: 6,
      chainId: 2222,
    },
    [ChainTypeEnum.BSC]: {
      rpcUrl: this.configService.get('BSC_RPC_URL', 'https://bsc-dataseed.binance.org'),
      wsUrl: this.configService.get('BSC_WS_URL'),
      usdtContract: '0x55d398326f99059fF775485246999027B3197955',
      decimals: 18,
      chainId: 56,
    },
  };

  constructor(private readonly configService: ConfigService) {}

  async onModuleInit(): Promise<void> {
    await this.initializeProviders();
  }

  async onModuleDestroy(): Promise<void> {
    await this.closeProviders();
  }

  private async initializeProviders(): Promise<void> {
    for (const [chainType, config] of Object.entries(this.chainConfigs)) {
      try {
        // RPC Provider (用于查询)
        const rpcProvider = new JsonRpcProvider(config.rpcUrl, config.chainId, {
          staticNetwork: true,
        });
        this.rpcProviders.set(chainType as ChainTypeEnum, rpcProvider);

        // WebSocket Provider (用于事件监听)
        if (config.wsUrl) {
          const wsProvider = new WebSocketProvider(config.wsUrl, config.chainId);
          this.wsProviders.set(chainType as ChainTypeEnum, wsProvider);
        }

        // USDT Contract
        const provider = config.wsUrl
          ? this.wsProviders.get(chainType as ChainTypeEnum)!
          : rpcProvider;
        const contract = new Contract(config.usdtContract, ERC20_ABI, provider);
        this.contracts.set(chainType as ChainTypeEnum, contract);

        this.logger.log(`Initialized ${chainType} provider`);
      } catch (error) {
        this.logger.error(`Failed to initialize ${chainType} provider: ${error.message}`);
      }
    }
  }

  private async closeProviders(): Promise<void> {
    for (const wsProvider of this.wsProviders.values()) {
      await wsProvider.destroy();
    }
  }

  getRpcProvider(chainType: ChainTypeEnum): JsonRpcProvider {
    const provider = this.rpcProviders.get(chainType);
    if (!provider) {
      throw new Error(`No RPC provider for chain: ${chainType}`);
    }
    return provider;
  }

  getWsProvider(chainType: ChainTypeEnum): WebSocketProvider | undefined {
    return this.wsProviders.get(chainType);
  }

  getUsdtContract(chainType: ChainTypeEnum): Contract {
    const contract = this.contracts.get(chainType);
    if (!contract) {
      throw new Error(`No USDT contract for chain: ${chainType}`);
    }
    return contract;
  }

  getChainConfig(chainType: ChainTypeEnum): ChainConfig {
    return this.chainConfigs[chainType];
  }

  async getCurrentBlockNumber(chainType: ChainTypeEnum): Promise<bigint> {
    const provider = this.getRpcProvider(chainType);
    const blockNumber = await provider.getBlockNumber();
    return BigInt(blockNumber);
  }

  async getBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
    const contract = this.getUsdtContract(chainType);
    const config = this.getChainConfig(chainType);
    const balance = await contract.balanceOf(address);
    return ethers.formatUnits(balance, config.decimals);
  }
}

5.2 事件监听器服务

// src/infrastructure/blockchain/event-listener.service.ts

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ChainTypeEnum } from '@/domain/value-objects';
import { Contract, EventLog } from 'ethers';

@Injectable()
export class EventListenerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(EventListenerService.name);
  private isListening = false;
  private listeners: Map<ChainTypeEnum, any> = new Map();

  constructor(
    private readonly evmProvider: EvmProviderAdapter,
    private readonly depositService: DepositDetectionService,
    private readonly addressCache: AddressCacheService,
  ) {}

  async onModuleInit(): Promise<void> {
    await this.startListening();
  }

  async onModuleDestroy(): Promise<void> {
    await this.stopListening();
  }

  async startListening(): Promise<void> {
    if (this.isListening) return;

    for (const chainType of Object.values(ChainTypeEnum)) {
      await this.subscribeToChain(chainType);
    }

    this.isListening = true;
    this.logger.log('Event listeners started');
  }

  private async subscribeToChain(chainType: ChainTypeEnum): Promise<void> {
    const wsProvider = this.evmProvider.getWsProvider(chainType);
    if (!wsProvider) {
      this.logger.warn(`No WebSocket provider for ${chainType}, using polling`);
      return;
    }

    const contract = this.evmProvider.getUsdtContract(chainType);

    // 监听所有 Transfer 事件
    const filter = contract.filters.Transfer();

    const listener = async (from: string, to: string, value: bigint, event: EventLog) => {
      try {
        // 检查目标地址是否是平台监控地址
        const isMonitored = await this.addressCache.isMonitoredAddress(chainType, to);
        if (!isMonitored) return;

        this.logger.log(`Detected deposit: ${chainType} ${event.transactionHash}`);

        await this.depositService.handleTransferEvent({
          chainType,
          txHash: event.transactionHash,
          fromAddress: from,
          toAddress: to,
          amount: value.toString(),
          blockNumber: event.blockNumber,
          logIndex: event.index,
        });
      } catch (error) {
        this.logger.error(`Error processing Transfer event: ${error.message}`);
      }
    };

    contract.on(filter, listener);
    this.listeners.set(chainType, { contract, filter, listener });

    this.logger.log(`Subscribed to ${chainType} Transfer events`);
  }

  async stopListening(): Promise<void> {
    for (const [chainType, { contract, filter, listener }] of this.listeners) {
      contract.off(filter, listener);
      this.logger.log(`Unsubscribed from ${chainType}`);
    }
    this.listeners.clear();
    this.isListening = false;
  }
}

5.3 地址派生适配器

// src/infrastructure/blockchain/address-derivation.adapter.ts

import { Injectable, Logger } from '@nestjs/common';
import { createHash } from 'crypto';
import { bech32 } from 'bech32';
import { ethers } from 'ethers';
import * as secp256k1 from 'secp256k1';

export enum AddressType {
  EVM = 'EVM',
  COSMOS = 'COSMOS',
}

export interface DerivedAddress {
  chainType: string;
  address: string;
  addressType: AddressType;
}

/**
 * 地址派生适配器
 *
 * 从压缩的 secp256k1 公钥派生多链钱包地址
 * - EVM 链 (BSC, KAVA EVM): 使用 keccak256(uncompressed_pubkey[1:])[-20:]
 * - Cosmos 链 (KAVA, DST): 使用 bech32(ripemd160(sha256(compressed_pubkey)))
 */
@Injectable()
export class AddressDerivationAdapter {
  private readonly logger = new Logger(AddressDerivationAdapter.name);

  /**
   * 从压缩公钥派生所有支持链的地址
   * @param compressedPubKeyHex 33 字节压缩公钥 (hex 格式,不含 0x 前缀)
   */
  deriveAllAddresses(compressedPubKeyHex: string): DerivedAddress[] {
    const pubKeyBytes = Buffer.from(compressedPubKeyHex, 'hex');

    if (pubKeyBytes.length !== 33) {
      throw new Error(`Invalid compressed public key length: ${pubKeyBytes.length}, expected 33`);
    }

    const addresses: DerivedAddress[] = [];

    // EVM 地址 (BSC, KAVA EVM)
    const evmAddress = this.deriveEvmAddress(pubKeyBytes);
    addresses.push({ chainType: 'BSC', address: evmAddress, addressType: AddressType.EVM });
    addresses.push({ chainType: 'KAVA', address: evmAddress, addressType: AddressType.EVM });

    // Cosmos 地址 (KAVA Native 使用 kava 前缀DST 使用 dst 前缀)
    const kavaCosmosAddress = this.deriveCosmosAddress(pubKeyBytes, 'kava');
    const dstAddress = this.deriveCosmosAddress(pubKeyBytes, 'dst');
    addresses.push({ chainType: 'KAVA_COSMOS', address: kavaCosmosAddress, addressType: AddressType.COSMOS });
    addresses.push({ chainType: 'DST', address: dstAddress, addressType: AddressType.COSMOS });

    this.logger.log(`Derived ${addresses.length} addresses from public key`);
    return addresses;
  }

  /**
   * 派生 EVM 地址
   * 1. 解压公钥 (33 bytes → 65 bytes)
   * 2. 取非压缩公钥去掉前缀 (64 bytes)
   * 3. keccak256 哈希后取后 20 bytes
   */
  private deriveEvmAddress(compressedPubKey: Buffer): string {
    // 使用 secp256k1 解压公钥
    const uncompressedPubKey = Buffer.from(
      secp256k1.publicKeyConvert(compressedPubKey, false)
    );

    // 去掉 0x04 前缀,取 64 bytes
    const pubKeyWithoutPrefix = uncompressedPubKey.slice(1);

    // keccak256 哈希后取后 20 bytes
    const hash = ethers.keccak256(pubKeyWithoutPrefix);
    const address = '0x' + hash.slice(-40);

    return ethers.getAddress(address); // 返回 checksum 格式
  }

  /**
   * 派生 Cosmos 地址
   * 1. SHA256(compressed_pubkey)
   * 2. RIPEMD160(sha256_result)
   * 3. bech32 编码
   */
  private deriveCosmosAddress(compressedPubKey: Buffer, prefix: string): string {
    // SHA256
    const sha256Hash = createHash('sha256').update(compressedPubKey).digest();

    // RIPEMD160
    const ripemd160Hash = createHash('ripemd160').update(sha256Hash).digest();

    // Bech32 编码
    const words = bech32.toWords(ripemd160Hash);
    return bech32.encode(prefix, words);
  }

  /**
   * 验证地址格式
   */
  validateAddress(chainType: string, address: string): boolean {
    switch (chainType) {
      case 'BSC':
      case 'KAVA':
        return ethers.isAddress(address);
      case 'KAVA_COSMOS':
        return /^kava1[a-z0-9]{38}$/.test(address);
      case 'DST':
        return /^dst1[a-z0-9]{38}$/.test(address);
      default:
        return false;
    }
  }
}

5.4 区块扫描器(补扫服务)

// src/infrastructure/blockchain/block-scanner.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { BlockCheckpointRepository } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl';
import { ChainTypeEnum } from '@/domain/value-objects';
import { ethers } from 'ethers';

const BATCH_SIZE = 1000;  // 每次扫描的区块数

@Injectable()
export class BlockScannerService {
  private readonly logger = new Logger(BlockScannerService.name);
  private isScanning: Map<ChainTypeEnum, boolean> = new Map();

  constructor(
    private readonly evmProvider: EvmProviderAdapter,
    private readonly depositService: DepositDetectionService,
    private readonly addressCache: AddressCacheService,
    private readonly checkpointRepo: BlockCheckpointRepository,
  ) {}

  /**
   * 定时补扫任务 (每 5 分钟)
   */
  @Cron(CronExpression.EVERY_5_MINUTES)
  async scheduledScan(): Promise<void> {
    for (const chainType of Object.values(ChainTypeEnum)) {
      await this.scanChain(chainType);
    }
  }

  /**
   * 扫描指定链的区块
   */
  async scanChain(chainType: ChainTypeEnum): Promise<void> {
    if (this.isScanning.get(chainType)) {
      this.logger.debug(`${chainType} scan already in progress`);
      return;
    }

    this.isScanning.set(chainType, true);

    try {
      const checkpoint = await this.checkpointRepo.findByChain(chainType);
      const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);

      let fromBlock = checkpoint
        ? checkpoint.lastScannedBlock + 1n
        : currentBlock - 1000n;  // 首次启动扫描最近 1000 块

      while (fromBlock <= currentBlock) {
        const toBlock = fromBlock + BigInt(BATCH_SIZE) - 1n > currentBlock
          ? currentBlock
          : fromBlock + BigInt(BATCH_SIZE) - 1n;

        await this.scanBlockRange(chainType, fromBlock, toBlock);

        // 更新检查点
        await this.checkpointRepo.updateCheckpoint(chainType, toBlock);

        fromBlock = toBlock + 1n;
      }

      this.logger.debug(`${chainType} scan completed up to block ${currentBlock}`);
    } catch (error) {
      this.logger.error(`${chainType} scan failed: ${error.message}`);
      await this.checkpointRepo.markUnhealthy(chainType, error.message);
    } finally {
      this.isScanning.set(chainType, false);
    }
  }

  /**
   * 扫描指定区块范围
   */
  private async scanBlockRange(
    chainType: ChainTypeEnum,
    fromBlock: bigint,
    toBlock: bigint,
  ): Promise<void> {
    const contract = this.evmProvider.getUsdtContract(chainType);
    const monitoredAddresses = await this.addressCache.getMonitoredAddresses(chainType);

    if (monitoredAddresses.size === 0) {
      return;
    }

    // 查询 Transfer 事件
    const filter = contract.filters.Transfer(null, [...monitoredAddresses]);
    const events = await contract.queryFilter(filter, Number(fromBlock), Number(toBlock));

    for (const event of events) {
      if (!('args' in event)) continue;

      const [from, to, value] = event.args;

      try {
        await this.depositService.handleTransferEvent({
          chainType,
          txHash: event.transactionHash,
          fromAddress: from,
          toAddress: to,
          amount: value.toString(),
          blockNumber: event.blockNumber,
          logIndex: event.index,
        });
      } catch (error) {
        // 重复交易等预期错误,忽略
        if (!error.message.includes('already exists')) {
          this.logger.error(`Error processing event: ${error.message}`);
        }
      }
    }
  }

  /**
   * 手动触发全量扫描
   */
  async fullScan(chainType: ChainTypeEnum, fromBlock: bigint, toBlock: bigint): Promise<void> {
    this.logger.log(`Starting full scan for ${chainType} from ${fromBlock} to ${toBlock}`);
    await this.scanBlockRange(chainType, fromBlock, toBlock);
    this.logger.log(`Full scan completed for ${chainType}`);
  }
}

5.4 地址缓存服务

// src/infrastructure/redis/address-cache.service.ts

import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { RedisService } from './redis.service';
import { IMonitoredAddressRepository, MONITORED_ADDRESS_REPOSITORY } from '@/domain/repositories';
import { Inject } from '@nestjs/common';
import { ChainTypeEnum } from '@/domain/value-objects';

const CACHE_KEY_PREFIX = 'blockchain:addresses:';
const CACHE_TTL = 300;  // 5 分钟

@Injectable()
export class AddressCacheService implements OnModuleInit {
  private readonly logger = new Logger(AddressCacheService.name);
  private localCache: Map<ChainTypeEnum, Set<string>> = new Map();

  constructor(
    private readonly redis: RedisService,
    @Inject(MONITORED_ADDRESS_REPOSITORY)
    private readonly addressRepo: IMonitoredAddressRepository,
  ) {}

  async onModuleInit(): Promise<void> {
    await this.refreshCache();
  }

  /**
   * 检查地址是否被监控
   */
  async isMonitoredAddress(chainType: ChainTypeEnum, address: string): Promise<boolean> {
    const normalizedAddress = address.toLowerCase();

    // 先查本地缓存
    const localSet = this.localCache.get(chainType);
    if (localSet?.has(normalizedAddress)) {
      return true;
    }

    // 再查 Redis
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
    const exists = await this.redis.sismember(cacheKey, normalizedAddress);

    if (exists) {
      // 更新本地缓存
      if (!localSet) {
        this.localCache.set(chainType, new Set([normalizedAddress]));
      } else {
        localSet.add(normalizedAddress);
      }
    }

    return exists;
  }

  /**
   * 获取链的所有监控地址
   */
  async getMonitoredAddresses(chainType: ChainTypeEnum): Promise<Set<string>> {
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
    const addresses = await this.redis.smembers(cacheKey);
    return new Set(addresses);
  }

  /**
   * 添加监控地址
   */
  async addAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
    const normalizedAddress = address.toLowerCase();
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;

    await this.redis.sadd(cacheKey, normalizedAddress);

    // 更新本地缓存
    let localSet = this.localCache.get(chainType);
    if (!localSet) {
      localSet = new Set();
      this.localCache.set(chainType, localSet);
    }
    localSet.add(normalizedAddress);
  }

  /**
   * 移除监控地址
   */
  async removeAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
    const normalizedAddress = address.toLowerCase();
    const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;

    await this.redis.srem(cacheKey, normalizedAddress);

    // 更新本地缓存
    this.localCache.get(chainType)?.delete(normalizedAddress);
  }

  /**
   * 从数据库刷新缓存
   */
  async refreshCache(): Promise<void> {
    this.logger.log('Refreshing address cache from database...');

    for (const chainType of Object.values(ChainTypeEnum)) {
      const addresses = await this.addressRepo.findActiveByChain(
        { value: chainType } as any
      );

      const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
      const addressSet = new Set(addresses.map(a => a.address.value.toLowerCase()));

      // 更新 Redis
      if (addressSet.size > 0) {
        await this.redis.del(cacheKey);
        await this.redis.sadd(cacheKey, ...addressSet);
      }

      // 更新本地缓存
      this.localCache.set(chainType, addressSet);

      this.logger.log(`Loaded ${addressSet.size} addresses for ${chainType}`);
    }
  }
}

6. 应用层设计

6.1 充值检测服务

// src/application/services/deposit-detection.service.ts

import { Injectable, Inject, Logger } from '@nestjs/common';
import {
  IDepositTransactionRepository,
  DEPOSIT_TRANSACTION_REPOSITORY,
  IMonitoredAddressRepository,
  MONITORED_ADDRESS_REPOSITORY,
} from '@/domain/repositories';
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber, ChainTypeEnum } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';

export interface TransferEventData {
  chainType: ChainTypeEnum;
  txHash: string;
  fromAddress: string;
  toAddress: string;
  amount: string;
  blockNumber: number;
  logIndex: number;
}

@Injectable()
export class DepositDetectionService {
  private readonly logger = new Logger(DepositDetectionService.name);

  constructor(
    @Inject(DEPOSIT_TRANSACTION_REPOSITORY)
    private readonly depositRepo: IDepositTransactionRepository,
    @Inject(MONITORED_ADDRESS_REPOSITORY)
    private readonly addressRepo: IMonitoredAddressRepository,
    private readonly eventPublisher: EventPublisherService,
    private readonly walletClient: WalletClientService,
    private readonly evmProvider: EvmProviderAdapter,
  ) {}

  /**
   * 处理检测到的 Transfer 事件
   */
  async handleTransferEvent(data: TransferEventData): Promise<void> {
    const txHash = TxHash.create(data.txHash);

    // 检查是否已处理
    const exists = await this.depositRepo.existsByTxHash(txHash);
    if (exists) {
      this.logger.debug(`Deposit ${data.txHash} already exists, skipping`);
      return;
    }

    // 查找监控地址信息
    const chainType = ChainType.create(data.chainType);
    const toAddress = EvmAddress.create(data.toAddress);
    const addressInfo = await this.addressRepo.findByAddress(chainType, toAddress);

    if (!addressInfo) {
      this.logger.warn(`Address ${data.toAddress} not found in monitored addresses`);
      return;
    }

    // 获取区块时间
    const block = await this.evmProvider.getRpcProvider(data.chainType).getBlock(data.blockNumber);
    const blockTimestamp = new Date(block!.timestamp * 1000);

    // 获取代币精度
    const chainConfig = this.evmProvider.getChainConfig(data.chainType);

    // 创建充值记录
    const deposit = DepositTransaction.create({
      chainType,
      txHash,
      fromAddress: EvmAddress.create(data.fromAddress),
      toAddress,
      tokenContract: EvmAddress.create(chainConfig.usdtContract),
      amount: TokenAmount.create(data.amount, chainConfig.decimals),
      blockNumber: BlockNumber.create(data.blockNumber),
      blockTimestamp,
      logIndex: data.logIndex,
      addressId: addressInfo.id,
      userId: addressInfo.userId,
    });

    // 保存并发布事件
    await this.depositRepo.save(deposit);
    await this.eventPublisher.publishAll(deposit.domainEvents);
    deposit.clearDomainEvents();

    this.logger.log(`Deposit detected: ${data.txHash}, amount: ${deposit.amount.formatted}`);

    // 检查确认数
    await this.checkConfirmationsAndNotify(deposit);
  }

  /**
   * 检查确认数并通知 wallet-service
   */
  async checkConfirmationsAndNotify(deposit: DepositTransaction): Promise<void> {
    const chainType = deposit.chainType;
    const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType.value as ChainTypeEnum);
    const confirmations = Number(currentBlock - deposit.blockNumber.value) + 1;
    const requiredConfirmations = chainType.requiredConfirmations;

    deposit.updateConfirmations(confirmations, requiredConfirmations);

    if (deposit.isConfirmed && !deposit.isNotified) {
      try {
        // 调用 wallet-service 入账
        await this.walletClient.handleDeposit({
          userId: deposit.userId.toString(),
          amount: deposit.amount.value,
          chainType: deposit.chainType.value,
          txHash: deposit.txHash.value,
        });

        deposit.markNotified();
        this.logger.log(`Deposit notified: ${deposit.txHash.value}`);
      } catch (error) {
        deposit.recordNotifyFailure(error.message);
        this.logger.error(`Failed to notify deposit: ${error.message}`);
      }
    }

    await this.depositRepo.save(deposit);
    await this.eventPublisher.publishAll(deposit.domainEvents);
    deposit.clearDomainEvents();
  }

  /**
   * 定时任务:更新未确认交易的确认数
   */
  async updatePendingConfirmations(): Promise<void> {
    for (const chainType of Object.values(ChainTypeEnum)) {
      const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
      const minBlock = currentBlock - 100n;  // 只处理最近 100 块内的

      const pendingDeposits = await this.depositRepo.findUnconfirmed(
        ChainType.create(chainType),
        minBlock,
      );

      for (const deposit of pendingDeposits) {
        await this.checkConfirmationsAndNotify(deposit);
      }
    }
  }

  /**
   * 定时任务:重试失败的通知
   */
  async retryFailedNotifications(): Promise<void> {
    const pendingNotifications = await this.depositRepo.findPendingNotification(50);

    for (const deposit of pendingNotifications) {
      if (deposit.toProps().notifyAttempts >= 10) {
        this.logger.error(`Deposit ${deposit.txHash.value} exceeded max retry attempts`);
        continue;
      }

      await this.checkConfirmationsAndNotify(deposit);
    }
  }
}

6.2 余额查询服务

// src/application/services/balance-query.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { ChainTypeEnum } from '@/domain/value-objects';

export interface BalanceResult {
  chainType: string;
  address: string;
  usdtBalance: string;
  nativeBalance: string;
}

@Injectable()
export class BalanceQueryService {
  private readonly logger = new Logger(BalanceQueryService.name);

  constructor(private readonly evmProvider: EvmProviderAdapter) {}

  /**
   * 查询单个地址的余额
   */
  async getBalance(chainType: ChainTypeEnum, address: string): Promise<BalanceResult> {
    const [usdtBalance, nativeBalance] = await Promise.all([
      this.evmProvider.getBalance(chainType, address),
      this.getNativeBalance(chainType, address),
    ]);

    return {
      chainType,
      address,
      usdtBalance,
      nativeBalance,
    };
  }

  /**
   * 批量查询余额
   */
  async getBatchBalances(
    addresses: Array<{ chainType: ChainTypeEnum; address: string }>,
  ): Promise<BalanceResult[]> {
    return Promise.all(
      addresses.map(({ chainType, address }) => this.getBalance(chainType, address)),
    );
  }

  private async getNativeBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
    const provider = this.evmProvider.getRpcProvider(chainType);
    const balance = await provider.getBalance(address);
    const { ethers } = await import('ethers');
    return ethers.formatEther(balance);
  }
}

7. Kafka 事件设计

7.1 发布的事件

// src/infrastructure/kafka/event-publisher.service.ts

export const BLOCKCHAIN_TOPICS = {
  // 地址派生完成事件 (发给 identity-service, wallet-service)
  WALLET_ADDRESS_CREATED: 'blockchain.WalletAddressCreated',

  // 充值相关事件
  DEPOSIT_DETECTED: 'blockchain.DepositDetected',
  DEPOSIT_CONFIRMED: 'blockchain.DepositConfirmed',

  // 交易相关事件
  TRANSACTION_BROADCASTED: 'blockchain.TransactionBroadcasted',
  TRANSACTION_CONFIRMED: 'blockchain.TransactionConfirmed',

  // 区块扫描事件
  BLOCK_SCANNED: 'blockchain.BlockScanned',
} as const;

7.2 消费的事件

// src/infrastructure/kafka/mpc-event-consumer.service.ts

// 消费 MPC 服务发布的事件
export const MPC_CONSUME_TOPICS = {
  KEYGEN_COMPLETED: 'mpc.KeygenCompleted',
  SESSION_FAILED: 'mpc.SessionFailed',
} as const;

export interface KeygenCompletedPayload {
  sessionId: string;
  partyId: string;
  publicKey: string;           // 33 bytes 压缩公钥 (hex)
  shareId: string;
  threshold: string;           // "2-of-3"
  extraPayload?: {
    userId: string;
    username: string;
    delegateShare?: {
      partyId: string;
      partyIndex: number;
      encryptedShare: string;
    };
    serverParties?: string[];
  };
}

7.3 MPC Keygen 完成事件处理

// src/application/event-handlers/mpc-keygen-completed.handler.ts

import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service';
import { AddressDerivationAdapter } from '@/infrastructure/blockchain/address-derivation.adapter';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletAddressCreatedEvent } from '@/domain/events';

@Injectable()
export class MpcKeygenCompletedHandler implements OnModuleInit {
  private readonly logger = new Logger(MpcKeygenCompletedHandler.name);

  constructor(
    private readonly mpcEventConsumer: MpcEventConsumerService,
    private readonly addressDerivation: AddressDerivationAdapter,
    private readonly eventPublisher: EventPublisherService,
  ) {}

  async onModuleInit() {
    this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this));
    this.logger.log('Registered KeygenCompleted event handler');
  }

  private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise<void> {
    const userId = payload.extraPayload?.userId;
    const username = payload.extraPayload?.username;
    const publicKey = payload.publicKey;

    if (!userId || !username || !publicKey) {
      this.logger.error('Missing required fields in KeygenCompleted payload');
      return;
    }

    this.logger.log(`[DERIVE] Processing keygen completion for userId=${userId}`);
    this.logger.log(`[DERIVE] PublicKey: ${publicKey.substring(0, 20)}...`);

    try {
      // 从公钥派生所有链的地址
      const addresses = this.addressDerivation.deriveAllAddresses(publicKey);

      this.logger.log(`[DERIVE] Derived ${addresses.length} addresses:`);
      addresses.forEach(addr => {
        this.logger.log(`[DERIVE]   ${addr.chainType}: ${addr.address}`);
      });

      // 发布 WalletAddressCreated 事件
      const event = new WalletAddressCreatedEvent({
        userId,
        username,
        publicKey,
        addresses: addresses.map(a => ({
          chainType: a.chainType,
          address: a.address,
          addressType: a.addressType,
        })),
        mpcSessionId: payload.sessionId,
        delegateShare: payload.extraPayload?.delegateShare,
      });

      await this.eventPublisher.publish(event);
      this.logger.log(`[DERIVE] Published WalletAddressCreated event for userId=${userId}`);

    } catch (error) {
      this.logger.error(`[DERIVE] Failed to derive addresses: ${error.message}`, error.stack);
    }
  }
}

7.4 消费的 Identity/Wallet 事件

// src/infrastructure/kafka/event-consumer.controller.ts

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
import { AddressRegistryService } from '@/application/services/address-registry.service';

@Controller()
export class EventConsumerController {
  private readonly logger = new Logger(EventConsumerController.name);

  constructor(private readonly addressRegistry: AddressRegistryService) {}

  /**
   * 监听用户创建事件,注册监控地址
   */
  @MessagePattern('identity.UserAccountCreated')
  async handleUserCreated(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received UserAccountCreated: ${message.payload.userId}`);

    // 注册用户的钱包地址到监控列表
    // 注意:地址信息可能在后续的 WalletBound 事件中
  }

  /**
   * 监听钱包绑定事件,注册监控地址
   */
  @MessagePattern('identity.WalletBound')
  async handleWalletBound(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received WalletBound: userId=${message.payload.userId}`);

    await this.addressRegistry.registerAddress({
      userId: BigInt(message.payload.userId),
      chainType: message.payload.chainType,
      address: message.payload.address,
    });
  }

  /**
   * 监听多钱包绑定事件
   */
  @MessagePattern('identity.MultipleWalletsBound')
  async handleMultipleWalletsBound(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    this.logger.log(`Received MultipleWalletsBound: userId=${message.payload.userId}`);

    for (const wallet of message.payload.wallets) {
      await this.addressRegistry.registerAddress({
        userId: BigInt(message.payload.userId),
        chainType: wallet.chainType,
        address: wallet.address,
      });
    }
  }
}

8. API 设计

8.1 健康检查

// src/api/controllers/health.controller.ts

@Controller('blockchain/health')
export class HealthController {
  @Get()
  @Public()
  async check(): Promise<{ status: string; chains: Record<string, boolean> }> {
    // 检查各链连接状态
  }
}

8.2 余额查询 (内部 API)

// src/api/controllers/balance.controller.ts

@Controller('blockchain/balance')
export class BalanceController {
  @Get(':chainType/:address')
  @Public()  // 内部服务调用
  async getBalance(
    @Param('chainType') chainType: string,
    @Param('address') address: string,
  ): Promise<BalanceResult> {
    // ...
  }
}

9. 配置设计

9.1 环境变量

# .env.example

# 服务配置
PORT=3012
NODE_ENV=development

# 数据库
DATABASE_URL=postgresql://rwa_user:password@localhost:5432/rwa_blockchain

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=11

# Kafka
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=blockchain-service

# 区块链 RPC
KAVA_RPC_URL=https://evm.kava.io
KAVA_WS_URL=wss://evm.kava.io/ws
BSC_RPC_URL=https://bsc-dataseed.binance.org
BSC_WS_URL=wss://bsc-ws-node.nariox.org:443

# 服务间调用
WALLET_SERVICE_URL=http://localhost:3001
IDENTITY_SERVICE_URL=http://localhost:3000

10. 部署配置

10.1 docker-compose.yml 配置

在主 docker-compose.yml 中添加:

blockchain-service:
  build:
    context: ./blockchain-service
    dockerfile: Dockerfile
  container_name: rwa-blockchain-service
  ports:
    - "3012:3012"
  environment:
    - NODE_ENV=production
    - DATABASE_URL=postgresql://${POSTGRES_USER:-rwa_user}:${POSTGRES_PASSWORD:-rwa_secure_password}@postgres:5432/rwa_blockchain?schema=public
    - REDIS_HOST=redis
    - REDIS_PORT=6379
    - REDIS_DB=11
    - KAFKA_BROKERS=kafka:29092
    - KAFKA_CLIENT_ID=blockchain-service
    - KAVA_RPC_URL=${KAVA_RPC_URL:-https://evm.kava.io}
    - KAVA_WS_URL=${KAVA_WS_URL}
    - BSC_RPC_URL=${BSC_RPC_URL:-https://bsc-dataseed.binance.org}
    - BSC_WS_URL=${BSC_WS_URL}
    - WALLET_SERVICE_URL=http://rwa-wallet-service:3001
    - IDENTITY_SERVICE_URL=http://rwa-identity-service:3000
  depends_on:
    postgres:
      condition: service_healthy
    redis:
      condition: service_healthy
    kafka:
      condition: service_healthy
  networks:
    - rwa-network
  restart: unless-stopped
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:3012/api/v1/blockchain/health"]
    interval: 30s
    timeout: 10s
    retries: 3
    start_period: 40s

10.2 Kong 路由配置

# 在 kong.yml 中添加
- name: blockchain-service
  url: http://192.168.1.111:3012
  routes:
    - name: blockchain-api
      paths:
        - /api/v1/blockchain
      strip_path: false

11. 初始化脚本

scripts/init-databases.sh 中添加 rwa_blockchain

for db in rwa_identity rwa_wallet ... rwa_blockchain; do

12. 关键设计决策

12.1 双重保障机制

实时监听 (WebSocket)     补扫任务 (定时)
        │                      │
        ▼                      ▼
    检测充值 ───────────────> 去重处理
        │                      │
        └──────────┬───────────┘
                   ▼
             确认数检查
                   │
                   ▼
           通知 wallet-service

12.2 确认数策略

要求确认数 约等待时间
KAVA 12 块 ~72 秒
BSC 15 块 ~45 秒

12.3 失败重试策略

  • 通知失败最多重试 10 次
  • 指数退避1s, 2s, 4s, 8s, ...
  • 超过最大重试次数告警

13. 测试策略

13.1 单元测试

  • 领域对象测试 (DepositTransaction, ValueObjects)
  • 确认数计算逻辑
  • 事件生成逻辑

13.2 集成测试

  • 区块链 Provider 连接
  • 事件监听和解析
  • 数据库读写

13.3 E2E 测试

  • 完整充值流程模拟
  • 补扫逻辑验证
  • 失败重试验证

14. 监控指标

  • blockchain_deposit_detected_total - 检测到的充值数
  • blockchain_deposit_confirmed_total - 确认的充值数
  • blockchain_notify_failed_total - 通知失败数
  • blockchain_block_scan_lag - 扫描延迟 (当前块 - 已扫描块)
  • blockchain_rpc_latency - RPC 调用延迟
  • blockchain_ws_reconnect_total - WebSocket 重连次数

15. 迁移计划

从 identity-service 迁移

  1. BlockchainQueryService 逻辑迁移到 BalanceQueryService
  2. DepositService 逻辑拆分:
    • 地址获取 → 保留在 identity-service
    • 余额查询 → 迁移到 blockchain-service
  3. 新增事件监听、补扫逻辑
  4. identity-service 发布 WalletBound 事件
  5. blockchain-service 消费事件并注册监控地址

16. 架构兼容性说明

16.1 与其他服务的架构对比

本服务严格遵循与其他微服务相同的 DDD + 六边形架构 + 微服务 设计原则:

特性 blockchain-service identity-service wallet-service admin-service
DDD 分层
六边形架构
模块化设计 (modules/)
聚合根基类 (AggregateRoot)
值对象 5个 (ChainType, EvmAddress, TokenAmount, BlockNumber, TxHash) 2个+ 5个+ 7个+
领域事件
CQRS
Kafka 集成
Redis 缓存
定时任务 (@Cron)

16.2 关键架构原则

依赖倒置原则 (Dependency Inversion)

┌─────────────────────────────────────────────────────────┐
│                    Domain Layer                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Repository Interfaces (Ports)                   │   │
│  │  - IDepositTransactionRepository                 │   │
│  │  - IMonitoredAddressRepository                   │   │
│  └─────────────────────────────────────────────────┘   │
│                         ▲                               │
│                         │ 依赖                          │
│                         │                               │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Infrastructure Layer                            │   │
│  │  Repository Implementations                      │   │
│  │  - DepositTransactionRepositoryImpl              │   │
│  │  - MonitoredAddressRepositoryImpl                │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

六边形架构端口与适配器

类型 端口 (Port) 适配器 (Adapter)
入站 (Driving) Controllers HTTP API
入站 (Driving) Kafka Consumer Event Handler
出站 (Driven) Repository Interfaces Prisma Repositories
出站 (Driven) Cache Interface Redis Cache
出站 (Driven) Event Publisher Kafka Producer
出站 (Driven) External Services HTTP Clients

16.3 命名约定

为与其他服务保持一致,请遵循以下命名约定:

类型 命名规则 示例
聚合根 *.aggregate.ts deposit-transaction.aggregate.ts
值对象 *.vo.ts chain-type.vo.ts, block-number.vo.ts
领域事件 *.event.ts deposit-detected.event.ts
仓储接口 *.repository.interface.ts deposit-transaction.repository.interface.ts
仓储实现 *.repository.impl.ts deposit-transaction.repository.impl.ts
映射器 *.mapper.ts deposit-transaction.mapper.ts
模块 *.module.ts api.module.ts, domain.module.ts

16.4 Symbol Token 注入规范

使用 Symbol 作为依赖注入 Token与其他服务保持一致

// 定义 (domain/repositories/index.ts)
export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');
export const BLOCK_CHECKPOINT_REPOSITORY = Symbol('BLOCK_CHECKPOINT_REPOSITORY');

// 注册 (modules/domain.module.ts)
{
  provide: DEPOSIT_TRANSACTION_REPOSITORY,
  useClass: DepositTransactionRepositoryImpl,
}

// 注入 (application/services/*.ts)
constructor(
  @Inject(DEPOSIT_TRANSACTION_REPOSITORY)
  private readonly depositRepo: IDepositTransactionRepository,
) {}

17. MPC 集成事件流

17.1 完整事件流程图

┌──────────────────────────────────────────────────────────────────────────────┐
│                           MPC 钱包创建事件流                                  │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  [1] 用户创建账户                                                            │
│       │                                                                      │
│       ▼                                                                      │
│  ┌─────────────────┐                                                         │
│  │ identity-service│                                                         │
│  │                 │ ────────────────────────────────────────┐               │
│  │ 发布事件:       │                                         │               │
│  │ mpc.KeygenRequested                                       │               │
│  └─────────────────┘                                         │               │
│                                                              ▼               │
│                                                    ┌─────────────────┐       │
│  [2] MPC 密钥生成                                  │   mpc-service   │       │
│                                                    │                 │       │
│       ┌────────────────────────────────────────────│ 消费事件:       │       │
│       │                                            │ mpc.KeygenRequested     │
│       │                                            │                 │       │
│       │  [3] 生成公钥                              │ 发布事件:       │       │
│       │       │                                    │ mpc.KeygenStarted       │
│       │       │                                    │ mpc.KeygenCompleted     │
│       │       ▼                                    └─────────────────┘       │
│       │  publicKey: 33 bytes                               │                 │
│       │                                                    │                 │
│       ▼                                                    ▼                 │
│  ┌─────────────────────────────────────────────────────────────────┐        │
│  │                      blockchain-service                          │        │
│  │                                                                  │        │
│  │  [4] 消费: mpc.KeygenCompleted                                   │        │
│  │       │                                                          │        │
│  │       ▼                                                          │        │
│  │  ┌─────────────────────────────────────────────────────────┐    │        │
│  │  │ AddressDerivationAdapter                                 │    │        │
│  │  │                                                          │    │        │
│  │  │  publicKey ──┬──> deriveEvmAddress() ──> 0x1234...      │    │        │
│  │  │              │                           (BSC, KAVA)     │    │        │
│  │  │              │                                           │    │        │
│  │  │              └──> deriveCosmosAddress() ──> kava1...    │    │        │
│  │  │                                            dst1...       │    │        │
│  │  └─────────────────────────────────────────────────────────┘    │        │
│  │       │                                                          │        │
│  │       ▼                                                          │        │
│  │  [5] 发布: blockchain.WalletAddressCreated                       │        │
│  │       │                                                          │        │
│  └───────┼──────────────────────────────────────────────────────────┘        │
│          │                                                                    │
│          ├──────────────────────────────┐                                    │
│          │                              │                                    │
│          ▼                              ▼                                    │
│  ┌─────────────────┐           ┌─────────────────┐                          │
│  │ identity-service│           │  wallet-service │                          │
│  │                 │           │                 │                          │
│  │ [6] 存储关联:   │           │ [7] 存储钱包:   │                          │
│  │ userId ↔ address│           │ 地址、余额管理  │                          │
│  └─────────────────┘           └─────────────────┘                          │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

17.2 事件 Topic 汇总

Topic 发布者 消费者 描述
mpc.KeygenRequested identity-service mpc-service 请求生成 MPC 密钥
mpc.KeygenStarted mpc-service identity-service 密钥生成开始
mpc.KeygenCompleted mpc-service blockchain-service 密钥生成完成,包含公钥
mpc.SessionFailed mpc-service identity-service, blockchain-service 会话失败
blockchain.WalletAddressCreated blockchain-service identity-service, wallet-service 地址派生完成

17.3 关键设计决策

  1. 职责分离

    • mpc-service 只负责 MPC 协议,不了解区块链地址格式
    • blockchain-service 封装所有区块链技术细节
    • identity-service 不直接处理公钥→地址转换
  2. 多链扩展性

    • 新增链类型只需在 AddressDerivationAdapter 添加派生逻辑
    • 事件格式保持不变,下游服务无需修改
  3. 事件溯源

    • 所有状态变更通过事件传递
    • 支持事件重放和故障恢复

文档版本: 1.2.0 最后更新: 2025-12-06 变更说明: 添加公钥→地址派生职责、MPC 事件集成、AddressDerivationAdapter、WalletAddressCreated 事件