feat: 省市团队账户及待领取奖励逐笔追踪

1. authorization-service:
   - 省团队权益分配改用系统省团队账户 (7+provinceCode)
   - 市团队权益分配改用系统市团队账户 (6+cityCode)
   - 无推荐链时不再进总部,而是进对应团队账户

2. wallet-service:
   - 新增 pending_rewards 表支持逐笔追踪待领取奖励
   - ensureRegionAccounts 同时创建区域账户和团队账户
   - 新增 getPendingRewards API 查询待领取列表
   - 新增 settleUserPendingRewards 用户认种后结算
   - 新增 processExpiredRewards 定时处理过期奖励
   - PlantingCreatedHandler 监听认种事件触发结算
   - ExpiredRewardsScheduler 每小时处理过期奖励

账户格式:
- 省区域: 9+provinceCode (如 9440000)
- 市区域: 8+cityCode (如 8440100)
- 省团队: 7+provinceCode (如 7440000)
- 市团队: 6+cityCode (如 6440100)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-13 03:32:47 -08:00
parent 4d3290f029
commit 8f81c46d75
22 changed files with 1016 additions and 80 deletions

View File

@ -1111,7 +1111,8 @@ export class AuthorizationApplicationService {
`[getProvinceTeamRewardDistribution] accountSequence=${accountSequence}, provinceCode=${provinceCode}, treeCount=${treeCount}`,
)
const HEADQUARTERS_ACCOUNT_SEQUENCE = '1'
// 系统省团队账户ID格式: 7 + 省份代码
const systemProvinceTeamAccountSequence = `7${provinceCode.padStart(6, '0')}`
// 1. 获取用户的祖先链
const ancestorAccountSequences = await this.referralServiceClient.getReferralChain(accountSequence)
@ -1119,7 +1120,7 @@ export class AuthorizationApplicationService {
if (ancestorAccountSequences.length === 0) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '无推荐链,进总部社区' },
{ accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '无推荐链,进系统省团队账户' },
],
}
}
@ -1133,7 +1134,7 @@ export class AuthorizationApplicationService {
if (ancestorAuthProvinces.length === 0) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '推荐链上无授权省公司,进总部社区' },
{ accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '推荐链上无授权省公司,进系统省团队账户' },
],
}
}
@ -1157,7 +1158,7 @@ export class AuthorizationApplicationService {
if (!nearestAuthProvince) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '未找到匹配的授权省公司,进总部社区' },
{ accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '未找到匹配的授权省公司,进系统省团队账户' },
],
}
}
@ -1183,8 +1184,8 @@ export class AuthorizationApplicationService {
)
// 6. 查找上级(用于接收考核前的权益)
let parentAccountSequence: string = HEADQUARTERS_ACCOUNT_SEQUENCE
let parentReason = '上级为总部社区'
let parentAccountSequence: string = systemProvinceTeamAccountSequence
let parentReason = '上级为系统省团队账户'
for (let i = nearestIndex + 1; i < ancestorAccountSequences.length; i++) {
const ancestorSeq = ancestorAccountSequences[i]
@ -1408,7 +1409,8 @@ export class AuthorizationApplicationService {
`[getCityTeamRewardDistribution] accountSequence=${accountSequence}, cityCode=${cityCode}, treeCount=${treeCount}`,
)
const HEADQUARTERS_ACCOUNT_SEQUENCE = '1'
// 系统市团队账户ID格式: 6 + 城市代码
const systemCityTeamAccountSequence = `6${cityCode.padStart(6, '0')}`
// 1. 获取用户的祖先链
const ancestorAccountSequences = await this.referralServiceClient.getReferralChain(accountSequence)
@ -1416,7 +1418,7 @@ export class AuthorizationApplicationService {
if (ancestorAccountSequences.length === 0) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '无推荐链,进总部社区' },
{ accountSequence: systemCityTeamAccountSequence, treeCount, reason: '无推荐链,进系统市团队账户' },
],
}
}
@ -1430,7 +1432,7 @@ export class AuthorizationApplicationService {
if (ancestorAuthCities.length === 0) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '推荐链上无授权市公司,进总部社区' },
{ accountSequence: systemCityTeamAccountSequence, treeCount, reason: '推荐链上无授权市公司,进系统市团队账户' },
],
}
}
@ -1454,7 +1456,7 @@ export class AuthorizationApplicationService {
if (!nearestAuthCity) {
return {
distributions: [
{ accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '未找到匹配的授权市公司,进总部社区' },
{ accountSequence: systemCityTeamAccountSequence, treeCount, reason: '未找到匹配的授权市公司,进系统市团队账户' },
],
}
}
@ -1480,8 +1482,8 @@ export class AuthorizationApplicationService {
)
// 6. 查找上级
let parentAccountSequence: string = HEADQUARTERS_ACCOUNT_SEQUENCE
let parentReason = '上级为总部社区'
let parentAccountSequence: string = systemCityTeamAccountSequence
let parentReason = '上级为系统市团队账户'
for (let i = nearestIndex + 1; i < ancestorAccountSequences.length; i++) {
const ancestorSeq = ancestorAccountSequences[i]

View File

@ -0,0 +1,34 @@
-- CreateTable: pending_rewards
-- 待领取奖励表,逐笔记录每笔待领取奖励,支持独立过期时间
CREATE TABLE "pending_rewards" (
"pending_reward_id" BIGSERIAL NOT NULL,
"account_sequence" VARCHAR(20) NOT NULL,
"user_id" BIGINT NOT NULL,
"usdt_amount" DECIMAL(20,8) NOT NULL,
"hashpower_amount" DECIMAL(20,8) NOT NULL,
"source_order_id" VARCHAR(100) NOT NULL,
"allocation_type" VARCHAR(50) NOT NULL,
"expire_at" TIMESTAMP(3) NOT NULL,
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"settled_at" TIMESTAMP(3),
"expired_at" TIMESTAMP(3),
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "pending_rewards_pkey" PRIMARY KEY ("pending_reward_id")
);
-- CreateIndex
CREATE INDEX "pending_rewards_account_sequence_status_idx" ON "pending_rewards"("account_sequence", "status");
-- CreateIndex
CREATE INDEX "pending_rewards_user_id_status_idx" ON "pending_rewards"("user_id", "status");
-- CreateIndex: 用于定时任务查询过期奖励
CREATE INDEX "pending_rewards_status_expire_at_idx" ON "pending_rewards"("status", "expire_at");
-- CreateIndex
CREATE INDEX "pending_rewards_source_order_id_idx" ON "pending_rewards"("source_order_id");
-- CreateIndex
CREATE INDEX "pending_rewards_created_at_idx" ON "pending_rewards"("created_at");

View File

@ -200,6 +200,43 @@ model WithdrawalOrder {
@@index([createdAt])
}
// ============================================
// 待领取奖励表 (逐笔记录)
// 每笔待领取奖励独立跟踪过期时间
// ============================================
model PendingReward {
id BigInt @id @default(autoincrement()) @map("pending_reward_id")
accountSequence String @map("account_sequence") @db.VarChar(20) // 跨服务关联标识
userId BigInt @map("user_id") // 保留兼容
// 奖励金额
usdtAmount Decimal @map("usdt_amount") @db.Decimal(20, 8)
hashpowerAmount Decimal @map("hashpower_amount") @db.Decimal(20, 8)
// 来源信息
sourceOrderId String @map("source_order_id") @db.VarChar(100) // 触发奖励的认种订单
allocationType String @map("allocation_type") @db.VarChar(50) // 分配类型 (SHARE_RIGHT, TEAM_RIGHT 等)
// 过期时间
expireAt DateTime @map("expire_at")
// 状态: PENDING(待领取) / SETTLED(已结算) / EXPIRED(已过期)
status String @default("PENDING") @map("status") @db.VarChar(20)
// 结算/过期时间
settledAt DateTime? @map("settled_at")
expiredAt DateTime? @map("expired_at")
createdAt DateTime @default(now()) @map("created_at")
@@map("pending_rewards")
@@index([accountSequence, status])
@@index([userId, status])
@@index([status, expireAt]) // 用于定时任务查询过期奖励
@@index([sourceOrderId])
@@index([createdAt])
}
// ============================================
// 已处理事件表 (幂等性检查)
// 用于确保 Kafka 事件不会被重复处理

View File

@ -10,7 +10,8 @@ import {
} from './controllers';
import { InternalWalletController } from './controllers/internal-wallet.controller';
import { WalletApplicationService } from '@/application/services';
import { DepositConfirmedHandler } from '@/application/event-handlers';
import { DepositConfirmedHandler, PlantingCreatedHandler } from '@/application/event-handlers';
import { ExpiredRewardsScheduler } from '@/application/schedulers';
import { JwtStrategy } from '@/shared/strategies/jwt.strategy';
@Module({
@ -34,6 +35,8 @@ import { JwtStrategy } from '@/shared/strategies/jwt.strategy';
providers: [
WalletApplicationService,
DepositConfirmedHandler,
PlantingCreatedHandler,
ExpiredRewardsScheduler,
JwtStrategy,
],
})

View File

@ -75,4 +75,22 @@ export class WalletController {
): Promise<WithdrawalListItemDTO[]> {
return this.walletService.getWithdrawals(user.userId);
}
@Get('pending-rewards')
@ApiOperation({ summary: '查询待领取奖励列表', description: '获取用户的逐笔待领取奖励,包含过期时间' })
@ApiResponse({ status: 200, description: '待领取奖励列表' })
async getPendingRewards(
@CurrentUser() user: CurrentUserPayload,
): Promise<Array<{
id: string;
usdtAmount: number;
hashpowerAmount: number;
sourceOrderId: string;
allocationType: string;
expireAt: string;
status: string;
createdAt: string;
}>> {
return this.walletService.getPendingRewards(user.accountSequence);
}
}

View File

@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { APP_FILTER, APP_INTERCEPTOR, APP_GUARD } from '@nestjs/core';
import { ApiModule } from '@/api/api.module';
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
@ -18,6 +19,7 @@ import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard';
// Also load from process.env (system environment variables)
ignoreEnvFile: false,
}),
ScheduleModule.forRoot(),
InfrastructureModule,
ApiModule,
],

View File

@ -1 +1,2 @@
export * from './deposit-confirmed.handler';
export * from './planting-created.handler';

View File

@ -0,0 +1,48 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { PlantingEventConsumerService, PlantingCreatedPayload } from '@/infrastructure/kafka';
import { WalletApplicationService } from '@/application/services';
/**
*
*
*
* 1. PENDING SETTLED
* 2.
*/
@Injectable()
export class PlantingCreatedHandler implements OnModuleInit {
private readonly logger = new Logger(PlantingCreatedHandler.name);
constructor(
private readonly plantingEventConsumer: PlantingEventConsumerService,
private readonly walletService: WalletApplicationService,
) {}
onModuleInit() {
this.plantingEventConsumer.onPlantingCreated(this.handlePlantingCreated.bind(this));
this.logger.log('PlantingCreatedHandler registered');
}
private async handlePlantingCreated(payload: PlantingCreatedPayload): Promise<void> {
const { orderNo, accountSequence, userId, treeCount } = payload;
this.logger.log(`[PLANTING] User ${accountSequence} planted ${treeCount} tree(s), order: ${orderNo}`);
try {
// 用户认种后,结算其所有待领取奖励
const result = await this.walletService.settleUserPendingRewards(accountSequence);
if (result.settledCount > 0) {
this.logger.log(
`[PLANTING] Settled ${result.settledCount} pending rewards for ${accountSequence}: ` +
`${result.totalUsdt} USDT, ${result.totalHashpower} hashpower`,
);
} else {
this.logger.debug(`[PLANTING] No pending rewards to settle for ${accountSequence}`);
}
} catch (error) {
this.logger.error(`[PLANTING] Failed to settle pending rewards for ${accountSequence}`, error);
// 不抛出异常,避免阻塞 Kafka 消费
}
}
}

View File

@ -0,0 +1,56 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { WalletApplicationService } from '@/application/services';
/**
*
*
*
* 1. pending_rewards status=PENDING expire_at < now
* 2. EXPIRED
* 3. (S0000000001)
*/
@Injectable()
export class ExpiredRewardsScheduler {
private readonly logger = new Logger(ExpiredRewardsScheduler.name);
constructor(
private readonly walletService: WalletApplicationService,
) {}
/**
*
*/
@Cron(CronExpression.EVERY_HOUR)
async handleExpiredRewards(): Promise<void> {
this.logger.log('Starting expired rewards processing job...');
try {
const result = await this.walletService.processExpiredRewards(100);
if (result.processedCount > 0) {
this.logger.log(
`Expired rewards job completed: processed=${result.processedCount}, ` +
`totalExpiredUsdt=${result.totalExpiredUsdt}, ` +
`transferredToHQ=${result.transferredToHeadquarters}`,
);
} else {
this.logger.debug('No expired rewards to process');
}
} catch (error) {
this.logger.error('Failed to process expired rewards', error);
}
}
/**
*
*/
async triggerManually(batchSize = 100): Promise<{
processedCount: number;
totalExpiredUsdt: number;
transferredToHeadquarters: number;
}> {
this.logger.log('Manually triggered expired rewards processing');
return this.walletService.processExpiredRewards(batchSize);
}
}

View File

@ -0,0 +1 @@
export * from './expired-rewards.scheduler';

View File

@ -5,8 +5,9 @@ import {
IDepositOrderRepository, DEPOSIT_ORDER_REPOSITORY,
ISettlementOrderRepository, SETTLEMENT_ORDER_REPOSITORY,
IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY,
IPendingRewardRepository, PENDING_REWARD_REPOSITORY,
} from '@/domain/repositories';
import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder } from '@/domain/aggregates';
import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingRewardStatus } from '@/domain/aggregates';
import {
UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency,
} from '@/domain/value-objects';
@ -82,6 +83,8 @@ export class WalletApplicationService {
private readonly settlementRepo: ISettlementOrderRepository,
@Inject(WITHDRAWAL_ORDER_REPOSITORY)
private readonly withdrawalRepo: IWithdrawalOrderRepository,
@Inject(PENDING_REWARD_REPOSITORY)
private readonly pendingRewardRepo: IPendingRewardRepository,
private readonly walletCacheService: WalletCacheService,
private readonly eventPublisher: EventPublisherService,
) {}
@ -685,72 +688,74 @@ export class WalletApplicationService {
);
}
/**
*
* S开头 migration seed
*
*
* - S0000000001: 总部社区 (user_id = -1)
* - S0000000002: 成本费账户 (user_id = -2)
* - S0000000003: 运营费账户 (user_id = -3)
* - S0000000004: RWA底池 (user_id = -4)
*/
private async allocateToSystemAccount(
allocation: FundAllocationItem,
orderId: string,
): Promise<void> {
this.logger.debug(
`System account allocation: ${allocation.amount} USDT to ${allocation.targetId} for ${allocation.allocationType}`,
);
const targetId = allocation.targetId;
if (!targetId.startsWith('S')) {
this.logger.warn(`Invalid system account format: ${targetId}`);
return;
}
// 获取系统账户(已由 migration seed 创建)
const wallet = await this.walletRepo.findByAccountSequence(targetId);
if (!wallet) {
this.logger.error(`System account not found: ${targetId}`);
return;
}
const amount = Money.USDT(allocation.amount);
// 系统账户直接增加可用余额(不需要待领取/过期机制)
wallet.addAvailableBalance(amount);
await this.walletRepo.save(wallet);
// 记录流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.SYSTEM_ALLOCATION,
amount,
refOrderId: orderId,
memo: `${allocation.allocationType} - system account allocation`,
payloadJson: {
allocationType: allocation.allocationType,
metadata: allocation.metadata,
},
});
await this.ledgerRepo.save(ledgerEntry);
this.logger.debug(
`Allocated ${allocation.amount} USDT to system account ${targetId} for ${allocation.allocationType}`,
);
}
/**
*
* S开头 migration seed
*
*
* - S0000000001: 总部社区 (user_id = -1)
* - S0000000002: 成本费账户 (user_id = -2)
* - S0000000003: 运营费账户 (user_id = -3)
* - S0000000004: RWA底池 (user_id = -4)
*/
private async allocateToSystemAccount(
allocation: FundAllocationItem,
orderId: string,
): Promise<void> {
this.logger.debug(
`System account allocation: ${allocation.amount} USDT to ${allocation.targetId} for ${allocation.allocationType}`,
);
const targetId = allocation.targetId;
if (!targetId.startsWith('S')) {
this.logger.warn(`Invalid system account format: ${targetId}`);
return;
}
// 获取系统账户(已由 migration seed 创建)
const wallet = await this.walletRepo.findByAccountSequence(targetId);
if (!wallet) {
this.logger.error(`System account not found: ${targetId}`);
return;
}
const amount = Money.USDT(allocation.amount);
// 系统账户直接增加可用余额(不需要待领取/过期机制)
wallet.addAvailableBalance(amount);
await this.walletRepo.save(wallet);
// 记录流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.SYSTEM_ALLOCATION,
amount,
refOrderId: orderId,
memo: `${allocation.allocationType} - system account allocation`,
payloadJson: {
allocationType: allocation.allocationType,
metadata: allocation.metadata,
},
});
await this.ledgerRepo.save(ledgerEntry);
this.logger.debug(
`Allocated ${allocation.amount} USDT to system account ${targetId} for ${allocation.allocationType}`,
);
}
// =============== Region Accounts ===============
/**
*
* /
*
* /
*
* :
* - 省区域账户: 9 + provinceCode (: 9440000)
* - 市区域账户: 8 + cityCode (: 8440100)
* - 省区域账户: 9 + provinceCode (: 9440000) -
* - 市区域账户: 8 + cityCode (: 8440100) -
* - 省团队账户: 7 + provinceCode (: 7440000) -
* - 市团队账户: 6 + cityCode (: 6440100) -
*/
async ensureRegionAccounts(params: {
provinceCode: string;
@ -760,6 +765,8 @@ export class WalletApplicationService {
}): Promise<{
provinceAccount: { accountSequence: string; created: boolean };
cityAccount: { accountSequence: string; created: boolean };
provinceTeamAccount: { accountSequence: string; created: boolean };
cityTeamAccount: { accountSequence: string; created: boolean };
}> {
const { provinceCode, provinceName, cityCode, cityName } = params;
@ -771,9 +778,17 @@ export class WalletApplicationService {
const cityAccountSequence = `8${cityCode}`;
const cityUserId = BigInt(cityAccountSequence);
this.logger.log(`确保区域账户存在: 省=${provinceAccountSequence}, 市=${cityAccountSequence}`);
// 省团队账户: 7 + provinceCode
const provinceTeamAccountSequence = `7${provinceCode}`;
const provinceTeamUserId = BigInt(provinceTeamAccountSequence);
// 检查省账户是否已存在
// 市团队账户: 6 + cityCode
const cityTeamAccountSequence = `6${cityCode}`;
const cityTeamUserId = BigInt(cityTeamAccountSequence);
this.logger.log(`确保区域和团队账户存在: 省区域=${provinceAccountSequence}, 市区域=${cityAccountSequence}, 省团队=${provinceTeamAccountSequence}, 市团队=${cityTeamAccountSequence}`);
// 检查省区域账户是否已存在
let provinceWallet = await this.walletRepo.findByAccountSequence(provinceAccountSequence);
let provinceCreated = false;
if (!provinceWallet) {
@ -782,7 +797,7 @@ export class WalletApplicationService {
this.logger.log(`创建省区域账户: ${provinceAccountSequence} (${provinceName})`);
}
// 检查市账户是否已存在
// 检查市区域账户是否已存在
let cityWallet = await this.walletRepo.findByAccountSequence(cityAccountSequence);
let cityCreated = false;
if (!cityWallet) {
@ -791,6 +806,24 @@ export class WalletApplicationService {
this.logger.log(`创建市区域账户: ${cityAccountSequence} (${cityName})`);
}
// 检查省团队账户是否已存在
let provinceTeamWallet = await this.walletRepo.findByAccountSequence(provinceTeamAccountSequence);
let provinceTeamCreated = false;
if (!provinceTeamWallet) {
provinceTeamWallet = await this.walletRepo.getOrCreate(provinceTeamAccountSequence, provinceTeamUserId);
provinceTeamCreated = true;
this.logger.log(`创建省团队账户: ${provinceTeamAccountSequence} (${provinceName}团队)`);
}
// 检查市团队账户是否已存在
let cityTeamWallet = await this.walletRepo.findByAccountSequence(cityTeamAccountSequence);
let cityTeamCreated = false;
if (!cityTeamWallet) {
cityTeamWallet = await this.walletRepo.getOrCreate(cityTeamAccountSequence, cityTeamUserId);
cityTeamCreated = true;
this.logger.log(`创建市团队账户: ${cityTeamAccountSequence} (${cityName}团队)`);
}
return {
provinceAccount: {
accountSequence: provinceAccountSequence,
@ -800,6 +833,14 @@ export class WalletApplicationService {
accountSequence: cityAccountSequence,
created: cityCreated,
},
provinceTeamAccount: {
accountSequence: provinceTeamAccountSequence,
created: provinceTeamCreated,
},
cityTeamAccount: {
accountSequence: cityTeamAccountSequence,
created: cityTeamCreated,
},
};
}
@ -1145,4 +1186,165 @@ export class WalletApplicationService {
totalPages: result.totalPages,
};
}
// =============== Pending Rewards ===============
/**
*
* pending_rewards
*/
async getPendingRewards(accountSequence: string): Promise<Array<{
id: string;
usdtAmount: number;
hashpowerAmount: number;
sourceOrderId: string;
allocationType: string;
expireAt: string;
status: string;
createdAt: string;
}>> {
const rewards = await this.pendingRewardRepo.findByAccountSequence(accountSequence);
return rewards.map(r => ({
id: r.id.toString(),
usdtAmount: r.usdtAmount.value,
hashpowerAmount: r.hashpowerAmount.value,
sourceOrderId: r.sourceOrderId,
allocationType: r.allocationType,
expireAt: r.expireAt.toISOString(),
status: r.status,
createdAt: r.createdAt.toISOString(),
}));
}
/**
*
* PENDING SETTLED
*
*/
async settleUserPendingRewards(accountSequence: string): Promise<{
settledCount: number;
totalUsdt: number;
totalHashpower: number;
}> {
this.logger.log(`[settleUserPendingRewards] accountSequence=${accountSequence}`);
// 查找所有 PENDING 状态的奖励
const pendingRewards = await this.pendingRewardRepo.findByAccountSequence(
accountSequence,
PendingRewardStatus.PENDING,
);
if (pendingRewards.length === 0) {
return { settledCount: 0, totalUsdt: 0, totalHashpower: 0 };
}
let totalUsdt = 0;
let totalHashpower = 0;
// 标记为已结算
for (const reward of pendingRewards) {
reward.markAsSettled();
totalUsdt += reward.usdtAmount.value;
totalHashpower += reward.hashpowerAmount.value;
}
// 批量更新状态
await this.pendingRewardRepo.updateAll(pendingRewards);
// 更新钱包可结算余额
const wallet = await this.walletRepo.findByAccountSequence(accountSequence);
if (wallet) {
// 将待领取转为可结算
wallet.addSettleableReward(
Money.USDT(totalUsdt),
Hashpower.create(totalHashpower),
);
await this.walletRepo.save(wallet);
// 记录流水
if (totalUsdt > 0) {
const ledgerEntry = LedgerEntry.create({
accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.REWARD_TO_SETTLEABLE,
amount: Money.USDT(totalUsdt),
memo: `${pendingRewards.length} pending rewards settled`,
});
await this.ledgerRepo.save(ledgerEntry);
}
await this.walletCacheService.invalidateWallet(wallet.userId.value);
}
this.logger.log(
`[settleUserPendingRewards] Settled ${pendingRewards.length} rewards: ${totalUsdt} USDT, ${totalHashpower} hashpower`,
);
return {
settledCount: pendingRewards.length,
totalUsdt,
totalHashpower,
};
}
/**
*
* PENDING EXPIRED
* (S0000000001)
*/
async processExpiredRewards(batchSize = 100): Promise<{
processedCount: number;
totalExpiredUsdt: number;
transferredToHeadquarters: number;
}> {
const now = new Date();
this.logger.log(`[processExpiredRewards] Processing expired rewards at ${now.toISOString()}`);
// 查找已过期的 PENDING 奖励
const expiredRewards = await this.pendingRewardRepo.findExpiredPending(now, batchSize);
if (expiredRewards.length === 0) {
return { processedCount: 0, totalExpiredUsdt: 0, transferredToHeadquarters: 0 };
}
let totalExpiredUsdt = 0;
// 标记为已过期
for (const reward of expiredRewards) {
reward.markAsExpired();
totalExpiredUsdt += reward.usdtAmount.value;
}
// 批量更新状态
await this.pendingRewardRepo.updateAll(expiredRewards);
// 将过期金额转入总部社区账户
const headquartersAccountSequence = 'S0000000001';
const hqWallet = await this.walletRepo.findByAccountSequence(headquartersAccountSequence);
if (hqWallet && totalExpiredUsdt > 0) {
hqWallet.addAvailableBalance(Money.USDT(totalExpiredUsdt));
await this.walletRepo.save(hqWallet);
// 记录流水
const ledgerEntry = LedgerEntry.create({
accountSequence: headquartersAccountSequence,
userId: hqWallet.userId,
entryType: LedgerEntryType.SYSTEM_ALLOCATION,
amount: Money.USDT(totalExpiredUsdt),
memo: `Expired rewards from ${expiredRewards.length} pending entries`,
});
await this.ledgerRepo.save(ledgerEntry);
this.logger.log(
`[processExpiredRewards] Transferred ${totalExpiredUsdt} USDT to headquarters from ${expiredRewards.length} expired rewards`,
);
}
return {
processedCount: expiredRewards.length,
totalExpiredUsdt,
transferredToHeadquarters: totalExpiredUsdt,
};
}
}

View File

@ -3,3 +3,4 @@ export * from './ledger-entry.aggregate';
export * from './deposit-order.aggregate';
export * from './settlement-order.aggregate';
export * from './withdrawal-order.aggregate';
export * from './pending-reward.aggregate';

View File

@ -0,0 +1,159 @@
import Decimal from 'decimal.js';
import { UserId, Money, Hashpower } from '@/domain/value-objects';
export enum PendingRewardStatus {
PENDING = 'PENDING',
SETTLED = 'SETTLED',
EXPIRED = 'EXPIRED',
}
export class PendingReward {
private readonly _id: bigint;
private readonly _accountSequence: string;
private readonly _userId: UserId;
private readonly _usdtAmount: Money;
private readonly _hashpowerAmount: Hashpower;
private readonly _sourceOrderId: string;
private readonly _allocationType: string;
private readonly _expireAt: Date;
private _status: PendingRewardStatus;
private _settledAt: Date | null;
private _expiredAt: Date | null;
private readonly _createdAt: Date;
private constructor(
id: bigint,
accountSequence: string,
userId: UserId,
usdtAmount: Money,
hashpowerAmount: Hashpower,
sourceOrderId: string,
allocationType: string,
expireAt: Date,
status: PendingRewardStatus,
settledAt: Date | null,
expiredAt: Date | null,
createdAt: Date,
) {
this._id = id;
this._accountSequence = accountSequence;
this._userId = userId;
this._usdtAmount = usdtAmount;
this._hashpowerAmount = hashpowerAmount;
this._sourceOrderId = sourceOrderId;
this._allocationType = allocationType;
this._expireAt = expireAt;
this._status = status;
this._settledAt = settledAt;
this._expiredAt = expiredAt;
this._createdAt = createdAt;
}
// Getters
get id(): bigint { return this._id; }
get accountSequence(): string { return this._accountSequence; }
get userId(): UserId { return this._userId; }
get usdtAmount(): Money { return this._usdtAmount; }
get hashpowerAmount(): Hashpower { return this._hashpowerAmount; }
get sourceOrderId(): string { return this._sourceOrderId; }
get allocationType(): string { return this._allocationType; }
get expireAt(): Date { return this._expireAt; }
get status(): PendingRewardStatus { return this._status; }
get settledAt(): Date | null { return this._settledAt; }
get expiredAt(): Date | null { return this._expiredAt; }
get createdAt(): Date { return this._createdAt; }
get isPending(): boolean { return this._status === PendingRewardStatus.PENDING; }
get isSettled(): boolean { return this._status === PendingRewardStatus.SETTLED; }
get isExpired(): boolean { return this._status === PendingRewardStatus.EXPIRED; }
/**
*
*/
static create(params: {
accountSequence: string;
userId: UserId;
usdtAmount: Money;
hashpowerAmount: Hashpower;
sourceOrderId: string;
allocationType: string;
expireAt: Date;
}): PendingReward {
return new PendingReward(
BigInt(0), // Will be set by database
params.accountSequence,
params.userId,
params.usdtAmount,
params.hashpowerAmount,
params.sourceOrderId,
params.allocationType,
params.expireAt,
PendingRewardStatus.PENDING,
null,
null,
new Date(),
);
}
/**
*
*/
static reconstruct(params: {
id: bigint;
accountSequence: string;
userId: bigint;
usdtAmount: Decimal;
hashpowerAmount: Decimal;
sourceOrderId: string;
allocationType: string;
expireAt: Date;
status: string;
settledAt: Date | null;
expiredAt: Date | null;
createdAt: Date;
}): PendingReward {
return new PendingReward(
params.id,
params.accountSequence,
UserId.create(params.userId),
Money.USDT(params.usdtAmount),
Hashpower.create(params.hashpowerAmount),
params.sourceOrderId,
params.allocationType,
params.expireAt,
params.status as PendingRewardStatus,
params.settledAt,
params.expiredAt,
params.createdAt,
);
}
/**
* ()
*/
markAsSettled(): void {
if (this._status !== PendingRewardStatus.PENDING) {
throw new Error(`Cannot settle reward in status: ${this._status}`);
}
this._status = PendingRewardStatus.SETTLED;
this._settledAt = new Date();
}
/**
* (24)
*/
markAsExpired(): void {
if (this._status !== PendingRewardStatus.PENDING) {
throw new Error(`Cannot expire reward in status: ${this._status}`);
}
this._status = PendingRewardStatus.EXPIRED;
this._expiredAt = new Date();
}
/**
*
*/
isOverdue(now: Date = new Date()): boolean {
return this._status === PendingRewardStatus.PENDING && now > this._expireAt;
}
}

View File

@ -318,6 +318,31 @@ export class WalletAccount {
}));
}
/**
* pending_rewards
* pending_rewards
*/
addSettleableReward(usdtAmount: Money, hashpowerAmount: Hashpower): void {
this.ensureActive();
this._rewards = {
...this._rewards,
settleableUsdt: this._rewards.settleableUsdt.add(usdtAmount),
settleableHashpower: this._rewards.settleableHashpower.add(hashpowerAmount),
};
// 增加算力
this._hashpower = this._hashpower.add(hashpowerAmount);
this._updatedAt = new Date();
this.addDomainEvent(new RewardMovedToSettleableEvent({
userId: this._userId.toString(),
walletId: this._walletId.toString(),
usdtAmount: usdtAmount.value.toString(),
hashpowerAmount: hashpowerAmount.value.toString(),
}));
}
// 奖励过期
expirePendingRewards(): void {
if (this._rewards.pendingUsdt.isZero() && this._rewards.pendingHashpower.isZero()) {

View File

@ -3,3 +3,4 @@ export * from './ledger-entry.repository.interface';
export * from './deposit-order.repository.interface';
export * from './settlement-order.repository.interface';
export * from './withdrawal-order.repository.interface';
export * from './pending-reward.repository.interface';

View File

@ -0,0 +1,26 @@
import { PendingReward, PendingRewardStatus } from '@/domain/aggregates';
export interface IPendingRewardRepository {
save(reward: PendingReward): Promise<PendingReward>;
saveAll(rewards: PendingReward[]): Promise<void>;
update(reward: PendingReward): Promise<void>;
updateAll(rewards: PendingReward[]): Promise<void>;
findById(id: bigint): Promise<PendingReward | null>;
findByAccountSequence(accountSequence: string, status?: PendingRewardStatus): Promise<PendingReward[]>;
findByUserId(userId: bigint, status?: PendingRewardStatus): Promise<PendingReward[]>;
findBySourceOrderId(sourceOrderId: string): Promise<PendingReward[]>;
/**
* PENDING
*
*/
findExpiredPending(now: Date, limit?: number): Promise<PendingReward[]>;
/**
*
*/
sumPendingByAccountSequence(accountSequence: string): Promise<{ usdtTotal: number; hashpowerTotal: number }>;
}
export const PENDING_REWARD_REPOSITORY = Symbol('IPendingRewardRepository');

View File

@ -6,6 +6,7 @@ import {
DepositOrderRepositoryImpl,
SettlementOrderRepositoryImpl,
WithdrawalOrderRepositoryImpl,
PendingRewardRepositoryImpl,
} from './persistence/repositories';
import {
WALLET_ACCOUNT_REPOSITORY,
@ -13,6 +14,7 @@ import {
DEPOSIT_ORDER_REPOSITORY,
SETTLEMENT_ORDER_REPOSITORY,
WITHDRAWAL_ORDER_REPOSITORY,
PENDING_REWARD_REPOSITORY,
} from '@/domain/repositories';
import { RedisModule } from './redis';
import { KafkaModule } from './kafka';
@ -38,6 +40,10 @@ const repositories = [
provide: WITHDRAWAL_ORDER_REPOSITORY,
useClass: WithdrawalOrderRepositoryImpl,
},
{
provide: PENDING_REWARD_REPOSITORY,
useClass: PendingRewardRepositoryImpl,
},
];
@Global()

View File

@ -1,3 +1,4 @@
export * from './kafka.module';
export * from './event-publisher.service';
export * from './deposit-event-consumer.service';
export * from './planting-event-consumer.service';

View File

@ -3,6 +3,7 @@ import { ConfigModule, ConfigService } from '@nestjs/config';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { EventPublisherService } from './event-publisher.service';
import { DepositEventConsumerService } from './deposit-event-consumer.service';
import { PlantingEventConsumerService } from './planting-event-consumer.service';
// [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息
// import { RewardEventConsumerController } from './reward-event-consumer.controller';
// import { EventAckPublisher } from './event-ack.publisher';
@ -34,7 +35,7 @@ import { PrismaService } from '../persistence/prisma/prisma.service';
// [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息
// controllers: [RewardEventConsumerController],
controllers: [],
providers: [PrismaService, EventPublisherService, DepositEventConsumerService],
exports: [EventPublisherService, DepositEventConsumerService, ClientsModule],
providers: [PrismaService, EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService],
exports: [EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService, ClientsModule],
})
export class KafkaModule {}

View File

@ -0,0 +1,144 @@
/**
* Planting Event Consumer Service for Wallet Service
*
* Consumes planting events from planting-service via Kafka.
* When a user plants a tree, settles their pending rewards.
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
export const PLANTING_TOPICS = {
PLANTING_EVENTS: 'planting-events',
} as const;
export interface PlantingCreatedPayload {
orderNo: string;
accountSequence: string;
userId: string;
treeCount: number;
totalAmount: string;
createdAt: string;
}
export type PlantingEventHandler = (payload: PlantingCreatedPayload) => Promise<void>;
@Injectable()
export class PlantingEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PlantingEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private plantingCreatedHandler?: PlantingEventHandler;
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'wallet-service';
const groupId = 'wallet-service-planting-events';
this.logger.log(`[INIT] Planting Event Consumer initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`);
this.logger.log(`[INIT] GroupId: ${groupId}`);
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
this.logger.log(`[INIT] Topics: ${Object.values(PLANTING_TOPICS).join(', ')}`);
this.kafka = new Kafka({
clientId: `${clientId}-planting`,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
this.logger.log(`[CONNECT] Connecting Planting Event consumer...`);
await this.consumer.connect();
this.isConnected = true;
this.logger.log(`[CONNECT] Planting Event consumer connected successfully`);
await this.consumer.subscribe({
topics: Object.values(PLANTING_TOPICS),
fromBeginning: false,
});
this.logger.log(`[SUBSCRIBE] Subscribed to planting topics`);
await this.startConsuming();
} catch (error) {
this.logger.error(`[ERROR] Failed to connect Planting Event consumer`, error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('Planting Event consumer disconnected');
}
}
/**
* Register handler for planting created events
*/
onPlantingCreated(handler: PlantingEventHandler): void {
this.plantingCreatedHandler = handler;
this.logger.log(`[REGISTER] PlantingCreated handler registered`);
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const offset = message.offset;
this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`);
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn(`[RECEIVE] Empty message received on ${topic}`);
return;
}
this.logger.debug(`[RECEIVE] Raw message: ${value.substring(0, 500)}...`);
const parsed = JSON.parse(value);
const eventType = parsed.eventType || parsed.type;
const payload = parsed.payload || parsed;
this.logger.log(`[RECEIVE] Event type: ${eventType}`);
// 监听认种创建事件 - 用户认种后,结算其所有待领取奖励
if (eventType === 'planting.planting.created' || eventType === 'PlantingOrderCreated') {
this.logger.log(`[HANDLE] Processing PlantingCreated event`);
this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`);
this.logger.log(`[HANDLE] accountSequence: ${payload.accountSequence}`);
this.logger.log(`[HANDLE] userId: ${payload.userId}`);
this.logger.log(`[HANDLE] treeCount: ${payload.treeCount}`);
if (this.plantingCreatedHandler) {
await this.plantingCreatedHandler(payload as PlantingCreatedPayload);
this.logger.log(`[HANDLE] PlantingCreated handler completed`);
} else {
this.logger.warn(`[HANDLE] No handler registered for PlantingCreated`);
}
} else {
this.logger.debug(`[RECEIVE] Ignoring event type: ${eventType}`);
}
} catch (error) {
this.logger.error(`[ERROR] Error processing planting event from ${topic}`, error);
}
},
});
this.logger.log(`[START] Started consuming planting events`);
}
}

View File

@ -3,3 +3,4 @@ export * from './ledger-entry.repository.impl';
export * from './deposit-order.repository.impl';
export * from './settlement-order.repository.impl';
export * from './withdrawal-order.repository.impl';
export * from './pending-reward.repository.impl';

View File

@ -0,0 +1,167 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { IPendingRewardRepository } from '@/domain/repositories';
import { PendingReward, PendingRewardStatus } from '@/domain/aggregates';
import Decimal from 'decimal.js';
@Injectable()
export class PendingRewardRepositoryImpl implements IPendingRewardRepository {
constructor(private readonly prisma: PrismaService) {}
async save(reward: PendingReward): Promise<PendingReward> {
const created = await this.prisma.pendingReward.create({
data: {
accountSequence: reward.accountSequence,
userId: reward.userId.value,
usdtAmount: reward.usdtAmount.toDecimal(),
hashpowerAmount: reward.hashpowerAmount.decimal,
sourceOrderId: reward.sourceOrderId,
allocationType: reward.allocationType,
expireAt: reward.expireAt,
status: reward.status,
settledAt: reward.settledAt,
expiredAt: reward.expiredAt,
},
});
return this.toDomain(created);
}
async saveAll(rewards: PendingReward[]): Promise<void> {
await this.prisma.pendingReward.createMany({
data: rewards.map(reward => ({
accountSequence: reward.accountSequence,
userId: reward.userId.value,
usdtAmount: reward.usdtAmount.toDecimal(),
hashpowerAmount: reward.hashpowerAmount.decimal,
sourceOrderId: reward.sourceOrderId,
allocationType: reward.allocationType,
expireAt: reward.expireAt,
status: reward.status,
settledAt: reward.settledAt,
expiredAt: reward.expiredAt,
})),
});
}
async update(reward: PendingReward): Promise<void> {
await this.prisma.pendingReward.update({
where: { id: reward.id },
data: {
status: reward.status,
settledAt: reward.settledAt,
expiredAt: reward.expiredAt,
},
});
}
async updateAll(rewards: PendingReward[]): Promise<void> {
await this.prisma.$transaction(
rewards.map(reward =>
this.prisma.pendingReward.update({
where: { id: reward.id },
data: {
status: reward.status,
settledAt: reward.settledAt,
expiredAt: reward.expiredAt,
},
}),
),
);
}
async findById(id: bigint): Promise<PendingReward | null> {
const record = await this.prisma.pendingReward.findUnique({
where: { id },
});
return record ? this.toDomain(record) : null;
}
async findByAccountSequence(accountSequence: string, status?: PendingRewardStatus): Promise<PendingReward[]> {
const records = await this.prisma.pendingReward.findMany({
where: {
accountSequence,
...(status && { status }),
},
orderBy: { createdAt: 'desc' },
});
return records.map(r => this.toDomain(r));
}
async findByUserId(userId: bigint, status?: PendingRewardStatus): Promise<PendingReward[]> {
const records = await this.prisma.pendingReward.findMany({
where: {
userId,
...(status && { status }),
},
orderBy: { createdAt: 'desc' },
});
return records.map(r => this.toDomain(r));
}
async findBySourceOrderId(sourceOrderId: string): Promise<PendingReward[]> {
const records = await this.prisma.pendingReward.findMany({
where: { sourceOrderId },
orderBy: { createdAt: 'desc' },
});
return records.map(r => this.toDomain(r));
}
async findExpiredPending(now: Date, limit = 100): Promise<PendingReward[]> {
const records = await this.prisma.pendingReward.findMany({
where: {
status: PendingRewardStatus.PENDING,
expireAt: { lt: now },
},
orderBy: { expireAt: 'asc' },
take: limit,
});
return records.map(r => this.toDomain(r));
}
async sumPendingByAccountSequence(accountSequence: string): Promise<{ usdtTotal: number; hashpowerTotal: number }> {
const result = await this.prisma.pendingReward.aggregate({
where: {
accountSequence,
status: PendingRewardStatus.PENDING,
},
_sum: {
usdtAmount: true,
hashpowerAmount: true,
},
});
return {
usdtTotal: result._sum.usdtAmount?.toNumber() ?? 0,
hashpowerTotal: result._sum.hashpowerAmount?.toNumber() ?? 0,
};
}
private toDomain(record: {
id: bigint;
accountSequence: string;
userId: bigint;
usdtAmount: Decimal;
hashpowerAmount: Decimal;
sourceOrderId: string;
allocationType: string;
expireAt: Date;
status: string;
settledAt: Date | null;
expiredAt: Date | null;
createdAt: Date;
}): PendingReward {
return PendingReward.reconstruct({
id: record.id,
accountSequence: record.accountSequence,
userId: record.userId,
usdtAmount: new Decimal(record.usdtAmount.toString()),
hashpowerAmount: new Decimal(record.hashpowerAmount.toString()),
sourceOrderId: record.sourceOrderId,
allocationType: record.allocationType,
expireAt: record.expireAt,
status: record.status,
settledAt: record.settledAt,
expiredAt: record.expiredAt,
createdAt: record.createdAt,
});
}
}