150 lines
4.5 KiB
TypeScript
150 lines
4.5 KiB
TypeScript
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||
import { MiningDistributionService } from '../services/mining-distribution.service';
|
||
import { NetworkSyncService } from '../services/network-sync.service';
|
||
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
|
||
import { RedisService } from '../../infrastructure/redis/redis.service';
|
||
|
||
@Injectable()
|
||
export class MiningScheduler implements OnModuleInit {
|
||
private readonly logger = new Logger(MiningScheduler.name);
|
||
|
||
constructor(
|
||
private readonly distributionService: MiningDistributionService,
|
||
private readonly networkSyncService: NetworkSyncService,
|
||
private readonly prisma: PrismaService,
|
||
private readonly redis: RedisService,
|
||
private readonly configService: ConfigService,
|
||
) {}
|
||
|
||
async onModuleInit() {
|
||
this.logger.log('Mining scheduler initialized');
|
||
// 启动时同步全网数据
|
||
await this.syncNetworkData();
|
||
}
|
||
|
||
/**
|
||
* 每秒执行挖矿分配
|
||
*/
|
||
@Cron(CronExpression.EVERY_SECOND)
|
||
async executeSecondDistribution(): Promise<void> {
|
||
try {
|
||
await this.distributionService.executeSecondDistribution();
|
||
} catch (error) {
|
||
this.logger.error('Failed to execute second distribution', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 每天凌晨0点生成每日统计
|
||
*/
|
||
@Cron('0 0 * * *')
|
||
async generateDailyStats(): Promise<void> {
|
||
const lockValue = await this.redis.acquireLock('mining:daily-stats:lock', 300);
|
||
if (!lockValue) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const yesterday = new Date();
|
||
yesterday.setDate(yesterday.getDate() - 1);
|
||
yesterday.setHours(0, 0, 0, 0);
|
||
|
||
const endOfYesterday = new Date(yesterday);
|
||
endOfYesterday.setHours(23, 59, 59, 999);
|
||
|
||
// 聚合昨天的分钟统计
|
||
const minuteStats = await this.prisma.minuteMiningStat.aggregate({
|
||
where: {
|
||
minute: {
|
||
gte: yesterday,
|
||
lte: endOfYesterday,
|
||
},
|
||
},
|
||
_sum: {
|
||
totalDistributed: true,
|
||
burnAmount: true,
|
||
},
|
||
_avg: {
|
||
totalContribution: true,
|
||
},
|
||
_max: {
|
||
participantCount: true,
|
||
},
|
||
});
|
||
|
||
// 创建每日统计
|
||
await this.prisma.dailyMiningStat.create({
|
||
data: {
|
||
date: yesterday,
|
||
totalContribution: minuteStats._avg.totalContribution || 0,
|
||
totalDistributed: minuteStats._sum.totalDistributed || 0,
|
||
totalBurned: minuteStats._sum.burnAmount || 0,
|
||
participantCount: minuteStats._max.participantCount || 0,
|
||
avgContributionRate: 0, // TODO: 计算
|
||
},
|
||
});
|
||
|
||
this.logger.log(`Daily stats generated for ${yesterday.toISOString().split('T')[0]}`);
|
||
} catch (error) {
|
||
this.logger.error('Failed to generate daily stats', error);
|
||
} finally {
|
||
await this.redis.releaseLock('mining:daily-stats:lock', lockValue);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 每小时清理旧的分钟统计(保留7天)
|
||
*/
|
||
@Cron('0 * * * *')
|
||
async cleanupOldMinuteStats(): Promise<void> {
|
||
try {
|
||
const sevenDaysAgo = new Date();
|
||
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
|
||
|
||
const result = await this.prisma.minuteMiningStat.deleteMany({
|
||
where: {
|
||
minute: { lt: sevenDaysAgo },
|
||
},
|
||
});
|
||
|
||
if (result.count > 0) {
|
||
this.logger.log(`Cleaned up ${result.count} old minute stats`);
|
||
}
|
||
} catch (error) {
|
||
this.logger.error('Failed to cleanup old minute stats', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 每5分钟同步全网数据(兜底方案,防止 Kafka 事件丢失)
|
||
* 正常情况下由 Kafka 事件驱动实时同步
|
||
*/
|
||
@Cron(CronExpression.EVERY_5_MINUTES)
|
||
async syncNetworkData(): Promise<void> {
|
||
const lockValue = await this.redis.acquireLock('mining:network-sync:lock', 30);
|
||
if (!lockValue) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const contributionServiceUrl = this.configService.get<string>(
|
||
'CONTRIBUTION_SERVICE_URL',
|
||
'http://localhost:3020',
|
||
);
|
||
|
||
const result = await this.networkSyncService.syncFromContributionService(contributionServiceUrl);
|
||
if (result.success) {
|
||
this.logger.debug(`Network sync completed: ${result.message}`);
|
||
} else {
|
||
this.logger.warn(`Network sync failed: ${result.message}`);
|
||
}
|
||
} catch (error) {
|
||
this.logger.error('Failed to sync network data', error);
|
||
} finally {
|
||
await this.redis.releaseLock('mining:network-sync:lock', lockValue);
|
||
}
|
||
}
|
||
}
|