rwadurian/backend/services/trading-service/src/application/schedulers/price-broadcast.scheduler.ts

97 lines
3.0 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PriceService } from '../services/price.service';
import { PriceGateway } from '../../api/gateways/price.gateway';
import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository';
/**
* 每秒广播价格更新的调度器
* 使用 setInterval 而非 @Cron因为 @Cron 最小精度是 1 秒,
* 且 setInterval 更适合高频率任务
*/
@Injectable()
export class PriceBroadcastScheduler implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PriceBroadcastScheduler.name);
private intervalId: NodeJS.Timeout | null = null;
private lastPrice: string | null = null;
constructor(
private readonly priceService: PriceService,
private readonly priceGateway: PriceGateway,
private readonly tradingConfigRepository: TradingConfigRepository,
) {}
async onModuleInit() {
this.logger.log('🚀 Price broadcast scheduler initializing...');
// 启动每秒广播
this.startBroadcast();
}
onModuleDestroy() {
this.stopBroadcast();
}
private startBroadcast() {
if (this.intervalId) {
this.logger.warn('⚠️ Broadcast already running, skipping start');
return;
}
// 每秒执行一次
this.intervalId = setInterval(async () => {
await this.broadcastPrice();
}, 1000);
this.logger.log('✅ Price broadcast started (1 second interval)');
}
private stopBroadcast() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
this.logger.log('🛑 Price broadcast stopped');
}
}
private broadcastCount = 0;
private async broadcastPrice() {
try {
// 检查是否有客户端连接,没有则跳过
const clientsCount = this.priceGateway.getConnectedClientsCount();
if (clientsCount === 0) {
// 每60秒打印一次等待日志
this.broadcastCount++;
if (this.broadcastCount % 60 === 0) {
this.logger.debug('⏳ No clients connected, waiting...');
}
return;
}
// 检查交易系统是否激活
const config = await this.tradingConfigRepository.getConfig();
if (!config || !config.isActive) {
this.logger.debug('⏸️ Trading system not active, skipping broadcast');
return;
}
// 获取当前价格
const priceInfo = await this.priceService.getCurrentPrice();
// 广播价格更新
this.priceGateway.broadcastPriceUpdate({
price: priceInfo.price,
burnMultiplier: priceInfo.burnMultiplier,
timestamp: Date.now(),
});
// 记录价格变化(用于调试)
if (this.lastPrice !== priceInfo.price) {
this.logger.log(`💰 Price changed: ${this.lastPrice} -> ${priceInfo.price}`);
this.lastPrice = priceInfo.price;
}
} catch (error) {
this.logger.error('❌ Failed to broadcast price', error);
}
}
}