78 KiB
78 KiB
Blockchain Service 开发指南
1. 服务概述
1.1 服务定位
blockchain-service 是 RWA 榴莲女皇平台的区块链基础设施服务,负责:
- 链上事件监听:监听 ERC20 Transfer 事件,检测用户充值
- 充值入账触发:检测到充值后通知 wallet-service 入账
- 余额查询:查询链上 USDT/原生代币余额
- 交易广播:提交签名后的交易到链上
- 地址管理:管理平台充值地址池
1.2 技术栈
| 组件 | 技术选型 |
|---|---|
| 框架 | NestJS 10.x |
| 语言 | TypeScript 5.x |
| 数据库 | PostgreSQL 15 + Prisma |
| 消息队列 | Kafka |
| 缓存 | Redis |
| 区块链 | ethers.js 6.x |
| 容器化 | Docker |
1.3 端口分配
- HTTP API:
3012 - 数据库:
rwa_blockchain(共享 PostgreSQL) - Redis DB:
11
2. 架构设计
2.1 六边形架构 (Hexagonal Architecture)
┌─────────────────────────────────────────────────────────────────────┐
│ blockchain-service │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ API Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Health Ctrl │ │ Balance Ctrl │ │ Internal Ctrl│ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Application Layer │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ DepositDetection │ │ BalanceQuery │ │ │
│ │ │ Service │ │ Service │ │ │
│ │ └──────────────────┘ └──────────────────┘ │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ TransactionBroad │ │ AddressRegistry │ │ │
│ │ │ castService │ │ Service │ │ │
│ │ └──────────────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Domain Layer │ │
│ │ ┌────────────────────────────────────────────────────────┐ │ │
│ │ │ Aggregates │ │ │
│ │ │ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │ │ │
│ │ │ │DepositTx │ │MonitoredAddr│ │TransactionRequest │ │ │ │
│ │ │ └────────────┘ └────────────┘ └────────────────────┘ │ │ │
│ │ └────────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────────┐ │ │
│ │ │ Domain Events │ │ │
│ │ │ DepositDetected, TransactionBroadcasted, BlockScanned │ │ │
│ │ └────────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────────┐ │ │
│ │ │ Repository Interfaces (Ports) │ │ │
│ │ │ IDepositTxRepository, IMonitoredAddressRepository │ │ │
│ │ └────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Infrastructure Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Prisma Repos │ │ Kafka │ │ Redis Cache │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ EVM Provider │ │ Event │ │ Block │ │ │
│ │ │ Adapter │ │ Listener │ │ Scanner │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
2.2 目录结构
blockchain-service/
├── prisma/
│ └── schema.prisma
├── src/
│ ├── modules/ # NestJS 模块定义 (与其他服务保持一致)
│ │ ├── api.module.ts # API 层模块
│ │ ├── application.module.ts # 应用层模块
│ │ ├── domain.module.ts # 领域层模块
│ │ └── infrastructure.module.ts # 基础设施层模块
│ │
│ ├── api/ # 入站适配器 (Driving Adapters)
│ │ ├── controllers/
│ │ │ ├── health.controller.ts
│ │ │ ├── balance.controller.ts
│ │ │ └── internal.controller.ts
│ │ └── dto/
│ │ ├── request/
│ │ │ └── query-balance.dto.ts
│ │ └── response/
│ │ └── balance.dto.ts
│ │
│ ├── application/ # 应用层 (Use Cases)
│ │ ├── services/
│ │ │ ├── deposit-detection.service.ts
│ │ │ ├── balance-query.service.ts
│ │ │ ├── transaction-broadcast.service.ts
│ │ │ └── address-registry.service.ts
│ │ ├── commands/
│ │ │ ├── register-address/
│ │ │ │ ├── register-address.command.ts
│ │ │ │ └── register-address.handler.ts
│ │ │ └── broadcast-transaction/
│ │ │ ├── broadcast-transaction.command.ts
│ │ │ └── broadcast-transaction.handler.ts
│ │ └── queries/
│ │ ├── get-balance/
│ │ │ ├── get-balance.query.ts
│ │ │ └── get-balance.handler.ts
│ │ └── get-deposit-history/
│ │ ├── get-deposit-history.query.ts
│ │ └── get-deposit-history.handler.ts
│ │
│ ├── domain/ # 领域层 (核心业务)
│ │ ├── aggregates/
│ │ │ ├── aggregate-root.base.ts # 聚合根基类 (统一事件管理)
│ │ │ ├── deposit-transaction/
│ │ │ │ ├── deposit-transaction.aggregate.ts
│ │ │ │ ├── deposit-transaction.factory.ts
│ │ │ │ └── index.ts
│ │ │ ├── monitored-address/
│ │ │ │ ├── monitored-address.aggregate.ts
│ │ │ │ └── index.ts
│ │ │ └── transaction-request/
│ │ │ ├── transaction-request.aggregate.ts
│ │ │ └── index.ts
│ │ ├── entities/
│ │ │ └── block-checkpoint.entity.ts
│ │ ├── events/
│ │ │ ├── domain-event.base.ts
│ │ │ ├── deposit-detected.event.ts
│ │ │ ├── deposit-confirmed.event.ts
│ │ │ ├── transaction-broadcasted.event.ts
│ │ │ └── index.ts
│ │ ├── repositories/
│ │ │ ├── deposit-transaction.repository.interface.ts
│ │ │ ├── monitored-address.repository.interface.ts
│ │ │ ├── block-checkpoint.repository.interface.ts
│ │ │ └── index.ts
│ │ ├── services/
│ │ │ ├── confirmation-policy.service.ts
│ │ │ └── chain-config.service.ts
│ │ ├── value-objects/
│ │ │ ├── chain-type.vo.ts
│ │ │ ├── tx-hash.vo.ts
│ │ │ ├── evm-address.vo.ts
│ │ │ ├── token-amount.vo.ts
│ │ │ ├── block-number.vo.ts
│ │ │ └── index.ts
│ │ └── enums/
│ │ ├── deposit-status.enum.ts
│ │ ├── chain-type.enum.ts
│ │ └── index.ts
│ │
│ ├── infrastructure/ # 出站适配器 (Driven Adapters)
│ │ ├── blockchain/
│ │ │ ├── evm-provider.adapter.ts
│ │ │ ├── event-listener.service.ts
│ │ │ ├── block-scanner.service.ts
│ │ │ ├── transaction-sender.service.ts
│ │ │ └── blockchain.module.ts
│ │ ├── persistence/
│ │ │ ├── prisma/
│ │ │ │ └── prisma.service.ts
│ │ │ ├── repositories/
│ │ │ │ ├── deposit-transaction.repository.impl.ts
│ │ │ │ ├── monitored-address.repository.impl.ts
│ │ │ │ └── block-checkpoint.repository.impl.ts
│ │ │ └── mappers/
│ │ │ ├── deposit-transaction.mapper.ts
│ │ │ ├── monitored-address.mapper.ts
│ │ │ └── transaction-request.mapper.ts
│ │ ├── kafka/
│ │ │ ├── event-publisher.service.ts
│ │ │ ├── event-consumer.controller.ts
│ │ │ └── kafka.module.ts
│ │ ├── redis/
│ │ │ ├── redis.service.ts
│ │ │ ├── address-cache.service.ts
│ │ │ └── redis.module.ts
│ │ ├── external/
│ │ │ ├── wallet-service/
│ │ │ │ └── wallet-client.service.ts
│ │ │ └── identity-service/
│ │ │ └── identity-client.service.ts
│ │ └── infrastructure.module.ts
│ │
│ ├── config/
│ │ ├── app.config.ts
│ │ ├── database.config.ts
│ │ ├── kafka.config.ts
│ │ ├── redis.config.ts
│ │ ├── chain.config.ts
│ │ └── index.ts
│ │
│ ├── shared/
│ │ ├── decorators/
│ │ │ ├── public.decorator.ts
│ │ │ └── index.ts
│ │ ├── exceptions/
│ │ │ ├── domain.exception.ts
│ │ │ ├── blockchain.exception.ts
│ │ │ └── index.ts
│ │ ├── filters/
│ │ │ ├── global-exception.filter.ts
│ │ │ └── domain-exception.filter.ts
│ │ └── interceptors/
│ │ └── transform.interceptor.ts
│ │
│ ├── app.module.ts
│ └── main.ts
│
├── test/
│ ├── unit/
│ ├── integration/
│ └── e2e/
│
├── .env.example
├── Dockerfile
├── docker-compose.yml
├── package.json
├── tsconfig.json
├── nest-cli.json
└── DEVELOPMENT_GUIDE.md
2.3 模块化设计 (与其他服务保持一致)
为了与 identity-service、wallet-service、leaderboard-service 等服务保持架构一致性,采用分层模块化设计:
// src/modules/infrastructure.module.ts
import { Global, Module } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service';
import { IdentityClientService } from '@/infrastructure/external/identity-service/identity-client.service';
@Global()
@Module({
providers: [
PrismaService,
RedisService,
EvmProviderAdapter,
EventPublisherService,
WalletClientService,
IdentityClientService,
],
exports: [
PrismaService,
RedisService,
EvmProviderAdapter,
EventPublisherService,
WalletClientService,
IdentityClientService,
],
})
export class InfrastructureModule {}
// src/modules/domain.module.ts
import { Module } from '@nestjs/common';
import { InfrastructureModule } from './infrastructure.module';
import {
DEPOSIT_TRANSACTION_REPOSITORY,
MONITORED_ADDRESS_REPOSITORY,
BLOCK_CHECKPOINT_REPOSITORY,
} from '@/domain/repositories';
import { DepositTransactionRepositoryImpl } from '@/infrastructure/persistence/repositories/deposit-transaction.repository.impl';
import { MonitoredAddressRepositoryImpl } from '@/infrastructure/persistence/repositories/monitored-address.repository.impl';
import { BlockCheckpointRepositoryImpl } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl';
import { ConfirmationPolicyService } from '@/domain/services/confirmation-policy.service';
import { ChainConfigService } from '@/domain/services/chain-config.service';
@Module({
imports: [InfrastructureModule],
providers: [
// 仓储实现 (依赖倒置)
{
provide: DEPOSIT_TRANSACTION_REPOSITORY,
useClass: DepositTransactionRepositoryImpl,
},
{
provide: MONITORED_ADDRESS_REPOSITORY,
useClass: MonitoredAddressRepositoryImpl,
},
{
provide: BLOCK_CHECKPOINT_REPOSITORY,
useClass: BlockCheckpointRepositoryImpl,
},
// 领域服务
ConfirmationPolicyService,
ChainConfigService,
],
exports: [
DEPOSIT_TRANSACTION_REPOSITORY,
MONITORED_ADDRESS_REPOSITORY,
BLOCK_CHECKPOINT_REPOSITORY,
ConfirmationPolicyService,
ChainConfigService,
],
})
export class DomainModule {}
// src/modules/application.module.ts
import { Module } from '@nestjs/common';
import { DomainModule } from './domain.module';
import { InfrastructureModule } from './infrastructure.module';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { BalanceQueryService } from '@/application/services/balance-query.service';
import { TransactionBroadcastService } from '@/application/services/transaction-broadcast.service';
import { AddressRegistryService } from '@/application/services/address-registry.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { EventListenerService } from '@/infrastructure/blockchain/event-listener.service';
import { BlockScannerService } from '@/infrastructure/blockchain/block-scanner.service';
@Module({
imports: [DomainModule, InfrastructureModule],
providers: [
// 应用服务
DepositDetectionService,
BalanceQueryService,
TransactionBroadcastService,
AddressRegistryService,
// 基础设施服务 (需要应用层依赖)
AddressCacheService,
EventListenerService,
BlockScannerService,
],
exports: [
DepositDetectionService,
BalanceQueryService,
TransactionBroadcastService,
AddressRegistryService,
AddressCacheService,
],
})
export class ApplicationModule {}
// src/modules/api.module.ts
import { Module } from '@nestjs/common';
import { ApplicationModule } from './application.module';
import { HealthController } from '@/api/controllers/health.controller';
import { BalanceController } from '@/api/controllers/balance.controller';
import { InternalController } from '@/api/controllers/internal.controller';
import { EventConsumerController } from '@/infrastructure/kafka/event-consumer.controller';
@Module({
imports: [ApplicationModule],
controllers: [
HealthController,
BalanceController,
InternalController,
EventConsumerController,
],
})
export class ApiModule {}
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from '@/modules/api.module';
import appConfig from '@/config/app.config';
import databaseConfig from '@/config/database.config';
import kafkaConfig from '@/config/kafka.config';
import redisConfig from '@/config/redis.config';
import chainConfig from '@/config/chain.config';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
load: [appConfig, databaseConfig, kafkaConfig, redisConfig, chainConfig],
}),
ScheduleModule.forRoot(),
ApiModule,
],
})
export class AppModule {}
模块依赖关系图:
┌─────────────────────────────────────────────────────────────┐
│ AppModule │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ ApiModule │ │
│ │ Controllers: Health, Balance, Internal, EventConsumer │ │
│ └───────────────────────┬───────────────────────────────┘ │
│ │ imports │
│ ┌───────────────────────▼───────────────────────────────┐ │
│ │ ApplicationModule │ │
│ │ Services: DepositDetection, BalanceQuery, ... │ │
│ └───────────────────────┬───────────────────────────────┘ │
│ │ imports │
│ ┌───────────────────────▼───────────────────────────────┐ │
│ │ DomainModule │ │
│ │ Repositories (interfaces → impl), Domain Services │ │
│ └───────────────────────┬───────────────────────────────┘ │
│ │ imports │
│ ┌───────────────────────▼───────────────────────────────┐ │
│ │ InfrastructureModule (@Global) │ │
│ │ Prisma, Redis, Kafka, EVM Provider, External Clients │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3. 数据模型设计
3.1 Prisma Schema
// prisma/schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// ============================================
// 监控地址表
// 存储需要监听充值的地址
// ============================================
model MonitoredAddress {
id BigInt @id @default(autoincrement()) @map("address_id")
chainType String @map("chain_type") @db.VarChar(20) // KAVA, BSC
address String @db.VarChar(42) // 0x地址
userId BigInt @map("user_id") // 关联用户ID
isActive Boolean @default(true) @map("is_active") // 是否激活监听
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
deposits DepositTransaction[]
@@unique([chainType, address], name: "uk_chain_address")
@@index([userId], name: "idx_user")
@@index([chainType, isActive], name: "idx_chain_active")
@@map("monitored_addresses")
}
// ============================================
// 充值交易表 (Append-Only)
// 记录检测到的所有充值交易
// ============================================
model DepositTransaction {
id BigInt @id @default(autoincrement()) @map("deposit_id")
chainType String @map("chain_type") @db.VarChar(20)
txHash String @unique @map("tx_hash") @db.VarChar(66)
fromAddress String @map("from_address") @db.VarChar(42)
toAddress String @map("to_address") @db.VarChar(42)
tokenContract String @map("token_contract") @db.VarChar(42) // USDT合约地址
amount Decimal @db.Decimal(36, 18) // 原始金额
amountFormatted Decimal @map("amount_formatted") @db.Decimal(20, 8) // 格式化金额
blockNumber BigInt @map("block_number")
blockTimestamp DateTime @map("block_timestamp")
logIndex Int @map("log_index")
// 确认状态
confirmations Int @default(0)
status String @default("DETECTED") @db.VarChar(20) // DETECTED, CONFIRMING, CONFIRMED, NOTIFIED
// 关联
addressId BigInt @map("address_id")
userId BigInt @map("user_id")
// 通知状态
notifiedAt DateTime? @map("notified_at")
notifyAttempts Int @default(0) @map("notify_attempts")
lastNotifyError String? @map("last_notify_error") @db.Text
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
monitoredAddress MonitoredAddress @relation(fields: [addressId], references: [id])
@@index([chainType, status], name: "idx_chain_status")
@@index([userId], name: "idx_deposit_user")
@@index([blockNumber], name: "idx_block")
@@index([status, notifiedAt], name: "idx_pending_notify")
@@map("deposit_transactions")
}
// ============================================
// 区块扫描检查点 (每条链一条记录)
// 记录扫描进度,用于断点续扫
// ============================================
model BlockCheckpoint {
id BigInt @id @default(autoincrement()) @map("checkpoint_id")
chainType String @unique @map("chain_type") @db.VarChar(20)
lastScannedBlock BigInt @map("last_scanned_block")
lastScannedAt DateTime @map("last_scanned_at")
// 健康状态
isHealthy Boolean @default(true) @map("is_healthy")
lastError String? @map("last_error") @db.Text
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("block_checkpoints")
}
// ============================================
// 交易广播请求表
// 记录待广播和已广播的交易
// ============================================
model TransactionRequest {
id BigInt @id @default(autoincrement()) @map("request_id")
chainType String @map("chain_type") @db.VarChar(20)
// 请求来源
sourceService String @map("source_service") @db.VarChar(50)
sourceOrderId String @map("source_order_id") @db.VarChar(100)
// 交易数据
fromAddress String @map("from_address") @db.VarChar(42)
toAddress String @map("to_address") @db.VarChar(42)
value Decimal @db.Decimal(36, 18)
data String? @db.Text // 合约调用数据
// 签名数据 (由 MPC 服务提供)
signedTx String? @map("signed_tx") @db.Text
// 广播结果
txHash String? @map("tx_hash") @db.VarChar(66)
status String @default("PENDING") @db.VarChar(20) // PENDING, SIGNED, BROADCASTED, CONFIRMED, FAILED
// Gas 信息
gasLimit BigInt? @map("gas_limit")
gasPrice Decimal? @map("gas_price") @db.Decimal(36, 18)
nonce Int?
// 错误信息
errorMessage String? @map("error_message") @db.Text
retryCount Int @default(0) @map("retry_count")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@unique([sourceService, sourceOrderId], name: "uk_source_order")
@@index([chainType, status], name: "idx_tx_chain_status")
@@index([txHash], name: "idx_tx_hash")
@@map("transaction_requests")
}
// ============================================
// 区块链事件日志 (Append-Only 审计)
// ============================================
model BlockchainEvent {
id BigInt @id @default(autoincrement()) @map("event_id")
eventType String @map("event_type") @db.VarChar(50)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
eventData Json @map("event_data")
chainType String? @map("chain_type") @db.VarChar(20)
txHash String? @map("tx_hash") @db.VarChar(66)
occurredAt DateTime @default(now()) @map("occurred_at") @db.Timestamp(6)
@@index([aggregateType, aggregateId], name: "idx_event_aggregate")
@@index([eventType], name: "idx_event_type")
@@index([chainType], name: "idx_event_chain")
@@index([occurredAt], name: "idx_event_occurred")
@@map("blockchain_events")
}
4. 领域层设计
4.1 聚合根基类 (与其他服务保持一致)
为了与 authorization-service、wallet-service 等服务保持架构一致性,抽取统一的聚合根基类:
// src/domain/aggregates/aggregate-root.base.ts
import { DomainEvent } from '@/domain/events/domain-event.base';
/**
* 聚合根基类
*
* 所有聚合根都应继承此基类,统一管理领域事件的收集和清理。
* 参考: authorization-service/src/domain/aggregates/aggregate-root.base.ts
*/
export abstract class AggregateRoot<TId = bigint> {
private readonly _domainEvents: DomainEvent[] = [];
/**
* 聚合根唯一标识
*/
abstract get id(): TId | undefined;
/**
* 获取所有待发布的领域事件
*/
get domainEvents(): ReadonlyArray<DomainEvent> {
return [...this._domainEvents];
}
/**
* 添加领域事件
* @param event 领域事件
*/
protected addDomainEvent(event: DomainEvent): void {
this._domainEvents.push(event);
}
/**
* 清空领域事件(在事件发布后调用)
*/
clearDomainEvents(): void {
this._domainEvents.length = 0;
}
/**
* 检查是否有待发布的领域事件
*/
hasDomainEvents(): boolean {
return this._domainEvents.length > 0;
}
}
4.2 聚合根:DepositTransaction
// src/domain/aggregates/deposit-transaction/deposit-transaction.aggregate.ts
import { AggregateRoot } from '../aggregate-root.base';
import { DomainEvent, DepositDetectedEvent, DepositConfirmedEvent } from '@/domain/events';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
export interface DepositTransactionProps {
id?: bigint;
chainType: ChainType;
txHash: TxHash;
fromAddress: EvmAddress;
toAddress: EvmAddress;
tokenContract: EvmAddress;
amount: TokenAmount;
blockNumber: BlockNumber;
blockTimestamp: Date;
logIndex: number;
confirmations: number;
status: DepositStatus;
addressId: bigint;
userId: bigint;
notifiedAt?: Date;
notifyAttempts: number;
lastNotifyError?: string;
createdAt?: Date;
updatedAt?: Date;
}
export class DepositTransaction extends AggregateRoot<bigint> {
private constructor(private props: DepositTransactionProps) {
super();
}
// Getters
get id(): bigint | undefined { return this.props.id; }
get chainType(): ChainType { return this.props.chainType; }
get txHash(): TxHash { return this.props.txHash; }
get fromAddress(): EvmAddress { return this.props.fromAddress; }
get toAddress(): EvmAddress { return this.props.toAddress; }
get amount(): TokenAmount { return this.props.amount; }
get blockNumber(): BlockNumber { return this.props.blockNumber; }
get confirmations(): number { return this.props.confirmations; }
get status(): DepositStatus { return this.props.status; }
get userId(): bigint { return this.props.userId; }
get isConfirmed(): boolean { return this.props.status === DepositStatus.CONFIRMED; }
get isNotified(): boolean { return this.props.status === DepositStatus.NOTIFIED; }
get domainEvents(): DomainEvent[] { return [...this._domainEvents]; }
/**
* 创建新的充值交易(检测到时)
*/
static create(params: {
chainType: ChainType;
txHash: TxHash;
fromAddress: EvmAddress;
toAddress: EvmAddress;
tokenContract: EvmAddress;
amount: TokenAmount;
blockNumber: BlockNumber;
blockTimestamp: Date;
logIndex: number;
addressId: bigint;
userId: bigint;
}): DepositTransaction {
const deposit = new DepositTransaction({
...params,
confirmations: 1,
status: DepositStatus.DETECTED,
notifyAttempts: 0,
});
deposit.addDomainEvent(new DepositDetectedEvent({
chainType: params.chainType.value,
txHash: params.txHash.value,
fromAddress: params.fromAddress.value,
toAddress: params.toAddress.value,
amount: params.amount.formatted,
userId: params.userId.toString(),
blockNumber: params.blockNumber.value,
}));
return deposit;
}
/**
* 从数据库重建
*/
static reconstruct(props: DepositTransactionProps): DepositTransaction {
return new DepositTransaction(props);
}
/**
* 更新确认数
*/
updateConfirmations(newConfirmations: number, requiredConfirmations: number): void {
this.props.confirmations = newConfirmations;
if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.DETECTED) {
this.props.status = DepositStatus.CONFIRMING;
}
if (newConfirmations >= requiredConfirmations && this.props.status === DepositStatus.CONFIRMING) {
this.confirm();
}
}
/**
* 确认充值
*/
private confirm(): void {
if (this.props.status === DepositStatus.CONFIRMED) {
return;
}
this.props.status = DepositStatus.CONFIRMED;
this.addDomainEvent(new DepositConfirmedEvent({
depositId: this.props.id!.toString(),
chainType: this.props.chainType.value,
txHash: this.props.txHash.value,
amount: this.props.amount.formatted,
userId: this.props.userId.toString(),
confirmations: this.props.confirmations,
}));
}
/**
* 标记已通知
*/
markNotified(): void {
this.props.status = DepositStatus.NOTIFIED;
this.props.notifiedAt = new Date();
}
/**
* 记录通知失败
*/
recordNotifyFailure(error: string): void {
this.props.notifyAttempts++;
this.props.lastNotifyError = error;
}
// 注意:addDomainEvent 和 clearDomainEvents 方法已在 AggregateRoot 基类中定义
toProps(): DepositTransactionProps {
return { ...this.props };
}
}
4.3 值对象
4.3.1 EVM 地址值对象
// src/domain/value-objects/evm-address.vo.ts
import { ethers } from 'ethers';
export class EvmAddress {
private constructor(private readonly _value: string) {}
get value(): string {
return this._value;
}
get checksummed(): string {
return ethers.getAddress(this._value);
}
static create(address: string): EvmAddress {
if (!ethers.isAddress(address)) {
throw new Error(`Invalid EVM address: ${address}`);
}
return new EvmAddress(ethers.getAddress(address)); // 标准化为 checksum 格式
}
equals(other: EvmAddress): boolean {
return this._value.toLowerCase() === other._value.toLowerCase();
}
toString(): string {
return this._value;
}
}
4.3.2 代币金额值对象
// src/domain/value-objects/token-amount.vo.ts
import Decimal from 'decimal.js';
export class TokenAmount {
private constructor(
private readonly _raw: Decimal, // 链上原始值 (wei)
private readonly _decimals: number, // 代币精度
) {}
get raw(): Decimal {
return this._raw;
}
get decimals(): number {
return this._decimals;
}
get formatted(): string {
return this._raw.div(new Decimal(10).pow(this._decimals)).toFixed(8);
}
get value(): number {
return parseFloat(this.formatted);
}
static create(raw: string | bigint, decimals: number): TokenAmount {
return new TokenAmount(new Decimal(raw.toString()), decimals);
}
static fromFormatted(amount: string, decimals: number): TokenAmount {
const raw = new Decimal(amount).mul(new Decimal(10).pow(decimals));
return new TokenAmount(raw, decimals);
}
isZero(): boolean {
return this._raw.isZero();
}
greaterThan(other: TokenAmount): boolean {
return this._raw.greaterThan(other._raw);
}
}
4.3.3 链类型值对象
// src/domain/value-objects/chain-type.vo.ts
export enum ChainTypeEnum {
KAVA = 'KAVA',
BSC = 'BSC',
}
export class ChainType {
private constructor(private readonly _value: ChainTypeEnum) {}
get value(): ChainTypeEnum {
return this._value;
}
get name(): string {
switch (this._value) {
case ChainTypeEnum.KAVA: return 'KAVA EVM';
case ChainTypeEnum.BSC: return 'BSC';
}
}
get requiredConfirmations(): number {
switch (this._value) {
case ChainTypeEnum.KAVA: return 12;
case ChainTypeEnum.BSC: return 15;
}
}
get blockTime(): number {
switch (this._value) {
case ChainTypeEnum.KAVA: return 6; // 约 6 秒
case ChainTypeEnum.BSC: return 3; // 约 3 秒
}
}
static create(value: string): ChainType {
if (!Object.values(ChainTypeEnum).includes(value as ChainTypeEnum)) {
throw new Error(`Invalid chain type: ${value}`);
}
return new ChainType(value as ChainTypeEnum);
}
static KAVA(): ChainType {
return new ChainType(ChainTypeEnum.KAVA);
}
static BSC(): ChainType {
return new ChainType(ChainTypeEnum.BSC);
}
equals(other: ChainType): boolean {
return this._value === other._value;
}
}
4.3.4 区块号值对象
// src/domain/value-objects/block-number.vo.ts
/**
* 区块号值对象
*
* 封装区块号,确保类型安全和业务规则验证
*/
export class BlockNumber {
private constructor(private readonly _value: bigint) {}
get value(): bigint {
return this._value;
}
/**
* 创建区块号值对象
* @param value 区块号(支持 number 或 bigint)
* @throws Error 如果区块号为负数
*/
static create(value: number | bigint): BlockNumber {
const bn = BigInt(value);
if (bn < 0n) {
throw new Error(`Invalid block number: ${value}. Block number cannot be negative.`);
}
return new BlockNumber(bn);
}
/**
* 计算两个区块之间的差值
*/
diff(other: BlockNumber): bigint {
return this._value - other._value;
}
/**
* 检查是否大于另一个区块号
*/
greaterThan(other: BlockNumber): boolean {
return this._value > other._value;
}
/**
* 检查是否小于另一个区块号
*/
lessThan(other: BlockNumber): boolean {
return this._value < other._value;
}
/**
* 加上指定数量的区块
*/
add(blocks: number | bigint): BlockNumber {
return new BlockNumber(this._value + BigInt(blocks));
}
/**
* 减去指定数量的区块
*/
subtract(blocks: number | bigint): BlockNumber {
const result = this._value - BigInt(blocks);
if (result < 0n) {
throw new Error('Block number cannot be negative after subtraction');
}
return new BlockNumber(result);
}
equals(other: BlockNumber): boolean {
return this._value === other._value;
}
toString(): string {
return this._value.toString();
}
toNumber(): number {
return Number(this._value);
}
}
4.3.5 交易哈希值对象
// src/domain/value-objects/tx-hash.vo.ts
/**
* 交易哈希值对象
*
* 封装 EVM 交易哈希,确保格式正确(0x + 64位十六进制字符)
*/
export class TxHash {
private static readonly PATTERN = /^0x[a-fA-F0-9]{64}$/;
private constructor(private readonly _value: string) {}
get value(): string {
return this._value;
}
/**
* 获取标准化(小写)的哈希值
*/
get normalized(): string {
return this._value.toLowerCase();
}
/**
* 创建交易哈希值对象
* @param hash 交易哈希字符串
* @throws Error 如果哈希格式无效
*/
static create(hash: string): TxHash {
if (!this.isValid(hash)) {
throw new Error(`Invalid transaction hash: ${hash}. Expected format: 0x + 64 hex characters.`);
}
// 统一存储为小写格式
return new TxHash(hash.toLowerCase());
}
/**
* 验证交易哈希格式是否有效
*/
static isValid(hash: string): boolean {
return this.PATTERN.test(hash);
}
/**
* 获取缩短显示格式 (0x1234...abcd)
*/
toShortString(prefixLength = 6, suffixLength = 4): string {
return `${this._value.slice(0, prefixLength + 2)}...${this._value.slice(-suffixLength)}`;
}
equals(other: TxHash): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
}
4.3.6 值对象导出索引
// src/domain/value-objects/index.ts
export { ChainType, ChainTypeEnum } from './chain-type.vo';
export { EvmAddress } from './evm-address.vo';
export { TokenAmount } from './token-amount.vo';
export { BlockNumber } from './block-number.vo';
export { TxHash } from './tx-hash.vo';
4.4 领域事件
// src/domain/events/deposit-detected.event.ts
import { DomainEvent } from './domain-event.base';
export interface DepositDetectedPayload {
chainType: string;
txHash: string;
fromAddress: string;
toAddress: string;
amount: string;
userId: string;
blockNumber: number;
}
export class DepositDetectedEvent extends DomainEvent {
constructor(public readonly payload: DepositDetectedPayload) {
super();
}
get eventType(): string {
return 'DepositDetected';
}
get aggregateId(): string {
return this.payload.txHash;
}
get aggregateType(): string {
return 'DepositTransaction';
}
}
// src/domain/events/deposit-confirmed.event.ts
import { DomainEvent } from './domain-event.base';
export interface DepositConfirmedPayload {
depositId: string;
chainType: string;
txHash: string;
amount: string;
userId: string;
confirmations: number;
}
export class DepositConfirmedEvent extends DomainEvent {
constructor(public readonly payload: DepositConfirmedPayload) {
super();
}
get eventType(): string {
return 'DepositConfirmed';
}
get aggregateId(): string {
return this.payload.depositId;
}
get aggregateType(): string {
return 'DepositTransaction';
}
}
4.5 仓储接口 (Ports)
// src/domain/repositories/deposit-transaction.repository.interface.ts
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
export interface IDepositTransactionRepository {
save(deposit: DepositTransaction): Promise<DepositTransaction>;
findById(id: bigint): Promise<DepositTransaction | null>;
findByTxHash(txHash: TxHash): Promise<DepositTransaction | null>;
existsByTxHash(txHash: TxHash): Promise<boolean>;
findByStatus(status: DepositStatus): Promise<DepositTransaction[]>;
findPendingNotification(limit?: number): Promise<DepositTransaction[]>;
findByUserIdAndChain(userId: bigint, chainType: ChainType): Promise<DepositTransaction[]>;
findUnconfirmed(chainType: ChainType, minBlockNumber: bigint): Promise<DepositTransaction[]>;
}
export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
// src/domain/repositories/monitored-address.repository.interface.ts
import { ChainType, EvmAddress } from '@/domain/value-objects';
export interface MonitoredAddressData {
id: bigint;
chainType: ChainType;
address: EvmAddress;
userId: bigint;
isActive: boolean;
createdAt: Date;
}
export interface IMonitoredAddressRepository {
save(data: Omit<MonitoredAddressData, 'id' | 'createdAt'>): Promise<MonitoredAddressData>;
findByAddress(chainType: ChainType, address: EvmAddress): Promise<MonitoredAddressData | null>;
findByUserId(userId: bigint): Promise<MonitoredAddressData[]>;
findActiveByChain(chainType: ChainType): Promise<MonitoredAddressData[]>;
getAllActiveAddresses(): Promise<Set<string>>; // 返回所有激活地址的 Set
deactivate(id: bigint): Promise<void>;
}
export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');
5. 基础设施层设计
5.1 EVM Provider 适配器
// src/infrastructure/blockchain/evm-provider.adapter.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ethers, JsonRpcProvider, WebSocketProvider, Contract } from 'ethers';
import { ChainTypeEnum } from '@/domain/value-objects';
interface ChainConfig {
rpcUrl: string;
wsUrl?: string;
usdtContract: string;
decimals: number;
chainId: number;
}
const ERC20_ABI = [
'event Transfer(address indexed from, address indexed to, uint256 value)',
'function balanceOf(address owner) view returns (uint256)',
'function decimals() view returns (uint8)',
];
@Injectable()
export class EvmProviderAdapter implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EvmProviderAdapter.name);
private rpcProviders: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
private wsProviders: Map<ChainTypeEnum, WebSocketProvider> = new Map();
private contracts: Map<ChainTypeEnum, Contract> = new Map();
private readonly chainConfigs: Record<ChainTypeEnum, ChainConfig> = {
[ChainTypeEnum.KAVA]: {
rpcUrl: this.configService.get('KAVA_RPC_URL', 'https://evm.kava.io'),
wsUrl: this.configService.get('KAVA_WS_URL'),
usdtContract: '0x919C1c267BC06a7039e03fcc2eF738525769109c',
decimals: 6,
chainId: 2222,
},
[ChainTypeEnum.BSC]: {
rpcUrl: this.configService.get('BSC_RPC_URL', 'https://bsc-dataseed.binance.org'),
wsUrl: this.configService.get('BSC_WS_URL'),
usdtContract: '0x55d398326f99059fF775485246999027B3197955',
decimals: 18,
chainId: 56,
},
};
constructor(private readonly configService: ConfigService) {}
async onModuleInit(): Promise<void> {
await this.initializeProviders();
}
async onModuleDestroy(): Promise<void> {
await this.closeProviders();
}
private async initializeProviders(): Promise<void> {
for (const [chainType, config] of Object.entries(this.chainConfigs)) {
try {
// RPC Provider (用于查询)
const rpcProvider = new JsonRpcProvider(config.rpcUrl, config.chainId, {
staticNetwork: true,
});
this.rpcProviders.set(chainType as ChainTypeEnum, rpcProvider);
// WebSocket Provider (用于事件监听)
if (config.wsUrl) {
const wsProvider = new WebSocketProvider(config.wsUrl, config.chainId);
this.wsProviders.set(chainType as ChainTypeEnum, wsProvider);
}
// USDT Contract
const provider = config.wsUrl
? this.wsProviders.get(chainType as ChainTypeEnum)!
: rpcProvider;
const contract = new Contract(config.usdtContract, ERC20_ABI, provider);
this.contracts.set(chainType as ChainTypeEnum, contract);
this.logger.log(`Initialized ${chainType} provider`);
} catch (error) {
this.logger.error(`Failed to initialize ${chainType} provider: ${error.message}`);
}
}
}
private async closeProviders(): Promise<void> {
for (const wsProvider of this.wsProviders.values()) {
await wsProvider.destroy();
}
}
getRpcProvider(chainType: ChainTypeEnum): JsonRpcProvider {
const provider = this.rpcProviders.get(chainType);
if (!provider) {
throw new Error(`No RPC provider for chain: ${chainType}`);
}
return provider;
}
getWsProvider(chainType: ChainTypeEnum): WebSocketProvider | undefined {
return this.wsProviders.get(chainType);
}
getUsdtContract(chainType: ChainTypeEnum): Contract {
const contract = this.contracts.get(chainType);
if (!contract) {
throw new Error(`No USDT contract for chain: ${chainType}`);
}
return contract;
}
getChainConfig(chainType: ChainTypeEnum): ChainConfig {
return this.chainConfigs[chainType];
}
async getCurrentBlockNumber(chainType: ChainTypeEnum): Promise<bigint> {
const provider = this.getRpcProvider(chainType);
const blockNumber = await provider.getBlockNumber();
return BigInt(blockNumber);
}
async getBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
const contract = this.getUsdtContract(chainType);
const config = this.getChainConfig(chainType);
const balance = await contract.balanceOf(address);
return ethers.formatUnits(balance, config.decimals);
}
}
5.2 事件监听器服务
// src/infrastructure/blockchain/event-listener.service.ts
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ChainTypeEnum } from '@/domain/value-objects';
import { Contract, EventLog } from 'ethers';
@Injectable()
export class EventListenerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventListenerService.name);
private isListening = false;
private listeners: Map<ChainTypeEnum, any> = new Map();
constructor(
private readonly evmProvider: EvmProviderAdapter,
private readonly depositService: DepositDetectionService,
private readonly addressCache: AddressCacheService,
) {}
async onModuleInit(): Promise<void> {
await this.startListening();
}
async onModuleDestroy(): Promise<void> {
await this.stopListening();
}
async startListening(): Promise<void> {
if (this.isListening) return;
for (const chainType of Object.values(ChainTypeEnum)) {
await this.subscribeToChain(chainType);
}
this.isListening = true;
this.logger.log('Event listeners started');
}
private async subscribeToChain(chainType: ChainTypeEnum): Promise<void> {
const wsProvider = this.evmProvider.getWsProvider(chainType);
if (!wsProvider) {
this.logger.warn(`No WebSocket provider for ${chainType}, using polling`);
return;
}
const contract = this.evmProvider.getUsdtContract(chainType);
// 监听所有 Transfer 事件
const filter = contract.filters.Transfer();
const listener = async (from: string, to: string, value: bigint, event: EventLog) => {
try {
// 检查目标地址是否是平台监控地址
const isMonitored = await this.addressCache.isMonitoredAddress(chainType, to);
if (!isMonitored) return;
this.logger.log(`Detected deposit: ${chainType} ${event.transactionHash}`);
await this.depositService.handleTransferEvent({
chainType,
txHash: event.transactionHash,
fromAddress: from,
toAddress: to,
amount: value.toString(),
blockNumber: event.blockNumber,
logIndex: event.index,
});
} catch (error) {
this.logger.error(`Error processing Transfer event: ${error.message}`);
}
};
contract.on(filter, listener);
this.listeners.set(chainType, { contract, filter, listener });
this.logger.log(`Subscribed to ${chainType} Transfer events`);
}
async stopListening(): Promise<void> {
for (const [chainType, { contract, filter, listener }] of this.listeners) {
contract.off(filter, listener);
this.logger.log(`Unsubscribed from ${chainType}`);
}
this.listeners.clear();
this.isListening = false;
}
}
5.3 区块扫描器(补扫服务)
// src/infrastructure/blockchain/block-scanner.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { EvmProviderAdapter } from './evm-provider.adapter';
import { DepositDetectionService } from '@/application/services/deposit-detection.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { BlockCheckpointRepository } from '@/infrastructure/persistence/repositories/block-checkpoint.repository.impl';
import { ChainTypeEnum } from '@/domain/value-objects';
import { ethers } from 'ethers';
const BATCH_SIZE = 1000; // 每次扫描的区块数
@Injectable()
export class BlockScannerService {
private readonly logger = new Logger(BlockScannerService.name);
private isScanning: Map<ChainTypeEnum, boolean> = new Map();
constructor(
private readonly evmProvider: EvmProviderAdapter,
private readonly depositService: DepositDetectionService,
private readonly addressCache: AddressCacheService,
private readonly checkpointRepo: BlockCheckpointRepository,
) {}
/**
* 定时补扫任务 (每 5 分钟)
*/
@Cron(CronExpression.EVERY_5_MINUTES)
async scheduledScan(): Promise<void> {
for (const chainType of Object.values(ChainTypeEnum)) {
await this.scanChain(chainType);
}
}
/**
* 扫描指定链的区块
*/
async scanChain(chainType: ChainTypeEnum): Promise<void> {
if (this.isScanning.get(chainType)) {
this.logger.debug(`${chainType} scan already in progress`);
return;
}
this.isScanning.set(chainType, true);
try {
const checkpoint = await this.checkpointRepo.findByChain(chainType);
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
let fromBlock = checkpoint
? checkpoint.lastScannedBlock + 1n
: currentBlock - 1000n; // 首次启动扫描最近 1000 块
while (fromBlock <= currentBlock) {
const toBlock = fromBlock + BigInt(BATCH_SIZE) - 1n > currentBlock
? currentBlock
: fromBlock + BigInt(BATCH_SIZE) - 1n;
await this.scanBlockRange(chainType, fromBlock, toBlock);
// 更新检查点
await this.checkpointRepo.updateCheckpoint(chainType, toBlock);
fromBlock = toBlock + 1n;
}
this.logger.debug(`${chainType} scan completed up to block ${currentBlock}`);
} catch (error) {
this.logger.error(`${chainType} scan failed: ${error.message}`);
await this.checkpointRepo.markUnhealthy(chainType, error.message);
} finally {
this.isScanning.set(chainType, false);
}
}
/**
* 扫描指定区块范围
*/
private async scanBlockRange(
chainType: ChainTypeEnum,
fromBlock: bigint,
toBlock: bigint,
): Promise<void> {
const contract = this.evmProvider.getUsdtContract(chainType);
const monitoredAddresses = await this.addressCache.getMonitoredAddresses(chainType);
if (monitoredAddresses.size === 0) {
return;
}
// 查询 Transfer 事件
const filter = contract.filters.Transfer(null, [...monitoredAddresses]);
const events = await contract.queryFilter(filter, Number(fromBlock), Number(toBlock));
for (const event of events) {
if (!('args' in event)) continue;
const [from, to, value] = event.args;
try {
await this.depositService.handleTransferEvent({
chainType,
txHash: event.transactionHash,
fromAddress: from,
toAddress: to,
amount: value.toString(),
blockNumber: event.blockNumber,
logIndex: event.index,
});
} catch (error) {
// 重复交易等预期错误,忽略
if (!error.message.includes('already exists')) {
this.logger.error(`Error processing event: ${error.message}`);
}
}
}
}
/**
* 手动触发全量扫描
*/
async fullScan(chainType: ChainTypeEnum, fromBlock: bigint, toBlock: bigint): Promise<void> {
this.logger.log(`Starting full scan for ${chainType} from ${fromBlock} to ${toBlock}`);
await this.scanBlockRange(chainType, fromBlock, toBlock);
this.logger.log(`Full scan completed for ${chainType}`);
}
}
5.4 地址缓存服务
// src/infrastructure/redis/address-cache.service.ts
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { RedisService } from './redis.service';
import { IMonitoredAddressRepository, MONITORED_ADDRESS_REPOSITORY } from '@/domain/repositories';
import { Inject } from '@nestjs/common';
import { ChainTypeEnum } from '@/domain/value-objects';
const CACHE_KEY_PREFIX = 'blockchain:addresses:';
const CACHE_TTL = 300; // 5 分钟
@Injectable()
export class AddressCacheService implements OnModuleInit {
private readonly logger = new Logger(AddressCacheService.name);
private localCache: Map<ChainTypeEnum, Set<string>> = new Map();
constructor(
private readonly redis: RedisService,
@Inject(MONITORED_ADDRESS_REPOSITORY)
private readonly addressRepo: IMonitoredAddressRepository,
) {}
async onModuleInit(): Promise<void> {
await this.refreshCache();
}
/**
* 检查地址是否被监控
*/
async isMonitoredAddress(chainType: ChainTypeEnum, address: string): Promise<boolean> {
const normalizedAddress = address.toLowerCase();
// 先查本地缓存
const localSet = this.localCache.get(chainType);
if (localSet?.has(normalizedAddress)) {
return true;
}
// 再查 Redis
const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
const exists = await this.redis.sismember(cacheKey, normalizedAddress);
if (exists) {
// 更新本地缓存
if (!localSet) {
this.localCache.set(chainType, new Set([normalizedAddress]));
} else {
localSet.add(normalizedAddress);
}
}
return exists;
}
/**
* 获取链的所有监控地址
*/
async getMonitoredAddresses(chainType: ChainTypeEnum): Promise<Set<string>> {
const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
const addresses = await this.redis.smembers(cacheKey);
return new Set(addresses);
}
/**
* 添加监控地址
*/
async addAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
const normalizedAddress = address.toLowerCase();
const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
await this.redis.sadd(cacheKey, normalizedAddress);
// 更新本地缓存
let localSet = this.localCache.get(chainType);
if (!localSet) {
localSet = new Set();
this.localCache.set(chainType, localSet);
}
localSet.add(normalizedAddress);
}
/**
* 移除监控地址
*/
async removeAddress(chainType: ChainTypeEnum, address: string): Promise<void> {
const normalizedAddress = address.toLowerCase();
const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
await this.redis.srem(cacheKey, normalizedAddress);
// 更新本地缓存
this.localCache.get(chainType)?.delete(normalizedAddress);
}
/**
* 从数据库刷新缓存
*/
async refreshCache(): Promise<void> {
this.logger.log('Refreshing address cache from database...');
for (const chainType of Object.values(ChainTypeEnum)) {
const addresses = await this.addressRepo.findActiveByChain(
{ value: chainType } as any
);
const cacheKey = `${CACHE_KEY_PREFIX}${chainType}`;
const addressSet = new Set(addresses.map(a => a.address.value.toLowerCase()));
// 更新 Redis
if (addressSet.size > 0) {
await this.redis.del(cacheKey);
await this.redis.sadd(cacheKey, ...addressSet);
}
// 更新本地缓存
this.localCache.set(chainType, addressSet);
this.logger.log(`Loaded ${addressSet.size} addresses for ${chainType}`);
}
}
}
6. 应用层设计
6.1 充值检测服务
// src/application/services/deposit-detection.service.ts
import { Injectable, Inject, Logger } from '@nestjs/common';
import {
IDepositTransactionRepository,
DEPOSIT_TRANSACTION_REPOSITORY,
IMonitoredAddressRepository,
MONITORED_ADDRESS_REPOSITORY,
} from '@/domain/repositories';
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber, ChainTypeEnum } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { WalletClientService } from '@/infrastructure/external/wallet-service/wallet-client.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
export interface TransferEventData {
chainType: ChainTypeEnum;
txHash: string;
fromAddress: string;
toAddress: string;
amount: string;
blockNumber: number;
logIndex: number;
}
@Injectable()
export class DepositDetectionService {
private readonly logger = new Logger(DepositDetectionService.name);
constructor(
@Inject(DEPOSIT_TRANSACTION_REPOSITORY)
private readonly depositRepo: IDepositTransactionRepository,
@Inject(MONITORED_ADDRESS_REPOSITORY)
private readonly addressRepo: IMonitoredAddressRepository,
private readonly eventPublisher: EventPublisherService,
private readonly walletClient: WalletClientService,
private readonly evmProvider: EvmProviderAdapter,
) {}
/**
* 处理检测到的 Transfer 事件
*/
async handleTransferEvent(data: TransferEventData): Promise<void> {
const txHash = TxHash.create(data.txHash);
// 检查是否已处理
const exists = await this.depositRepo.existsByTxHash(txHash);
if (exists) {
this.logger.debug(`Deposit ${data.txHash} already exists, skipping`);
return;
}
// 查找监控地址信息
const chainType = ChainType.create(data.chainType);
const toAddress = EvmAddress.create(data.toAddress);
const addressInfo = await this.addressRepo.findByAddress(chainType, toAddress);
if (!addressInfo) {
this.logger.warn(`Address ${data.toAddress} not found in monitored addresses`);
return;
}
// 获取区块时间
const block = await this.evmProvider.getRpcProvider(data.chainType).getBlock(data.blockNumber);
const blockTimestamp = new Date(block!.timestamp * 1000);
// 获取代币精度
const chainConfig = this.evmProvider.getChainConfig(data.chainType);
// 创建充值记录
const deposit = DepositTransaction.create({
chainType,
txHash,
fromAddress: EvmAddress.create(data.fromAddress),
toAddress,
tokenContract: EvmAddress.create(chainConfig.usdtContract),
amount: TokenAmount.create(data.amount, chainConfig.decimals),
blockNumber: BlockNumber.create(data.blockNumber),
blockTimestamp,
logIndex: data.logIndex,
addressId: addressInfo.id,
userId: addressInfo.userId,
});
// 保存并发布事件
await this.depositRepo.save(deposit);
await this.eventPublisher.publishAll(deposit.domainEvents);
deposit.clearDomainEvents();
this.logger.log(`Deposit detected: ${data.txHash}, amount: ${deposit.amount.formatted}`);
// 检查确认数
await this.checkConfirmationsAndNotify(deposit);
}
/**
* 检查确认数并通知 wallet-service
*/
async checkConfirmationsAndNotify(deposit: DepositTransaction): Promise<void> {
const chainType = deposit.chainType;
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType.value as ChainTypeEnum);
const confirmations = Number(currentBlock - deposit.blockNumber.value) + 1;
const requiredConfirmations = chainType.requiredConfirmations;
deposit.updateConfirmations(confirmations, requiredConfirmations);
if (deposit.isConfirmed && !deposit.isNotified) {
try {
// 调用 wallet-service 入账
await this.walletClient.handleDeposit({
userId: deposit.userId.toString(),
amount: deposit.amount.value,
chainType: deposit.chainType.value,
txHash: deposit.txHash.value,
});
deposit.markNotified();
this.logger.log(`Deposit notified: ${deposit.txHash.value}`);
} catch (error) {
deposit.recordNotifyFailure(error.message);
this.logger.error(`Failed to notify deposit: ${error.message}`);
}
}
await this.depositRepo.save(deposit);
await this.eventPublisher.publishAll(deposit.domainEvents);
deposit.clearDomainEvents();
}
/**
* 定时任务:更新未确认交易的确认数
*/
async updatePendingConfirmations(): Promise<void> {
for (const chainType of Object.values(ChainTypeEnum)) {
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
const minBlock = currentBlock - 100n; // 只处理最近 100 块内的
const pendingDeposits = await this.depositRepo.findUnconfirmed(
ChainType.create(chainType),
minBlock,
);
for (const deposit of pendingDeposits) {
await this.checkConfirmationsAndNotify(deposit);
}
}
}
/**
* 定时任务:重试失败的通知
*/
async retryFailedNotifications(): Promise<void> {
const pendingNotifications = await this.depositRepo.findPendingNotification(50);
for (const deposit of pendingNotifications) {
if (deposit.toProps().notifyAttempts >= 10) {
this.logger.error(`Deposit ${deposit.txHash.value} exceeded max retry attempts`);
continue;
}
await this.checkConfirmationsAndNotify(deposit);
}
}
}
6.2 余额查询服务
// src/application/services/balance-query.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { ChainTypeEnum } from '@/domain/value-objects';
export interface BalanceResult {
chainType: string;
address: string;
usdtBalance: string;
nativeBalance: string;
}
@Injectable()
export class BalanceQueryService {
private readonly logger = new Logger(BalanceQueryService.name);
constructor(private readonly evmProvider: EvmProviderAdapter) {}
/**
* 查询单个地址的余额
*/
async getBalance(chainType: ChainTypeEnum, address: string): Promise<BalanceResult> {
const [usdtBalance, nativeBalance] = await Promise.all([
this.evmProvider.getBalance(chainType, address),
this.getNativeBalance(chainType, address),
]);
return {
chainType,
address,
usdtBalance,
nativeBalance,
};
}
/**
* 批量查询余额
*/
async getBatchBalances(
addresses: Array<{ chainType: ChainTypeEnum; address: string }>,
): Promise<BalanceResult[]> {
return Promise.all(
addresses.map(({ chainType, address }) => this.getBalance(chainType, address)),
);
}
private async getNativeBalance(chainType: ChainTypeEnum, address: string): Promise<string> {
const provider = this.evmProvider.getRpcProvider(chainType);
const balance = await provider.getBalance(address);
const { ethers } = await import('ethers');
return ethers.formatEther(balance);
}
}
7. Kafka 事件设计
7.1 发布的事件
// src/infrastructure/kafka/event-publisher.service.ts
export const BLOCKCHAIN_TOPICS = {
DEPOSIT_DETECTED: 'blockchain.DepositDetected',
DEPOSIT_CONFIRMED: 'blockchain.DepositConfirmed',
TRANSACTION_BROADCASTED: 'blockchain.TransactionBroadcasted',
TRANSACTION_CONFIRMED: 'blockchain.TransactionConfirmed',
BLOCK_SCANNED: 'blockchain.BlockScanned',
} as const;
7.2 消费的事件
// src/infrastructure/kafka/event-consumer.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
import { AddressRegistryService } from '@/application/services/address-registry.service';
@Controller()
export class EventConsumerController {
private readonly logger = new Logger(EventConsumerController.name);
constructor(private readonly addressRegistry: AddressRegistryService) {}
/**
* 监听用户创建事件,注册监控地址
*/
@MessagePattern('identity.UserAccountCreated')
async handleUserCreated(
@Payload() message: any,
@Ctx() context: KafkaContext,
): Promise<void> {
this.logger.log(`Received UserAccountCreated: ${message.payload.userId}`);
// 注册用户的钱包地址到监控列表
// 注意:地址信息可能在后续的 WalletBound 事件中
}
/**
* 监听钱包绑定事件,注册监控地址
*/
@MessagePattern('identity.WalletBound')
async handleWalletBound(
@Payload() message: any,
@Ctx() context: KafkaContext,
): Promise<void> {
this.logger.log(`Received WalletBound: userId=${message.payload.userId}`);
await this.addressRegistry.registerAddress({
userId: BigInt(message.payload.userId),
chainType: message.payload.chainType,
address: message.payload.address,
});
}
/**
* 监听多钱包绑定事件
*/
@MessagePattern('identity.MultipleWalletsBound')
async handleMultipleWalletsBound(
@Payload() message: any,
@Ctx() context: KafkaContext,
): Promise<void> {
this.logger.log(`Received MultipleWalletsBound: userId=${message.payload.userId}`);
for (const wallet of message.payload.wallets) {
await this.addressRegistry.registerAddress({
userId: BigInt(message.payload.userId),
chainType: wallet.chainType,
address: wallet.address,
});
}
}
}
8. API 设计
8.1 健康检查
// src/api/controllers/health.controller.ts
@Controller('blockchain/health')
export class HealthController {
@Get()
@Public()
async check(): Promise<{ status: string; chains: Record<string, boolean> }> {
// 检查各链连接状态
}
}
8.2 余额查询 (内部 API)
// src/api/controllers/balance.controller.ts
@Controller('blockchain/balance')
export class BalanceController {
@Get(':chainType/:address')
@Public() // 内部服务调用
async getBalance(
@Param('chainType') chainType: string,
@Param('address') address: string,
): Promise<BalanceResult> {
// ...
}
}
9. 配置设计
9.1 环境变量
# .env.example
# 服务配置
PORT=3012
NODE_ENV=development
# 数据库
DATABASE_URL=postgresql://rwa_user:password@localhost:5432/rwa_blockchain
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=11
# Kafka
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=blockchain-service
# 区块链 RPC
KAVA_RPC_URL=https://evm.kava.io
KAVA_WS_URL=wss://evm.kava.io/ws
BSC_RPC_URL=https://bsc-dataseed.binance.org
BSC_WS_URL=wss://bsc-ws-node.nariox.org:443
# 服务间调用
WALLET_SERVICE_URL=http://localhost:3001
IDENTITY_SERVICE_URL=http://localhost:3000
10. 部署配置
10.1 docker-compose.yml 配置
在主 docker-compose.yml 中添加:
blockchain-service:
build:
context: ./blockchain-service
dockerfile: Dockerfile
container_name: rwa-blockchain-service
ports:
- "3012:3012"
environment:
- NODE_ENV=production
- DATABASE_URL=postgresql://${POSTGRES_USER:-rwa_user}:${POSTGRES_PASSWORD:-rwa_secure_password}@postgres:5432/rwa_blockchain?schema=public
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=11
- KAFKA_BROKERS=kafka:29092
- KAFKA_CLIENT_ID=blockchain-service
- KAVA_RPC_URL=${KAVA_RPC_URL:-https://evm.kava.io}
- KAVA_WS_URL=${KAVA_WS_URL}
- BSC_RPC_URL=${BSC_RPC_URL:-https://bsc-dataseed.binance.org}
- BSC_WS_URL=${BSC_WS_URL}
- WALLET_SERVICE_URL=http://rwa-wallet-service:3001
- IDENTITY_SERVICE_URL=http://rwa-identity-service:3000
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_healthy
networks:
- rwa-network
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3012/api/v1/blockchain/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
10.2 Kong 路由配置
# 在 kong.yml 中添加
- name: blockchain-service
url: http://192.168.1.111:3012
routes:
- name: blockchain-api
paths:
- /api/v1/blockchain
strip_path: false
11. 初始化脚本
在 scripts/init-databases.sh 中添加 rwa_blockchain:
for db in rwa_identity rwa_wallet ... rwa_blockchain; do
12. 关键设计决策
12.1 双重保障机制
实时监听 (WebSocket) 补扫任务 (定时)
│ │
▼ ▼
检测充值 ───────────────> 去重处理
│ │
└──────────┬───────────┘
▼
确认数检查
│
▼
通知 wallet-service
12.2 确认数策略
| 链 | 要求确认数 | 约等待时间 |
|---|---|---|
| KAVA | 12 块 | ~72 秒 |
| BSC | 15 块 | ~45 秒 |
12.3 失败重试策略
- 通知失败最多重试 10 次
- 指数退避:1s, 2s, 4s, 8s, ...
- 超过最大重试次数告警
13. 测试策略
13.1 单元测试
- 领域对象测试 (DepositTransaction, ValueObjects)
- 确认数计算逻辑
- 事件生成逻辑
13.2 集成测试
- 区块链 Provider 连接
- 事件监听和解析
- 数据库读写
13.3 E2E 测试
- 完整充值流程模拟
- 补扫逻辑验证
- 失败重试验证
14. 监控指标
blockchain_deposit_detected_total- 检测到的充值数blockchain_deposit_confirmed_total- 确认的充值数blockchain_notify_failed_total- 通知失败数blockchain_block_scan_lag- 扫描延迟 (当前块 - 已扫描块)blockchain_rpc_latency- RPC 调用延迟blockchain_ws_reconnect_total- WebSocket 重连次数
15. 迁移计划
从 identity-service 迁移
- 将
BlockchainQueryService逻辑迁移到BalanceQueryService - 将
DepositService逻辑拆分:- 地址获取 → 保留在 identity-service
- 余额查询 → 迁移到 blockchain-service
- 新增事件监听、补扫逻辑
- identity-service 发布
WalletBound事件 - blockchain-service 消费事件并注册监控地址
16. 架构兼容性说明
16.1 与其他服务的架构对比
本服务严格遵循与其他微服务相同的 DDD + 六边形架构 + 微服务 设计原则:
| 特性 | blockchain-service | identity-service | wallet-service | admin-service |
|---|---|---|---|---|
| DDD 分层 | ✅ | ✅ | ✅ | ✅ |
| 六边形架构 | ✅ | ✅ | ✅ | ✅ |
| 模块化设计 | ✅ (modules/) | ✅ | ✅ | ✅ |
| 聚合根基类 | ✅ (AggregateRoot) | ✅ | ✅ | ✅ |
| 值对象 | 5个 (ChainType, EvmAddress, TokenAmount, BlockNumber, TxHash) | 2个+ | 5个+ | 7个+ |
| 领域事件 | ✅ | ❌ | ✅ | ❌ |
| CQRS | ✅ | ✅ | ✅ | ✅ |
| Kafka 集成 | ✅ | ✅ | ❌ | ❌ |
| Redis 缓存 | ✅ | ✅ | ❌ | ❌ |
| 定时任务 | ✅ (@Cron) | ❌ | ❌ | ❌ |
16.2 关键架构原则
依赖倒置原则 (Dependency Inversion)
┌─────────────────────────────────────────────────────────┐
│ Domain Layer │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Repository Interfaces (Ports) │ │
│ │ - IDepositTransactionRepository │ │
│ │ - IMonitoredAddressRepository │ │
│ └─────────────────────────────────────────────────┘ │
│ ▲ │
│ │ 依赖 │
│ │ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Infrastructure Layer │ │
│ │ Repository Implementations │ │
│ │ - DepositTransactionRepositoryImpl │ │
│ │ - MonitoredAddressRepositoryImpl │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
六边形架构端口与适配器
| 类型 | 端口 (Port) | 适配器 (Adapter) |
|---|---|---|
| 入站 (Driving) | Controllers | HTTP API |
| 入站 (Driving) | Kafka Consumer | Event Handler |
| 出站 (Driven) | Repository Interfaces | Prisma Repositories |
| 出站 (Driven) | Cache Interface | Redis Cache |
| 出站 (Driven) | Event Publisher | Kafka Producer |
| 出站 (Driven) | External Services | HTTP Clients |
16.3 命名约定
为与其他服务保持一致,请遵循以下命名约定:
| 类型 | 命名规则 | 示例 |
|---|---|---|
| 聚合根 | *.aggregate.ts |
deposit-transaction.aggregate.ts |
| 值对象 | *.vo.ts |
chain-type.vo.ts, block-number.vo.ts |
| 领域事件 | *.event.ts |
deposit-detected.event.ts |
| 仓储接口 | *.repository.interface.ts |
deposit-transaction.repository.interface.ts |
| 仓储实现 | *.repository.impl.ts |
deposit-transaction.repository.impl.ts |
| 映射器 | *.mapper.ts |
deposit-transaction.mapper.ts |
| 模块 | *.module.ts |
api.module.ts, domain.module.ts |
16.4 Symbol Token 注入规范
使用 Symbol 作为依赖注入 Token,与其他服务保持一致:
// 定义 (domain/repositories/index.ts)
export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');
export const BLOCK_CHECKPOINT_REPOSITORY = Symbol('BLOCK_CHECKPOINT_REPOSITORY');
// 注册 (modules/domain.module.ts)
{
provide: DEPOSIT_TRANSACTION_REPOSITORY,
useClass: DepositTransactionRepositoryImpl,
}
// 注入 (application/services/*.ts)
constructor(
@Inject(DEPOSIT_TRANSACTION_REPOSITORY)
private readonly depositRepo: IDepositTransactionRepository,
) {}
文档版本: 1.1.0 最后更新: 2025-12-03 变更说明: 添加模块化设计、聚合根基类、完整值对象定义、架构兼容性说明