From 8ab11c8f507445008c516d2b54ed6e88cfc8af35 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 15 Jan 2026 23:24:43 -0800 Subject: [PATCH] feat(wallet): sync burn events from trading-service to deduct SHARE_POOL_A Add Kafka consumer to listen for burn events (minute burn and sell burn) from trading-service and deduct from SHARE_POOL_A (100B pool), updating BLACK_HOLE_POOL balance accordingly. Co-Authored-By: Claude Opus 4.5 --- .../src/application/application.module.ts | 2 + .../services/pool-account.service.ts | 70 +++++++ .../kafka/consumers/burn.consumer.ts | 195 ++++++++++++++++++ .../infrastructure/kafka/events/burn.event.ts | 48 +++++ 4 files changed, 315 insertions(+) create mode 100644 backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts create mode 100644 backend/services/mining-wallet-service/src/infrastructure/kafka/events/burn.event.ts diff --git a/backend/services/mining-wallet-service/src/application/application.module.ts b/backend/services/mining-wallet-service/src/application/application.module.ts index 622b123f..646f1bf0 100644 --- a/backend/services/mining-wallet-service/src/application/application.module.ts +++ b/backend/services/mining-wallet-service/src/application/application.module.ts @@ -15,6 +15,7 @@ import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.sc import { ContributionDistributionConsumer } from '../infrastructure/kafka/consumers/contribution-distribution.consumer'; import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-registered.consumer'; import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mining-distribution.consumer'; +import { BurnConsumer } from '../infrastructure/kafka/consumers/burn.consumer'; @Module({ imports: [ScheduleModule.forRoot()], @@ -23,6 +24,7 @@ import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mi ContributionDistributionConsumer, UserRegisteredConsumer, MiningDistributionConsumer, + BurnConsumer, ], providers: [ // Services diff --git a/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts b/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts index 4a1659f4..5c33ca60 100644 --- a/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts +++ b/backend/services/mining-wallet-service/src/application/services/pool-account.service.ts @@ -239,6 +239,76 @@ export class PoolAccountService { ); } + /** + * 从积分股池A扣减(销毁) + * 由 Kafka 消费者调用,处理 trading-service 发布的销毁事件 + * 销毁的积分股从 100亿的 SHARE_POOL_A 中扣减 + */ + async deductFromSharePoolAForBurn( + amount: Decimal, + burnInfo: { + burnMinute: Date; + burnRecordId: string; + sourceType: 'MINUTE_BURN' | 'SELL_BURN'; + sourceAccountSeq?: string; + sourceOrderNo?: string; + }, + ): Promise { + const sourcePool: PoolAccountType = 'SHARE_POOL_A'; + + const memo = + burnInfo.sourceType === 'MINUTE_BURN' + ? `每分钟销毁扣减, 分钟${burnInfo.burnMinute.toISOString()}, 销毁量${amount.toFixed(8)}` + : `卖出销毁扣减, 账户${burnInfo.sourceAccountSeq}, 订单${burnInfo.sourceOrderNo}, 销毁量${amount.toFixed(8)}`; + + await this.poolAccountRepo.updateBalanceWithTransaction( + sourcePool, + amount.negated(), // 负数表示扣减 + { + transactionType: 'BURN', + counterpartyType: 'POOL', + counterpartyPoolType: 'BLACK_HOLE_POOL', + referenceId: burnInfo.burnRecordId, + referenceType: burnInfo.sourceType, + memo, + metadata: { + burnMinute: burnInfo.burnMinute.toISOString(), + burnRecordId: burnInfo.burnRecordId, + sourceType: burnInfo.sourceType, + sourceAccountSeq: burnInfo.sourceAccountSeq, + sourceOrderNo: burnInfo.sourceOrderNo, + burnAmount: amount.toString(), + }, + }, + ); + + // 同时增加黑洞池的余额 + await this.poolAccountRepo.updateBalanceWithTransaction( + 'BLACK_HOLE_POOL', + amount, // 正数表示增加 + { + transactionType: 'BURN', + counterpartyType: 'POOL', + counterpartyPoolType: 'SHARE_POOL_A', + referenceId: burnInfo.burnRecordId, + referenceType: burnInfo.sourceType, + memo: `销毁入账, 来源${sourcePool}, ${memo}`, + metadata: { + burnMinute: burnInfo.burnMinute.toISOString(), + burnRecordId: burnInfo.burnRecordId, + sourceType: burnInfo.sourceType, + sourceAccountSeq: burnInfo.sourceAccountSeq, + sourceOrderNo: burnInfo.sourceOrderNo, + burnAmount: amount.toString(), + }, + }, + ); + + this.logger.log( + `Burned ${amount.toFixed(8)} from SHARE_POOL_A to BLACK_HOLE_POOL (${burnInfo.sourceType})`, + ); + } + /** * 用户划入流通池(准备卖出) */ diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts new file mode 100644 index 00000000..b42f0e30 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/burn.consumer.ts @@ -0,0 +1,195 @@ +import { Controller, Logger, OnModuleInit } from '@nestjs/common'; +import { EventPattern, Payload } from '@nestjs/microservices'; +import Decimal from 'decimal.js'; +import { RedisService } from '../../redis/redis.service'; +import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; +import { PoolAccountService } from '../../../application/services/pool-account.service'; +import { + MinuteBurnExecutedEvent, + MinuteBurnExecutedPayload, + BurnExecutedEvent, + BurnExecutedPayload, +} from '../events/burn.event'; + +// 4小时 TTL(秒) +const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; + +/** + * 销毁事件消费者 + * 监听 trading-service 发布的销毁事件,从 SHARE_POOL_A 扣减 + */ +@Controller() +export class BurnConsumer implements OnModuleInit { + private readonly logger = new Logger(BurnConsumer.name); + + constructor( + private readonly redis: RedisService, + private readonly processedEventRepo: ProcessedEventRepository, + private readonly poolAccountService: PoolAccountService, + ) {} + + async onModuleInit() { + this.logger.log('BurnConsumer initialized'); + } + + /** + * 处理每分钟定时销毁事件 + * Topic: trading.burns + * EventType: burn.minute-executed + */ + @EventPattern('trading.burns') + async handleBurnEvent(@Payload() message: any): Promise { + // 解析消息格式(Outbox 发布的格式) + const eventData = message.value || message; + const eventId = eventData.eventId || message.eventId; + const eventType = eventData.eventType; + + if (!eventId) { + this.logger.warn('Received burn event without eventId, skipping'); + return; + } + + // 根据事件类型分发处理 + if (eventType === 'burn.minute-executed') { + await this.handleMinuteBurn(eventId, eventData as MinuteBurnExecutedEvent); + } else if (eventType === 'burn.executed') { + await this.handleSellBurn(eventId, eventData as BurnExecutedEvent); + } else { + this.logger.debug(`Unknown burn event type: ${eventType}, skipping`); + } + } + + /** + * 处理每分钟定时销毁 + */ + private async handleMinuteBurn( + eventId: string, + event: MinuteBurnExecutedEvent, + ): Promise { + this.logger.debug(`Processing minute burn event: ${eventId}`); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + const payload = event.payload; + const burnAmount = new Decimal(payload.burnAmount); + + if (burnAmount.isZero()) { + this.logger.debug('Zero burn amount, skipping'); + return; + } + + // 从 SHARE_POOL_A 扣减销毁量 + await this.poolAccountService.deductFromSharePoolAForBurn(burnAmount, { + burnMinute: new Date(payload.burnMinute), + burnRecordId: payload.burnRecordId, + sourceType: 'MINUTE_BURN', + }); + + // 标记为已处理 + await this.markEventProcessed(eventId, 'burn.minute-executed'); + + this.logger.log( + `Minute burn processed: amount=${burnAmount.toFixed(8)}, minute=${payload.burnMinute}`, + ); + } catch (error) { + this.logger.error( + `Failed to process minute burn event ${eventId}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 处理卖出销毁 + */ + private async handleSellBurn( + eventId: string, + event: BurnExecutedEvent, + ): Promise { + this.logger.debug(`Processing sell burn event: ${eventId}`); + + // 幂等性检查 + if (await this.isEventProcessed(eventId)) { + this.logger.debug(`Event ${eventId} already processed, skipping`); + return; + } + + try { + const payload = event.payload; + const burnAmount = new Decimal(payload.burnAmount); + + if (burnAmount.isZero()) { + this.logger.debug('Zero burn amount, skipping'); + return; + } + + // 从 SHARE_POOL_A 扣减销毁量 + await this.poolAccountService.deductFromSharePoolAForBurn(burnAmount, { + burnMinute: new Date(payload.executedAt), + burnRecordId: payload.burnRecordId, + sourceType: 'SELL_BURN', + sourceAccountSeq: payload.sourceAccountSeq, + sourceOrderNo: payload.sourceOrderNo, + }); + + // 标记为已处理 + await this.markEventProcessed(eventId, 'burn.executed'); + + this.logger.log( + `Sell burn processed: amount=${burnAmount.toFixed(8)}, account=${payload.sourceAccountSeq}`, + ); + } catch (error) { + this.logger.error( + `Failed to process sell burn event ${eventId}`, + error instanceof Error ? error.stack : error, + ); + throw error; // 让 Kafka 重试 + } + } + + /** + * 幂等性检查 - Redis + DB 双重检查 + */ + private async isEventProcessed(eventId: string): Promise { + const redisKey = `processed-event:burn:${eventId}`; + + // 1. 先检查 Redis 缓存(快速路径) + const cached = await this.redis.get(redisKey); + if (cached) return true; + + // 2. 检查数据库 + const dbRecord = await this.processedEventRepo.findByEventId(eventId); + if (dbRecord) { + // 回填 Redis 缓存 + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + return true; + } + + return false; + } + + /** + * 标记事件为已处理 + */ + private async markEventProcessed( + eventId: string, + eventType: string, + ): Promise { + // 1. 写入数据库 + await this.processedEventRepo.create({ + eventId, + eventType, + sourceService: 'trading-service', + }); + + // 2. 写入 Redis 缓存 + const redisKey = `processed-event:burn:${eventId}`; + await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); + } +} diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/events/burn.event.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/burn.event.ts new file mode 100644 index 00000000..bf720955 --- /dev/null +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/events/burn.event.ts @@ -0,0 +1,48 @@ +/** + * 销毁事件定义 + * 来自 trading-service + */ + +/** + * 每分钟定时销毁事件 + */ +export interface MinuteBurnExecutedEvent { + eventId: string; + eventType: 'burn.minute-executed'; + aggregateType: string; + aggregateId: string; + createdAt: string; + payload: MinuteBurnExecutedPayload; +} + +export interface MinuteBurnExecutedPayload { + burnRecordId: string; + burnMinute: string; + burnAmount: string; + totalBurned: string; + remainingTarget: string; + executedAt: string; +} + +/** + * 卖出销毁事件 + */ +export interface BurnExecutedEvent { + eventId: string; + eventType: 'burn.executed'; + aggregateType: string; + aggregateId: string; + createdAt: string; + payload: BurnExecutedPayload; +} + +export interface BurnExecutedPayload { + burnRecordId: string; + sourceType: 'SELL' | 'SCHEDULED'; + sourceAccountSeq?: string; + sourceOrderNo?: string; + burnAmount: string; + burnMultiplier?: string; + remainingTarget: string; + executedAt: string; +}