feat(wallet): sync burn events from trading-service to deduct SHARE_POOL_A

Add Kafka consumer to listen for burn events (minute burn and sell burn)
from trading-service and deduct from SHARE_POOL_A (100B pool), updating
BLACK_HOLE_POOL balance accordingly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-15 23:24:43 -08:00
parent 88368d1705
commit 8ab11c8f50
4 changed files with 315 additions and 0 deletions

View File

@ -15,6 +15,7 @@ import { ContributionExpiryScheduler } from './schedulers/contribution-expiry.sc
import { ContributionDistributionConsumer } from '../infrastructure/kafka/consumers/contribution-distribution.consumer';
import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-registered.consumer';
import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mining-distribution.consumer';
import { BurnConsumer } from '../infrastructure/kafka/consumers/burn.consumer';
@Module({
imports: [ScheduleModule.forRoot()],
@ -23,6 +24,7 @@ import { MiningDistributionConsumer } from '../infrastructure/kafka/consumers/mi
ContributionDistributionConsumer,
UserRegisteredConsumer,
MiningDistributionConsumer,
BurnConsumer,
],
providers: [
// Services

View File

@ -239,6 +239,76 @@ export class PoolAccountService {
);
}
/**
* A扣减
* Kafka trading-service
* 100亿 SHARE_POOL_A
*/
async deductFromSharePoolAForBurn(
amount: Decimal,
burnInfo: {
burnMinute: Date;
burnRecordId: string;
sourceType: 'MINUTE_BURN' | 'SELL_BURN';
sourceAccountSeq?: string;
sourceOrderNo?: string;
},
): Promise<void> {
const sourcePool: PoolAccountType = 'SHARE_POOL_A';
const memo =
burnInfo.sourceType === 'MINUTE_BURN'
? `每分钟销毁扣减, 分钟${burnInfo.burnMinute.toISOString()}, 销毁量${amount.toFixed(8)}`
: `卖出销毁扣减, 账户${burnInfo.sourceAccountSeq}, 订单${burnInfo.sourceOrderNo}, 销毁量${amount.toFixed(8)}`;
await this.poolAccountRepo.updateBalanceWithTransaction(
sourcePool,
amount.negated(), // 负数表示扣减
{
transactionType: 'BURN',
counterpartyType: 'POOL',
counterpartyPoolType: 'BLACK_HOLE_POOL',
referenceId: burnInfo.burnRecordId,
referenceType: burnInfo.sourceType,
memo,
metadata: {
burnMinute: burnInfo.burnMinute.toISOString(),
burnRecordId: burnInfo.burnRecordId,
sourceType: burnInfo.sourceType,
sourceAccountSeq: burnInfo.sourceAccountSeq,
sourceOrderNo: burnInfo.sourceOrderNo,
burnAmount: amount.toString(),
},
},
);
// 同时增加黑洞池的余额
await this.poolAccountRepo.updateBalanceWithTransaction(
'BLACK_HOLE_POOL',
amount, // 正数表示增加
{
transactionType: 'BURN',
counterpartyType: 'POOL',
counterpartyPoolType: 'SHARE_POOL_A',
referenceId: burnInfo.burnRecordId,
referenceType: burnInfo.sourceType,
memo: `销毁入账, 来源${sourcePool}, ${memo}`,
metadata: {
burnMinute: burnInfo.burnMinute.toISOString(),
burnRecordId: burnInfo.burnRecordId,
sourceType: burnInfo.sourceType,
sourceAccountSeq: burnInfo.sourceAccountSeq,
sourceOrderNo: burnInfo.sourceOrderNo,
burnAmount: amount.toString(),
},
},
);
this.logger.log(
`Burned ${amount.toFixed(8)} from SHARE_POOL_A to BLACK_HOLE_POOL (${burnInfo.sourceType})`,
);
}
/**
*
*/

View File

@ -0,0 +1,195 @@
import { Controller, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import Decimal from 'decimal.js';
import { RedisService } from '../../redis/redis.service';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
import { PoolAccountService } from '../../../application/services/pool-account.service';
import {
MinuteBurnExecutedEvent,
MinuteBurnExecutedPayload,
BurnExecutedEvent,
BurnExecutedPayload,
} from '../events/burn.event';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
/**
*
* trading-service SHARE_POOL_A
*/
@Controller()
export class BurnConsumer implements OnModuleInit {
private readonly logger = new Logger(BurnConsumer.name);
constructor(
private readonly redis: RedisService,
private readonly processedEventRepo: ProcessedEventRepository,
private readonly poolAccountService: PoolAccountService,
) {}
async onModuleInit() {
this.logger.log('BurnConsumer initialized');
}
/**
*
* Topic: trading.burns
* EventType: burn.minute-executed
*/
@EventPattern('trading.burns')
async handleBurnEvent(@Payload() message: any): Promise<void> {
// 解析消息格式Outbox 发布的格式)
const eventData = message.value || message;
const eventId = eventData.eventId || message.eventId;
const eventType = eventData.eventType;
if (!eventId) {
this.logger.warn('Received burn event without eventId, skipping');
return;
}
// 根据事件类型分发处理
if (eventType === 'burn.minute-executed') {
await this.handleMinuteBurn(eventId, eventData as MinuteBurnExecutedEvent);
} else if (eventType === 'burn.executed') {
await this.handleSellBurn(eventId, eventData as BurnExecutedEvent);
} else {
this.logger.debug(`Unknown burn event type: ${eventType}, skipping`);
}
}
/**
*
*/
private async handleMinuteBurn(
eventId: string,
event: MinuteBurnExecutedEvent,
): Promise<void> {
this.logger.debug(`Processing minute burn event: ${eventId}`);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
const payload = event.payload;
const burnAmount = new Decimal(payload.burnAmount);
if (burnAmount.isZero()) {
this.logger.debug('Zero burn amount, skipping');
return;
}
// 从 SHARE_POOL_A 扣减销毁量
await this.poolAccountService.deductFromSharePoolAForBurn(burnAmount, {
burnMinute: new Date(payload.burnMinute),
burnRecordId: payload.burnRecordId,
sourceType: 'MINUTE_BURN',
});
// 标记为已处理
await this.markEventProcessed(eventId, 'burn.minute-executed');
this.logger.log(
`Minute burn processed: amount=${burnAmount.toFixed(8)}, minute=${payload.burnMinute}`,
);
} catch (error) {
this.logger.error(
`Failed to process minute burn event ${eventId}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
/**
*
*/
private async handleSellBurn(
eventId: string,
event: BurnExecutedEvent,
): Promise<void> {
this.logger.debug(`Processing sell burn event: ${eventId}`);
// 幂等性检查
if (await this.isEventProcessed(eventId)) {
this.logger.debug(`Event ${eventId} already processed, skipping`);
return;
}
try {
const payload = event.payload;
const burnAmount = new Decimal(payload.burnAmount);
if (burnAmount.isZero()) {
this.logger.debug('Zero burn amount, skipping');
return;
}
// 从 SHARE_POOL_A 扣减销毁量
await this.poolAccountService.deductFromSharePoolAForBurn(burnAmount, {
burnMinute: new Date(payload.executedAt),
burnRecordId: payload.burnRecordId,
sourceType: 'SELL_BURN',
sourceAccountSeq: payload.sourceAccountSeq,
sourceOrderNo: payload.sourceOrderNo,
});
// 标记为已处理
await this.markEventProcessed(eventId, 'burn.executed');
this.logger.log(
`Sell burn processed: amount=${burnAmount.toFixed(8)}, account=${payload.sourceAccountSeq}`,
);
} catch (error) {
this.logger.error(
`Failed to process sell burn event ${eventId}`,
error instanceof Error ? error.stack : error,
);
throw error; // 让 Kafka 重试
}
}
/**
* - Redis + DB
*/
private async isEventProcessed(eventId: string): Promise<boolean> {
const redisKey = `processed-event:burn:${eventId}`;
// 1. 先检查 Redis 缓存(快速路径)
const cached = await this.redis.get(redisKey);
if (cached) return true;
// 2. 检查数据库
const dbRecord = await this.processedEventRepo.findByEventId(eventId);
if (dbRecord) {
// 回填 Redis 缓存
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
return true;
}
return false;
}
/**
*
*/
private async markEventProcessed(
eventId: string,
eventType: string,
): Promise<void> {
// 1. 写入数据库
await this.processedEventRepo.create({
eventId,
eventType,
sourceService: 'trading-service',
});
// 2. 写入 Redis 缓存
const redisKey = `processed-event:burn:${eventId}`;
await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS);
}
}

View File

@ -0,0 +1,48 @@
/**
*
* trading-service
*/
/**
*
*/
export interface MinuteBurnExecutedEvent {
eventId: string;
eventType: 'burn.minute-executed';
aggregateType: string;
aggregateId: string;
createdAt: string;
payload: MinuteBurnExecutedPayload;
}
export interface MinuteBurnExecutedPayload {
burnRecordId: string;
burnMinute: string;
burnAmount: string;
totalBurned: string;
remainingTarget: string;
executedAt: string;
}
/**
*
*/
export interface BurnExecutedEvent {
eventId: string;
eventType: 'burn.executed';
aggregateType: string;
aggregateId: string;
createdAt: string;
payload: BurnExecutedPayload;
}
export interface BurnExecutedPayload {
burnRecordId: string;
sourceType: 'SELL' | 'SCHEDULED';
sourceAccountSeq?: string;
sourceOrderNo?: string;
burnAmount: string;
burnMultiplier?: string;
remainingTarget: string;
executedAt: string;
}