rwadurian/backend/services/blockchain-service/src/infrastructure/blockchain/block-scanner.service.ts

125 lines
3.8 KiB
TypeScript

import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { EvmProviderAdapter, TransferEvent } from './evm-provider.adapter';
import { AddressCacheService } from '@/infrastructure/redis/address-cache.service';
import { ChainConfigService } from '@/domain/services/chain-config.service';
import { ChainType, BlockNumber } from '@/domain/value-objects';
import { ChainTypeEnum } from '@/domain/enums';
export interface DepositEvent extends TransferEvent {
chainType: ChainTypeEnum;
}
export type DepositHandler = (deposits: DepositEvent[]) => Promise<void>;
/**
* 区块扫描服务
* 定期扫描区块,检测充值交易
*/
@Injectable()
export class BlockScannerService implements OnModuleInit {
private readonly logger = new Logger(BlockScannerService.name);
private readonly scanBatchSize: number;
private depositHandler?: DepositHandler;
private isScanning: Map<ChainTypeEnum, boolean> = new Map();
constructor(
private readonly configService: ConfigService,
private readonly evmProvider: EvmProviderAdapter,
private readonly addressCache: AddressCacheService,
private readonly chainConfig: ChainConfigService,
) {
this.scanBatchSize = this.configService.get<number>('blockchain.scanBatchSize', 100);
}
async onModuleInit() {
// 初始化扫描状态
for (const chainType of this.chainConfig.getSupportedChains()) {
this.isScanning.set(chainType, false);
}
this.logger.log('BlockScannerService initialized');
}
/**
* 注册充值处理器
*/
registerDepositHandler(handler: DepositHandler): void {
this.depositHandler = handler;
this.logger.log('Deposit handler registered');
}
/**
* 扫描指定链的区块
*/
async scanChain(
chainType: ChainType,
fromBlock: BlockNumber,
toBlock: BlockNumber,
): Promise<DepositEvent[]> {
const config = this.chainConfig.getConfig(chainType);
const deposits: DepositEvent[] = [];
// 获取所有监控地址
const monitoredAddresses = await this.addressCache.getAllAddresses(chainType);
const addressSet = new Set(monitoredAddresses.map((a) => a.toLowerCase()));
if (addressSet.size === 0) {
this.logger.debug(`No monitored addresses for ${chainType}, skipping scan`);
return deposits;
}
// 扫描 USDT Transfer 事件
const events = await this.evmProvider.scanTransferEvents(
chainType,
fromBlock,
toBlock,
config.usdtContract,
);
// 过滤出充值到监控地址的交易
for (const event of events) {
if (addressSet.has(event.to.toLowerCase())) {
deposits.push({
...event,
chainType: chainType.value,
});
this.logger.log(
`Deposit detected: ${event.txHash} -> ${event.to} (${event.value.toString()})`,
);
}
}
return deposits;
}
/**
* 执行单次扫描(供应用层调用)
*/
async executeScan(
chainType: ChainType,
lastScannedBlock: BlockNumber,
): Promise<{ deposits: DepositEvent[]; newLastBlock: BlockNumber }> {
const currentBlock = await this.evmProvider.getCurrentBlockNumber(chainType);
// 计算扫描范围
const fromBlock = lastScannedBlock.add(1);
let toBlock = fromBlock.add(this.scanBatchSize - 1);
// 不超过当前区块
if (toBlock.isGreaterThan(currentBlock)) {
toBlock = currentBlock;
}
// 如果没有新区块,返回空
if (fromBlock.isGreaterThan(currentBlock)) {
return { deposits: [], newLastBlock: lastScannedBlock };
}
this.logger.debug(`Scanning ${chainType}: blocks ${fromBlock} to ${toBlock}`);
const deposits = await this.scanChain(chainType, fromBlock, toBlock);
return { deposits, newLastBlock: toBlock };
}
}