feat(mining): sync pool balance via Kafka when mining distributes

- mining-service: publish MINING_MINUTE_DISTRIBUTED event to Kafka after
  each minute's mining distribution is completed
- mining-wallet-service: add MiningDistributionConsumer to consume the
  event and deduct from SHARE_POOL_B
- Add deductFromSharePoolB method in PoolAccountService
- This ensures the share pool balance displayed in mining-app reflects
  the actual remaining balance after mining distributions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-15 21:30:35 -08:00
parent 7b3c222b24
commit 974d660544
5 changed files with 235 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import { MiningAccountRepository } from '../../infrastructure/persistence/reposi
import { MiningConfigRepository } from '../../infrastructure/persistence/repositories/mining-config.repository';
import { BlackHoleRepository } from '../../infrastructure/persistence/repositories/black-hole.repository';
import { PriceSnapshotRepository } from '../../infrastructure/persistence/repositories/price-snapshot.repository';
import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { MiningCalculatorService } from '../../domain/services/mining-calculator.service';
@ -30,6 +31,7 @@ export class MiningDistributionService {
private readonly miningConfigRepository: MiningConfigRepository,
private readonly blackHoleRepository: BlackHoleRepository,
private readonly priceSnapshotRepository: PriceSnapshotRepository,
private readonly outboxRepository: OutboxRepository,
private readonly prisma: PrismaService,
private readonly redis: RedisService,
private readonly configService: ConfigService,
@ -190,7 +192,7 @@ export class MiningDistributionService {
}
/**
* MiningRecord
* MiningRecord Kafka mining-wallet-service
*/
private async writeMinuteRecords(minuteTime: Date): Promise<void> {
try {
@ -198,6 +200,13 @@ export class MiningDistributionService {
const pattern = `${this.MINUTE_ACCUMULATOR_PREFIX}${minuteTime.getTime()}:*`;
const keys = await this.redis.keys(pattern);
let totalMinuteDistributed = new Decimal(0);
const distributions: Array<{
accountSequence: string;
minedAmount: string;
contributionRatio: string;
}> = [];
for (const key of keys) {
const data = await this.redis.get(key);
if (!data) continue;
@ -219,9 +228,38 @@ export class MiningDistributionService {
},
});
// 累计该分钟的总分配量
totalMinuteDistributed = totalMinuteDistributed.plus(accumulated.minedAmount);
distributions.push({
accountSequence,
minedAmount: accumulated.minedAmount,
contributionRatio: accumulated.contributionRatio,
});
// 删除已处理的累积数据
await this.redis.del(key);
}
// 发布事件到 Kafka通知 mining-wallet-service 扣减池余额
if (!totalMinuteDistributed.isZero()) {
await this.outboxRepository.create({
aggregateType: 'MiningDistribution',
aggregateId: minuteTime.toISOString(),
eventType: 'MINING_MINUTE_DISTRIBUTED',
topic: 'mining.distribution.completed',
key: minuteTime.toISOString(),
payload: {
miningMinute: minuteTime.toISOString(),
totalDistributed: totalMinuteDistributed.toFixed(18),
participantCount: distributions.length,
// 只记录汇总数据,不记录每个用户的明细(减少消息大小)
},
});
this.logger.debug(
`Published MINING_MINUTE_DISTRIBUTED event: minute=${minuteTime.toISOString()}, total=${totalMinuteDistributed.toFixed(8)}`,
);
}
} catch (error) {
this.logger.error('Failed to write minute records', error);
}

View File

@ -14,6 +14,7 @@ import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.sc
// Consumers (从 InfrastructureModule 移过来,因为依赖应用服务)
import { ContributionDistributionConsumer } from '../infrastructure/kafka/consumers/contribution-distribution.consumer';
import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-registered.consumer';
import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mining-distribution.consumer';
@Module({
imports: [ScheduleModule.forRoot()],
@ -21,6 +22,7 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r
// Kafka Consumers (微服务消息处理器需要是 Controller)
ContributionDistributionConsumer,
UserRegisteredConsumer,
MiningDistributionConsumer,
],
providers: [
// Services

View File

@ -201,6 +201,44 @@ export class PoolAccountService {
};
}
/**
* B扣减
* Kafka mining-service
*/
async deductFromSharePoolB(
amount: Decimal,
miningInfo: {
miningMinute: Date;
participantCount: number;
referenceId?: string;
},
): Promise<void> {
const sourcePool: PoolAccountType = 'SHARE_POOL_B';
const memo = `挖矿分配扣减, 分钟${miningInfo.miningMinute.toISOString()}, 参与人数${miningInfo.participantCount}, 总量${amount.toFixed(8)}`;
await this.poolAccountRepo.updateBalanceWithTransaction(
sourcePool,
amount.negated(), // 负数表示扣减
{
transactionType: 'MINING_DISTRIBUTE',
counterpartyType: 'SYSTEM_ACCOUNT',
referenceId: miningInfo.referenceId,
referenceType: 'MINING_MINUTE_SUMMARY',
memo,
metadata: {
miningMinute: miningInfo.miningMinute.toISOString(),
participantCount: miningInfo.participantCount,
totalDistributed: amount.toString(),
},
},
);
this.logger.log(
`Deducted ${amount.toFixed(8)} from SHARE_POOL_B for minute ${miningInfo.miningMinute.toISOString()}`,
);
}
/**
*
*/

View File

@ -0,0 +1,138 @@
import { Controller, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import Decimal from 'decimal.js';
import { RedisService } from '../../redis/redis.service';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
import { PoolAccountService } from '../../../application/services/pool-account.service';
import {
MiningMinuteDistributedEvent,
MiningMinuteDistributedPayload,
} from '../events/mining-distribution.event';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
/**
*
* mining-service
*/
@Controller()
export class MiningDistributionConsumer implements OnModuleInit {
private readonly logger = new Logger(MiningDistributionConsumer.name);
constructor(
private readonly redis: RedisService,
private readonly processedEventRepo: ProcessedEventRepository,
private readonly poolAccountService: PoolAccountService,
) {}
async onModuleInit() {
this.logger.log('MiningDistributionConsumer initialized');
}
/**
*
* Topic: mining.distribution.completed
*/
@EventPattern('mining.distribution.completed')
async handleMiningDistributed(@Payload() message: any): Promise<void> {
// 解析消息格式Outbox 发布的格式)
const event: MiningMinuteDistributedEvent = message.value || message;
const eventId = event.eventId || message.eventId;
if (!eventId) {
this.logger.warn('Received event without eventId, skipping');
return;
}
this.logger.debug(`Processing mining distribution event: ${eventId}`);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
await this.processDistribution(event.payload);
// 标记为已处理
await this.markEventProcessed(eventId, event.eventType);
this.logger.log(
`Mining distribution processed: minute=${event.payload.miningMinute}, amount=${event.payload.totalDistributed}`,
);
} catch (error) {
this.logger.error(
`Failed to process mining distribution for minute ${event.payload.miningMinute}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
/**
* SHARE_POOL_B
*/
private async processDistribution(
payload: MiningMinuteDistributedPayload,
): Promise<void> {
const amount = new Decimal(payload.totalDistributed);
if (amount.isZero()) {
this.logger.debug('Zero distribution amount, skipping');
return;
}
// 从 SHARE_POOL_B 扣减(挖矿分配来源)
await this.poolAccountService.deductFromSharePoolB(amount, {
miningMinute: new Date(payload.miningMinute),
participantCount: payload.participantCount,
referenceId: payload.miningMinute,
});
this.logger.debug(
`Deducted ${amount.toFixed(8)} from SHARE_POOL_B for minute ${payload.miningMinute}`,
);
}
/**
* - Redis + DB
*/
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `processed-event:mining:${eventId}`;
// 1. 先检查 Redis 缓存(快速路径)
const cached = await this.redis.get(redisKey);
if (cached) return true;
// 2. 检查数据库
const dbRecord = await this.processedEventRepo.findByEventId(eventId);
if (dbRecord) {
// 回填 Redis 缓存
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
/**
*
*/
private async markEventProcessed(
eventId: string,
eventType: string,
): Promise<void> {
// 1. 写入数据库
await this.processedEventRepo.create({
eventId,
eventType,
sourceService: 'mining-service',
});
// 2. 写入 Redis 缓存
const redisKey = `processed-event:mining:${eventId}`;
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
}

View File

@ -0,0 +1,18 @@
/**
*
* mining-service
*/
export interface MiningMinuteDistributedEvent {
eventType: 'MINING_MINUTE_DISTRIBUTED';
eventId: string;
aggregateType: string;
aggregateId: string;
createdAt: string;
payload: MiningMinuteDistributedPayload;
}
export interface MiningMinuteDistributedPayload {
miningMinute: string; // ISO 时间字符串
totalDistributed: string; // 该分钟总分配量精度18位
participantCount: number; // 参与人数
}