feat(mining): publish CDC events for mining-admin-service sync
Add event publishing to enable mining-admin-service to sync data via Debezium CDC instead of direct API calls: - MiningConfigUpdated: Published every minute with distribution status - DailyMiningStatCreated: Published when daily stats are generated - MiningAccountUpdated: Method added for future per-account sync These events will be captured by Debezium monitoring the outbox_events table and forwarded to mining-admin-service via Kafka. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
9d65eef1b1
commit
7852b9d673
|
|
@ -5,6 +5,7 @@ import { MiningDistributionService } from '../services/mining-distribution.servi
|
|||
import { NetworkSyncService } from '../services/network-sync.service';
|
||||
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
|
||||
import { RedisService } from '../../infrastructure/redis/redis.service';
|
||||
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
|
||||
|
||||
@Injectable()
|
||||
export class MiningScheduler implements OnModuleInit {
|
||||
|
|
@ -16,6 +17,7 @@ export class MiningScheduler implements OnModuleInit {
|
|||
private readonly prisma: PrismaService,
|
||||
private readonly redis: RedisService,
|
||||
private readonly configService: ConfigService,
|
||||
private readonly outboxRepository: OutboxRepository,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
|
|
@ -75,7 +77,7 @@ export class MiningScheduler implements OnModuleInit {
|
|||
});
|
||||
|
||||
// 创建每日统计
|
||||
await this.prisma.dailyMiningStat.create({
|
||||
const dailyStat = await this.prisma.dailyMiningStat.create({
|
||||
data: {
|
||||
date: yesterday,
|
||||
totalContribution: minuteStats._avg.totalContribution || 0,
|
||||
|
|
@ -86,6 +88,21 @@ export class MiningScheduler implements OnModuleInit {
|
|||
},
|
||||
});
|
||||
|
||||
// 发布 DailyMiningStatCreated 事件(供 mining-admin-service 同步)
|
||||
await this.outboxRepository.create({
|
||||
aggregateType: 'DailyMiningStat',
|
||||
aggregateId: dailyStat.id.toString(),
|
||||
eventType: 'DailyMiningStatCreated',
|
||||
payload: {
|
||||
date: yesterday.toISOString(),
|
||||
totalContribution: dailyStat.totalContribution.toString(),
|
||||
totalDistributed: dailyStat.totalDistributed.toString(),
|
||||
totalBurned: dailyStat.totalBurned.toString(),
|
||||
participantCount: dailyStat.participantCount,
|
||||
avgContributionRate: dailyStat.avgContributionRate.toString(),
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Daily stats generated for ${yesterday.toISOString().split('T')[0]}`);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to generate daily stats', error);
|
||||
|
|
|
|||
|
|
@ -203,11 +203,14 @@ export class MiningDistributionService {
|
|||
// 标记已处理(过期时间2秒)
|
||||
await this.redis.set(processedKey, '1', 2);
|
||||
|
||||
// 每分钟记录一次日志
|
||||
// 每分钟记录一次日志并发布 CDC 事件
|
||||
if (isMinuteEnd) {
|
||||
this.logger.log(
|
||||
`Minute distribution: total=${totalDistributed.toFixed(8)}, users=${userParticipantCount}, system=${systemParticipantCount}, pending=${pendingParticipantCount}`,
|
||||
);
|
||||
|
||||
// 发布 MiningConfigUpdated 事件(供 mining-admin-service 同步)
|
||||
await this.publishMiningConfigUpdated(config, newRemaining);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to execute second distribution', error);
|
||||
|
|
@ -641,4 +644,67 @@ export class MiningDistributionService {
|
|||
now.setMilliseconds(0);
|
||||
return now;
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
// CDC 事件发布(供 mining-admin-service 同步)
|
||||
// ===========================================================================
|
||||
|
||||
/**
|
||||
* 发布 MiningConfigUpdated 事件
|
||||
* 每分钟发布一次,同步挖矿配置到 mining-admin-service
|
||||
*/
|
||||
private async publishMiningConfigUpdated(
|
||||
config: any,
|
||||
newRemaining: ShareAmount,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.outboxRepository.create({
|
||||
aggregateType: 'MiningConfig',
|
||||
aggregateId: config.id,
|
||||
eventType: 'MiningConfigUpdated',
|
||||
payload: {
|
||||
totalShares: config.totalShares.toString(),
|
||||
distributionPool: config.distributionPool.toString(),
|
||||
remainingDistribution: newRemaining.toString(),
|
||||
halvingPeriodYears: config.halvingPeriodYears,
|
||||
currentEra: config.currentEra,
|
||||
minuteDistribution: config.minuteDistribution.toString(),
|
||||
isActive: config.isActive,
|
||||
activatedAt: config.activatedAt?.toISOString(),
|
||||
},
|
||||
});
|
||||
this.logger.debug('Published MiningConfigUpdated event');
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to publish MiningConfigUpdated event', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布 MiningAccountUpdated 事件
|
||||
* 在用户账户更新后发布,同步账户数据到 mining-admin-service
|
||||
*/
|
||||
private async publishMiningAccountUpdated(
|
||||
accountSequence: string,
|
||||
totalMined: string,
|
||||
availableBalance: string,
|
||||
frozenBalance: string,
|
||||
totalContribution: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.outboxRepository.create({
|
||||
aggregateType: 'MiningAccount',
|
||||
aggregateId: accountSequence,
|
||||
eventType: 'MiningAccountUpdated',
|
||||
payload: {
|
||||
accountSequence,
|
||||
totalMined,
|
||||
availableBalance,
|
||||
frozenBalance,
|
||||
totalContribution,
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to publish MiningAccountUpdated event for ${accountSequence}`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue