diff --git a/backend/services/mining-service/src/application/schedulers/mining.scheduler.ts b/backend/services/mining-service/src/application/schedulers/mining.scheduler.ts index b3003018..957a4076 100644 --- a/backend/services/mining-service/src/application/schedulers/mining.scheduler.ts +++ b/backend/services/mining-service/src/application/schedulers/mining.scheduler.ts @@ -5,6 +5,7 @@ import { MiningDistributionService } from '../services/mining-distribution.servi import { NetworkSyncService } from '../services/network-sync.service'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; @Injectable() export class MiningScheduler implements OnModuleInit { @@ -16,6 +17,7 @@ export class MiningScheduler implements OnModuleInit { private readonly prisma: PrismaService, private readonly redis: RedisService, private readonly configService: ConfigService, + private readonly outboxRepository: OutboxRepository, ) {} async onModuleInit() { @@ -75,7 +77,7 @@ export class MiningScheduler implements OnModuleInit { }); // 创建每日统计 - await this.prisma.dailyMiningStat.create({ + const dailyStat = await this.prisma.dailyMiningStat.create({ data: { date: yesterday, totalContribution: minuteStats._avg.totalContribution || 0, @@ -86,6 +88,21 @@ export class MiningScheduler implements OnModuleInit { }, }); + // 发布 DailyMiningStatCreated 事件(供 mining-admin-service 同步) + await this.outboxRepository.create({ + aggregateType: 'DailyMiningStat', + aggregateId: dailyStat.id.toString(), + eventType: 'DailyMiningStatCreated', + payload: { + date: yesterday.toISOString(), + totalContribution: dailyStat.totalContribution.toString(), + totalDistributed: dailyStat.totalDistributed.toString(), + totalBurned: dailyStat.totalBurned.toString(), + participantCount: dailyStat.participantCount, + avgContributionRate: dailyStat.avgContributionRate.toString(), + }, + }); + this.logger.log(`Daily stats generated for ${yesterday.toISOString().split('T')[0]}`); } catch (error) { this.logger.error('Failed to generate daily stats', error); diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index f763f164..d8ba6497 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -203,11 +203,14 @@ export class MiningDistributionService { // 标记已处理(过期时间2秒) await this.redis.set(processedKey, '1', 2); - // 每分钟记录一次日志 + // 每分钟记录一次日志并发布 CDC 事件 if (isMinuteEnd) { this.logger.log( `Minute distribution: total=${totalDistributed.toFixed(8)}, users=${userParticipantCount}, system=${systemParticipantCount}, pending=${pendingParticipantCount}`, ); + + // 发布 MiningConfigUpdated 事件(供 mining-admin-service 同步) + await this.publishMiningConfigUpdated(config, newRemaining); } } catch (error) { this.logger.error('Failed to execute second distribution', error); @@ -641,4 +644,67 @@ export class MiningDistributionService { now.setMilliseconds(0); return now; } + + // =========================================================================== + // CDC 事件发布(供 mining-admin-service 同步) + // =========================================================================== + + /** + * 发布 MiningConfigUpdated 事件 + * 每分钟发布一次,同步挖矿配置到 mining-admin-service + */ + private async publishMiningConfigUpdated( + config: any, + newRemaining: ShareAmount, + ): Promise { + try { + await this.outboxRepository.create({ + aggregateType: 'MiningConfig', + aggregateId: config.id, + eventType: 'MiningConfigUpdated', + payload: { + totalShares: config.totalShares.toString(), + distributionPool: config.distributionPool.toString(), + remainingDistribution: newRemaining.toString(), + halvingPeriodYears: config.halvingPeriodYears, + currentEra: config.currentEra, + minuteDistribution: config.minuteDistribution.toString(), + isActive: config.isActive, + activatedAt: config.activatedAt?.toISOString(), + }, + }); + this.logger.debug('Published MiningConfigUpdated event'); + } catch (error) { + this.logger.error('Failed to publish MiningConfigUpdated event', error); + } + } + + /** + * 发布 MiningAccountUpdated 事件 + * 在用户账户更新后发布,同步账户数据到 mining-admin-service + */ + private async publishMiningAccountUpdated( + accountSequence: string, + totalMined: string, + availableBalance: string, + frozenBalance: string, + totalContribution: string, + ): Promise { + try { + await this.outboxRepository.create({ + aggregateType: 'MiningAccount', + aggregateId: accountSequence, + eventType: 'MiningAccountUpdated', + payload: { + accountSequence, + totalMined, + availableBalance, + frozenBalance, + totalContribution, + }, + }); + } catch (error) { + this.logger.error(`Failed to publish MiningAccountUpdated event for ${accountSequence}`, error); + } + } }