feat(blockchain-service): implement complete blockchain service with DDD + Hexagonal architecture

- Domain layer: ChainType, EvmAddress, TxHash, TokenAmount, BlockNumber value objects
- Domain events: DepositDetected, DepositConfirmed, WalletAddressCreated, TransactionBroadcasted
- Aggregates: DepositTransaction, MonitoredAddress, TransactionRequest
- Infrastructure: Prisma ORM, Redis cache, Kafka messaging, EVM blockchain adapters
- Application services: AddressDerivation, DepositDetection, BalanceQuery
- API: Health, Balance, Internal controllers with Swagger documentation
- Deployment: Docker, docker-compose, deploy.sh, health-check scripts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-06 20:54:58 -08:00
parent 6ff1868944
commit 50388c1115
107 changed files with 15186 additions and 0 deletions

View File

@ -0,0 +1,19 @@
{
"permissions": {
"allow": [
"Bash(Select-Object -First 50)",
"Bash(find:*)",
"Bash(npm install:*)",
"Bash(npx prisma generate:*)",
"Bash(npm run build:*)",
"Bash(npx prisma migrate dev:*)",
"Bash(copy:*)",
"Bash(docker compose:*)",
"Bash(powershell:*)",
"Bash(npm run lint)",
"Bash(git add:*)"
],
"deny": [],
"ask": []
}
}

View File

@ -0,0 +1,13 @@
node_modules
dist
.git
.gitignore
.env
.env.local
*.md
.vscode
.idea
coverage
test
*.log
npm-debug.log

View File

@ -0,0 +1,36 @@
# Application
NODE_ENV=development
PORT=3012
SERVICE_NAME=blockchain-service
# Database
DATABASE_URL=postgresql://rwa:rwa_password@localhost:5432/rwa_blockchain?schema=public
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=11
REDIS_PASSWORD=
# Kafka
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=blockchain-service
KAFKA_GROUP_ID=blockchain-service-group
# Blockchain - KAVA (EVM)
KAVA_RPC_URL=https://evm.kava.io
KAVA_CHAIN_ID=2222
KAVA_USDT_CONTRACT=0x919C1c267BC06a7039e03fcc2eF738525769109c
# Blockchain - BSC
BSC_RPC_URL=https://bsc-dataseed.binance.org
BSC_CHAIN_ID=56
BSC_USDT_CONTRACT=0x55d398326f99059fF775485246999027B3197955
# Block Scanner
BLOCK_SCAN_INTERVAL_MS=5000
BLOCK_CONFIRMATIONS_REQUIRED=12
BLOCK_SCAN_BATCH_SIZE=100
# Logging
LOG_LEVEL=debug

View File

@ -0,0 +1,25 @@
module.exports = {
parser: '@typescript-eslint/parser',
parserOptions: {
project: 'tsconfig.json',
tsconfigRootDir: __dirname,
sourceType: 'module',
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
'plugin:@typescript-eslint/recommended',
'plugin:prettier/recommended',
],
root: true,
env: {
node: true,
jest: true,
},
ignorePatterns: ['.eslintrc.js'],
rules: {
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'warn',
},
};

View File

@ -0,0 +1,32 @@
# Dependencies
node_modules/
# Build
dist/
# Environment
.env
.env.local
.env.*.local
# IDE
.idea/
.vscode/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Logs
logs/
*.log
npm-debug.log*
# Test
coverage/
# Prisma
prisma/*.db
prisma/*.db-journal

View File

@ -0,0 +1,7 @@
{
"singleQuote": true,
"trailingComma": "all",
"tabWidth": 2,
"semi": true,
"printWidth": 100
}

View File

@ -0,0 +1,51 @@
# Build stage
FROM node:20-alpine AS builder
WORKDIR /app
# Copy package files
COPY package*.json ./
COPY prisma ./prisma/
# Install dependencies
RUN npm ci
# Generate Prisma client
RUN npx prisma generate
# Copy source code
COPY . .
# Build
RUN npm run build
# Production stage
FROM node:20-alpine AS production
WORKDIR /app
# Copy package files
COPY package*.json ./
COPY prisma ./prisma/
# Install production dependencies only
RUN npm ci --only=production
# Generate Prisma client
RUN npx prisma generate
# Copy built application
COPY --from=builder /app/dist ./dist
# Set environment
ENV NODE_ENV=production
# Expose port
EXPOSE 3012
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:3012/health || exit 1
# Start application
CMD ["node", "dist/main.js"]

View File

@ -0,0 +1,158 @@
#!/bin/bash
# =============================================================================
# Blockchain Service - Individual Deployment Script
# =============================================================================
set -e
SERVICE_NAME="blockchain-service"
CONTAINER_NAME="rwa-blockchain-service"
IMAGE_NAME="services-blockchain-service"
PORT=3012
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
log_info() { echo -e "${BLUE}[INFO]${NC} $1"; }
log_success() { echo -e "${GREEN}[OK]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
# Get script directory
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SERVICES_DIR="$(dirname "$SCRIPT_DIR")"
# Load environment
if [ -f "$SERVICES_DIR/.env" ]; then
export $(cat "$SERVICES_DIR/.env" | grep -v '^#' | xargs)
fi
case "$1" in
build)
log_info "Building $SERVICE_NAME..."
docker build -t "$IMAGE_NAME" "$SCRIPT_DIR"
log_success "$SERVICE_NAME built successfully"
;;
build-no-cache)
log_info "Building $SERVICE_NAME (no cache)..."
docker build --no-cache -t "$IMAGE_NAME" "$SCRIPT_DIR"
log_success "$SERVICE_NAME built successfully"
;;
start)
log_info "Starting $SERVICE_NAME..."
cd "$SERVICES_DIR"
docker compose up -d "$SERVICE_NAME"
log_success "$SERVICE_NAME started"
;;
stop)
log_info "Stopping $SERVICE_NAME..."
docker stop "$CONTAINER_NAME" 2>/dev/null || true
docker rm "$CONTAINER_NAME" 2>/dev/null || true
log_success "$SERVICE_NAME stopped"
;;
restart)
$0 stop
$0 start
;;
logs)
docker logs -f "$CONTAINER_NAME"
;;
logs-tail)
docker logs --tail 100 "$CONTAINER_NAME"
;;
status)
if docker ps --format '{{.Names}}' | grep -q "^${CONTAINER_NAME}$"; then
log_success "$SERVICE_NAME is running"
docker ps --filter "name=$CONTAINER_NAME" --format "table {{.Status}}\t{{.Ports}}"
else
log_warn "$SERVICE_NAME is not running"
fi
;;
health)
log_info "Checking health of $SERVICE_NAME..."
if curl -sf "http://localhost:$PORT/health" > /dev/null 2>&1; then
log_success "$SERVICE_NAME is healthy"
else
log_error "$SERVICE_NAME health check failed"
exit 1
fi
;;
migrate)
log_info "Running migrations for $SERVICE_NAME..."
docker exec "$CONTAINER_NAME" npx prisma migrate deploy
log_success "Migrations completed"
;;
migrate-dev)
log_info "Running dev migrations for $SERVICE_NAME..."
docker exec "$CONTAINER_NAME" npx prisma migrate dev
;;
prisma-studio)
log_info "Starting Prisma Studio..."
docker exec -it "$CONTAINER_NAME" npx prisma studio
;;
shell)
log_info "Opening shell in $SERVICE_NAME container..."
docker exec -it "$CONTAINER_NAME" sh
;;
test)
log_info "Running tests for $SERVICE_NAME..."
cd "$SCRIPT_DIR"
npm test
;;
scan-blocks)
log_info "Manually triggering block scan..."
curl -X POST "http://localhost:$PORT/internal/scan-blocks" \
-H "Content-Type: application/json"
;;
check-balance)
if [ -z "$2" ] || [ -z "$3" ]; then
log_error "Usage: $0 check-balance <chain> <address>"
log_info "Example: $0 check-balance KAVA 0x1234..."
exit 1
fi
log_info "Checking balance on $2 for $3..."
curl -s "http://localhost:$PORT/balance?chainType=$2&address=$3" | jq '.'
;;
*)
echo "Usage: $0 {build|build-no-cache|start|stop|restart|logs|logs-tail|status|health|migrate|migrate-dev|prisma-studio|shell|test|scan-blocks|check-balance}"
echo ""
echo "Commands:"
echo " build - Build Docker image"
echo " build-no-cache - Build Docker image without cache"
echo " start - Start the service"
echo " stop - Stop the service"
echo " restart - Restart the service"
echo " logs - Follow logs"
echo " logs-tail - Show last 100 log lines"
echo " status - Show service status"
echo " health - Check service health"
echo " migrate - Run database migrations"
echo " migrate-dev - Run dev migrations"
echo " prisma-studio - Open Prisma Studio"
echo " shell - Open shell in container"
echo " test - Run tests locally"
echo " scan-blocks - Manually trigger block scanning"
echo " check-balance - Check address balance (usage: check-balance <chain> <address>)"
exit 1
;;
esac

View File

@ -0,0 +1,82 @@
version: '3.8'
services:
blockchain-service:
build:
context: .
dockerfile: Dockerfile
container_name: blockchain-service
ports:
- "3012:3012"
environment:
- NODE_ENV=development
- PORT=3012
- DATABASE_URL=postgresql://rwa:rwa_password@postgres:5432/rwa_blockchain?schema=public
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=11
- KAFKA_BROKERS=kafka:9092
- KAFKA_CLIENT_ID=blockchain-service
- KAFKA_GROUP_ID=blockchain-service-group
- KAVA_RPC_URL=https://evm.kava.io
- BSC_RPC_URL=https://bsc-dataseed.binance.org
depends_on:
- postgres
- redis
- kafka
networks:
- rwa-network
restart: unless-stopped
postgres:
image: postgres:15-alpine
container_name: blockchain-postgres
environment:
- POSTGRES_USER=rwa
- POSTGRES_PASSWORD=rwa_password
- POSTGRES_DB=rwa_blockchain
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- rwa-network
redis:
image: redis:7-alpine
container_name: blockchain-redis
command: redis-server --appendonly yes
volumes:
- redis-data:/data
ports:
- "6379:6379"
networks:
- rwa-network
kafka:
image: bitnami/kafka:3.6
container_name: blockchain-kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
- kafka-data:/bitnami/kafka
ports:
- "9092:9092"
networks:
- rwa-network
networks:
rwa-network:
driver: bridge
volumes:
postgres-data:
redis-data:
kafka-data:

View File

@ -0,0 +1,17 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true,
"plugins": [
{
"name": "@nestjs/swagger",
"options": {
"classValidatorShim": true,
"introspectComments": true
}
}
]
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,90 @@
{
"name": "blockchain-service",
"version": "1.0.0",
"description": "RWA Blockchain Infrastructure Service - Address derivation, deposit detection, transaction broadcast",
"author": "RWA Team",
"private": true,
"license": "UNLICENSED",
"prisma": {
"schema": "prisma/schema.prisma",
"seed": "ts-node prisma/seed.ts"
},
"scripts": {
"build": "nest build",
"format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"",
"start": "nest start",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:prod": "node dist/main",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"test": "jest",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json",
"prisma:generate": "prisma generate",
"prisma:migrate": "prisma migrate dev",
"prisma:migrate:prod": "prisma migrate deploy",
"prisma:studio": "prisma studio"
},
"dependencies": {
"@nestjs/axios": "^3.0.0",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
"@nestjs/microservices": "^10.0.0",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"@nestjs/swagger": "^7.1.17",
"@prisma/client": "^5.7.0",
"@scure/bip32": "^1.3.2",
"bech32": "^2.0.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"ethers": "^6.9.0",
"ioredis": "^5.3.2",
"kafkajs": "^2.2.4",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"uuid": "^9.0.0"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.2",
"@types/node": "^20.3.1",
"@types/supertest": "^6.0.0",
"@types/uuid": "^9.0.0",
"@typescript-eslint/eslint-plugin": "^6.0.0",
"@typescript-eslint/parser": "^6.0.0",
"eslint": "^8.42.0",
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-prettier": "^5.0.0",
"jest": "^29.5.0",
"prettier": "^3.0.0",
"prisma": "^5.7.0",
"source-map-support": "^0.5.21",
"supertest": "^6.3.3",
"ts-jest": "^29.1.0",
"ts-loader": "^9.4.3",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.1.3"
},
"jest": {
"moduleFileExtensions": ["js", "json", "ts"],
"rootDir": "src",
"testRegex": ".*\\.spec\\.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": ["**/*.(t|j)s"],
"coverageDirectory": "../coverage",
"testEnvironment": "node",
"moduleNameMapper": {
"^@/(.*)$": "<rootDir>/$1"
}
}
}

View File

@ -0,0 +1,173 @@
// This is your Prisma schema file,
// learn more about it in the docs: https://pris.ly/d/prisma-schema
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// ============================================
// 监控地址表
// 存储需要监听充值的地址
// ============================================
model MonitoredAddress {
id BigInt @id @default(autoincrement()) @map("address_id")
chainType String @map("chain_type") @db.VarChar(20) // KAVA, BSC
address String @db.VarChar(42) // 0x地址
userId BigInt @map("user_id") // 关联用户ID
isActive Boolean @default(true) @map("is_active") // 是否激活监听
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
deposits DepositTransaction[]
@@unique([chainType, address], name: "uk_chain_address")
@@index([userId], name: "idx_user")
@@index([chainType, isActive], name: "idx_chain_active")
@@map("monitored_addresses")
}
// ============================================
// 充值交易表 (Append-Only)
// 记录检测到的所有充值交易
// ============================================
model DepositTransaction {
id BigInt @id @default(autoincrement()) @map("deposit_id")
chainType String @map("chain_type") @db.VarChar(20)
txHash String @unique @map("tx_hash") @db.VarChar(66)
fromAddress String @map("from_address") @db.VarChar(42)
toAddress String @map("to_address") @db.VarChar(42)
tokenContract String @map("token_contract") @db.VarChar(42) // USDT合约地址
amount Decimal @db.Decimal(36, 18) // 原始金额
amountFormatted Decimal @map("amount_formatted") @db.Decimal(20, 8) // 格式化金额
blockNumber BigInt @map("block_number")
blockTimestamp DateTime @map("block_timestamp")
logIndex Int @map("log_index")
// 确认状态
confirmations Int @default(0)
status String @default("DETECTED") @db.VarChar(20) // DETECTED, CONFIRMING, CONFIRMED, NOTIFIED
// 关联
addressId BigInt @map("address_id")
userId BigInt @map("user_id")
// 通知状态
notifiedAt DateTime? @map("notified_at")
notifyAttempts Int @default(0) @map("notify_attempts")
lastNotifyError String? @map("last_notify_error") @db.Text
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
monitoredAddress MonitoredAddress @relation(fields: [addressId], references: [id])
@@index([chainType, status], name: "idx_chain_status")
@@index([userId], name: "idx_deposit_user")
@@index([blockNumber], name: "idx_block")
@@index([status, notifiedAt], name: "idx_pending_notify")
@@map("deposit_transactions")
}
// ============================================
// 区块扫描检查点 (每条链一条记录)
// 记录扫描进度,用于断点续扫
// ============================================
model BlockCheckpoint {
id BigInt @id @default(autoincrement()) @map("checkpoint_id")
chainType String @unique @map("chain_type") @db.VarChar(20)
lastScannedBlock BigInt @map("last_scanned_block")
lastScannedAt DateTime @map("last_scanned_at")
// 健康状态
isHealthy Boolean @default(true) @map("is_healthy")
lastError String? @map("last_error") @db.Text
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("block_checkpoints")
}
// ============================================
// 交易广播请求表
// 记录待广播和已广播的交易
// ============================================
model TransactionRequest {
id BigInt @id @default(autoincrement()) @map("request_id")
chainType String @map("chain_type") @db.VarChar(20)
// 请求来源
sourceService String @map("source_service") @db.VarChar(50)
sourceOrderId String @map("source_order_id") @db.VarChar(100)
// 交易数据
fromAddress String @map("from_address") @db.VarChar(42)
toAddress String @map("to_address") @db.VarChar(42)
value Decimal @db.Decimal(36, 18)
data String? @db.Text // 合约调用数据
// 签名数据 (由 MPC 服务提供)
signedTx String? @map("signed_tx") @db.Text
// 广播结果
txHash String? @map("tx_hash") @db.VarChar(66)
status String @default("PENDING") @db.VarChar(20) // PENDING, SIGNED, BROADCASTED, CONFIRMED, FAILED
// Gas 信息
gasLimit BigInt? @map("gas_limit")
gasPrice Decimal? @map("gas_price") @db.Decimal(36, 18)
nonce Int?
// 错误信息
errorMessage String? @map("error_message") @db.Text
retryCount Int @default(0) @map("retry_count")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@unique([sourceService, sourceOrderId], name: "uk_source_order")
@@index([chainType, status], name: "idx_tx_chain_status")
@@index([txHash], name: "idx_tx_hash")
@@map("transaction_requests")
}
// ============================================
// 区块链事件日志 (Append-Only 审计)
// ============================================
model BlockchainEvent {
id BigInt @id @default(autoincrement()) @map("event_id")
eventType String @map("event_type") @db.VarChar(50)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
eventData Json @map("event_data")
chainType String? @map("chain_type") @db.VarChar(20)
txHash String? @map("tx_hash") @db.VarChar(66)
occurredAt DateTime @default(now()) @map("occurred_at") @db.Timestamp(6)
@@index([aggregateType, aggregateId], name: "idx_event_aggregate")
@@index([eventType], name: "idx_event_type")
@@index([chainType], name: "idx_event_chain")
@@index([occurredAt], name: "idx_event_occurred")
@@map("blockchain_events")
}

View File

@ -0,0 +1,235 @@
# 测试和健康检查脚本
## 使用流程
### 1⃣ 启动基础服务
```bash
# 启动 Redis
redis-server --daemonize yes
# 或使用 Docker
docker compose up -d redis
```
### 2⃣ 启动 Blockchain Service
```bash
# 在项目根目录
npm run start:dev
```
### 3⃣ 运行健康检查
```bash
# 进入 scripts 目录
cd scripts
# 运行健康检查
./health-check.sh
```
**期望输出:**
```
🏥 开始健康检查...
=== 数据库服务 ===
Checking PostgreSQL ... ✓ OK
=== 缓存服务 ===
Checking Redis ... ✓ OK
=== 消息队列服务 ===
Checking Kafka ... ✓ OK
=== 区块链 RPC ===
Checking KAVA RPC ... ✓ OK
Checking BSC RPC ... ✓ OK
=== 应用服务 ===
Checking Blockchain Service ... ✓ OK
=== API 文档 ===
Checking Swagger UI ... ✓ OK
======================================
健康检查完成!
正常: 7
异常: 0
======================================
✓ 所有服务正常!
现在可以运行测试:
./scripts/quick-test.sh
```
### 4⃣ 运行快速功能测试
```bash
./quick-test.sh
```
这个脚本会自动测试所有核心功能:
- ✅ 健康检查
- ✅ 余额查询(单链/多链)
- ✅ 地址派生
- ✅ 用户地址查询
- ✅ 错误场景处理
- ✅ API 文档可访问性
---
## 脚本说明
### `health-check.sh`
- **作用**: 检查所有依赖服务是否正常运行
- **使用场景**: 部署前、调试时
- **检查项目**:
- PostgreSQL 数据库
- Redis 缓存
- Kafka 消息队列
- KAVA/BSC RPC 端点
- Blockchain Service 应用
### `quick-test.sh`
- **作用**: 快速测试所有核心 API 功能
- **使用场景**: 验证功能完整性、回归测试
- **前置条件**: `health-check.sh` 通过
### `start-all.sh`
- **作用**: 一键启动所有服务
- **使用场景**: 初次启动、快速启动环境
- **前置条件**: 依赖已安装
### `stop-service.sh`
- **作用**: 停止 Blockchain Service
- **使用场景**: 需要停止服务时
### `rebuild-kafka.sh`
- **作用**: 重建 Kafka 容器
- **使用场景**: Kafka 配置变更后
---
## 主要 API 端点
| 端点 | 方法 | 描述 |
|------|------|------|
| `/health` | GET | 健康检查 |
| `/health/ready` | GET | 就绪检查 |
| `/balance` | GET | 查询单链余额 |
| `/balance/multi-chain` | GET | 查询多链余额 |
| `/internal/derive-address` | POST | 从公钥派生地址 |
| `/internal/user/:userId/addresses` | GET | 获取用户地址 |
| `/api` | GET | Swagger 文档 |
---
## 部署脚本 (deploy.sh)
主部署脚本位于项目根目录,提供以下命令:
```bash
# 构建 Docker 镜像
./deploy.sh build
# 启动服务
./deploy.sh start
# 停止服务
./deploy.sh stop
# 重启服务
./deploy.sh restart
# 查看日志
./deploy.sh logs
# 健康检查
./deploy.sh health
# 运行数据库迁移
./deploy.sh migrate
# 打开 Prisma Studio
./deploy.sh prisma-studio
# 进入容器 shell
./deploy.sh shell
# 查询余额
./deploy.sh check-balance KAVA 0x1234...
# 触发区块扫描
./deploy.sh scan-blocks
```
---
## 常见问题
### Q: 为什么 RPC 检查失败?
**A:** 检查网络连接,或者 RPC 端点可能暂时不可用
### Q: Redis 启动失败?
**A:** 检查是否已经在运行
```bash
ps aux | grep redis
redis-cli shutdown # 如果已运行
redis-server --daemonize yes
```
### Q: Kafka 连接失败?
**A:** 重建 Kafka 容器
```bash
./scripts/rebuild-kafka.sh
```
---
## 完整测试流程
```bash
# 1. 进入项目目录
cd ~/work/rwadurian/backend/services/blockchain-service
# 2. 安装依赖(首次)
npm install
# 3. 生成 Prisma Client
npx prisma generate
# 4. 运行数据库迁移
npx prisma migrate dev
# 5. 启动所有服务
./scripts/start-all.sh
# 6. 运行健康检查
./scripts/health-check.sh
# 7. 运行快速测试
./scripts/quick-test.sh
# 8. 运行完整测试
npm test
npm run test:e2e
```
---
## 区块链特定测试
### 测试余额查询
```bash
# KAVA 链
curl "http://localhost:3012/balance?chainType=KAVA&address=0x..."
# 多链查询
curl "http://localhost:3012/balance/multi-chain?address=0x..."
```
### 测试地址派生
```bash
curl -X POST "http://localhost:3012/internal/derive-address" \
-H "Content-Type: application/json" \
-d '{
"userId": "12345",
"publicKey": "0x02..."
}'
```

View File

@ -0,0 +1,93 @@
#!/bin/bash
# 健康检查脚本 - 检查所有依赖服务是否正常
echo "🏥 开始健康检查..."
echo ""
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# 检查计数
PASS=0
FAIL=0
FAILED_SERVICES=()
# 检查函数
check_service() {
local service_name=$1
local check_command=$2
local fix_command=$3
echo -n "Checking $service_name ... "
if eval "$check_command" > /dev/null 2>&1; then
echo -e "${GREEN}✓ OK${NC}"
PASS=$((PASS + 1))
else
echo -e "${RED}✗ FAIL${NC}"
FAIL=$((FAIL + 1))
FAILED_SERVICES+=("$service_name:$fix_command")
fi
}
# 检查 PostgreSQL
echo -e "${YELLOW}=== 数据库服务 ===${NC}"
check_service "PostgreSQL" "pg_isready -h localhost -p 5432" "sudo systemctl start postgresql"
# 检查 Redis (支持 Docker 和本地)
echo -e "${YELLOW}=== 缓存服务 ===${NC}"
if command -v redis-cli &> /dev/null; then
check_service "Redis" "redis-cli -h localhost -p 6379 ping" "docker start blockchain-service-redis-1 或 redis-server --daemonize yes"
elif command -v docker &> /dev/null; then
check_service "Redis" "docker exec blockchain-service-redis-1 redis-cli ping" "docker start blockchain-service-redis-1"
else
check_service "Redis" "nc -zv localhost 6379" "docker start blockchain-service-redis-1"
fi
# 检查 Kafka
echo -e "${YELLOW}=== 消息队列服务 ===${NC}"
check_service "Kafka" "nc -zv localhost 9092" "启动 Kafka (需要手动启动)"
# 检查区块链 RPC
echo -e "${YELLOW}=== 区块链 RPC ===${NC}"
check_service "KAVA RPC" "curl -sf https://evm.kava.io -X POST -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1}'" "检查网络连接或 RPC 端点"
check_service "BSC RPC" "curl -sf https://bsc-dataseed.binance.org -X POST -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1}'" "检查网络连接或 RPC 端点"
# 检查应用服务
echo -e "${YELLOW}=== 应用服务 ===${NC}"
check_service "Blockchain Service" "curl -f http://localhost:3012/health" "npm run start:dev"
# 检查 Swagger 文档
echo -e "${YELLOW}=== API 文档 ===${NC}"
check_service "Swagger UI" "curl -f http://localhost:3012/api" "等待 Blockchain Service 启动"
echo ""
echo -e "${YELLOW}======================================${NC}"
echo -e "${YELLOW}健康检查完成!${NC}"
echo -e "${GREEN}正常: $PASS${NC}"
echo -e "${RED}异常: $FAIL${NC}"
echo -e "${YELLOW}======================================${NC}"
if [ $FAIL -eq 0 ]; then
echo -e "${GREEN}✓ 所有服务正常!${NC}"
echo ""
echo -e "${BLUE}现在可以运行测试:${NC}"
echo " ./scripts/quick-test.sh"
exit 0
else
echo -e "${RED}✗ 存在异常的服务!${NC}"
echo ""
echo -e "${BLUE}修复建议:${NC}"
for service_info in "${FAILED_SERVICES[@]}"; do
service_name="${service_info%%:*}"
fix_command="${service_info#*:}"
echo -e "${YELLOW}$service_name:${NC} $fix_command"
done
echo ""
echo -e "${BLUE}或者运行一键启动脚本:${NC}"
echo " ./scripts/start-all.sh"
exit 1
fi

View File

@ -0,0 +1,119 @@
#!/bin/bash
# 快速测试脚本 - 在本地环境快速验证核心功能
set -e
echo "🚀 开始快速测试 Blockchain Service..."
echo ""
# 颜色定义
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
BASE_URL="http://localhost:3012"
# 测试结果统计
PASS=0
FAIL=0
# 测试函数
test_api() {
local test_name=$1
local method=$2
local endpoint=$3
local data=$4
local expected_status=$5
echo -n "Testing: $test_name ... "
if [ -n "$data" ]; then
response=$(curl -s -w "\n%{http_code}" -X $method "$BASE_URL$endpoint" \
-H "Content-Type: application/json" \
-d "$data")
else
response=$(curl -s -w "\n%{http_code}" -X $method "$BASE_URL$endpoint" \
-H "Content-Type: application/json")
fi
status=$(echo "$response" | tail -n1)
body=$(echo "$response" | head -n-1)
if [ "$status" -eq "$expected_status" ]; then
echo -e "${GREEN}✓ PASS${NC}"
PASS=$((PASS + 1))
if command -v jq &> /dev/null && [ -n "$body" ]; then
echo "$body" | jq '.' 2>/dev/null || echo "$body"
else
echo "$body"
fi
else
echo -e "${RED}✗ FAIL${NC} (Expected: $expected_status, Got: $status)"
FAIL=$((FAIL + 1))
echo "$body"
fi
echo ""
}
# 1. 健康检查
echo -e "${YELLOW}=== 1. 健康检查 ===${NC}"
test_api "Health Check" "GET" "/health" "" 200
test_api "Ready Check" "GET" "/health/ready" "" 200
# 2. 余额查询测试
echo -e "${YELLOW}=== 2. 余额查询 ===${NC}"
# 使用一个已知的测试地址 (Binance Hot Wallet)
TEST_ADDRESS="0x8894E0a0c962CB723c1976a4421c95949bE2D4E3"
test_api "Query KAVA Balance" "GET" "/balance?chainType=KAVA&address=$TEST_ADDRESS" "" 200
test_api "Query Multi-Chain Balance" "GET" "/balance/multi-chain?address=$TEST_ADDRESS" "" 200
# 3. 地址派生测试
echo -e "${YELLOW}=== 3. 地址派生测试 ===${NC}"
# 测试用压缩公钥 (仅用于测试)
TEST_PUBLIC_KEY="0x02b4632d08485ff1df2db55b9dafd23347d1c47a457072a1e87be26896549a8737"
TEST_USER_ID="999999"
test_api "Derive Address" "POST" "/internal/derive-address" \
"{\"userId\": \"$TEST_USER_ID\", \"publicKey\": \"$TEST_PUBLIC_KEY\"}" \
201
# 4. 获取用户地址
echo -e "${YELLOW}=== 4. 获取用户地址 ===${NC}"
test_api "Get User Addresses" "GET" "/internal/user/$TEST_USER_ID/addresses" "" 200
# 5. 错误场景测试
echo -e "${YELLOW}=== 5. 错误场景测试 ===${NC}"
# 无效地址格式
test_api "Invalid Address Format" "GET" "/balance?chainType=KAVA&address=invalid" "" 400
# 无效链类型
test_api "Invalid Chain Type" "GET" "/balance?chainType=INVALID&address=$TEST_ADDRESS" "" 400
# 无效公钥格式
test_api "Invalid Public Key" "POST" "/internal/derive-address" \
"{\"userId\": \"1\", \"publicKey\": \"invalid\"}" \
400
# 6. API 文档测试
echo -e "${YELLOW}=== 6. API 文档 ===${NC}"
test_api "Swagger API Docs" "GET" "/api" "" 200
# 总结
echo ""
echo -e "${YELLOW}======================================${NC}"
echo -e "${YELLOW}测试完成!${NC}"
echo -e "${GREEN}通过: $PASS${NC}"
echo -e "${RED}失败: $FAIL${NC}"
echo -e "${YELLOW}======================================${NC}"
if [ $FAIL -eq 0 ]; then
echo -e "${GREEN}✓ 所有测试通过!${NC}"
exit 0
else
echo -e "${RED}✗ 存在失败的测试!${NC}"
exit 1
fi

View File

@ -0,0 +1,74 @@
#!/bin/bash
# 重建 Kafka 容器以应用新的配置
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m'
echo -e "${YELLOW}🔄 重建 Kafka 容器...${NC}"
echo ""
# 1. 停止 Blockchain Service (如果在运行)
echo -e "${BLUE}步骤 1: 停止 Blockchain Service${NC}"
PID=$(lsof -ti :3012 2>/dev/null)
if [ ! -z "$PID" ]; then
echo "停止 Blockchain Service (PID: $PID)..."
kill $PID
sleep 2
echo -e "${GREEN}✓ Blockchain Service 已停止${NC}"
else
echo -e "${YELLOW}⚠️ Blockchain Service 未在运行${NC}"
fi
echo ""
# 2. 停止并删除 Kafka 容器
echo -e "${BLUE}步骤 2: 停止并删除旧容器${NC}"
docker compose stop kafka 2>/dev/null || true
docker compose rm -f kafka 2>/dev/null || true
echo -e "${GREEN}✓ 旧容器已删除${NC}"
echo ""
# 3. 重新创建容器
echo -e "${BLUE}步骤 3: 创建新容器${NC}"
docker compose up -d kafka
echo "等待 Kafka 启动..."
sleep 20
echo -e "${GREEN}✓ Kafka 容器已创建${NC}"
echo ""
# 4. 验证配置
echo -e "${BLUE}步骤 4: 验证配置${NC}"
CONTAINER_NAME=$(docker compose ps -q kafka 2>/dev/null)
if [ ! -z "$CONTAINER_NAME" ]; then
ADVERTISED=$(docker inspect "$CONTAINER_NAME" 2>/dev/null | grep -A 1 "KAFKA_ADVERTISED_LISTENERS" | head -1)
if echo "$ADVERTISED" | grep -q "localhost:9092"; then
echo -e "${GREEN}✓ Kafka 配置已更新!${NC}"
echo "$ADVERTISED"
else
echo -e "${YELLOW}⚠ Kafka 配置可能需要检查${NC}"
fi
else
echo -e "${YELLOW}⚠ 未找到 Kafka 容器${NC}"
fi
echo ""
# 5. 测试连接
echo -e "${BLUE}步骤 5: 测试 Kafka 连接${NC}"
if nc -zv localhost 9092 2>&1 | grep -q "succeeded\|Connected"; then
echo -e "${GREEN}✓ Kafka 端口可访问${NC}"
else
echo -e "${RED}✗ Kafka 端口不可访问${NC}"
exit 1
fi
echo ""
echo -e "${YELLOW}======================================${NC}"
echo -e "${GREEN}✓ Kafka 重建完成!${NC}"
echo -e "${YELLOW}======================================${NC}"
echo ""
echo -e "${BLUE}下一步:${NC}"
echo "1. 启动 Blockchain Service: npm run start:dev"
echo "2. 运行健康检查: ./scripts/health-check.sh"
echo "3. 运行快速测试: ./scripts/quick-test.sh"

View File

@ -0,0 +1,66 @@
#!/bin/bash
# 一键启动所有服务
set -e
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
echo -e "${YELLOW}🚀 启动所有服务...${NC}"
echo ""
# 1. 启动 Redis
echo -e "${YELLOW}启动 Redis...${NC}"
if ! pgrep -x "redis-server" > /dev/null; then
if command -v redis-server &> /dev/null; then
redis-server --daemonize yes
echo -e "${GREEN}✓ Redis 已启动${NC}"
else
echo -e "${YELLOW}⚠ Redis 未安装,尝试使用 Docker...${NC}"
docker start blockchain-service-redis-1 2>/dev/null || docker compose up -d redis
fi
else
echo -e "${GREEN}✓ Redis 已在运行${NC}"
fi
# 2. 检查 PostgreSQL
echo -e "${YELLOW}检查 PostgreSQL...${NC}"
if pg_isready -h localhost -p 5432 > /dev/null 2>&1; then
echo -e "${GREEN}✓ PostgreSQL 已在运行${NC}"
else
echo -e "${YELLOW}⚠ PostgreSQL 未运行,请手动启动${NC}"
fi
# 3. 检查 Kafka
echo -e "${YELLOW}检查 Kafka...${NC}"
if nc -zv localhost 9092 > /dev/null 2>&1; then
echo -e "${GREEN}✓ Kafka 已在运行${NC}"
else
echo -e "${YELLOW}⚠ Kafka 未运行,请手动启动${NC}"
fi
# 4. 启动 Blockchain Service
echo -e "${YELLOW}启动 Blockchain Service...${NC}"
cd "$(dirname "$0")/.."
npm run start:dev &
# 等待服务启动
echo "等待服务启动 (最多 30 秒)..."
for i in {1..30}; do
if curl -f http://localhost:3012/health > /dev/null 2>&1; then
echo -e "${GREEN}✓ Blockchain Service 已启动${NC}"
break
fi
sleep 1
echo -n "."
done
echo ""
echo -e "${GREEN}✓ 所有服务已启动!${NC}"
echo ""
echo "运行健康检查:"
echo " ./scripts/health-check.sh"
echo ""
echo "运行快速测试:"
echo " ./scripts/quick-test.sh"

View File

@ -0,0 +1,44 @@
#!/bin/bash
# 停止 Blockchain Service
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
echo -e "${YELLOW}🛑 停止 Blockchain Service...${NC}"
# 查找监听 3012 端口的进程
PID=$(lsof -ti :3012)
if [ -z "$PID" ]; then
echo -e "${YELLOW}⚠️ Blockchain Service 未在运行${NC}"
exit 0
fi
echo "找到进程: PID=$PID"
# 尝试优雅停止
echo "发送 SIGTERM 信号..."
kill $PID
# 等待进程结束
for i in {1..10}; do
if ! kill -0 $PID 2>/dev/null; then
echo -e "${GREEN}✓ Blockchain Service 已停止${NC}"
exit 0
fi
sleep 1
echo -n "."
done
echo ""
echo -e "${YELLOW}⚠️ 进程未响应,强制停止...${NC}"
kill -9 $PID
if ! kill -0 $PID 2>/dev/null; then
echo -e "${GREEN}✓ Blockchain Service 已强制停止${NC}"
else
echo -e "${RED}✗ 无法停止进程${NC}"
exit 1
fi

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { ApplicationModule } from '@/application/application.module';
import { HealthController, BalanceController, InternalController } from './controllers';
@Module({
imports: [ApplicationModule],
controllers: [HealthController, BalanceController, InternalController],
})
export class ApiModule {}

View File

@ -0,0 +1,36 @@
import { Controller, Get, Query } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { BalanceQueryService } from '@/application/services/balance-query.service';
import { QueryBalanceDto, QueryMultiChainBalanceDto } from '../dto/request';
import { BalanceResponseDto, MultiChainBalanceResponseDto } from '../dto/response';
import { ChainType } from '@/domain/value-objects';
@ApiTags('Balance')
@Controller('balance')
export class BalanceController {
constructor(private readonly balanceService: BalanceQueryService) {}
@Get()
@ApiOperation({ summary: '查询单链余额' })
@ApiResponse({ status: 200, description: '余额信息', type: BalanceResponseDto })
async getBalance(@Query() dto: QueryBalanceDto): Promise<BalanceResponseDto> {
if (!dto.chainType) {
throw new Error('chainType is required');
}
const chainType = ChainType.create(dto.chainType);
return this.balanceService.getBalance(chainType, dto.address);
}
@Get('multi-chain')
@ApiOperation({ summary: '查询多链余额' })
@ApiResponse({ status: 200, description: '多链余额信息', type: MultiChainBalanceResponseDto })
async getMultiChainBalance(
@Query() dto: QueryMultiChainBalanceDto,
): Promise<MultiChainBalanceResponseDto> {
const balances = await this.balanceService.getBalances(dto.address, dto.chainTypes);
return {
address: dto.address,
balances,
};
}
}

View File

@ -0,0 +1,28 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
@ApiTags('Health')
@Controller('health')
export class HealthController {
@Get()
@ApiOperation({ summary: '健康检查' })
@ApiResponse({ status: 200, description: '服务健康' })
check() {
return {
status: 'ok',
service: 'blockchain-service',
timestamp: new Date().toISOString(),
};
}
@Get('ready')
@ApiOperation({ summary: '就绪检查' })
@ApiResponse({ status: 200, description: '服务就绪' })
ready() {
return {
status: 'ready',
service: 'blockchain-service',
timestamp: new Date().toISOString(),
};
}
}

View File

@ -0,0 +1,3 @@
export * from './health.controller';
export * from './balance.controller';
export * from './internal.controller';

View File

@ -0,0 +1,48 @@
import { Controller, Post, Body, Get, Param } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { AddressDerivationService } from '@/application/services/address-derivation.service';
import { DeriveAddressDto } from '../dto/request';
import { DeriveAddressResponseDto } from '../dto/response';
/**
* API
*
*/
@ApiTags('Internal')
@Controller('internal')
export class InternalController {
constructor(private readonly addressDerivationService: AddressDerivationService) {}
@Post('derive-address')
@ApiOperation({ summary: '从公钥派生地址' })
@ApiResponse({ status: 201, description: '派生成功', type: DeriveAddressResponseDto })
async deriveAddress(@Body() dto: DeriveAddressDto): Promise<DeriveAddressResponseDto> {
const result = await this.addressDerivationService.deriveAndRegister(
BigInt(dto.userId),
dto.publicKey,
);
return {
userId: result.userId.toString(),
publicKey: result.publicKey,
addresses: result.addresses.map((a) => ({
chainType: a.chainType,
address: a.address,
})),
};
}
@Get('user/:userId/addresses')
@ApiOperation({ summary: '获取用户的所有地址' })
async getUserAddresses(@Param('userId') userId: string) {
const addresses = await this.addressDerivationService.getUserAddresses(BigInt(userId));
return {
userId,
addresses: addresses.map((a) => ({
chainType: a.chainType.toString(),
address: a.address.toString(),
isActive: a.isActive,
})),
};
}
}

View File

@ -0,0 +1,2 @@
export * from './request';
export * from './response';

View File

@ -0,0 +1,15 @@
import { IsString, IsNumberString } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
export class DeriveAddressDto {
@ApiProperty({ description: '用户ID', example: '12345' })
@IsNumberString()
userId: string;
@ApiProperty({
description: '压缩公钥 (33 bytes, 0x02/0x03 开头)',
example: '0x02abc123...',
})
@IsString()
publicKey: string;
}

View File

@ -0,0 +1,2 @@
export * from './query-balance.dto';
export * from './derive-address.dto';

View File

@ -0,0 +1,34 @@
import { IsString, IsOptional, IsEnum, IsArray } from 'class-validator';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { ChainTypeEnum } from '@/domain/enums';
export class QueryBalanceDto {
@ApiProperty({ description: '钱包地址', example: '0x1234...' })
@IsString()
address: string;
@ApiPropertyOptional({
description: '链类型',
enum: ChainTypeEnum,
example: ChainTypeEnum.KAVA,
})
@IsOptional()
@IsEnum(ChainTypeEnum)
chainType?: ChainTypeEnum;
}
export class QueryMultiChainBalanceDto {
@ApiProperty({ description: '钱包地址', example: '0x1234...' })
@IsString()
address: string;
@ApiPropertyOptional({
description: '链类型列表',
type: [String],
enum: ChainTypeEnum,
})
@IsOptional()
@IsArray()
@IsEnum(ChainTypeEnum, { each: true })
chainTypes?: ChainTypeEnum[];
}

View File

@ -0,0 +1,20 @@
import { ApiProperty } from '@nestjs/swagger';
export class DerivedAddressDto {
@ApiProperty({ description: '链类型' })
chainType: string;
@ApiProperty({ description: '钱包地址' })
address: string;
}
export class DeriveAddressResponseDto {
@ApiProperty({ description: '用户ID' })
userId: string;
@ApiProperty({ description: '公钥' })
publicKey: string;
@ApiProperty({ description: '派生的地址列表', type: [DerivedAddressDto] })
addresses: DerivedAddressDto[];
}

View File

@ -0,0 +1,26 @@
import { ApiProperty } from '@nestjs/swagger';
export class BalanceResponseDto {
@ApiProperty({ description: '链类型' })
chainType: string;
@ApiProperty({ description: '钱包地址' })
address: string;
@ApiProperty({ description: 'USDT 余额' })
usdtBalance: string;
@ApiProperty({ description: '原生代币余额' })
nativeBalance: string;
@ApiProperty({ description: '原生代币符号' })
nativeSymbol: string;
}
export class MultiChainBalanceResponseDto {
@ApiProperty({ description: '钱包地址' })
address: string;
@ApiProperty({ description: '各链余额', type: [BalanceResponseDto] })
balances: BalanceResponseDto[];
}

View File

@ -0,0 +1,2 @@
export * from './balance.dto';
export * from './address.dto';

View File

@ -0,0 +1,17 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from '@/api/api.module';
import { appConfig, databaseConfig, redisConfig, kafkaConfig, blockchainConfig } from '@/config';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
load: [appConfig, databaseConfig, redisConfig, kafkaConfig, blockchainConfig],
}),
ScheduleModule.forRoot(),
ApiModule,
],
})
export class AppModule {}

View File

@ -0,0 +1,24 @@
import { Module } from '@nestjs/common';
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
import { AddressDerivationService, DepositDetectionService, BalanceQueryService } from './services';
import { MpcKeygenCompletedHandler } from './event-handlers';
@Module({
imports: [InfrastructureModule],
providers: [
// 应用服务
AddressDerivationService,
DepositDetectionService,
BalanceQueryService,
// 事件处理器
MpcKeygenCompletedHandler,
],
exports: [
AddressDerivationService,
DepositDetectionService,
BalanceQueryService,
MpcKeygenCompletedHandler,
],
})
export class ApplicationModule {}

View File

@ -0,0 +1 @@
export * from './mpc-keygen-completed.handler';

View File

@ -0,0 +1,38 @@
import { Injectable, Logger } from '@nestjs/common';
import { AddressDerivationService } from '../services/address-derivation.service';
export interface MpcKeygenCompletedPayload {
userId: string;
deviceId: string;
publicKey: string;
keyType: string;
}
/**
* MPC
*/
@Injectable()
export class MpcKeygenCompletedHandler {
private readonly logger = new Logger(MpcKeygenCompletedHandler.name);
constructor(private readonly addressDerivationService: AddressDerivationService) {}
/**
* MPC
*/
async handle(payload: MpcKeygenCompletedPayload): Promise<void> {
this.logger.log(`Handling MPC keygen completed for user: ${payload.userId}`);
try {
const result = await this.addressDerivationService.deriveAndRegister(
BigInt(payload.userId),
payload.publicKey,
);
this.logger.log(`Derived ${result.addresses.length} addresses for user ${payload.userId}`);
} catch (error) {
this.logger.error(`Failed to derive addresses for user ${payload.userId}:`, error);
throw error;
}
}
}

View File

@ -0,0 +1,98 @@
import { Injectable, Logger, Inject } from '@nestjs/common';
import {
AddressDerivationAdapter,
DerivedAddress,
} from '@/infrastructure/blockchain/address-derivation.adapter';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import {
MONITORED_ADDRESS_REPOSITORY,
IMonitoredAddressRepository,
} from '@/domain/repositories/monitored-address.repository.interface';
import { MonitoredAddress } from '@/domain/aggregates/monitored-address';
import { WalletAddressCreatedEvent } from '@/domain/events';
import { ChainType, EvmAddress } from '@/domain/value-objects';
export interface DeriveAddressResult {
userId: bigint;
publicKey: string;
addresses: DerivedAddress[];
}
/**
*
* MPC
*/
@Injectable()
export class AddressDerivationService {
private readonly logger = new Logger(AddressDerivationService.name);
constructor(
private readonly addressDerivation: AddressDerivationAdapter,
private readonly addressCache: AddressCacheService,
private readonly eventPublisher: EventPublisherService,
@Inject(MONITORED_ADDRESS_REPOSITORY)
private readonly monitoredAddressRepo: IMonitoredAddressRepository,
) {}
/**
*
*/
async deriveAndRegister(userId: bigint, publicKey: string): Promise<DeriveAddressResult> {
this.logger.log(`Deriving addresses for user ${userId} from public key`);
// 1. 派生所有链的地址
const derivedAddresses = this.addressDerivation.deriveAllAddresses(publicKey);
// 2. 为每个链注册监控地址
for (const derived of derivedAddresses) {
const chainType = ChainType.fromEnum(derived.chainType);
const address = EvmAddress.create(derived.address);
// 检查是否已存在
const exists = await this.monitoredAddressRepo.existsByChainAndAddress(chainType, address);
if (!exists) {
// 创建监控地址
const monitored = MonitoredAddress.create({
chainType,
address,
userId,
});
await this.monitoredAddressRepo.save(monitored);
// 添加到缓存
await this.addressCache.addAddress(chainType, address.lowercase);
this.logger.log(`Registered address: ${derived.chainType} - ${derived.address}`);
} else {
this.logger.debug(`Address already registered: ${derived.chainType} - ${derived.address}`);
}
}
// 3. 发布钱包地址创建事件
const event = new WalletAddressCreatedEvent({
userId: userId.toString(),
publicKey,
addresses: derivedAddresses.map((a) => ({
chainType: a.chainType,
address: a.address,
})),
});
await this.eventPublisher.publish(event);
return {
userId,
publicKey,
addresses: derivedAddresses,
};
}
/**
*
*/
async getUserAddresses(userId: bigint): Promise<MonitoredAddress[]> {
return this.monitoredAddressRepo.findByUserId(userId);
}
}

View File

@ -0,0 +1,89 @@
import { Injectable, Logger } from '@nestjs/common';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import { ChainType } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
export interface BalanceResult {
chainType: string;
address: string;
usdtBalance: string;
nativeBalance: string;
nativeSymbol: string;
}
/**
*
*/
@Injectable()
export class BalanceQueryService {
private readonly logger = new Logger(BalanceQueryService.name);
constructor(
private readonly evmProvider: EvmProviderAdapter,
private readonly chainConfig: ChainConfigService,
) {}
/**
*
*/
async getBalance(chainType: ChainType, address: string): Promise<BalanceResult> {
const config = this.chainConfig.getConfig(chainType);
const [usdtBalance, nativeBalance] = await Promise.all([
this.evmProvider.getTokenBalance(chainType, config.usdtContract, address),
this.evmProvider.getNativeBalance(chainType, address),
]);
return {
chainType: chainType.toString(),
address,
usdtBalance: usdtBalance.formatted,
nativeBalance: nativeBalance.formatted,
nativeSymbol: config.nativeSymbol,
};
}
/**
*
*/
async getBalances(address: string, chainTypes?: ChainTypeEnum[]): Promise<BalanceResult[]> {
const chains = chainTypes || this.chainConfig.getSupportedChains();
const results: BalanceResult[] = [];
for (const chainTypeEnum of chains) {
try {
const chainType = ChainType.fromEnum(chainTypeEnum);
const balance = await this.getBalance(chainType, address);
results.push(balance);
} catch (error) {
this.logger.error(`Error querying balance for ${chainTypeEnum}:`, error);
}
}
return results;
}
/**
*
*/
async getBatchBalances(
chainType: ChainType,
addresses: string[],
): Promise<Map<string, BalanceResult>> {
const results = new Map<string, BalanceResult>();
await Promise.all(
addresses.map(async (address) => {
try {
const balance = await this.getBalance(chainType, address);
results.set(address.toLowerCase(), balance);
} catch (error) {
this.logger.error(`Error querying balance for ${address}:`, error);
}
}),
);
return results;
}
}

View File

@ -0,0 +1,210 @@
import { Injectable, Logger, Inject, OnModuleInit } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import {
BlockScannerService,
DepositEvent,
} from '@/infrastructure/blockchain/block-scanner.service';
import { EvmProviderAdapter } from '@/infrastructure/blockchain/evm-provider.adapter';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ConfirmationPolicyService } from '@/domain/services/confirmation-policy.service';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import {
DEPOSIT_TRANSACTION_REPOSITORY,
IDepositTransactionRepository,
} from '@/domain/repositories/deposit-transaction.repository.interface';
import {
MONITORED_ADDRESS_REPOSITORY,
IMonitoredAddressRepository,
} from '@/domain/repositories/monitored-address.repository.interface';
import {
BLOCK_CHECKPOINT_REPOSITORY,
IBlockCheckpointRepository,
} from '@/domain/repositories/block-checkpoint.repository.interface';
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
/**
*
*
*/
@Injectable()
export class DepositDetectionService implements OnModuleInit {
private readonly logger = new Logger(DepositDetectionService.name);
constructor(
private readonly blockScanner: BlockScannerService,
private readonly evmProvider: EvmProviderAdapter,
private readonly eventPublisher: EventPublisherService,
private readonly addressCache: AddressCacheService,
private readonly confirmationPolicy: ConfirmationPolicyService,
private readonly chainConfig: ChainConfigService,
@Inject(DEPOSIT_TRANSACTION_REPOSITORY)
private readonly depositRepo: IDepositTransactionRepository,
@Inject(MONITORED_ADDRESS_REPOSITORY)
private readonly monitoredAddressRepo: IMonitoredAddressRepository,
@Inject(BLOCK_CHECKPOINT_REPOSITORY)
private readonly checkpointRepo: IBlockCheckpointRepository,
) {}
async onModuleInit() {
// 初始化地址缓存
await this.initializeAddressCache();
this.logger.log('DepositDetectionService initialized');
}
/**
*
*/
private async initializeAddressCache(): Promise<void> {
for (const chainTypeEnum of this.chainConfig.getSupportedChains()) {
const chainType = ChainType.fromEnum(chainTypeEnum);
const addresses = await this.monitoredAddressRepo.getAllActiveAddresses(chainType);
await this.addressCache.reloadCache(chainType, addresses);
this.logger.log(`Loaded ${addresses.length} addresses for ${chainTypeEnum} into cache`);
}
}
/**
* 5
*/
@Cron(CronExpression.EVERY_5_SECONDS)
async scanBlocks(): Promise<void> {
for (const chainTypeEnum of this.chainConfig.getSupportedChains()) {
try {
await this.scanChain(ChainType.fromEnum(chainTypeEnum));
} catch (error) {
this.logger.error(`Error scanning ${chainTypeEnum}:`, error);
await this.checkpointRepo.recordError(
ChainType.fromEnum(chainTypeEnum),
error instanceof Error ? error.message : 'Unknown error',
);
}
}
}
/**
*
*/
private async scanChain(chainType: ChainType): Promise<void> {
// 获取上次扫描位置
let lastBlock = await this.checkpointRepo.getLastScannedBlock(chainType);
if (!lastBlock) {
// 首次扫描,从当前区块开始
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
lastBlock = currentBlock.subtract(10); // 从10个块前开始
await this.checkpointRepo.initializeIfNotExists(chainType, lastBlock);
}
// 执行扫描
const { deposits, newLastBlock } = await this.blockScanner.executeScan(chainType, lastBlock);
// 处理检测到的充值
for (const deposit of deposits) {
await this.processDeposit(deposit);
}
// 更新检查点
if (newLastBlock.isGreaterThan(lastBlock)) {
await this.checkpointRepo.updateCheckpoint(chainType, newLastBlock);
}
}
/**
*
*/
private async processDeposit(event: DepositEvent): Promise<void> {
const txHash = TxHash.create(event.txHash);
// 检查是否已处理
if (await this.depositRepo.existsByTxHash(txHash)) {
this.logger.debug(`Deposit already processed: ${event.txHash}`);
return;
}
const chainType = ChainType.fromEnum(event.chainType);
// 查找监控地址以获取用户ID
const monitoredAddress = await this.monitoredAddressRepo.findByChainAndAddress(
chainType,
EvmAddress.fromUnchecked(event.to),
);
if (!monitoredAddress || !monitoredAddress.id) {
this.logger.warn(`Monitored address not found: ${event.to}`);
return;
}
// 创建充值记录
const deposit = DepositTransaction.create({
chainType,
txHash,
fromAddress: EvmAddress.fromUnchecked(event.from),
toAddress: EvmAddress.fromUnchecked(event.to),
tokenContract: EvmAddress.fromUnchecked(event.tokenContract),
amount: TokenAmount.fromRaw(event.value, 18),
blockNumber: BlockNumber.create(event.blockNumber),
blockTimestamp: event.blockTimestamp,
logIndex: event.logIndex,
addressId: monitoredAddress.id,
userId: monitoredAddress.userId,
});
// 保存
await this.depositRepo.save(deposit);
// 发布事件
for (const domainEvent of deposit.domainEvents) {
await this.eventPublisher.publish(domainEvent);
}
deposit.clearDomainEvents();
this.logger.log(
`New deposit saved: ${txHash.toShort()} -> ${event.to} (${deposit.amount.formatted} USDT)`,
);
}
/**
* 30
*/
@Cron(CronExpression.EVERY_30_SECONDS)
async updateConfirmations(): Promise<void> {
for (const chainTypeEnum of this.chainConfig.getSupportedChains()) {
try {
await this.updateChainConfirmations(ChainType.fromEnum(chainTypeEnum));
} catch (error) {
this.logger.error(`Error updating confirmations for ${chainTypeEnum}:`, error);
}
}
}
/**
*
*/
private async updateChainConfirmations(chainType: ChainType): Promise<void> {
const pendingDeposits = await this.depositRepo.findPendingConfirmation(chainType);
if (pendingDeposits.length === 0) return;
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
const requiredConfirmations = this.confirmationPolicy.getRequiredConfirmations(chainType);
for (const deposit of pendingDeposits) {
deposit.updateConfirmations(currentBlock, requiredConfirmations);
await this.depositRepo.save(deposit);
// 发布确认事件
for (const event of deposit.domainEvents) {
await this.eventPublisher.publish(event);
}
deposit.clearDomainEvents();
if (deposit.isConfirmed) {
this.logger.log(
`Deposit confirmed: ${deposit.txHash.toShort()} (${deposit.confirmations} confirmations)`,
);
}
}
}
}

View File

@ -0,0 +1,3 @@
export * from './address-derivation.service';
export * from './deposit-detection.service';
export * from './balance-query.service';

View File

@ -0,0 +1,7 @@
import { registerAs } from '@nestjs/config';
export default registerAs('app', () => ({
nodeEnv: process.env.NODE_ENV || 'development',
port: parseInt(process.env.PORT || '3012', 10),
serviceName: process.env.SERVICE_NAME || 'blockchain-service',
}));

View File

@ -0,0 +1,24 @@
import { registerAs } from '@nestjs/config';
export default registerAs('blockchain', () => ({
// 通用配置
scanIntervalMs: parseInt(process.env.BLOCK_SCAN_INTERVAL_MS || '5000', 10),
confirmationsRequired: parseInt(process.env.BLOCK_CONFIRMATIONS_REQUIRED || '12', 10),
scanBatchSize: parseInt(process.env.BLOCK_SCAN_BATCH_SIZE || '100', 10),
// KAVA 配置
kava: {
rpcUrl: process.env.KAVA_RPC_URL || 'https://evm.kava.io',
chainId: parseInt(process.env.KAVA_CHAIN_ID || '2222', 10),
usdtContract: process.env.KAVA_USDT_CONTRACT || '0x919C1c267BC06a7039e03fcc2eF738525769109c',
confirmations: parseInt(process.env.KAVA_CONFIRMATIONS || '12', 10),
},
// BSC 配置
bsc: {
rpcUrl: process.env.BSC_RPC_URL || 'https://bsc-dataseed.binance.org',
chainId: parseInt(process.env.BSC_CHAIN_ID || '56', 10),
usdtContract: process.env.BSC_USDT_CONTRACT || '0x55d398326f99059fF775485246999027B3197955',
confirmations: parseInt(process.env.BSC_CONFIRMATIONS || '15', 10),
},
}));

View File

@ -0,0 +1,5 @@
import { registerAs } from '@nestjs/config';
export default registerAs('database', () => ({
url: process.env.DATABASE_URL,
}));

View File

@ -0,0 +1,5 @@
export { default as appConfig } from './app.config';
export { default as databaseConfig } from './database.config';
export { default as redisConfig } from './redis.config';
export { default as kafkaConfig } from './kafka.config';
export { default as blockchainConfig } from './blockchain.config';

View File

@ -0,0 +1,7 @@
import { registerAs } from '@nestjs/config';
export default registerAs('kafka', () => ({
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
clientId: process.env.KAFKA_CLIENT_ID || 'blockchain-service',
groupId: process.env.KAFKA_GROUP_ID || 'blockchain-service-group',
}));

View File

@ -0,0 +1,8 @@
import { registerAs } from '@nestjs/config';
export default registerAs('redis', () => ({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
db: parseInt(process.env.REDIS_DB || '11', 10),
password: process.env.REDIS_PASSWORD || undefined,
}));

View File

@ -0,0 +1,44 @@
import { DomainEvent } from '@/domain/events/domain-event.base';
/**
*
*
*
*/
export abstract class AggregateRoot<TId = bigint> {
protected readonly _domainEvents: DomainEvent[] = [];
/**
*
*/
abstract get id(): TId | undefined;
/**
*
*/
get domainEvents(): ReadonlyArray<DomainEvent> {
return [...this._domainEvents];
}
/**
*
* @param event
*/
protected addDomainEvent(event: DomainEvent): void {
this._domainEvents.push(event);
}
/**
*
*/
clearDomainEvents(): void {
this._domainEvents.length = 0;
}
/**
*
*/
hasDomainEvents(): boolean {
return this._domainEvents.length > 0;
}
}

View File

@ -0,0 +1,203 @@
import { AggregateRoot } from '../aggregate-root.base';
import { DepositDetectedEvent, DepositConfirmedEvent } from '@/domain/events';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
export interface DepositTransactionProps {
id?: bigint;
chainType: ChainType;
txHash: TxHash;
fromAddress: EvmAddress;
toAddress: EvmAddress;
tokenContract: EvmAddress;
amount: TokenAmount;
blockNumber: BlockNumber;
blockTimestamp: Date;
logIndex: number;
confirmations: number;
status: DepositStatus;
addressId: bigint;
userId: bigint;
notifiedAt?: Date;
notifyAttempts: number;
lastNotifyError?: string;
createdAt?: Date;
updatedAt?: Date;
}
export class DepositTransaction extends AggregateRoot<bigint> {
private props: DepositTransactionProps;
private constructor(props: DepositTransactionProps) {
super();
this.props = props;
}
// Getters
get id(): bigint | undefined {
return this.props.id;
}
get chainType(): ChainType {
return this.props.chainType;
}
get txHash(): TxHash {
return this.props.txHash;
}
get fromAddress(): EvmAddress {
return this.props.fromAddress;
}
get toAddress(): EvmAddress {
return this.props.toAddress;
}
get tokenContract(): EvmAddress {
return this.props.tokenContract;
}
get amount(): TokenAmount {
return this.props.amount;
}
get blockNumber(): BlockNumber {
return this.props.blockNumber;
}
get blockTimestamp(): Date {
return this.props.blockTimestamp;
}
get logIndex(): number {
return this.props.logIndex;
}
get confirmations(): number {
return this.props.confirmations;
}
get status(): DepositStatus {
return this.props.status;
}
get addressId(): bigint {
return this.props.addressId;
}
get userId(): bigint {
return this.props.userId;
}
get notifiedAt(): Date | undefined {
return this.props.notifiedAt;
}
get notifyAttempts(): number {
return this.props.notifyAttempts;
}
get lastNotifyError(): string | undefined {
return this.props.lastNotifyError;
}
get createdAt(): Date | undefined {
return this.props.createdAt;
}
get updatedAt(): Date | undefined {
return this.props.updatedAt;
}
get isConfirmed(): boolean {
return this.props.status === DepositStatus.CONFIRMED;
}
get isNotified(): boolean {
return this.props.status === DepositStatus.NOTIFIED;
}
/**
*
*/
static create(params: {
chainType: ChainType;
txHash: TxHash;
fromAddress: EvmAddress;
toAddress: EvmAddress;
tokenContract: EvmAddress;
amount: TokenAmount;
blockNumber: BlockNumber;
blockTimestamp: Date;
logIndex: number;
addressId: bigint;
userId: bigint;
}): DepositTransaction {
const deposit = new DepositTransaction({
...params,
confirmations: 0,
status: DepositStatus.DETECTED,
notifyAttempts: 0,
});
deposit.addDomainEvent(
new DepositDetectedEvent({
depositId: '0', // Will be set after persistence
chainType: params.chainType.toString(),
txHash: params.txHash.toString(),
fromAddress: params.fromAddress.toString(),
toAddress: params.toAddress.toString(),
tokenContract: params.tokenContract.toString(),
amount: params.amount.raw.toString(),
amountFormatted: params.amount.toFixed(8),
blockNumber: params.blockNumber.toString(),
blockTimestamp: params.blockTimestamp.toISOString(),
userId: params.userId.toString(),
}),
);
return deposit;
}
/**
*
*/
static reconstitute(props: DepositTransactionProps): DepositTransaction {
return new DepositTransaction(props);
}
/**
*
*/
updateConfirmations(currentBlockNumber: BlockNumber, requiredConfirmations: number): void {
const confirmations = Number(currentBlockNumber.diff(this.props.blockNumber));
this.props.confirmations = Math.max(0, confirmations);
if (
this.props.confirmations >= requiredConfirmations &&
this.props.status === DepositStatus.DETECTED
) {
this.confirm();
} else if (this.props.status === DepositStatus.DETECTED) {
this.props.status = DepositStatus.CONFIRMING;
}
}
/**
*
*/
private confirm(): void {
this.props.status = DepositStatus.CONFIRMED;
this.addDomainEvent(
new DepositConfirmedEvent({
depositId: this.props.id?.toString() ?? '0',
chainType: this.props.chainType.toString(),
txHash: this.props.txHash.toString(),
toAddress: this.props.toAddress.toString(),
amount: this.props.amount.raw.toString(),
amountFormatted: this.props.amount.toFixed(8),
confirmations: this.props.confirmations,
userId: this.props.userId.toString(),
}),
);
}
/**
*
*/
markAsNotified(): void {
this.props.status = DepositStatus.NOTIFIED;
this.props.notifiedAt = new Date();
}
/**
*
*/
recordNotifyFailure(error: string): void {
this.props.notifyAttempts += 1;
this.props.lastNotifyError = error;
}
}

View File

@ -0,0 +1 @@
export * from './deposit-transaction.aggregate';

View File

@ -0,0 +1,4 @@
export * from './aggregate-root.base';
export * from './deposit-transaction';
export * from './monitored-address';
export * from './transaction-request';

View File

@ -0,0 +1 @@
export * from './monitored-address.aggregate';

View File

@ -0,0 +1,79 @@
import { AggregateRoot } from '../aggregate-root.base';
import { ChainType, EvmAddress } from '@/domain/value-objects';
export interface MonitoredAddressProps {
id?: bigint;
chainType: ChainType;
address: EvmAddress;
userId: bigint;
isActive: boolean;
createdAt?: Date;
updatedAt?: Date;
}
export class MonitoredAddress extends AggregateRoot<bigint> {
private props: MonitoredAddressProps;
private constructor(props: MonitoredAddressProps) {
super();
this.props = props;
}
// Getters
get id(): bigint | undefined {
return this.props.id;
}
get chainType(): ChainType {
return this.props.chainType;
}
get address(): EvmAddress {
return this.props.address;
}
get userId(): bigint {
return this.props.userId;
}
get isActive(): boolean {
return this.props.isActive;
}
get createdAt(): Date | undefined {
return this.props.createdAt;
}
get updatedAt(): Date | undefined {
return this.props.updatedAt;
}
/**
*
*/
static create(params: {
chainType: ChainType;
address: EvmAddress;
userId: bigint;
}): MonitoredAddress {
return new MonitoredAddress({
...params,
isActive: true,
});
}
/**
*
*/
static reconstitute(props: MonitoredAddressProps): MonitoredAddress {
return new MonitoredAddress(props);
}
/**
*
*/
activate(): void {
this.props.isActive = true;
}
/**
*
*/
deactivate(): void {
this.props.isActive = false;
}
}

View File

@ -0,0 +1 @@
export * from './transaction-request.aggregate';

View File

@ -0,0 +1,185 @@
import { AggregateRoot } from '../aggregate-root.base';
import { TransactionBroadcastedEvent } from '@/domain/events';
import { ChainType, TxHash, EvmAddress, TokenAmount } from '@/domain/value-objects';
import { TransactionStatus } from '@/domain/enums';
import { Decimal } from '@prisma/client/runtime/library';
export interface TransactionRequestProps {
id?: bigint;
chainType: ChainType;
sourceService: string;
sourceOrderId: string;
fromAddress: EvmAddress;
toAddress: EvmAddress;
value: TokenAmount;
data?: string;
signedTx?: string;
txHash?: TxHash;
status: TransactionStatus;
gasLimit?: bigint;
gasPrice?: Decimal;
nonce?: number;
errorMessage?: string;
retryCount: number;
createdAt?: Date;
updatedAt?: Date;
}
export class TransactionRequest extends AggregateRoot<bigint> {
private props: TransactionRequestProps;
private constructor(props: TransactionRequestProps) {
super();
this.props = props;
}
// Getters
get id(): bigint | undefined {
return this.props.id;
}
get chainType(): ChainType {
return this.props.chainType;
}
get sourceService(): string {
return this.props.sourceService;
}
get sourceOrderId(): string {
return this.props.sourceOrderId;
}
get fromAddress(): EvmAddress {
return this.props.fromAddress;
}
get toAddress(): EvmAddress {
return this.props.toAddress;
}
get value(): TokenAmount {
return this.props.value;
}
get data(): string | undefined {
return this.props.data;
}
get signedTx(): string | undefined {
return this.props.signedTx;
}
get txHash(): TxHash | undefined {
return this.props.txHash;
}
get status(): TransactionStatus {
return this.props.status;
}
get gasLimit(): bigint | undefined {
return this.props.gasLimit;
}
get gasPrice(): Decimal | undefined {
return this.props.gasPrice;
}
get nonce(): number | undefined {
return this.props.nonce;
}
get errorMessage(): string | undefined {
return this.props.errorMessage;
}
get retryCount(): number {
return this.props.retryCount;
}
get createdAt(): Date | undefined {
return this.props.createdAt;
}
get updatedAt(): Date | undefined {
return this.props.updatedAt;
}
get isPending(): boolean {
return this.props.status === TransactionStatus.PENDING;
}
get isBroadcasted(): boolean {
return this.props.status === TransactionStatus.BROADCASTED;
}
get isFailed(): boolean {
return this.props.status === TransactionStatus.FAILED;
}
/**
*
*/
static create(params: {
chainType: ChainType;
sourceService: string;
sourceOrderId: string;
fromAddress: EvmAddress;
toAddress: EvmAddress;
value: TokenAmount;
data?: string;
}): TransactionRequest {
return new TransactionRequest({
...params,
status: TransactionStatus.PENDING,
retryCount: 0,
});
}
/**
*
*/
static reconstitute(props: TransactionRequestProps): TransactionRequest {
return new TransactionRequest(props);
}
/**
*
*/
setSignedTransaction(signedTx: string, gasLimit: bigint, gasPrice: Decimal, nonce: number): void {
this.props.signedTx = signedTx;
this.props.gasLimit = gasLimit;
this.props.gasPrice = gasPrice;
this.props.nonce = nonce;
this.props.status = TransactionStatus.SIGNED;
}
/**
* 广
*/
markAsBroadcasted(txHash: TxHash): void {
this.props.txHash = txHash;
this.props.status = TransactionStatus.BROADCASTED;
this.addDomainEvent(
new TransactionBroadcastedEvent({
requestId: this.props.id?.toString() ?? '0',
chainType: this.props.chainType.toString(),
txHash: txHash.toString(),
fromAddress: this.props.fromAddress.toString(),
toAddress: this.props.toAddress.toString(),
value: this.props.value.raw.toString(),
sourceService: this.props.sourceService,
sourceOrderId: this.props.sourceOrderId,
}),
);
}
/**
*
*/
markAsConfirmed(): void {
this.props.status = TransactionStatus.CONFIRMED;
}
/**
*
*/
markAsFailed(errorMessage: string): void {
this.props.status = TransactionStatus.FAILED;
this.props.errorMessage = errorMessage;
this.props.retryCount += 1;
}
/**
*
*/
retry(): void {
this.props.status = TransactionStatus.PENDING;
this.props.errorMessage = undefined;
this.props.signedTx = undefined;
this.props.txHash = undefined;
}
}

View File

@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { ConfirmationPolicyService, ChainConfigService } from './services';
@Module({
providers: [ConfirmationPolicyService, ChainConfigService],
exports: [ConfirmationPolicyService, ChainConfigService],
})
export class DomainModule {}

View File

@ -0,0 +1,7 @@
/**
*
*/
export enum ChainTypeEnum {
KAVA = 'KAVA',
BSC = 'BSC',
}

View File

@ -0,0 +1,13 @@
/**
*
*/
export enum DepositStatus {
/** 已检测到 */
DETECTED = 'DETECTED',
/** 确认中 */
CONFIRMING = 'CONFIRMING',
/** 已确认 */
CONFIRMED = 'CONFIRMED',
/** 已通知 */
NOTIFIED = 'NOTIFIED',
}

View File

@ -0,0 +1,3 @@
export * from './chain-type.enum';
export * from './deposit-status.enum';
export * from './transaction-status.enum';

View File

@ -0,0 +1,15 @@
/**
*
*/
export enum TransactionStatus {
/** 待处理 */
PENDING = 'PENDING',
/** 已签名 */
SIGNED = 'SIGNED',
/** 已广播 */
BROADCASTED = 'BROADCASTED',
/** 已确认 */
CONFIRMED = 'CONFIRMED',
/** 失败 */
FAILED = 'FAILED',
}

View File

@ -0,0 +1,29 @@
import { DomainEvent } from './domain-event.base';
export interface DepositConfirmedPayload {
depositId: string;
chainType: string;
txHash: string;
toAddress: string;
amount: string;
amountFormatted: string;
confirmations: number;
userId: string;
[key: string]: unknown;
}
/**
*
*
*/
export class DepositConfirmedEvent extends DomainEvent {
readonly eventType = 'blockchain.deposit.confirmed';
constructor(private readonly payload: DepositConfirmedPayload) {
super();
}
toPayload(): DepositConfirmedPayload {
return this.payload;
}
}

View File

@ -0,0 +1,32 @@
import { DomainEvent } from './domain-event.base';
export interface DepositDetectedPayload {
depositId: string;
chainType: string;
txHash: string;
fromAddress: string;
toAddress: string;
tokenContract: string;
amount: string;
amountFormatted: string;
blockNumber: string;
blockTimestamp: string;
userId: string;
[key: string]: unknown;
}
/**
*
*
*/
export class DepositDetectedEvent extends DomainEvent {
readonly eventType = 'blockchain.deposit.detected';
constructor(private readonly payload: DepositDetectedPayload) {
super();
}
toPayload(): DepositDetectedPayload {
return this.payload;
}
}

View File

@ -0,0 +1,17 @@
import { v4 as uuidv4 } from 'uuid';
/**
*
*/
export abstract class DomainEvent {
readonly eventId: string;
readonly occurredAt: Date;
abstract readonly eventType: string;
constructor() {
this.eventId = uuidv4();
this.occurredAt = new Date();
}
abstract toPayload(): Record<string, unknown>;
}

View File

@ -0,0 +1,5 @@
export * from './domain-event.base';
export * from './deposit-detected.event';
export * from './deposit-confirmed.event';
export * from './wallet-address-created.event';
export * from './transaction-broadcasted.event';

View File

@ -0,0 +1,29 @@
import { DomainEvent } from './domain-event.base';
export interface TransactionBroadcastedPayload {
requestId: string;
chainType: string;
txHash: string;
fromAddress: string;
toAddress: string;
value: string;
sourceService: string;
sourceOrderId: string;
[key: string]: unknown;
}
/**
* 广
* 广
*/
export class TransactionBroadcastedEvent extends DomainEvent {
readonly eventType = 'blockchain.transaction.broadcasted';
constructor(private readonly payload: TransactionBroadcastedPayload) {
super();
}
toPayload(): TransactionBroadcastedPayload {
return this.payload;
}
}

View File

@ -0,0 +1,27 @@
import { DomainEvent } from './domain-event.base';
export interface WalletAddressCreatedPayload {
userId: string;
publicKey: string;
addresses: {
chainType: string;
address: string;
}[];
[key: string]: unknown;
}
/**
*
*
*/
export class WalletAddressCreatedEvent extends DomainEvent {
readonly eventType = 'blockchain.wallet.address.created';
constructor(private readonly payload: WalletAddressCreatedPayload) {
super();
}
toPayload(): WalletAddressCreatedPayload {
return this.payload;
}
}

View File

@ -0,0 +1,43 @@
import { ChainType, BlockNumber } from '@/domain/value-objects';
export const BLOCK_CHECKPOINT_REPOSITORY = Symbol('BLOCK_CHECKPOINT_REPOSITORY');
export interface BlockCheckpointData {
chainType: string;
lastScannedBlock: bigint;
lastScannedAt: Date;
isHealthy: boolean;
lastError?: string;
}
export interface IBlockCheckpointRepository {
/**
*
*/
getLastScannedBlock(chainType: ChainType): Promise<BlockNumber | null>;
/**
*
*/
updateCheckpoint(chainType: ChainType, blockNumber: BlockNumber): Promise<void>;
/**
*
*/
recordError(chainType: ChainType, error: string): Promise<void>;
/**
*
*/
markHealthy(chainType: ChainType): Promise<void>;
/**
*
*/
getCheckpoint(chainType: ChainType): Promise<BlockCheckpointData | null>;
/**
*
*/
initializeIfNotExists(chainType: ChainType, startBlock: BlockNumber): Promise<void>;
}

View File

@ -0,0 +1,47 @@
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
export const DEPOSIT_TRANSACTION_REPOSITORY = Symbol('DEPOSIT_TRANSACTION_REPOSITORY');
export interface IDepositTransactionRepository {
/**
*
*/
save(deposit: DepositTransaction): Promise<DepositTransaction>;
/**
* ID查找
*/
findById(id: bigint): Promise<DepositTransaction | null>;
/**
*
*/
findByTxHash(txHash: TxHash): Promise<DepositTransaction | null>;
/**
*
*/
findByStatus(chainType: ChainType, status: DepositStatus): Promise<DepositTransaction[]>;
/**
*
*/
findPendingConfirmation(chainType: ChainType): Promise<DepositTransaction[]>;
/**
*
*/
findPendingNotification(): Promise<DepositTransaction[]>;
/**
* ID查找
*/
findByUserId(userId: bigint, limit?: number): Promise<DepositTransaction[]>;
/**
*
*/
existsByTxHash(txHash: TxHash): Promise<boolean>;
}

View File

@ -0,0 +1,4 @@
export * from './deposit-transaction.repository.interface';
export * from './monitored-address.repository.interface';
export * from './block-checkpoint.repository.interface';
export * from './transaction-request.repository.interface';

View File

@ -0,0 +1,44 @@
import { MonitoredAddress } from '@/domain/aggregates/monitored-address';
import { ChainType, EvmAddress } from '@/domain/value-objects';
export const MONITORED_ADDRESS_REPOSITORY = Symbol('MONITORED_ADDRESS_REPOSITORY');
export interface IMonitoredAddressRepository {
/**
*
*/
save(address: MonitoredAddress): Promise<MonitoredAddress>;
/**
* ID查找
*/
findById(id: bigint): Promise<MonitoredAddress | null>;
/**
*
*/
findByChainAndAddress(
chainType: ChainType,
address: EvmAddress,
): Promise<MonitoredAddress | null>;
/**
*
*/
findActiveByChain(chainType: ChainType): Promise<MonitoredAddress[]>;
/**
* ID查找
*/
findByUserId(userId: bigint): Promise<MonitoredAddress[]>;
/**
*
*/
existsByChainAndAddress(chainType: ChainType, address: EvmAddress): Promise<boolean>;
/**
*
*/
getAllActiveAddresses(chainType: ChainType): Promise<string[]>;
}

View File

@ -0,0 +1,41 @@
import { TransactionRequest } from '@/domain/aggregates/transaction-request';
import { ChainType, TxHash } from '@/domain/value-objects';
export const TRANSACTION_REQUEST_REPOSITORY = Symbol('TRANSACTION_REQUEST_REPOSITORY');
export interface ITransactionRequestRepository {
/**
*
*/
save(request: TransactionRequest): Promise<TransactionRequest>;
/**
* ID查找
*/
findById(id: bigint): Promise<TransactionRequest | null>;
/**
*
*/
findByTxHash(txHash: TxHash): Promise<TransactionRequest | null>;
/**
*
*/
findBySource(sourceService: string, sourceOrderId: string): Promise<TransactionRequest | null>;
/**
*
*/
findPending(chainType: ChainType): Promise<TransactionRequest[]>;
/**
* 广
*/
findBroadcasted(chainType: ChainType): Promise<TransactionRequest[]>;
/**
*
*/
findRetryable(chainType: ChainType, maxRetries: number): Promise<TransactionRequest[]>;
}

View File

@ -0,0 +1,82 @@
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ChainType } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
export interface ChainConfig {
chainType: ChainTypeEnum;
chainId: number;
rpcUrl: string;
usdtContract: string;
nativeSymbol: string;
blockTime: number; // 平均出块时间(秒)
}
/**
*
*/
@Injectable()
export class ChainConfigService {
private readonly configs: Map<ChainTypeEnum, ChainConfig>;
constructor(private readonly configService: ConfigService) {
this.configs = new Map();
this.initializeConfigs();
}
private initializeConfigs(): void {
// KAVA 配置
this.configs.set(ChainTypeEnum.KAVA, {
chainType: ChainTypeEnum.KAVA,
chainId: this.configService.get<number>('blockchain.kava.chainId', 2222),
rpcUrl: this.configService.get<string>('blockchain.kava.rpcUrl', 'https://evm.kava.io'),
usdtContract: this.configService.get<string>(
'blockchain.kava.usdtContract',
'0x919C1c267BC06a7039e03fcc2eF738525769109c',
),
nativeSymbol: 'KAVA',
blockTime: 6,
});
// BSC 配置
this.configs.set(ChainTypeEnum.BSC, {
chainType: ChainTypeEnum.BSC,
chainId: this.configService.get<number>('blockchain.bsc.chainId', 56),
rpcUrl: this.configService.get<string>(
'blockchain.bsc.rpcUrl',
'https://bsc-dataseed.binance.org',
),
usdtContract: this.configService.get<string>(
'blockchain.bsc.usdtContract',
'0x55d398326f99059fF775485246999027B3197955',
),
nativeSymbol: 'BNB',
blockTime: 3,
});
}
/**
*
*/
getConfig(chainType: ChainType): ChainConfig {
const config = this.configs.get(chainType.value);
if (!config) {
throw new Error(`Unsupported chain type: ${chainType.toString()}`);
}
return config;
}
/**
*
*/
getSupportedChains(): ChainTypeEnum[] {
return Array.from(this.configs.keys());
}
/**
*
*/
isSupported(chainType: ChainType): boolean {
return this.configs.has(chainType.value);
}
}

View File

@ -0,0 +1,42 @@
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ChainType } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
/**
*
*
*/
@Injectable()
export class ConfirmationPolicyService {
private readonly defaultConfirmations: number;
constructor(private readonly configService: ConfigService) {
this.defaultConfirmations = this.configService.get<number>(
'blockchain.confirmationsRequired',
12,
);
}
/**
*
*/
getRequiredConfirmations(chainType: ChainType): number {
// 可以根据不同链配置不同的确认数
switch (chainType.value) {
case ChainTypeEnum.KAVA:
return this.configService.get<number>('blockchain.kava.confirmations', 12);
case ChainTypeEnum.BSC:
return this.configService.get<number>('blockchain.bsc.confirmations', 15);
default:
return this.defaultConfirmations;
}
}
/**
*
*/
isConfirmed(chainType: ChainType, confirmations: number): boolean {
return confirmations >= this.getRequiredConfirmations(chainType);
}
}

View File

@ -0,0 +1,2 @@
export * from './confirmation-policy.service';
export * from './chain-config.service';

View File

@ -0,0 +1,61 @@
/**
*
*/
export class BlockNumber {
private readonly _value: bigint;
private constructor(value: bigint) {
this._value = value;
}
static create(value: bigint | number | string): BlockNumber {
const num = BigInt(value);
if (num < 0n) {
throw new Error(`Block number cannot be negative: ${value}`);
}
return new BlockNumber(num);
}
get value(): bigint {
return this._value;
}
get asNumber(): number {
return Number(this._value);
}
equals(other: BlockNumber): boolean {
return this._value === other._value;
}
isGreaterThan(other: BlockNumber): boolean {
return this._value > other._value;
}
isLessThan(other: BlockNumber): boolean {
return this._value < other._value;
}
add(blocks: number): BlockNumber {
return new BlockNumber(this._value + BigInt(blocks));
}
subtract(blocks: number): BlockNumber {
const result = this._value - BigInt(blocks);
if (result < 0n) {
throw new Error('Block number cannot be negative');
}
return new BlockNumber(result);
}
/**
*
*/
diff(other: BlockNumber): bigint {
return this._value - other._value;
}
toString(): string {
return this._value.toString();
}
}

View File

@ -0,0 +1,48 @@
import { ChainTypeEnum } from '../enums';
/**
*
*/
export class ChainType {
private readonly _value: ChainTypeEnum;
private constructor(value: ChainTypeEnum) {
this._value = value;
}
static create(value: string): ChainType {
const normalized = value.toUpperCase() as ChainTypeEnum;
if (!Object.values(ChainTypeEnum).includes(normalized)) {
throw new Error(`Invalid chain type: ${value}`);
}
return new ChainType(normalized);
}
static fromEnum(value: ChainTypeEnum): ChainType {
return new ChainType(value);
}
static KAVA(): ChainType {
return new ChainType(ChainTypeEnum.KAVA);
}
static BSC(): ChainType {
return new ChainType(ChainTypeEnum.BSC);
}
get value(): ChainTypeEnum {
return this._value;
}
equals(other: ChainType): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
isEVM(): boolean {
return [ChainTypeEnum.KAVA, ChainTypeEnum.BSC].includes(this._value);
}
}

View File

@ -0,0 +1,41 @@
import { getAddress, isAddress } from 'ethers';
/**
* EVM
*/
export class EvmAddress {
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
static create(value: string): EvmAddress {
if (!isAddress(value)) {
throw new Error(`Invalid EVM address: ${value}`);
}
// 使用 checksum 格式
const checksumAddress = getAddress(value);
return new EvmAddress(checksumAddress);
}
static fromUnchecked(value: string): EvmAddress {
return new EvmAddress(value.toLowerCase());
}
get value(): string {
return this._value;
}
get lowercase(): string {
return this._value.toLowerCase();
}
equals(other: EvmAddress): boolean {
return this._value.toLowerCase() === other._value.toLowerCase();
}
toString(): string {
return this._value;
}
}

View File

@ -0,0 +1,5 @@
export * from './chain-type.vo';
export * from './evm-address.vo';
export * from './tx-hash.vo';
export * from './token-amount.vo';
export * from './block-number.vo';

View File

@ -0,0 +1,99 @@
import { Decimal } from '@prisma/client/runtime/library';
/**
*
*/
export class TokenAmount {
private readonly _raw: bigint;
private readonly _decimals: number;
private constructor(raw: bigint, decimals: number) {
this._raw = raw;
this._decimals = decimals;
}
/**
*
*/
static fromRaw(raw: bigint, decimals: number = 18): TokenAmount {
return new TokenAmount(raw, decimals);
}
/**
*
*/
static fromFormatted(formatted: string, decimals: number = 18): TokenAmount {
const [intPart, decPart = ''] = formatted.split('.');
const paddedDec = decPart.padEnd(decimals, '0').slice(0, decimals);
const raw = BigInt(intPart + paddedDec);
return new TokenAmount(raw, decimals);
}
/**
* Decimal
*/
static fromDecimal(decimal: Decimal, decimals: number = 18): TokenAmount {
const raw = BigInt(decimal.toFixed(0));
return new TokenAmount(raw, decimals);
}
get raw(): bigint {
return this._raw;
}
get decimals(): number {
return this._decimals;
}
/**
*
*/
get formatted(): string {
const rawStr = this._raw.toString().padStart(this._decimals + 1, '0');
const intPart = rawStr.slice(0, -this._decimals) || '0';
const decPart = rawStr.slice(-this._decimals);
// 移除尾随零
const trimmedDec = decPart.replace(/0+$/, '');
return trimmedDec ? `${intPart}.${trimmedDec}` : intPart;
}
/**
*
*/
toFixed(places: number = 8): string {
const rawStr = this._raw.toString().padStart(this._decimals + 1, '0');
const intPart = rawStr.slice(0, -this._decimals) || '0';
const decPart = rawStr.slice(-this._decimals).padEnd(places, '0').slice(0, places);
return `${intPart}.${decPart}`;
}
/**
* Decimal
*/
toDecimal(): Decimal {
return new Decimal(this._raw.toString());
}
/**
* Decimal
*/
toFormattedDecimal(): Decimal {
return new Decimal(this.toFixed(8));
}
equals(other: TokenAmount): boolean {
return this._raw === other._raw && this._decimals === other._decimals;
}
isZero(): boolean {
return this._raw === 0n;
}
isPositive(): boolean {
return this._raw > 0n;
}
toString(): string {
return this.formatted;
}
}

View File

@ -0,0 +1,42 @@
/**
*
*/
export class TxHash {
private readonly _value: string;
private constructor(value: string) {
this._value = value;
}
static create(value: string): TxHash {
// 验证格式: 0x 开头64位十六进制
const normalized = value.toLowerCase();
if (!/^0x[a-f0-9]{64}$/.test(normalized)) {
throw new Error(`Invalid transaction hash: ${value}`);
}
return new TxHash(normalized);
}
static fromUnchecked(value: string): TxHash {
return new TxHash(value.toLowerCase());
}
get value(): string {
return this._value;
}
equals(other: TxHash): boolean {
return this._value === other._value;
}
toString(): string {
return this._value;
}
/**
*
*/
toShort(): string {
return `${this._value.slice(0, 10)}...${this._value.slice(-8)}`;
}
}

View File

@ -0,0 +1,139 @@
import { Injectable, Logger } from '@nestjs/common';
import { keccak256, getBytes } from 'ethers';
import { EvmAddress } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
export interface DerivedAddress {
chainType: ChainTypeEnum;
address: string;
}
/**
*
* MPC
*/
@Injectable()
export class AddressDerivationAdapter {
private readonly logger = new Logger(AddressDerivationAdapter.name);
/**
* EVM
*
* @param compressedPublicKey (33 bytes, 0x02/0x03 )
* @returns EVM
*/
deriveEvmAddress(compressedPublicKey: string): string {
// 移除 0x 前缀
const pubKeyHex = compressedPublicKey.replace('0x', '');
// 验证压缩公钥格式
if (pubKeyHex.length !== 66) {
throw new Error(`Invalid compressed public key length: ${pubKeyHex.length}, expected 66`);
}
const prefix = pubKeyHex.slice(0, 2);
if (prefix !== '02' && prefix !== '03') {
throw new Error(`Invalid compressed public key prefix: ${prefix}, expected 02 or 03`);
}
// 解压缩公钥
const uncompressedPubKey = this.decompressPublicKey(pubKeyHex);
// 移除 04 前缀(非压缩公钥标识)
const pubKeyWithoutPrefix = uncompressedPubKey.slice(2);
// Keccak256 哈希
const hash = keccak256(getBytes('0x' + pubKeyWithoutPrefix));
// 取最后 20 bytes 作为地址
const address = '0x' + hash.slice(-40);
return address;
}
/**
*
*
* 使 secp256k1 线
*/
private decompressPublicKey(compressedPubKeyHex: string): string {
const p = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F');
const prefix = parseInt(compressedPubKeyHex.slice(0, 2), 16);
const x = BigInt('0x' + compressedPubKeyHex.slice(2));
// y² = x³ + 7 (mod p)
const ySquared = (x ** 3n + 7n) % p;
// 计算模平方根
let y = this.modPow(ySquared, (p + 1n) / 4n, p);
// 根据前缀选择 y 的奇偶性
const isYOdd = y % 2n === 1n;
const shouldBeOdd = prefix === 0x03;
if (isYOdd !== shouldBeOdd) {
y = p - y;
}
// 返回非压缩格式 (04 + x + y)
const xHex = x.toString(16).padStart(64, '0');
const yHex = y.toString(16).padStart(64, '0');
return '04' + xHex + yHex;
}
/**
*
*/
private modPow(base: bigint, exp: bigint, mod: bigint): bigint {
let result = 1n;
base = base % mod;
while (exp > 0n) {
if (exp % 2n === 1n) {
result = (result * base) % mod;
}
exp = exp / 2n;
base = (base * base) % mod;
}
return result;
}
/**
*
*/
deriveAllAddresses(compressedPublicKey: string): DerivedAddress[] {
const addresses: DerivedAddress[] = [];
// EVM 链共用同一个地址
const evmAddress = this.deriveEvmAddress(compressedPublicKey);
// KAVA (EVM)
addresses.push({
chainType: ChainTypeEnum.KAVA,
address: evmAddress,
});
// BSC (EVM)
addresses.push({
chainType: ChainTypeEnum.BSC,
address: evmAddress,
});
this.logger.log(`Derived addresses from public key: ${addresses.length} chains`);
return addresses;
}
/**
*
*/
validateEvmAddress(address: string): boolean {
try {
EvmAddress.create(address);
return true;
} catch {
return false;
}
}
}

View File

@ -0,0 +1,124 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { EvmProviderAdapter, TransferEvent } from './evm-provider.adapter';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import { ChainType, BlockNumber } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
export interface DepositEvent extends TransferEvent {
chainType: ChainTypeEnum;
}
export type DepositHandler = (deposits: DepositEvent[]) => Promise<void>;
/**
*
*
*/
@Injectable()
export class BlockScannerService implements OnModuleInit {
private readonly logger = new Logger(BlockScannerService.name);
private readonly scanBatchSize: number;
private depositHandler?: DepositHandler;
private isScanning: Map<ChainTypeEnum, boolean> = new Map();
constructor(
private readonly configService: ConfigService,
private readonly evmProvider: EvmProviderAdapter,
private readonly addressCache: AddressCacheService,
private readonly chainConfig: ChainConfigService,
) {
this.scanBatchSize = this.configService.get<number>('blockchain.scanBatchSize', 100);
}
async onModuleInit() {
// 初始化扫描状态
for (const chainType of this.chainConfig.getSupportedChains()) {
this.isScanning.set(chainType, false);
}
this.logger.log('BlockScannerService initialized');
}
/**
*
*/
registerDepositHandler(handler: DepositHandler): void {
this.depositHandler = handler;
this.logger.log('Deposit handler registered');
}
/**
*
*/
async scanChain(
chainType: ChainType,
fromBlock: BlockNumber,
toBlock: BlockNumber,
): Promise<DepositEvent[]> {
const config = this.chainConfig.getConfig(chainType);
const deposits: DepositEvent[] = [];
// 获取所有监控地址
const monitoredAddresses = await this.addressCache.getAllAddresses(chainType);
const addressSet = new Set(monitoredAddresses.map((a) => a.toLowerCase()));
if (addressSet.size === 0) {
this.logger.debug(`No monitored addresses for ${chainType}, skipping scan`);
return deposits;
}
// 扫描 USDT Transfer 事件
const events = await this.evmProvider.scanTransferEvents(
chainType,
fromBlock,
toBlock,
config.usdtContract,
);
// 过滤出充值到监控地址的交易
for (const event of events) {
if (addressSet.has(event.to.toLowerCase())) {
deposits.push({
...event,
chainType: chainType.value,
});
this.logger.log(
`Deposit detected: ${event.txHash} -> ${event.to} (${event.value.toString()})`,
);
}
}
return deposits;
}
/**
*
*/
async executeScan(
chainType: ChainType,
lastScannedBlock: BlockNumber,
): Promise<{ deposits: DepositEvent[]; newLastBlock: BlockNumber }> {
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
// 计算扫描范围
const fromBlock = lastScannedBlock.add(1);
let toBlock = fromBlock.add(this.scanBatchSize - 1);
// 不超过当前区块
if (toBlock.isGreaterThan(currentBlock)) {
toBlock = currentBlock;
}
// 如果没有新区块,返回空
if (fromBlock.isGreaterThan(currentBlock)) {
return { deposits: [], newLastBlock: lastScannedBlock };
}
this.logger.debug(`Scanning ${chainType}: blocks ${fromBlock} to ${toBlock}`);
const deposits = await this.scanChain(chainType, fromBlock, toBlock);
return { deposits, newLastBlock: toBlock };
}
}

View File

@ -0,0 +1,182 @@
import { Injectable, Logger } from '@nestjs/common';
import { JsonRpcProvider, Contract } from 'ethers';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import { ChainType, BlockNumber, TokenAmount } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
// ERC20 Transfer 事件 ABI
const ERC20_TRANSFER_EVENT_ABI = [
'event Transfer(address indexed from, address indexed to, uint256 value)',
];
// ERC20 balanceOf ABI
const ERC20_BALANCE_ABI = ['function balanceOf(address owner) view returns (uint256)'];
export interface TransferEvent {
txHash: string;
logIndex: number;
blockNumber: bigint;
blockTimestamp: Date;
from: string;
to: string;
value: bigint;
tokenContract: string;
}
/**
* EVM
* EVM
*/
@Injectable()
export class EvmProviderAdapter {
private readonly logger = new Logger(EvmProviderAdapter.name);
private readonly providers: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
constructor(private readonly chainConfig: ChainConfigService) {
this.initializeProviders();
}
private initializeProviders(): void {
for (const chainType of this.chainConfig.getSupportedChains()) {
const config = this.chainConfig.getConfig(ChainType.fromEnum(chainType));
const provider = new JsonRpcProvider(config.rpcUrl, config.chainId);
this.providers.set(chainType, provider);
this.logger.log(`Initialized provider for ${chainType}: ${config.rpcUrl}`);
}
}
private getProvider(chainType: ChainType): JsonRpcProvider {
const provider = this.providers.get(chainType.value);
if (!provider) {
throw new Error(`No provider for chain: ${chainType.toString()}`);
}
return provider;
}
/**
*
*/
async getCurrentBlockNumber(chainType: ChainType): Promise<BlockNumber> {
const provider = this.getProvider(chainType);
const blockNumber = await provider.getBlockNumber();
return BlockNumber.create(blockNumber);
}
/**
*
*/
async getBlockTimestamp(chainType: ChainType, blockNumber: BlockNumber): Promise<Date> {
const provider = this.getProvider(chainType);
const block = await provider.getBlock(blockNumber.asNumber);
if (!block) {
throw new Error(`Block not found: ${blockNumber.toString()}`);
}
return new Date(block.timestamp * 1000);
}
/**
* ERC20 Transfer
*/
async scanTransferEvents(
chainType: ChainType,
fromBlock: BlockNumber,
toBlock: BlockNumber,
tokenContract: string,
): Promise<TransferEvent[]> {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_TRANSFER_EVENT_ABI, provider);
const filter = contract.filters.Transfer();
const logs = await contract.queryFilter(filter, fromBlock.asNumber, toBlock.asNumber);
const events: TransferEvent[] = [];
for (const log of logs) {
const block = await provider.getBlock(log.blockNumber);
if (!block) continue;
const parsedLog = contract.interface.parseLog({
topics: log.topics as string[],
data: log.data,
});
if (parsedLog) {
events.push({
txHash: log.transactionHash,
logIndex: log.index,
blockNumber: BigInt(log.blockNumber),
blockTimestamp: new Date(block.timestamp * 1000),
from: parsedLog.args[0],
to: parsedLog.args[1],
value: parsedLog.args[2],
tokenContract,
});
}
}
return events;
}
/**
* ERC20
*/
async getTokenBalance(
chainType: ChainType,
tokenContract: string,
address: string,
): Promise<TokenAmount> {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_BALANCE_ABI, provider);
const balance = await contract.balanceOf(address);
return TokenAmount.fromRaw(balance, 18);
}
/**
*
*/
async getNativeBalance(chainType: ChainType, address: string): Promise<TokenAmount> {
const provider = this.getProvider(chainType);
const balance = await provider.getBalance(address);
return TokenAmount.fromRaw(balance, 18);
}
/**
* 广
*/
async broadcastTransaction(chainType: ChainType, signedTx: string): Promise<string> {
const provider = this.getProvider(chainType);
const txResponse = await provider.broadcastTransaction(signedTx);
this.logger.log(`Transaction broadcasted: ${txResponse.hash}`);
return txResponse.hash;
}
/**
*
*/
async waitForTransaction(
chainType: ChainType,
txHash: string,
confirmations: number = 1,
): Promise<boolean> {
const provider = this.getProvider(chainType);
const receipt = await provider.waitForTransaction(txHash, confirmations);
return receipt !== null && receipt.status === 1;
}
/**
*
*/
async isTransactionConfirmed(
chainType: ChainType,
txHash: string,
requiredConfirmations: number,
): Promise<boolean> {
const provider = this.getProvider(chainType);
const receipt = await provider.getTransactionReceipt(txHash);
if (!receipt) return false;
const currentBlock = await provider.getBlockNumber();
const confirmations = currentBlock - receipt.blockNumber;
return confirmations >= requiredConfirmations;
}
}

View File

@ -0,0 +1,3 @@
export * from './evm-provider.adapter';
export * from './address-derivation.adapter';
export * from './block-scanner.service';

View File

@ -0,0 +1,69 @@
import { Global, Module } from '@nestjs/common';
import { PrismaService } from './persistence/prisma/prisma.service';
import { RedisService, AddressCacheService } from './redis';
import { EventPublisherService } from './kafka';
import { EvmProviderAdapter, AddressDerivationAdapter, BlockScannerService } from './blockchain';
import { DomainModule } from '@/domain/domain.module';
import {
DEPOSIT_TRANSACTION_REPOSITORY,
MONITORED_ADDRESS_REPOSITORY,
BLOCK_CHECKPOINT_REPOSITORY,
TRANSACTION_REQUEST_REPOSITORY,
} from '@/domain/repositories';
import {
DepositTransactionRepositoryImpl,
MonitoredAddressRepositoryImpl,
BlockCheckpointRepositoryImpl,
TransactionRequestRepositoryImpl,
} from './persistence/repositories';
@Global()
@Module({
imports: [DomainModule],
providers: [
// 核心服务
PrismaService,
RedisService,
EventPublisherService,
// 区块链适配器
EvmProviderAdapter,
AddressDerivationAdapter,
BlockScannerService,
// 缓存服务
AddressCacheService,
// 仓储实现 (依赖倒置)
{
provide: DEPOSIT_TRANSACTION_REPOSITORY,
useClass: DepositTransactionRepositoryImpl,
},
{
provide: MONITORED_ADDRESS_REPOSITORY,
useClass: MonitoredAddressRepositoryImpl,
},
{
provide: BLOCK_CHECKPOINT_REPOSITORY,
useClass: BlockCheckpointRepositoryImpl,
},
{
provide: TRANSACTION_REQUEST_REPOSITORY,
useClass: TransactionRequestRepositoryImpl,
},
],
exports: [
PrismaService,
RedisService,
EventPublisherService,
EvmProviderAdapter,
AddressDerivationAdapter,
BlockScannerService,
AddressCacheService,
DEPOSIT_TRANSACTION_REPOSITORY,
MONITORED_ADDRESS_REPOSITORY,
BLOCK_CHECKPOINT_REPOSITORY,
TRANSACTION_REQUEST_REPOSITORY,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,36 @@
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
interface MpcKeygenCompletedEvent {
eventId: string;
eventType: string;
occurredAt: string;
payload: {
userId: string;
deviceId: string;
publicKey: string;
keyType: string;
};
}
/**
* Kafka
*
*/
@Controller()
export class EventConsumerController {
private readonly logger = new Logger(EventConsumerController.name);
/**
* MPC
* mpc-service
*/
@EventPattern('mpc.keygen.completed')
async handleMpcKeygenCompleted(@Payload() event: MpcKeygenCompletedEvent): Promise<void> {
this.logger.log(`Received MPC keygen completed event: ${event.eventId}`);
this.logger.debug(`User: ${event.payload.userId}, PublicKey: ${event.payload.publicKey}`);
// TODO: 调用 AddressDerivationService 派生地址
// 这将在应用层实现
}
}

View File

@ -0,0 +1,77 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, logLevel } from 'kafkajs';
import { DomainEvent } from '@/domain/events/domain-event.base';
@Injectable()
export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EventPublisherService.name);
private readonly kafka: Kafka;
private readonly producer: Producer;
constructor(private readonly configService: ConfigService) {
this.kafka = new Kafka({
clientId: this.configService.get<string>('kafka.clientId'),
brokers: this.configService.get<string[]>('kafka.brokers') || ['localhost:9092'],
logLevel: logLevel.WARN,
});
this.producer = this.kafka.producer();
}
async onModuleInit() {
await this.producer.connect();
this.logger.log('Kafka producer connected');
}
async onModuleDestroy() {
await this.producer.disconnect();
this.logger.log('Kafka producer disconnected');
}
/**
*
*/
async publish(event: DomainEvent): Promise<void> {
const topic = this.getTopicForEvent(event.eventType);
const message = {
key: event.eventId,
value: JSON.stringify({
eventId: event.eventId,
eventType: event.eventType,
occurredAt: event.occurredAt.toISOString(),
payload: event.toPayload(),
}),
headers: {
eventType: event.eventType,
source: 'blockchain-service',
},
};
await this.producer.send({
topic,
messages: [message],
});
this.logger.debug(`Published event: ${event.eventType} to topic: ${topic}`);
}
/**
*
*/
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
private getTopicForEvent(eventType: string): string {
// 事件类型到 topic 的映射
const topicMap: Record<string, string> = {
'blockchain.deposit.detected': 'blockchain.deposits',
'blockchain.deposit.confirmed': 'blockchain.deposits',
'blockchain.wallet.address.created': 'blockchain.wallets',
'blockchain.transaction.broadcasted': 'blockchain.transactions',
};
return topicMap[eventType] || 'blockchain.events';
}
}

View File

@ -0,0 +1,2 @@
export * from './event-publisher.service';
export * from './event-consumer.controller';

View File

@ -0,0 +1,60 @@
import { DepositTransaction as PrismaDepositTransaction } from '@prisma/client';
import {
DepositTransaction,
DepositTransactionProps,
} from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
export class DepositTransactionMapper {
static toDomain(prisma: PrismaDepositTransaction): DepositTransaction {
const props: DepositTransactionProps = {
id: prisma.id,
chainType: ChainType.create(prisma.chainType),
txHash: TxHash.fromUnchecked(prisma.txHash),
fromAddress: EvmAddress.fromUnchecked(prisma.fromAddress),
toAddress: EvmAddress.fromUnchecked(prisma.toAddress),
tokenContract: EvmAddress.fromUnchecked(prisma.tokenContract),
amount: TokenAmount.fromDecimal(prisma.amount, 18),
blockNumber: BlockNumber.create(prisma.blockNumber),
blockTimestamp: prisma.blockTimestamp,
logIndex: prisma.logIndex,
confirmations: prisma.confirmations,
status: prisma.status as DepositStatus,
addressId: prisma.addressId,
userId: prisma.userId,
notifiedAt: prisma.notifiedAt ?? undefined,
notifyAttempts: prisma.notifyAttempts,
lastNotifyError: prisma.lastNotifyError ?? undefined,
createdAt: prisma.createdAt,
updatedAt: prisma.updatedAt,
};
return DepositTransaction.reconstitute(props);
}
static toPersistence(
domain: DepositTransaction,
): Omit<PrismaDepositTransaction, 'id' | 'createdAt' | 'updatedAt'> & { id?: bigint } {
return {
id: domain.id,
chainType: domain.chainType.toString(),
txHash: domain.txHash.toString(),
fromAddress: domain.fromAddress.toString(),
toAddress: domain.toAddress.toString(),
tokenContract: domain.tokenContract.toString(),
amount: domain.amount.toDecimal(),
amountFormatted: domain.amount.toFormattedDecimal(),
blockNumber: domain.blockNumber.value,
blockTimestamp: domain.blockTimestamp,
logIndex: domain.logIndex,
confirmations: domain.confirmations,
status: domain.status,
addressId: domain.addressId,
userId: domain.userId,
notifiedAt: domain.notifiedAt ?? null,
notifyAttempts: domain.notifyAttempts,
lastNotifyError: domain.lastNotifyError ?? null,
};
}
}

View File

@ -0,0 +1,3 @@
export * from './deposit-transaction.mapper';
export * from './monitored-address.mapper';
export * from './transaction-request.mapper';

View File

@ -0,0 +1,31 @@
import { MonitoredAddress as PrismaMonitoredAddress } from '@prisma/client';
import { MonitoredAddress, MonitoredAddressProps } from '@/domain/aggregates/monitored-address';
import { ChainType, EvmAddress } from '@/domain/value-objects';
export class MonitoredAddressMapper {
static toDomain(prisma: PrismaMonitoredAddress): MonitoredAddress {
const props: MonitoredAddressProps = {
id: prisma.id,
chainType: ChainType.create(prisma.chainType),
address: EvmAddress.fromUnchecked(prisma.address),
userId: prisma.userId,
isActive: prisma.isActive,
createdAt: prisma.createdAt,
updatedAt: prisma.updatedAt,
};
return MonitoredAddress.reconstitute(props);
}
static toPersistence(
domain: MonitoredAddress,
): Omit<PrismaMonitoredAddress, 'id' | 'createdAt' | 'updatedAt'> & { id?: bigint } {
return {
id: domain.id,
chainType: domain.chainType.toString(),
address: domain.address.lowercase,
userId: domain.userId,
isActive: domain.isActive,
};
}
}

View File

@ -0,0 +1,57 @@
import { TransactionRequest as PrismaTransactionRequest } from '@prisma/client';
import {
TransactionRequest,
TransactionRequestProps,
} from '@/domain/aggregates/transaction-request';
import { ChainType, TxHash, EvmAddress, TokenAmount } from '@/domain/value-objects';
import { TransactionStatus } from '@/domain/enums';
export class TransactionRequestMapper {
static toDomain(prisma: PrismaTransactionRequest): TransactionRequest {
const props: TransactionRequestProps = {
id: prisma.id,
chainType: ChainType.create(prisma.chainType),
sourceService: prisma.sourceService,
sourceOrderId: prisma.sourceOrderId,
fromAddress: EvmAddress.fromUnchecked(prisma.fromAddress),
toAddress: EvmAddress.fromUnchecked(prisma.toAddress),
value: TokenAmount.fromDecimal(prisma.value, 18),
data: prisma.data ?? undefined,
signedTx: prisma.signedTx ?? undefined,
txHash: prisma.txHash ? TxHash.fromUnchecked(prisma.txHash) : undefined,
status: prisma.status as TransactionStatus,
gasLimit: prisma.gasLimit ?? undefined,
gasPrice: prisma.gasPrice ?? undefined,
nonce: prisma.nonce ?? undefined,
errorMessage: prisma.errorMessage ?? undefined,
retryCount: prisma.retryCount,
createdAt: prisma.createdAt,
updatedAt: prisma.updatedAt,
};
return TransactionRequest.reconstitute(props);
}
static toPersistence(
domain: TransactionRequest,
): Omit<PrismaTransactionRequest, 'id' | 'createdAt' | 'updatedAt'> & { id?: bigint } {
return {
id: domain.id,
chainType: domain.chainType.toString(),
sourceService: domain.sourceService,
sourceOrderId: domain.sourceOrderId,
fromAddress: domain.fromAddress.toString(),
toAddress: domain.toAddress.toString(),
value: domain.value.toDecimal(),
data: domain.data ?? null,
signedTx: domain.signedTx ?? null,
txHash: domain.txHash?.toString() ?? null,
status: domain.status,
gasLimit: domain.gasLimit ?? null,
gasPrice: domain.gasPrice ?? null,
nonce: domain.nonce ?? null,
errorMessage: domain.errorMessage ?? null,
retryCount: domain.retryCount,
};
}
}

View File

@ -0,0 +1,17 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
async onModuleInit() {
await this.$connect();
this.logger.log('Prisma connected to database');
}
async onModuleDestroy() {
await this.$disconnect();
this.logger.log('Prisma disconnected from database');
}
}

View File

@ -0,0 +1,90 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import {
IBlockCheckpointRepository,
BlockCheckpointData,
} from '@/domain/repositories/block-checkpoint.repository.interface';
import { ChainType, BlockNumber } from '@/domain/value-objects';
@Injectable()
export class BlockCheckpointRepositoryImpl implements IBlockCheckpointRepository {
constructor(private readonly prisma: PrismaService) {}
async getLastScannedBlock(chainType: ChainType): Promise<BlockNumber | null> {
const record = await this.prisma.blockCheckpoint.findUnique({
where: { chainType: chainType.toString() },
});
return record ? BlockNumber.create(record.lastScannedBlock) : null;
}
async updateCheckpoint(chainType: ChainType, blockNumber: BlockNumber): Promise<void> {
await this.prisma.blockCheckpoint.upsert({
where: { chainType: chainType.toString() },
update: {
lastScannedBlock: blockNumber.value,
lastScannedAt: new Date(),
isHealthy: true,
lastError: null,
},
create: {
chainType: chainType.toString(),
lastScannedBlock: blockNumber.value,
lastScannedAt: new Date(),
isHealthy: true,
},
});
}
async recordError(chainType: ChainType, error: string): Promise<void> {
await this.prisma.blockCheckpoint.update({
where: { chainType: chainType.toString() },
data: {
isHealthy: false,
lastError: error,
},
});
}
async markHealthy(chainType: ChainType): Promise<void> {
await this.prisma.blockCheckpoint.update({
where: { chainType: chainType.toString() },
data: {
isHealthy: true,
lastError: null,
},
});
}
async getCheckpoint(chainType: ChainType): Promise<BlockCheckpointData | null> {
const record = await this.prisma.blockCheckpoint.findUnique({
where: { chainType: chainType.toString() },
});
if (!record) return null;
return {
chainType: record.chainType,
lastScannedBlock: record.lastScannedBlock,
lastScannedAt: record.lastScannedAt,
isHealthy: record.isHealthy,
lastError: record.lastError ?? undefined,
};
}
async initializeIfNotExists(chainType: ChainType, startBlock: BlockNumber): Promise<void> {
const existing = await this.prisma.blockCheckpoint.findUnique({
where: { chainType: chainType.toString() },
});
if (!existing) {
await this.prisma.blockCheckpoint.create({
data: {
chainType: chainType.toString(),
lastScannedBlock: startBlock.value,
lastScannedAt: new Date(),
isHealthy: true,
},
});
}
}
}

View File

@ -0,0 +1,97 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { DepositTransactionMapper } from '../mappers/deposit-transaction.mapper';
import { IDepositTransactionRepository } from '@/domain/repositories/deposit-transaction.repository.interface';
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
import { ChainType, TxHash } from '@/domain/value-objects';
import { DepositStatus } from '@/domain/enums';
@Injectable()
export class DepositTransactionRepositoryImpl implements IDepositTransactionRepository {
constructor(private readonly prisma: PrismaService) {}
async save(deposit: DepositTransaction): Promise<DepositTransaction> {
const data = DepositTransactionMapper.toPersistence(deposit);
if (deposit.id) {
const updated = await this.prisma.depositTransaction.update({
where: { id: deposit.id },
data,
});
return DepositTransactionMapper.toDomain(updated);
} else {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id: _, ...createData } = data;
const created = await this.prisma.depositTransaction.create({
data: createData,
});
return DepositTransactionMapper.toDomain(created);
}
}
async findById(id: bigint): Promise<DepositTransaction | null> {
const record = await this.prisma.depositTransaction.findUnique({
where: { id },
});
return record ? DepositTransactionMapper.toDomain(record) : null;
}
async findByTxHash(txHash: TxHash): Promise<DepositTransaction | null> {
const record = await this.prisma.depositTransaction.findUnique({
where: { txHash: txHash.toString() },
});
return record ? DepositTransactionMapper.toDomain(record) : null;
}
async findByStatus(chainType: ChainType, status: DepositStatus): Promise<DepositTransaction[]> {
const records = await this.prisma.depositTransaction.findMany({
where: {
chainType: chainType.toString(),
status,
},
orderBy: { blockNumber: 'asc' },
});
return records.map(DepositTransactionMapper.toDomain);
}
async findPendingConfirmation(chainType: ChainType): Promise<DepositTransaction[]> {
const records = await this.prisma.depositTransaction.findMany({
where: {
chainType: chainType.toString(),
status: {
in: [DepositStatus.DETECTED, DepositStatus.CONFIRMING],
},
},
orderBy: { blockNumber: 'asc' },
});
return records.map(DepositTransactionMapper.toDomain);
}
async findPendingNotification(): Promise<DepositTransaction[]> {
const records = await this.prisma.depositTransaction.findMany({
where: {
status: DepositStatus.CONFIRMED,
notifiedAt: null,
},
orderBy: { createdAt: 'asc' },
take: 100,
});
return records.map(DepositTransactionMapper.toDomain);
}
async findByUserId(userId: bigint, limit: number = 50): Promise<DepositTransaction[]> {
const records = await this.prisma.depositTransaction.findMany({
where: { userId },
orderBy: { createdAt: 'desc' },
take: limit,
});
return records.map(DepositTransactionMapper.toDomain);
}
async existsByTxHash(txHash: TxHash): Promise<boolean> {
const count = await this.prisma.depositTransaction.count({
where: { txHash: txHash.toString() },
});
return count > 0;
}
}

View File

@ -0,0 +1,4 @@
export * from './deposit-transaction.repository.impl';
export * from './monitored-address.repository.impl';
export * from './block-checkpoint.repository.impl';
export * from './transaction-request.repository.impl';

View File

@ -0,0 +1,90 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { MonitoredAddressMapper } from '../mappers/monitored-address.mapper';
import { IMonitoredAddressRepository } from '@/domain/repositories/monitored-address.repository.interface';
import { MonitoredAddress } from '@/domain/aggregates/monitored-address';
import { ChainType, EvmAddress } from '@/domain/value-objects';
@Injectable()
export class MonitoredAddressRepositoryImpl implements IMonitoredAddressRepository {
constructor(private readonly prisma: PrismaService) {}
async save(address: MonitoredAddress): Promise<MonitoredAddress> {
const data = MonitoredAddressMapper.toPersistence(address);
if (address.id) {
const updated = await this.prisma.monitoredAddress.update({
where: { id: address.id },
data,
});
return MonitoredAddressMapper.toDomain(updated);
} else {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id: _, ...createData } = data;
const created = await this.prisma.monitoredAddress.create({
data: createData,
});
return MonitoredAddressMapper.toDomain(created);
}
}
async findById(id: bigint): Promise<MonitoredAddress | null> {
const record = await this.prisma.monitoredAddress.findUnique({
where: { id },
});
return record ? MonitoredAddressMapper.toDomain(record) : null;
}
async findByChainAndAddress(
chainType: ChainType,
address: EvmAddress,
): Promise<MonitoredAddress | null> {
const record = await this.prisma.monitoredAddress.findUnique({
where: {
uk_chain_address: {
chainType: chainType.toString(),
address: address.lowercase,
},
},
});
return record ? MonitoredAddressMapper.toDomain(record) : null;
}
async findActiveByChain(chainType: ChainType): Promise<MonitoredAddress[]> {
const records = await this.prisma.monitoredAddress.findMany({
where: {
chainType: chainType.toString(),
isActive: true,
},
});
return records.map(MonitoredAddressMapper.toDomain);
}
async findByUserId(userId: bigint): Promise<MonitoredAddress[]> {
const records = await this.prisma.monitoredAddress.findMany({
where: { userId },
});
return records.map(MonitoredAddressMapper.toDomain);
}
async existsByChainAndAddress(chainType: ChainType, address: EvmAddress): Promise<boolean> {
const count = await this.prisma.monitoredAddress.count({
where: {
chainType: chainType.toString(),
address: address.lowercase,
},
});
return count > 0;
}
async getAllActiveAddresses(chainType: ChainType): Promise<string[]> {
const records = await this.prisma.monitoredAddress.findMany({
where: {
chainType: chainType.toString(),
isActive: true,
},
select: { address: true },
});
return records.map((r) => r.address.toLowerCase());
}
}

View File

@ -0,0 +1,94 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { TransactionRequestMapper } from '../mappers/transaction-request.mapper';
import { ITransactionRequestRepository } from '@/domain/repositories/transaction-request.repository.interface';
import { TransactionRequest } from '@/domain/aggregates/transaction-request';
import { ChainType, TxHash } from '@/domain/value-objects';
import { TransactionStatus } from '@/domain/enums';
@Injectable()
export class TransactionRequestRepositoryImpl implements ITransactionRequestRepository {
constructor(private readonly prisma: PrismaService) {}
async save(request: TransactionRequest): Promise<TransactionRequest> {
const data = TransactionRequestMapper.toPersistence(request);
if (request.id) {
const updated = await this.prisma.transactionRequest.update({
where: { id: request.id },
data,
});
return TransactionRequestMapper.toDomain(updated);
} else {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id: _, ...createData } = data;
const created = await this.prisma.transactionRequest.create({
data: createData,
});
return TransactionRequestMapper.toDomain(created);
}
}
async findById(id: bigint): Promise<TransactionRequest | null> {
const record = await this.prisma.transactionRequest.findUnique({
where: { id },
});
return record ? TransactionRequestMapper.toDomain(record) : null;
}
async findByTxHash(txHash: TxHash): Promise<TransactionRequest | null> {
const record = await this.prisma.transactionRequest.findFirst({
where: { txHash: txHash.toString() },
});
return record ? TransactionRequestMapper.toDomain(record) : null;
}
async findBySource(
sourceService: string,
sourceOrderId: string,
): Promise<TransactionRequest | null> {
const record = await this.prisma.transactionRequest.findUnique({
where: {
uk_source_order: {
sourceService,
sourceOrderId,
},
},
});
return record ? TransactionRequestMapper.toDomain(record) : null;
}
async findPending(chainType: ChainType): Promise<TransactionRequest[]> {
const records = await this.prisma.transactionRequest.findMany({
where: {
chainType: chainType.toString(),
status: TransactionStatus.PENDING,
},
orderBy: { createdAt: 'asc' },
});
return records.map(TransactionRequestMapper.toDomain);
}
async findBroadcasted(chainType: ChainType): Promise<TransactionRequest[]> {
const records = await this.prisma.transactionRequest.findMany({
where: {
chainType: chainType.toString(),
status: TransactionStatus.BROADCASTED,
},
orderBy: { createdAt: 'asc' },
});
return records.map(TransactionRequestMapper.toDomain);
}
async findRetryable(chainType: ChainType, maxRetries: number): Promise<TransactionRequest[]> {
const records = await this.prisma.transactionRequest.findMany({
where: {
chainType: chainType.toString(),
status: TransactionStatus.FAILED,
retryCount: { lt: maxRetries },
},
orderBy: { createdAt: 'asc' },
});
return records.map(TransactionRequestMapper.toDomain);
}
}

View File

@ -0,0 +1,83 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { RedisService } from './redis.service';
import { ChainType } from '@/domain/value-objects';
/**
*
*
*/
@Injectable()
export class AddressCacheService implements OnModuleInit {
private readonly logger = new Logger(AddressCacheService.name);
private readonly CACHE_KEY_PREFIX = 'blockchain:monitored_addresses:';
constructor(private readonly redis: RedisService) {}
async onModuleInit() {
this.logger.log('AddressCacheService initialized');
}
private getCacheKey(chainType: ChainType): string {
return `${this.CACHE_KEY_PREFIX}${chainType.toString().toLowerCase()}`;
}
/**
*
*/
async isMonitored(chainType: ChainType, address: string): Promise<boolean> {
const key = this.getCacheKey(chainType);
return this.redis.sismember(key, address.toLowerCase());
}
/**
*
*/
async addAddress(chainType: ChainType, address: string): Promise<void> {
const key = this.getCacheKey(chainType);
await this.redis.sadd(key, address.toLowerCase());
this.logger.debug(`Added address to cache: ${chainType} - ${address}`);
}
/**
*
*/
async removeAddress(chainType: ChainType, address: string): Promise<void> {
const key = this.getCacheKey(chainType);
await this.redis.srem(key, address.toLowerCase());
this.logger.debug(`Removed address from cache: ${chainType} - ${address}`);
}
/**
*
*/
async addAddresses(chainType: ChainType, addresses: string[]): Promise<void> {
if (addresses.length === 0) return;
const key = this.getCacheKey(chainType);
const lowercased = addresses.map((a) => a.toLowerCase());
await this.redis.sadd(key, ...lowercased);
this.logger.debug(`Added ${addresses.length} addresses to cache: ${chainType}`);
}
/**
*
*/
async getAllAddresses(chainType: ChainType): Promise<string[]> {
const key = this.getCacheKey(chainType);
return this.redis.smembers(key);
}
/**
*
*/
async reloadCache(chainType: ChainType, addresses: string[]): Promise<void> {
const key = this.getCacheKey(chainType);
// 先删除旧缓存
await this.redis.del(key);
// 重新添加
if (addresses.length > 0) {
const lowercased = addresses.map((a) => a.toLowerCase());
await this.redis.sadd(key, ...lowercased);
}
this.logger.log(`Reloaded cache for ${chainType}: ${addresses.length} addresses`);
}
}

View File

@ -0,0 +1,2 @@
export * from './redis.service';
export * from './address-cache.service';

View File

@ -0,0 +1,67 @@
import { Injectable, OnModuleDestroy, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
@Injectable()
export class RedisService implements OnModuleDestroy {
private readonly logger = new Logger(RedisService.name);
private readonly client: Redis;
constructor(private readonly configService: ConfigService) {
this.client = new Redis({
host: this.configService.get<string>('redis.host'),
port: this.configService.get<number>('redis.port'),
db: this.configService.get<number>('redis.db'),
password: this.configService.get<string>('redis.password') || undefined,
});
this.client.on('connect', () => {
this.logger.log('Redis connected');
});
this.client.on('error', (err) => {
this.logger.error('Redis error', err);
});
}
onModuleDestroy() {
this.client.disconnect();
}
getClient(): Redis {
return this.client;
}
async get(key: string): Promise<string | null> {
return this.client.get(key);
}
async set(key: string, value: string, ttlSeconds?: number): Promise<void> {
if (ttlSeconds) {
await this.client.set(key, value, 'EX', ttlSeconds);
} else {
await this.client.set(key, value);
}
}
async del(key: string): Promise<void> {
await this.client.del(key);
}
async sismember(key: string, member: string): Promise<boolean> {
const result = await this.client.sismember(key, member);
return result === 1;
}
async sadd(key: string, ...members: string[]): Promise<number> {
return this.client.sadd(key, ...members);
}
async srem(key: string, ...members: string[]): Promise<number> {
return this.client.srem(key, ...members);
}
async smembers(key: string): Promise<string[]> {
return this.client.smembers(key);
}
}

View File

@ -0,0 +1,69 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('Bootstrap');
const app = await NestFactory.create(AppModule);
const configService = app.get(ConfigService);
// 全局验证管道
app.useGlobalPipes(
new ValidationPipe({
whitelist: true,
transform: true,
forbidNonWhitelisted: true,
}),
);
// CORS
app.enableCors();
// Swagger 文档
const swaggerConfig = new DocumentBuilder()
.setTitle('Blockchain Service API')
.setDescription('RWA 区块链基础设施服务 API')
.setVersion('1.0')
.addTag('Health', '健康检查')
.addTag('Balance', '余额查询')
.addTag('Internal', '内部接口')
.build();
const document = SwaggerModule.createDocument(app, swaggerConfig);
SwaggerModule.setup('api', app, document);
// Kafka 微服务
const kafkaBrokers = configService.get<string[]>('kafka.brokers') || ['localhost:9092'];
const kafkaGroupId = configService.get<string>('kafka.groupId') || 'blockchain-service-group';
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'blockchain-service',
brokers: kafkaBrokers,
},
consumer: {
groupId: kafkaGroupId,
},
},
});
// 启动微服务
await app.startAllMicroservices();
logger.log('Kafka microservice started');
// 启动 HTTP 服务
const port = configService.get<number>('app.port', 3012);
await app.listen(port);
logger.log(`Blockchain service is running on port ${port}`);
logger.log(`Swagger docs available at http://localhost:${port}/api`);
}
bootstrap();

Some files were not shown because too many files have changed in this diff Show More