From 974d66054482daf54e0f90f94f8aa68f9e8fdf08 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 15 Jan 2026 21:30:35 -0800 Subject: [PATCH] feat(mining): sync pool balance via Kafka when mining distributes - mining-service: publish MINING_MINUTE_DISTRIBUTED event to Kafka after each minute's mining distribution is completed - mining-wallet-service: add MiningDistributionConsumer to consume the event and deduct from SHARE_POOL_B - Add deductFromSharePoolB method in PoolAccountService - This ensures the share pool balance displayed in mining-app reflects the actual remaining balance after mining distributions Co-Authored-By: Claude Opus 4.5 --- .../services/mining-distribution.service.ts | 40 ++++- .../src/application/application.module.ts | 2 + .../services/pool-account.service.ts | 38 +++++ .../consumers/mining-distribution.consumer.ts | 138 ++++++++++++++++++ .../kafka/events/mining-distribution.event.ts | 18 +++ 5 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts create mode 100644 backend/services/mining-wallet-service/src/infrastructure/kafka/events/mining-distribution.event.ts diff --git a/backend/services/mining-service/src/application/services/mining-distribution.service.ts b/backend/services/mining-service/src/application/services/mining-distribution.service.ts index 92c4c181..aa12e2bc 100644 --- a/backend/services/mining-service/src/application/services/mining-distribution.service.ts +++ b/backend/services/mining-service/src/application/services/mining-distribution.service.ts @@ -4,6 +4,7 @@ import { MiningAccountRepository } from '../../infrastructure/persistence/reposi import { MiningConfigRepository } from '../../infrastructure/persistence/repositories/mining-config.repository'; import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository'; import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository'; +import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { RedisService } from '../../infrastructure/redis/redis.service'; import { MiningCalculatorService } from '../../domain/services/mining-calculator.service'; @@ -30,6 +31,7 @@ export class MiningDistributionService { private readonly miningConfigRepository: MiningConfigRepository, private readonly blackHoleRepository: BlackHoleRepository, private readonly priceSnapshotRepository: PriceSnapshotRepository, + private readonly outboxRepository: OutboxRepository, private readonly prisma: PrismaService, private readonly redis: RedisService, private readonly configService: ConfigService, @@ -190,7 +192,7 @@ export class MiningDistributionService { } /** - * 写入每分钟汇总的MiningRecord + * 写入每分钟汇总的MiningRecord,并发布事件到 Kafka 通知 mining-wallet-service 扣减池余额 */ private async writeMinuteRecords(minuteTime: Date): Promise { try { @@ -198,6 +200,13 @@ export class MiningDistributionService { const pattern = `${this.MINUTE_ACCUMULATOR_PREFIX}${minuteTime.getTime()}:*`; const keys = await this.redis.keys(pattern); + let totalMinuteDistributed = new Decimal(0); + const distributions: Array<{ + accountSequence: string; + minedAmount: string; + contributionRatio: string; + }> = []; + for (const key of keys) { const data = await this.redis.get(key); if (!data) continue; @@ -219,9 +228,38 @@ export class MiningDistributionService { }, }); + // 累计该分钟的总分配量 + totalMinuteDistributed = totalMinuteDistributed.plus(accumulated.minedAmount); + distributions.push({ + accountSequence, + minedAmount: accumulated.minedAmount, + contributionRatio: accumulated.contributionRatio, + }); + // 删除已处理的累积数据 await this.redis.del(key); } + + // 发布事件到 Kafka,通知 mining-wallet-service 扣减池余额 + if (!totalMinuteDistributed.isZero()) { + await this.outboxRepository.create({ + aggregateType: 'MiningDistribution', + aggregateId: minuteTime.toISOString(), + eventType: 'MINING_MINUTE_DISTRIBUTED', + topic: 'mining.distribution.completed', + key: minuteTime.toISOString(), + payload: { + miningMinute: minuteTime.toISOString(), + totalDistributed: totalMinuteDistributed.toFixed(18), + participantCount: distributions.length, + // 只记录汇总数据,不记录每个用户的明细(减少消息大小) + }, + }); + + this.logger.debug( + `Published MINING_MINUTE_DISTRIBUTED event: minute=${minuteTime.toISOString()}, total=${totalMinuteDistributed.toFixed(8)}`, + ); + } } catch (error) { this.logger.error('Failed to write minute records', error); } diff --git a/backend/services/mining-wallet-service/src/application/application.module.ts b/backend/services/mining-wallet-service/src/application/application.module.ts index f39c22af..622b123f 100644 --- a/backend/services/mining-wallet-service/src/application/application.module.ts +++ b/backend/services/mining-wallet-service/src/application/application.module.ts @@ -14,6 +14,7 @@ import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.sc // Consumers (从 InfrastructureModule 移过来,因为依赖应用服务) import { ContributionDistributionConsumer } from '../infrastructure/kafka/consumers/contribution-distribution.consumer'; import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-registered.consumer'; +import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mining-distribution.consumer'; @Module({ imports: [ScheduleModule.forRoot()], @@ -21,6 +22,7 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r // Kafka Consumers (微服务消息处理器需要是 Controller) ContributionDistributionConsumer, UserRegisteredConsumer, + MiningDistributionConsumer, ], providers: [ // Services diff --git a/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts b/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts index fe3507c7..4a1659f4 100644 --- a/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts +++ b/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts @@ -201,6 +201,44 @@ export class PoolAccountService { }; } + /** + * 从积分股池B扣减(挖矿分配汇总) + * 由 Kafka 消费者调用,处理 mining-service 发布的挖矿分配事件 + */ + async deductFromSharePoolB( + amount: Decimal, + miningInfo: { + miningMinute: Date; + participantCount: number; + referenceId?: string; + }, + ): Promise { + const sourcePool: PoolAccountType = 'SHARE_POOL_B'; + + const memo = `挖矿分配扣减, 分钟${miningInfo.miningMinute.toISOString()}, 参与人数${miningInfo.participantCount}, 总量${amount.toFixed(8)}`; + + await this.poolAccountRepo.updateBalanceWithTransaction( + sourcePool, + amount.negated(), // 负数表示扣减 + { + transactionType: 'MINING_DISTRIBUTE', + counterpartyType: 'SYSTEM_ACCOUNT', + referenceId: miningInfo.referenceId, + referenceType: 'MINING_MINUTE_SUMMARY', + memo, + metadata: { + miningMinute: miningInfo.miningMinute.toISOString(), + participantCount: miningInfo.participantCount, + totalDistributed: amount.toString(), + }, + }, + ); + + this.logger.log( + `Deducted ${amount.toFixed(8)} from SHARE_POOL_B for minute ${miningInfo.miningMinute.toISOString()}`, + ); + } + /** * 用户划入流通池(准备卖出) */ diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts new file mode 100644 index 00000000..dd0ba92a --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/mining-distribution.consumer.ts @@ -0,0 +1,138 @@ +import { Controller, Logger, OnModuleInit } from '@nestjs/common'; +import { EventPattern, Payload } from '@nestjs/microservices'; +import Decimal from 'decimal.js'; +import { RedisService } from '../../redis/redis.service'; +import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; +import { PoolAccountService } from '../../../application/services/pool-account.service'; +import { + MiningMinuteDistributedEvent, + MiningMinuteDistributedPayload, +} from '../events/mining-distribution.event'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +/** + * 挖矿分配事件消费者 + * 监听 mining-service 发布的挖矿分配事件,扣减积分股池余额 + */ +@Controller() +export class MiningDistributionConsumer implements OnModuleInit { + private readonly logger = new Logger(MiningDistributionConsumer.name); + + constructor( + private readonly redis: RedisService, + private readonly processedEventRepo: ProcessedEventRepository, + private readonly poolAccountService: PoolAccountService, + ) {} + + async onModuleInit() { + this.logger.log('MiningDistributionConsumer initialized'); + } + + /** + * 处理挖矿分配事件 + * Topic: mining.distribution.completed + */ + @EventPattern('mining.distribution.completed') + async handleMiningDistributed(@Payload() message: any): Promise { + // 解析消息格式(Outbox 发布的格式) + const event: MiningMinuteDistributedEvent = message.value || message; + const eventId = event.eventId || message.eventId; + + if (!eventId) { + this.logger.warn('Received event without eventId, skipping'); + return; + } + + this.logger.debug(`Processing mining distribution event: ${eventId}`); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + await this.processDistribution(event.payload); + + // 标记为已处理 + await this.markEventProcessed(eventId, event.eventType); + + this.logger.log( + `Mining distribution processed: minute=${event.payload.miningMinute}, amount=${event.payload.totalDistributed}`, + ); + } catch (error) { + this.logger.error( + `Failed to process mining distribution for minute ${event.payload.miningMinute}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 处理分配:从 SHARE_POOL_B 扣减分配量 + */ + private async processDistribution( + payload: MiningMinuteDistributedPayload, + ): Promise { + const amount = new Decimal(payload.totalDistributed); + + if (amount.isZero()) { + this.logger.debug('Zero distribution amount, skipping'); + return; + } + + // 从 SHARE_POOL_B 扣减(挖矿分配来源) + await this.poolAccountService.deductFromSharePoolB(amount, { + miningMinute: new Date(payload.miningMinute), + participantCount: payload.participantCount, + referenceId: payload.miningMinute, + }); + + this.logger.debug( + `Deducted ${amount.toFixed(8)} from SHARE_POOL_B for minute ${payload.miningMinute}`, + ); + } + + /** + * 幂等性检查 - Redis + DB 双重检查 + */ + private async isEventProcessed(eventId: string): Promise { + const redisKey = `processed-event:mining:${eventId}`; + + // 1. 先检查 Redis 缓存(快速路径) + const cached = await this.redis.get(redisKey); + if (cached) return true; + + // 2. 检查数据库 + const dbRecord = await this.processedEventRepo.findByEventId(eventId); + if (dbRecord) { + // 回填 Redis 缓存 + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + /** + * 标记事件为已处理 + */ + private async markEventProcessed( + eventId: string, + eventType: string, + ): Promise { + // 1. 写入数据库 + await this.processedEventRepo.create({ + eventId, + eventType, + sourceService: 'mining-service', + }); + + // 2. 写入 Redis 缓存 + const redisKey = `processed-event:mining:${eventId}`; + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/events/mining-distribution.event.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/mining-distribution.event.ts new file mode 100644 index 00000000..fb61ecd0 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/mining-distribution.event.ts @@ -0,0 +1,18 @@ +/** + * 挖矿分配事件 + * 来自 mining-service,每分钟发布一次 + */ +export interface MiningMinuteDistributedEvent { + eventType: 'MINING_MINUTE_DISTRIBUTED'; + eventId: string; + aggregateType: string; + aggregateId: string; + createdAt: string; + payload: MiningMinuteDistributedPayload; +} + +export interface MiningMinuteDistributedPayload { + miningMinute: string; // ISO 时间字符串 + totalDistributed: string; // 该分钟总分配量(精度18位) + participantCount: number; // 参与人数 +}