feat(mining-blockchain): RPC端点自动故障转移,连续失败3分钟后切换备选节点

问题:Kava主网RPC (evm.kava.io) 偶发503,导致mining-blockchain-service
所有链上操作失败(转账、余额查询、区块扫描等)。

方案:新增RpcProviderManager单例服务,统一管理各链的JsonRpcProvider实例,
当某个RPC端点连续失败超过3分钟后自动轮转到下一个备选端点。

新增文件:
- rpc-provider-manager.service.ts: 核心故障转移管理器
  · 每条链维护 provider/urls/currentIndex/failureState
  · reportSuccess() 重置失败状态
  · reportFailure() 记录失败,>=3分钟触发 switchToNextUrl()
  · 轮转创建新 JsonRpcProvider,替换旧实例
  · 每30秒记录一次失败日志,避免日志刷屏

修改文件:
- blockchain.config.ts: 新增 rpcUrls 配置字段(KAVA_RPC_URLS/BSC_RPC_URLS)
- chain-config.service.ts: 解析逗号分隔的URL列表,回退到单个rpcUrl
- domain.module.ts: 注册并导出 RpcProviderManager
- index.ts: 导出 RpcProviderManager
- evm-provider.adapter.ts: 委托RpcProviderManager获取provider,
  所有公开方法通过executeWithFailover包裹,自动上报成功/失败
- erc20-transfer.service.ts: 移除本地providers Map,改用RpcProviderManager,
  新增isRpcConnectionError()区分RPC网络错误与合约执行错误
- docker-compose.2.0.yml: 添加KAVA_RPC_URLS默认4个端点
- .env.example: 添加KAVA_RPC_URLS配置说明

默认端点仍为 evm.kava.io,备选: evm.kava-rpc.com,
kava-evm-rpc.publicnode.com, rpc.ankr.com/kava_evm

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-01 07:06:31 -08:00
parent ef663c0c08
commit 3635369a8a
9 changed files with 451 additions and 141 deletions

View File

@ -284,6 +284,8 @@ services:
NETWORK_MODE: ${NETWORK_MODE:-mainnet}
# KAVA 配置
KAVA_RPC_URL: ${KAVA_RPC_URL:-https://evm.kava.io}
# RPC 故障转移:逗号分隔的多个端点,主端点失败 3 分钟后自动切换
KAVA_RPC_URLS: ${KAVA_RPC_URLS:-https://evm.kava.io,https://evm.kava-rpc.com,https://kava-evm-rpc.publicnode.com,https://rpc.ankr.com/kava_evm}
KAVA_CHAIN_ID: ${KAVA_CHAIN_ID:-2222}
KAVA_USDT_CONTRACT: ${KAVA_USDT_CONTRACT:-0xA9F3A35dBa8699c8C681D8db03F0c1A8CEB9D7c3}
# 积分股合约 (eUSDT - Energy USDT)

View File

@ -46,6 +46,10 @@ KAFKA_GROUP_ID=mining-blockchain-service-group
# =============================================================================
# Official KAVA EVM RPC endpoint
KAVA_RPC_URL=https://evm.kava.io
# RPC 故障转移端点列表(逗号分隔,可选)
# 当主端点持续失败 3 分钟后,自动切换到下一个备选端点
# 不配置时仅使用 KAVA_RPC_URL 单个端点
# KAVA_RPC_URLS=https://evm.kava.io,https://evm.kava-rpc.com,https://kava-evm-rpc.publicnode.com,https://rpc.ankr.com/kava_evm
KAVA_CHAIN_ID=2222
# dUSDT (绿积分) 合约地址 - Durian USDT, 精度6位
# 合约链接: https://kavascan.com/address/0xA9F3A35dBa8699c8C681D8db03F0c1A8CEB9D7c3

View File

@ -35,6 +35,8 @@ export default registerAs('blockchain', () => {
? {
// KAVA Testnet
rpcUrl: process.env.KAVA_RPC_URL || 'https://evm.testnet.kava.io',
// 逗号分隔的多个 RPC URL用于故障转移可选不配置则仅使用 rpcUrl
rpcUrls: process.env.KAVA_RPC_URLS || '',
chainId: parseInt(process.env.KAVA_CHAIN_ID || '2221', 10),
// 测试网 USDT 合约 (自定义部署的 TestUSDT)
usdtContract: process.env.KAVA_USDT_CONTRACT || '0xc12f6A4A7Fd0965085B044A67a39CcA2ff7fe0dF',
@ -47,6 +49,8 @@ export default registerAs('blockchain', () => {
: {
// KAVA Mainnet
rpcUrl: process.env.KAVA_RPC_URL || 'https://evm.kava.io',
// 逗号分隔的多个 RPC URL用于故障转移可选不配置则仅使用 rpcUrl
rpcUrls: process.env.KAVA_RPC_URLS || '',
chainId: parseInt(process.env.KAVA_CHAIN_ID || '2222', 10),
// dUSDT (绿积分) 合约地址 - Durian USDT, 精度6位旧版本保留兼容
usdtContract: process.env.KAVA_USDT_CONTRACT || '0xA9F3A35dBa8699c8C681D8db03F0c1A8CEB9D7c3',
@ -62,6 +66,7 @@ export default registerAs('blockchain', () => {
? {
// BSC Testnet (BNB Smart Chain Testnet)
rpcUrl: process.env.BSC_RPC_URL || 'https://data-seed-prebsc-1-s1.binance.org:8545',
rpcUrls: process.env.BSC_RPC_URLS || '',
chainId: parseInt(process.env.BSC_CHAIN_ID || '97', 10),
// BSC Testnet 官方测试 USDT 合约
usdtContract: process.env.BSC_USDT_CONTRACT || '0x337610d27c682E347C9cD60BD4b3b107C9d34dDd',
@ -70,6 +75,7 @@ export default registerAs('blockchain', () => {
: {
// BSC Mainnet
rpcUrl: process.env.BSC_RPC_URL || 'https://bsc-dataseed.binance.org',
rpcUrls: process.env.BSC_RPC_URLS || '',
chainId: parseInt(process.env.BSC_CHAIN_ID || '56', 10),
usdtContract: process.env.BSC_USDT_CONTRACT || '0x55d398326f99059fF775485246999027B3197955',
confirmations: parseInt(process.env.BSC_CONFIRMATIONS || '15', 10),

View File

@ -1,9 +1,9 @@
import { Module } from '@nestjs/common';
import { ConfirmationPolicyService, ChainConfigService } from './services';
import { ConfirmationPolicyService, ChainConfigService, RpcProviderManager } from './services';
import { Erc20TransferService } from './services/erc20-transfer.service';
@Module({
providers: [ConfirmationPolicyService, ChainConfigService, Erc20TransferService],
exports: [ConfirmationPolicyService, ChainConfigService, Erc20TransferService],
providers: [ConfirmationPolicyService, ChainConfigService, RpcProviderManager, Erc20TransferService],
exports: [ConfirmationPolicyService, ChainConfigService, RpcProviderManager, Erc20TransferService],
})
export class DomainModule {}

View File

@ -7,6 +7,8 @@ export interface ChainConfig {
chainType: ChainTypeEnum;
chainId: number;
rpcUrl: string;
/** RPC URL 列表(含主端点和备选端点),用于故障转移 */
rpcUrls: string[];
usdtContract: string;
eUsdtContract: string; // 积分股代币 (Energy USDT)
fUsdtContract: string; // 积分值代币 (Future USDT)
@ -44,6 +46,13 @@ export class ChainConfigService {
'blockchain.kava.rpcUrl',
this.isTestnet ? 'https://evm.testnet.kava.io' : 'https://evm.kava.io',
),
rpcUrls: this.parseRpcUrls(
'blockchain.kava.rpcUrls',
this.configService.get<string>(
'blockchain.kava.rpcUrl',
this.isTestnet ? 'https://evm.testnet.kava.io' : 'https://evm.kava.io',
),
),
// dUSDT (绿积分) 合约地址 - Durian USDT, 精度6位
usdtContract: this.configService.get<string>(
'blockchain.kava.usdtContract',
@ -73,6 +82,13 @@ export class ChainConfigService {
'blockchain.bsc.rpcUrl',
this.isTestnet ? 'https://data-seed-prebsc-1-s1.binance.org:8545' : 'https://bsc-dataseed.binance.org',
),
rpcUrls: this.parseRpcUrls(
'blockchain.bsc.rpcUrls',
this.configService.get<string>(
'blockchain.bsc.rpcUrl',
this.isTestnet ? 'https://data-seed-prebsc-1-s1.binance.org:8545' : 'https://bsc-dataseed.binance.org',
),
),
usdtContract: this.configService.get<string>(
'blockchain.bsc.usdtContract',
this.isTestnet ? '0x337610d27c682E347C9cD60BD4b3b107C9d34dDd' : '0x55d398326f99059fF775485246999027B3197955',
@ -129,4 +145,24 @@ export class ChainConfigService {
isSupported(chainType: ChainType): boolean {
return this.configs.has(chainType.value);
}
/**
* RPC URL
*
* URL KAVA_RPC_URLS使
* 退 rpcUrl
*/
private parseRpcUrls(configKey: string, fallbackUrl: string): string[] {
const urlsStr = this.configService.get<string>(configKey, '');
if (urlsStr) {
const urls = urlsStr
.split(',')
.map((u) => u.trim())
.filter((u) => u.length > 0);
if (urls.length > 0) {
return urls;
}
}
return [fallbackUrl];
}
}

View File

@ -10,6 +10,7 @@ import {
recoverAddress,
} from 'ethers';
import { ChainConfigService } from './chain-config.service';
import { RpcProviderManager } from './rpc-provider-manager.service';
import { ChainType } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
@ -59,7 +60,6 @@ export const MPC_SIGNING_CLIENT = Symbol('MPC_SIGNING_CLIENT');
@Injectable()
export class Erc20TransferService {
private readonly logger = new Logger(Erc20TransferService.name);
private readonly providers: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
// C2C Bot 热钱包地址
private readonly hotWalletAddress: string;
// eUSDT (积分股) 做市商钱包地址
@ -71,11 +71,12 @@ export class Erc20TransferService {
constructor(
private readonly configService: ConfigService,
private readonly chainConfig: ChainConfigService,
private readonly rpcProviderManager: RpcProviderManager,
) {
this.hotWalletAddress = this.configService.get<string>('HOT_WALLET_ADDRESS', '');
this.eusdtMarketMakerAddress = this.configService.get<string>('EUSDT_MARKET_MAKER_ADDRESS', '');
this.fusdtMarketMakerAddress = this.configService.get<string>('FUSDT_MARKET_MAKER_ADDRESS', '');
this.initializeProviders();
this.initializeWalletConfig();
}
/**
@ -86,19 +87,40 @@ export class Erc20TransferService {
this.logger.log(`[INIT] MPC Signing Client injected`);
}
private initializeProviders(): void {
// 为每条支持的链创建 Provider
for (const chainType of this.chainConfig.getSupportedChains()) {
try {
const config = this.chainConfig.getConfig(ChainType.fromEnum(chainType));
const provider = new JsonRpcProvider(config.rpcUrl, config.chainId);
this.providers.set(chainType, provider);
this.logger.log(`[INIT] Provider initialized for ${chainType}: ${config.rpcUrl}`);
} catch (error) {
this.logger.error(`[INIT] Failed to initialize provider for ${chainType}`, error);
}
}
/**
* provider RpcProviderManager
*/
private getProvider(chainType: ChainTypeEnum): JsonRpcProvider {
return this.rpcProviderManager.getProvider(chainType);
}
/**
* RPC
* RPC 503revert
*/
private isRpcConnectionError(error: any): boolean {
const message = (error?.message || '').toLowerCase();
return (
message.includes('could not detect network') ||
message.includes('connection refused') ||
message.includes('timeout') ||
message.includes('econnrefused') ||
message.includes('enotfound') ||
message.includes('503') ||
message.includes('502') ||
message.includes('server error') ||
message.includes('missing response') ||
message.includes('request failed') ||
error?.code === 'NETWORK_ERROR' ||
error?.code === 'SERVER_ERROR' ||
error?.code === 'TIMEOUT'
);
}
/**
* provider RpcProviderManager
*/
private initializeWalletConfig(): void {
// 检查热钱包地址配置
if (this.hotWalletAddress) {
this.logger.log(`[INIT] C2C Bot wallet address configured: ${this.hotWalletAddress}`);
@ -147,10 +169,7 @@ export class Erc20TransferService {
* USDT
*/
async getHotWalletBalance(chainType: ChainTypeEnum): Promise<string> {
const provider = this.providers.get(chainType);
if (!provider) {
throw new Error(`Provider not configured for chain: ${chainType}`);
}
const provider = this.getProvider(chainType);
if (!this.hotWalletAddress) {
throw new Error('Hot wallet address not configured');
@ -183,12 +202,7 @@ export class Erc20TransferService {
this.logger.log(`[TRANSFER] To: ${toAddress}`);
this.logger.log(`[TRANSFER] Amount: ${amount} USDT`);
const provider = this.providers.get(chainType);
if (!provider) {
const error = `Provider not configured for chain: ${chainType}`;
this.logger.error(`[TRANSFER] ${error}`);
return { success: false, error };
}
const provider = this.getProvider(chainType);
if (!this.mpcSigningClient || !this.mpcSigningClient.isConfigured()) {
const error = 'MPC signing client not configured';
@ -343,6 +357,7 @@ export class Erc20TransferService {
this.logger.log(`[TRANSFER] Block: ${receipt.blockNumber}`);
this.logger.log(`[TRANSFER] Gas used: ${receipt.gasUsed.toString()}`);
this.rpcProviderManager.reportSuccess(chainType);
return {
success: true,
txHash: txResponse.hash,
@ -355,6 +370,9 @@ export class Erc20TransferService {
return { success: false, txHash: txResponse.hash, error };
}
} catch (error: any) {
if (this.isRpcConnectionError(error)) {
this.rpcProviderManager.reportFailure(chainType, error);
}
this.logger.error(`[TRANSFER] Transfer failed:`, error);
return {
success: false,
@ -384,10 +402,7 @@ export class Erc20TransferService {
*
*/
async getTokenBalance(chainType: ChainTypeEnum, tokenType: TokenType): Promise<string> {
const provider = this.providers.get(chainType);
if (!provider) {
throw new Error(`Provider not configured for chain: ${chainType}`);
}
const provider = this.getProvider(chainType);
if (!this.hotWalletAddress) {
throw new Error('Hot wallet address not configured');
@ -426,12 +441,7 @@ export class Erc20TransferService {
this.logger.log(`[TRANSFER] To: ${toAddress}`);
this.logger.log(`[TRANSFER] Amount: ${amount} ${tokenType}`);
const provider = this.providers.get(chainType);
if (!provider) {
const error = `Provider not configured for chain: ${chainType}`;
this.logger.error(`[TRANSFER] ${error}`);
return { success: false, error };
}
const provider = this.getProvider(chainType);
if (!this.mpcSigningClient || !this.mpcSigningClient.isConfigured()) {
const error = 'MPC signing client not configured';
@ -576,6 +586,7 @@ export class Erc20TransferService {
this.logger.log(`[TRANSFER] Block: ${receipt.blockNumber}`);
this.logger.log(`[TRANSFER] Gas used: ${receipt.gasUsed.toString()}`);
this.rpcProviderManager.reportSuccess(chainType);
return {
success: true,
txHash: txResponse.hash,
@ -588,6 +599,9 @@ export class Erc20TransferService {
return { success: false, txHash: txResponse.hash, error };
}
} catch (error: any) {
if (this.isRpcConnectionError(error)) {
this.rpcProviderManager.reportFailure(chainType, error);
}
this.logger.error(`[TRANSFER] Transfer failed:`, error);
return {
success: false,
@ -600,27 +614,36 @@ export class Erc20TransferService {
* C2C Bot
*/
isConfigured(chainType: ChainTypeEnum): boolean {
return this.providers.has(chainType) &&
!!this.hotWalletAddress &&
!!this.mpcSigningClient?.isConfigured();
try {
this.rpcProviderManager.getProvider(chainType);
return !!this.hotWalletAddress && !!this.mpcSigningClient?.isConfigured();
} catch {
return false;
}
}
/**
* eUSDT
*/
isEusdtMarketMakerConfigured(chainType: ChainTypeEnum): boolean {
return this.providers.has(chainType) &&
!!this.eusdtMarketMakerAddress &&
!!this.mpcSigningClient?.isEusdtMarketMakerConfigured();
try {
this.rpcProviderManager.getProvider(chainType);
return !!this.eusdtMarketMakerAddress && !!this.mpcSigningClient?.isEusdtMarketMakerConfigured();
} catch {
return false;
}
}
/**
* fUSDT
*/
isFusdtMarketMakerConfigured(chainType: ChainTypeEnum): boolean {
return this.providers.has(chainType) &&
!!this.fusdtMarketMakerAddress &&
!!this.mpcSigningClient?.isFusdtMarketMakerConfigured();
try {
this.rpcProviderManager.getProvider(chainType);
return !!this.fusdtMarketMakerAddress && !!this.mpcSigningClient?.isFusdtMarketMakerConfigured();
} catch {
return false;
}
}
/**
@ -649,12 +672,7 @@ export class Erc20TransferService {
this.logger.log(`[MM-TRANSFER] To: ${toAddress}`);
this.logger.log(`[MM-TRANSFER] Amount: ${amount} ${tokenType}`);
const provider = this.providers.get(chainType);
if (!provider) {
const error = `Provider not configured for chain: ${chainType}`;
this.logger.error(`[MM-TRANSFER] ${error}`);
return { success: false, error };
}
const provider = this.getProvider(chainType);
// 检查对应钱包是否配置
if (isEusdt) {
@ -810,6 +828,7 @@ export class Erc20TransferService {
this.logger.log(`[MM-TRANSFER] Block: ${receipt.blockNumber}`);
this.logger.log(`[MM-TRANSFER] Gas used: ${receipt.gasUsed.toString()}`);
this.rpcProviderManager.reportSuccess(chainType);
return {
success: true,
txHash: txResponse.hash,
@ -822,6 +841,9 @@ export class Erc20TransferService {
return { success: false, txHash: txResponse.hash, error };
}
} catch (error: any) {
if (this.isRpcConnectionError(error)) {
this.rpcProviderManager.reportFailure(chainType, error);
}
this.logger.error(`[MM-TRANSFER] Transfer failed:`, error);
return {
success: false,
@ -834,10 +856,7 @@ export class Erc20TransferService {
* eUSDT
*/
async getEusdtMarketMakerTokenBalance(chainType: ChainTypeEnum): Promise<string> {
const provider = this.providers.get(chainType);
if (!provider) {
throw new Error(`Provider not configured for chain: ${chainType}`);
}
const provider = this.getProvider(chainType);
if (!this.eusdtMarketMakerAddress) {
throw new Error('eUSDT Market Maker wallet address not configured');
@ -859,10 +878,7 @@ export class Erc20TransferService {
* fUSDT
*/
async getFusdtMarketMakerTokenBalance(chainType: ChainTypeEnum): Promise<string> {
const provider = this.providers.get(chainType);
if (!provider) {
throw new Error(`Provider not configured for chain: ${chainType}`);
}
const provider = this.getProvider(chainType);
if (!this.fusdtMarketMakerAddress) {
throw new Error('fUSDT Market Maker wallet address not configured');

View File

@ -1,2 +1,3 @@
export * from './confirmation-policy.service';
export * from './chain-config.service';
export * from './rpc-provider-manager.service';

View File

@ -0,0 +1,213 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { JsonRpcProvider } from 'ethers';
import { ChainConfigService } from './chain-config.service';
import { ChainTypeEnum } from '@/domain/enums';
/**
* RPC
*/
interface RpcHealthState {
/** 当前活跃的 JsonRpcProvider 实例 */
provider: JsonRpcProvider;
/** 该链可用的所有 RPC URL 列表(第一个为默认主端点) */
urls: string[];
/** 当前使用的 URL 在 urls 数组中的索引 */
currentIndex: number;
/** 该链的 chainId用于创建新 provider */
chainId: number;
/** 首次连续失败的时间戳null 表示当前健康) */
firstFailureAt: number | null;
/** 连续失败次数(用于日志) */
consecutiveFailures: number;
}
/**
* RPC Provider
*
* JsonRpcProvider RPC
* FAILOVER_THRESHOLD_MS 3
*
*
* 使:
* - EvmProviderAdapter Erc20TransferService provider
* - RPC reportSuccess(chain)
* - RPC reportFailure(chain, error)
* - URL
*
* :
* - KAVA_RPC_URLS: 逗号分隔的多个 Kava RPC URL使 KAVA_RPC_URL
* - BSC_RPC_URLS: 逗号分隔的多个 BSC RPC URL使 BSC_RPC_URL
*/
@Injectable()
export class RpcProviderManager implements OnModuleInit {
private readonly logger = new Logger(RpcProviderManager.name);
private readonly healthStates: Map<ChainTypeEnum, RpcHealthState> = new Map();
/** 持续失败多久后触发端点切换(毫秒),默认 3 分钟 */
private readonly FAILOVER_THRESHOLD_MS = 3 * 60 * 1000;
constructor(private readonly chainConfig: ChainConfigService) {}
onModuleInit(): void {
this.initializeAllChains();
}
/**
* provider
* ChainConfig rpcUrls provider
*/
private initializeAllChains(): void {
for (const chainType of this.chainConfig.getSupportedChains()) {
const config = this.chainConfig.getConfig(
// ChainType value object 需要从 enum 创建
{ value: chainType, toString: () => chainType } as any,
);
const urls = config.rpcUrls;
const primaryUrl = urls[0];
const provider = new JsonRpcProvider(primaryUrl, config.chainId);
this.healthStates.set(chainType, {
provider,
urls,
currentIndex: 0,
chainId: config.chainId,
firstFailureAt: null,
consecutiveFailures: 0,
});
if (urls.length > 1) {
this.logger.log(
`[INIT] ${chainType} RPC 端点列表 (${urls.length} 个): ${urls.join(', ')}`,
);
} else {
this.logger.log(
`[INIT] ${chainType} RPC 端点: ${primaryUrl}(未配置备选端点)`,
);
}
}
}
/**
* provider
*
* @param chain
* @returns JsonRpcProvider
* @throws Error
*/
getProvider(chain: ChainTypeEnum): JsonRpcProvider {
const state = this.healthStates.get(chain);
if (!state) {
throw new Error(`RPC Provider 未初始化: ${chain}`);
}
return state.provider;
}
/**
* 使 RPC URL/
*/
getCurrentUrl(chain: ChainTypeEnum): string {
const state = this.healthStates.get(chain);
return state ? state.urls[state.currentIndex] : 'unknown';
}
/**
* RPC URL
*/
getUrlCount(chain: ChainTypeEnum): number {
const state = this.healthStates.get(chain);
return state ? state.urls.length : 0;
}
/**
* RPC
*
*/
reportSuccess(chain: ChainTypeEnum): void {
const state = this.healthStates.get(chain);
if (!state) return;
// 如果之前处于失败状态,记录恢复日志
if (state.firstFailureAt !== null) {
this.logger.log(
`[${chain}] RPC 恢复正常: ${state.urls[state.currentIndex]}` +
` (之前连续失败 ${state.consecutiveFailures} 次)`,
);
}
state.firstFailureAt = null;
state.consecutiveFailures = 0;
}
/**
* RPC
*
* FAILOVER_THRESHOLD_MS
*
*
* @param chain
* @param error
*/
reportFailure(chain: ChainTypeEnum, error?: Error): void {
const state = this.healthStates.get(chain);
if (!state) return;
const now = Date.now();
state.consecutiveFailures++;
// 首次失败:记录起始时间
if (state.firstFailureAt === null) {
state.firstFailureAt = now;
this.logger.warn(
`[${chain}] RPC 开始失败: ${state.urls[state.currentIndex]}` +
`${error?.message || 'unknown error'}`,
);
return;
}
// 检查是否超过故障转移阈值
const elapsedMs = now - state.firstFailureAt;
if (elapsedMs >= this.FAILOVER_THRESHOLD_MS) {
this.switchToNextUrl(chain, state);
} else {
// 每 30 秒输出一条持续失败日志,避免日志洪水
if (state.consecutiveFailures % 6 === 0) {
this.logger.warn(
`[${chain}] RPC 持续失败中 (${Math.round(elapsedMs / 1000)}s / ` +
`${this.FAILOVER_THRESHOLD_MS / 1000}s): ` +
`${state.urls[state.currentIndex]}`,
);
}
}
}
/**
* RPC URL
*
* urls JsonRpcProvider
* URL provider
*/
private switchToNextUrl(chain: ChainTypeEnum, state: RpcHealthState): void {
const oldUrl = state.urls[state.currentIndex];
// 轮转到下一个 URL
state.currentIndex = (state.currentIndex + 1) % state.urls.length;
const newUrl = state.urls[state.currentIndex];
if (state.urls.length === 1) {
this.logger.error(
`[${chain}] 仅有一个 RPC URL无法切换到备选端点将重新创建 provider: ${newUrl}`,
);
} else {
this.logger.warn(
`[${chain}] === RPC 端点切换 === ${oldUrl}${newUrl}`,
);
}
// 创建新的 provider 实例ethers.js v6 的 JsonRpcProvider 创建后 URL 不可变)
state.provider = new JsonRpcProvider(newUrl, state.chainId);
// 重置失败状态,给新端点一个全新的 3 分钟窗口
state.firstFailureAt = null;
state.consecutiveFailures = 0;
}
}

View File

@ -1,6 +1,6 @@
import { Injectable, Logger } from '@nestjs/common';
import { JsonRpcProvider, Contract } from 'ethers';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import { RpcProviderManager } from '@/domain/services/rpc-provider-manager.service';
import { ChainType, BlockNumber, TokenAmount } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
@ -28,53 +28,71 @@ export interface TransferEvent {
/**
* EVM
* EVM
*
* EVM RpcProviderManager provider
* RPC /
*/
@Injectable()
export class EvmProviderAdapter {
private readonly logger = new Logger(EvmProviderAdapter.name);
private readonly providers: Map<ChainTypeEnum, JsonRpcProvider> = new Map();
constructor(private readonly chainConfig: ChainConfigService) {
this.initializeProviders();
}
private initializeProviders(): void {
for (const chainType of this.chainConfig.getSupportedChains()) {
const config = this.chainConfig.getConfig(ChainType.fromEnum(chainType));
const provider = new JsonRpcProvider(config.rpcUrl, config.chainId);
this.providers.set(chainType, provider);
this.logger.log(`Initialized provider for ${chainType}: ${config.rpcUrl}`);
}
}
constructor(private readonly rpcProviderManager: RpcProviderManager) {}
/**
* provider RpcProviderManager
*/
private getProvider(chainType: ChainType): JsonRpcProvider {
const provider = this.providers.get(chainType.value);
if (!provider) {
throw new Error(`No provider for chain: ${chainType.toString()}`);
return this.rpcProviderManager.getProvider(chainType.value);
}
/**
* RPC /
*
* RPC
* - reportSuccess()
* - reportFailure() 3
* - re-throw
*/
private async executeWithFailover<T>(
chainType: ChainType,
operation: () => Promise<T>,
): Promise<T> {
try {
const result = await operation();
this.rpcProviderManager.reportSuccess(chainType.value);
return result;
} catch (error) {
this.rpcProviderManager.reportFailure(
chainType.value,
error instanceof Error ? error : new Error(String(error)),
);
throw error;
}
return provider;
}
/**
*
*/
async getCurrentBlockNumber(chainType: ChainType): Promise<BlockNumber> {
const provider = this.getProvider(chainType);
const blockNumber = await provider.getBlockNumber();
return BlockNumber.create(blockNumber);
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const blockNumber = await provider.getBlockNumber();
return BlockNumber.create(blockNumber);
});
}
/**
*
*/
async getBlockTimestamp(chainType: ChainType, blockNumber: BlockNumber): Promise<Date> {
const provider = this.getProvider(chainType);
const block = await provider.getBlock(blockNumber.asNumber);
if (!block) {
throw new Error(`Block not found: ${blockNumber.toString()}`);
}
return new Date(block.timestamp * 1000);
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const block = await provider.getBlock(blockNumber.asNumber);
if (!block) {
throw new Error(`Block not found: ${blockNumber.toString()}`);
}
return new Date(block.timestamp * 1000);
});
}
/**
@ -86,38 +104,40 @@ export class EvmProviderAdapter {
toBlock: BlockNumber,
tokenContract: string,
): Promise<TransferEvent[]> {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_TRANSFER_EVENT_ABI, provider);
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_TRANSFER_EVENT_ABI, provider);
const filter = contract.filters.Transfer();
const logs = await contract.queryFilter(filter, fromBlock.asNumber, toBlock.asNumber);
const filter = contract.filters.Transfer();
const logs = await contract.queryFilter(filter, fromBlock.asNumber, toBlock.asNumber);
const events: TransferEvent[] = [];
const events: TransferEvent[] = [];
for (const log of logs) {
const block = await provider.getBlock(log.blockNumber);
if (!block) continue;
for (const log of logs) {
const block = await provider.getBlock(log.blockNumber);
if (!block) continue;
const parsedLog = contract.interface.parseLog({
topics: log.topics as string[],
data: log.data,
});
if (parsedLog) {
events.push({
txHash: log.transactionHash,
logIndex: log.index,
blockNumber: BigInt(log.blockNumber),
blockTimestamp: new Date(block.timestamp * 1000),
from: parsedLog.args[0],
to: parsedLog.args[1],
value: parsedLog.args[2],
tokenContract,
const parsedLog = contract.interface.parseLog({
topics: log.topics as string[],
data: log.data,
});
}
}
return events;
if (parsedLog) {
events.push({
txHash: log.transactionHash,
logIndex: log.index,
blockNumber: BigInt(log.blockNumber),
blockTimestamp: new Date(block.timestamp * 1000),
from: parsedLog.args[0],
to: parsedLog.args[1],
value: parsedLog.args[2],
tokenContract,
});
}
}
return events;
});
}
/**
@ -128,42 +148,50 @@ export class EvmProviderAdapter {
tokenContract: string,
address: string,
): Promise<TokenAmount> {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_BALANCE_ABI, provider);
const [balance, decimals] = await Promise.all([
contract.balanceOf(address),
contract.decimals(),
]);
return TokenAmount.fromRaw(balance, Number(decimals));
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_BALANCE_ABI, provider);
const [balance, decimals] = await Promise.all([
contract.balanceOf(address),
contract.decimals(),
]);
return TokenAmount.fromRaw(balance, Number(decimals));
});
}
/**
* ERC20 decimals
*/
async getTokenDecimals(chainType: ChainType, tokenContract: string): Promise<number> {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_BALANCE_ABI, provider);
const decimals = await contract.decimals();
return Number(decimals);
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const contract = new Contract(tokenContract, ERC20_BALANCE_ABI, provider);
const decimals = await contract.decimals();
return Number(decimals);
});
}
/**
*
*/
async getNativeBalance(chainType: ChainType, address: string): Promise<TokenAmount> {
const provider = this.getProvider(chainType);
const balance = await provider.getBalance(address);
return TokenAmount.fromRaw(balance, 18);
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const balance = await provider.getBalance(address);
return TokenAmount.fromRaw(balance, 18);
});
}
/**
* 广
*/
async broadcastTransaction(chainType: ChainType, signedTx: string): Promise<string> {
const provider = this.getProvider(chainType);
const txResponse = await provider.broadcastTransaction(signedTx);
this.logger.log(`Transaction broadcasted: ${txResponse.hash}`);
return txResponse.hash;
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const txResponse = await provider.broadcastTransaction(signedTx);
this.logger.log(`Transaction broadcasted: ${txResponse.hash}`);
return txResponse.hash;
});
}
/**
@ -174,9 +202,11 @@ export class EvmProviderAdapter {
txHash: string,
confirmations: number = 1,
): Promise<boolean> {
const provider = this.getProvider(chainType);
const receipt = await provider.waitForTransaction(txHash, confirmations);
return receipt !== null && receipt.status === 1;
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const receipt = await provider.waitForTransaction(txHash, confirmations);
return receipt !== null && receipt.status === 1;
});
}
/**
@ -187,12 +217,14 @@ export class EvmProviderAdapter {
txHash: string,
requiredConfirmations: number,
): Promise<boolean> {
const provider = this.getProvider(chainType);
const receipt = await provider.getTransactionReceipt(txHash);
if (!receipt) return false;
return this.executeWithFailover(chainType, async () => {
const provider = this.getProvider(chainType);
const receipt = await provider.getTransactionReceipt(txHash);
if (!receipt) return false;
const currentBlock = await provider.getBlockNumber();
const confirmations = currentBlock - receipt.blockNumber;
return confirmations >= requiredConfirmations;
const currentBlock = await provider.getBlockNumber();
const confirmations = currentBlock - receipt.blockNumber;
return confirmations >= requiredConfirmations;
});
}
}