From 8f81c46d75d8234584c4b84b79c2a6a3ff8a1212 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 13 Dec 2025 03:32:47 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=9C=81=E5=B8=82=E5=9B=A2=E9=98=9F?= =?UTF-8?q?=E8=B4=A6=E6=88=B7=E5=8F=8A=E5=BE=85=E9=A2=86=E5=8F=96=E5=A5=96?= =?UTF-8?q?=E5=8A=B1=E9=80=90=E7=AC=94=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. authorization-service: - 省团队权益分配改用系统省团队账户 (7+provinceCode) - 市团队权益分配改用系统市团队账户 (6+cityCode) - 无推荐链时不再进总部,而是进对应团队账户 2. wallet-service: - 新增 pending_rewards 表支持逐笔追踪待领取奖励 - ensureRegionAccounts 同时创建区域账户和团队账户 - 新增 getPendingRewards API 查询待领取列表 - 新增 settleUserPendingRewards 用户认种后结算 - 新增 processExpiredRewards 定时处理过期奖励 - PlantingCreatedHandler 监听认种事件触发结算 - ExpiredRewardsScheduler 每小时处理过期奖励 账户格式: - 省区域: 9+provinceCode (如 9440000) - 市区域: 8+cityCode (如 8440100) - 省团队: 7+provinceCode (如 7440000) - 市团队: 6+cityCode (如 6440100) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../authorization-application.service.ts | 26 +- .../migration.sql | 34 ++ .../wallet-service/prisma/schema.prisma | 37 ++ .../wallet-service/src/api/api.module.ts | 5 +- .../src/api/controllers/wallet.controller.ts | 18 + .../services/wallet-service/src/app.module.ts | 2 + .../src/application/event-handlers/index.ts | 1 + .../planting-created.handler.ts | 48 +++ .../schedulers/expired-rewards.scheduler.ts | 56 +++ .../src/application/schedulers/index.ts | 1 + .../services/wallet-application.service.ts | 332 ++++++++++++++---- .../src/domain/aggregates/index.ts | 1 + .../aggregates/pending-reward.aggregate.ts | 159 +++++++++ .../aggregates/wallet-account.aggregate.ts | 25 ++ .../src/domain/repositories/index.ts | 1 + .../pending-reward.repository.interface.ts | 26 ++ .../infrastructure/infrastructure.module.ts | 6 + .../src/infrastructure/kafka/index.ts | 1 + .../src/infrastructure/kafka/kafka.module.ts | 5 +- .../kafka/planting-event-consumer.service.ts | 144 ++++++++ .../persistence/repositories/index.ts | 1 + .../pending-reward.repository.impl.ts | 167 +++++++++ 22 files changed, 1016 insertions(+), 80 deletions(-) create mode 100644 backend/services/wallet-service/prisma/migrations/20241214000000_add_pending_rewards/migration.sql create mode 100644 backend/services/wallet-service/src/application/event-handlers/planting-created.handler.ts create mode 100644 backend/services/wallet-service/src/application/schedulers/expired-rewards.scheduler.ts create mode 100644 backend/services/wallet-service/src/application/schedulers/index.ts create mode 100644 backend/services/wallet-service/src/domain/aggregates/pending-reward.aggregate.ts create mode 100644 backend/services/wallet-service/src/domain/repositories/pending-reward.repository.interface.ts create mode 100644 backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts create mode 100644 backend/services/wallet-service/src/infrastructure/persistence/repositories/pending-reward.repository.impl.ts diff --git a/backend/services/authorization-service/src/application/services/authorization-application.service.ts b/backend/services/authorization-service/src/application/services/authorization-application.service.ts index 3df3b06d..bc5832df 100644 --- a/backend/services/authorization-service/src/application/services/authorization-application.service.ts +++ b/backend/services/authorization-service/src/application/services/authorization-application.service.ts @@ -1111,7 +1111,8 @@ export class AuthorizationApplicationService { `[getProvinceTeamRewardDistribution] accountSequence=${accountSequence}, provinceCode=${provinceCode}, treeCount=${treeCount}`, ) - const HEADQUARTERS_ACCOUNT_SEQUENCE = '1' + // 系统省团队账户ID格式: 7 + 省份代码 + const systemProvinceTeamAccountSequence = `7${provinceCode.padStart(6, '0')}` // 1. 获取用户的祖先链 const ancestorAccountSequences = await this.referralServiceClient.getReferralChain(accountSequence) @@ -1119,7 +1120,7 @@ export class AuthorizationApplicationService { if (ancestorAccountSequences.length === 0) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '无推荐链,进总部社区' }, + { accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '无推荐链,进系统省团队账户' }, ], } } @@ -1133,7 +1134,7 @@ export class AuthorizationApplicationService { if (ancestorAuthProvinces.length === 0) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '推荐链上无授权省公司,进总部社区' }, + { accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '推荐链上无授权省公司,进系统省团队账户' }, ], } } @@ -1157,7 +1158,7 @@ export class AuthorizationApplicationService { if (!nearestAuthProvince) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '未找到匹配的授权省公司,进总部社区' }, + { accountSequence: systemProvinceTeamAccountSequence, treeCount, reason: '未找到匹配的授权省公司,进系统省团队账户' }, ], } } @@ -1183,8 +1184,8 @@ export class AuthorizationApplicationService { ) // 6. 查找上级(用于接收考核前的权益) - let parentAccountSequence: string = HEADQUARTERS_ACCOUNT_SEQUENCE - let parentReason = '上级为总部社区' + let parentAccountSequence: string = systemProvinceTeamAccountSequence + let parentReason = '上级为系统省团队账户' for (let i = nearestIndex + 1; i < ancestorAccountSequences.length; i++) { const ancestorSeq = ancestorAccountSequences[i] @@ -1408,7 +1409,8 @@ export class AuthorizationApplicationService { `[getCityTeamRewardDistribution] accountSequence=${accountSequence}, cityCode=${cityCode}, treeCount=${treeCount}`, ) - const HEADQUARTERS_ACCOUNT_SEQUENCE = '1' + // 系统市团队账户ID格式: 6 + 城市代码 + const systemCityTeamAccountSequence = `6${cityCode.padStart(6, '0')}` // 1. 获取用户的祖先链 const ancestorAccountSequences = await this.referralServiceClient.getReferralChain(accountSequence) @@ -1416,7 +1418,7 @@ export class AuthorizationApplicationService { if (ancestorAccountSequences.length === 0) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '无推荐链,进总部社区' }, + { accountSequence: systemCityTeamAccountSequence, treeCount, reason: '无推荐链,进系统市团队账户' }, ], } } @@ -1430,7 +1432,7 @@ export class AuthorizationApplicationService { if (ancestorAuthCities.length === 0) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '推荐链上无授权市公司,进总部社区' }, + { accountSequence: systemCityTeamAccountSequence, treeCount, reason: '推荐链上无授权市公司,进系统市团队账户' }, ], } } @@ -1454,7 +1456,7 @@ export class AuthorizationApplicationService { if (!nearestAuthCity) { return { distributions: [ - { accountSequence: HEADQUARTERS_ACCOUNT_SEQUENCE, treeCount, reason: '未找到匹配的授权市公司,进总部社区' }, + { accountSequence: systemCityTeamAccountSequence, treeCount, reason: '未找到匹配的授权市公司,进系统市团队账户' }, ], } } @@ -1480,8 +1482,8 @@ export class AuthorizationApplicationService { ) // 6. 查找上级 - let parentAccountSequence: string = HEADQUARTERS_ACCOUNT_SEQUENCE - let parentReason = '上级为总部社区' + let parentAccountSequence: string = systemCityTeamAccountSequence + let parentReason = '上级为系统市团队账户' for (let i = nearestIndex + 1; i < ancestorAccountSequences.length; i++) { const ancestorSeq = ancestorAccountSequences[i] diff --git a/backend/services/wallet-service/prisma/migrations/20241214000000_add_pending_rewards/migration.sql b/backend/services/wallet-service/prisma/migrations/20241214000000_add_pending_rewards/migration.sql new file mode 100644 index 00000000..bce23228 --- /dev/null +++ b/backend/services/wallet-service/prisma/migrations/20241214000000_add_pending_rewards/migration.sql @@ -0,0 +1,34 @@ +-- CreateTable: pending_rewards +-- 待领取奖励表,逐笔记录每笔待领取奖励,支持独立过期时间 + +CREATE TABLE "pending_rewards" ( + "pending_reward_id" BIGSERIAL NOT NULL, + "account_sequence" VARCHAR(20) NOT NULL, + "user_id" BIGINT NOT NULL, + "usdt_amount" DECIMAL(20,8) NOT NULL, + "hashpower_amount" DECIMAL(20,8) NOT NULL, + "source_order_id" VARCHAR(100) NOT NULL, + "allocation_type" VARCHAR(50) NOT NULL, + "expire_at" TIMESTAMP(3) NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "settled_at" TIMESTAMP(3), + "expired_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "pending_rewards_pkey" PRIMARY KEY ("pending_reward_id") +); + +-- CreateIndex +CREATE INDEX "pending_rewards_account_sequence_status_idx" ON "pending_rewards"("account_sequence", "status"); + +-- CreateIndex +CREATE INDEX "pending_rewards_user_id_status_idx" ON "pending_rewards"("user_id", "status"); + +-- CreateIndex: 用于定时任务查询过期奖励 +CREATE INDEX "pending_rewards_status_expire_at_idx" ON "pending_rewards"("status", "expire_at"); + +-- CreateIndex +CREATE INDEX "pending_rewards_source_order_id_idx" ON "pending_rewards"("source_order_id"); + +-- CreateIndex +CREATE INDEX "pending_rewards_created_at_idx" ON "pending_rewards"("created_at"); diff --git a/backend/services/wallet-service/prisma/schema.prisma b/backend/services/wallet-service/prisma/schema.prisma index 646bbccf..f3dc49ae 100644 --- a/backend/services/wallet-service/prisma/schema.prisma +++ b/backend/services/wallet-service/prisma/schema.prisma @@ -200,6 +200,43 @@ model WithdrawalOrder { @@index([createdAt]) } +// ============================================ +// 待领取奖励表 (逐笔记录) +// 每笔待领取奖励独立跟踪过期时间 +// ============================================ +model PendingReward { + id BigInt @id @default(autoincrement()) @map("pending_reward_id") + accountSequence String @map("account_sequence") @db.VarChar(20) // 跨服务关联标识 + userId BigInt @map("user_id") // 保留兼容 + + // 奖励金额 + usdtAmount Decimal @map("usdt_amount") @db.Decimal(20, 8) + hashpowerAmount Decimal @map("hashpower_amount") @db.Decimal(20, 8) + + // 来源信息 + sourceOrderId String @map("source_order_id") @db.VarChar(100) // 触发奖励的认种订单 + allocationType String @map("allocation_type") @db.VarChar(50) // 分配类型 (SHARE_RIGHT, TEAM_RIGHT 等) + + // 过期时间 + expireAt DateTime @map("expire_at") + + // 状态: PENDING(待领取) / SETTLED(已结算) / EXPIRED(已过期) + status String @default("PENDING") @map("status") @db.VarChar(20) + + // 结算/过期时间 + settledAt DateTime? @map("settled_at") + expiredAt DateTime? @map("expired_at") + + createdAt DateTime @default(now()) @map("created_at") + + @@map("pending_rewards") + @@index([accountSequence, status]) + @@index([userId, status]) + @@index([status, expireAt]) // 用于定时任务查询过期奖励 + @@index([sourceOrderId]) + @@index([createdAt]) +} + // ============================================ // 已处理事件表 (幂等性检查) // 用于确保 Kafka 事件不会被重复处理 diff --git a/backend/services/wallet-service/src/api/api.module.ts b/backend/services/wallet-service/src/api/api.module.ts index edf8d0ed..132b6c16 100644 --- a/backend/services/wallet-service/src/api/api.module.ts +++ b/backend/services/wallet-service/src/api/api.module.ts @@ -10,7 +10,8 @@ import { } from './controllers'; import { InternalWalletController } from './controllers/internal-wallet.controller'; import { WalletApplicationService } from '@/application/services'; -import { DepositConfirmedHandler } from '@/application/event-handlers'; +import { DepositConfirmedHandler, PlantingCreatedHandler } from '@/application/event-handlers'; +import { ExpiredRewardsScheduler } from '@/application/schedulers'; import { JwtStrategy } from '@/shared/strategies/jwt.strategy'; @Module({ @@ -34,6 +35,8 @@ import { JwtStrategy } from '@/shared/strategies/jwt.strategy'; providers: [ WalletApplicationService, DepositConfirmedHandler, + PlantingCreatedHandler, + ExpiredRewardsScheduler, JwtStrategy, ], }) diff --git a/backend/services/wallet-service/src/api/controllers/wallet.controller.ts b/backend/services/wallet-service/src/api/controllers/wallet.controller.ts index 089b06dd..43af5c37 100644 --- a/backend/services/wallet-service/src/api/controllers/wallet.controller.ts +++ b/backend/services/wallet-service/src/api/controllers/wallet.controller.ts @@ -75,4 +75,22 @@ export class WalletController { ): Promise { return this.walletService.getWithdrawals(user.userId); } + + @Get('pending-rewards') + @ApiOperation({ summary: '查询待领取奖励列表', description: '获取用户的逐笔待领取奖励,包含过期时间' }) + @ApiResponse({ status: 200, description: '待领取奖励列表' }) + async getPendingRewards( + @CurrentUser() user: CurrentUserPayload, + ): Promise> { + return this.walletService.getPendingRewards(user.accountSequence); + } } diff --git a/backend/services/wallet-service/src/app.module.ts b/backend/services/wallet-service/src/app.module.ts index 486c0f9e..55c99171 100644 --- a/backend/services/wallet-service/src/app.module.ts +++ b/backend/services/wallet-service/src/app.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; +import { ScheduleModule } from '@nestjs/schedule'; import { APP_FILTER, APP_INTERCEPTOR, APP_GUARD } from '@nestjs/core'; import { ApiModule } from '@/api/api.module'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; @@ -18,6 +19,7 @@ import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard'; // Also load from process.env (system environment variables) ignoreEnvFile: false, }), + ScheduleModule.forRoot(), InfrastructureModule, ApiModule, ], diff --git a/backend/services/wallet-service/src/application/event-handlers/index.ts b/backend/services/wallet-service/src/application/event-handlers/index.ts index 384bdf25..1bafeb97 100644 --- a/backend/services/wallet-service/src/application/event-handlers/index.ts +++ b/backend/services/wallet-service/src/application/event-handlers/index.ts @@ -1 +1,2 @@ export * from './deposit-confirmed.handler'; +export * from './planting-created.handler'; diff --git a/backend/services/wallet-service/src/application/event-handlers/planting-created.handler.ts b/backend/services/wallet-service/src/application/event-handlers/planting-created.handler.ts new file mode 100644 index 00000000..28c8aaef --- /dev/null +++ b/backend/services/wallet-service/src/application/event-handlers/planting-created.handler.ts @@ -0,0 +1,48 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { PlantingEventConsumerService, PlantingCreatedPayload } from '@/infrastructure/kafka'; +import { WalletApplicationService } from '@/application/services'; + +/** + * 处理认种创建事件 + * + * 当用户认种一棵树后: + * 1. 结算该用户所有待领取奖励(从 PENDING 变为 SETTLED) + * 2. 待领取 → 可结算,算力生效 + */ +@Injectable() +export class PlantingCreatedHandler implements OnModuleInit { + private readonly logger = new Logger(PlantingCreatedHandler.name); + + constructor( + private readonly plantingEventConsumer: PlantingEventConsumerService, + private readonly walletService: WalletApplicationService, + ) {} + + onModuleInit() { + this.plantingEventConsumer.onPlantingCreated(this.handlePlantingCreated.bind(this)); + this.logger.log('PlantingCreatedHandler registered'); + } + + private async handlePlantingCreated(payload: PlantingCreatedPayload): Promise { + const { orderNo, accountSequence, userId, treeCount } = payload; + + this.logger.log(`[PLANTING] User ${accountSequence} planted ${treeCount} tree(s), order: ${orderNo}`); + + try { + // 用户认种后,结算其所有待领取奖励 + const result = await this.walletService.settleUserPendingRewards(accountSequence); + + if (result.settledCount > 0) { + this.logger.log( + `[PLANTING] Settled ${result.settledCount} pending rewards for ${accountSequence}: ` + + `${result.totalUsdt} USDT, ${result.totalHashpower} hashpower`, + ); + } else { + this.logger.debug(`[PLANTING] No pending rewards to settle for ${accountSequence}`); + } + } catch (error) { + this.logger.error(`[PLANTING] Failed to settle pending rewards for ${accountSequence}`, error); + // 不抛出异常,避免阻塞 Kafka 消费 + } + } +} diff --git a/backend/services/wallet-service/src/application/schedulers/expired-rewards.scheduler.ts b/backend/services/wallet-service/src/application/schedulers/expired-rewards.scheduler.ts new file mode 100644 index 00000000..113a5d4e --- /dev/null +++ b/backend/services/wallet-service/src/application/schedulers/expired-rewards.scheduler.ts @@ -0,0 +1,56 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { WalletApplicationService } from '@/application/services'; + +/** + * 过期奖励处理定时任务 + * + * 每小时执行一次,处理已过期的待领取奖励: + * 1. 扫描 pending_rewards 表中 status=PENDING 且 expire_at < now 的记录 + * 2. 标记为 EXPIRED + * 3. 将过期金额转移到总部社区账户 (S0000000001) + */ +@Injectable() +export class ExpiredRewardsScheduler { + private readonly logger = new Logger(ExpiredRewardsScheduler.name); + + constructor( + private readonly walletService: WalletApplicationService, + ) {} + + /** + * 每小时执行一次过期奖励处理 + */ + @Cron(CronExpression.EVERY_HOUR) + async handleExpiredRewards(): Promise { + this.logger.log('Starting expired rewards processing job...'); + + try { + const result = await this.walletService.processExpiredRewards(100); + + if (result.processedCount > 0) { + this.logger.log( + `Expired rewards job completed: processed=${result.processedCount}, ` + + `totalExpiredUsdt=${result.totalExpiredUsdt}, ` + + `transferredToHQ=${result.transferredToHeadquarters}`, + ); + } else { + this.logger.debug('No expired rewards to process'); + } + } catch (error) { + this.logger.error('Failed to process expired rewards', error); + } + } + + /** + * 手动触发过期处理(用于测试或管理员操作) + */ + async triggerManually(batchSize = 100): Promise<{ + processedCount: number; + totalExpiredUsdt: number; + transferredToHeadquarters: number; + }> { + this.logger.log('Manually triggered expired rewards processing'); + return this.walletService.processExpiredRewards(batchSize); + } +} diff --git a/backend/services/wallet-service/src/application/schedulers/index.ts b/backend/services/wallet-service/src/application/schedulers/index.ts new file mode 100644 index 00000000..aeb300c2 --- /dev/null +++ b/backend/services/wallet-service/src/application/schedulers/index.ts @@ -0,0 +1 @@ +export * from './expired-rewards.scheduler'; diff --git a/backend/services/wallet-service/src/application/services/wallet-application.service.ts b/backend/services/wallet-service/src/application/services/wallet-application.service.ts index 5fb8f881..86be384d 100644 --- a/backend/services/wallet-service/src/application/services/wallet-application.service.ts +++ b/backend/services/wallet-service/src/application/services/wallet-application.service.ts @@ -5,8 +5,9 @@ import { IDepositOrderRepository, DEPOSIT_ORDER_REPOSITORY, ISettlementOrderRepository, SETTLEMENT_ORDER_REPOSITORY, IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY, + IPendingRewardRepository, PENDING_REWARD_REPOSITORY, } from '@/domain/repositories'; -import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder } from '@/domain/aggregates'; +import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingRewardStatus } from '@/domain/aggregates'; import { UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency, } from '@/domain/value-objects'; @@ -82,6 +83,8 @@ export class WalletApplicationService { private readonly settlementRepo: ISettlementOrderRepository, @Inject(WITHDRAWAL_ORDER_REPOSITORY) private readonly withdrawalRepo: IWithdrawalOrderRepository, + @Inject(PENDING_REWARD_REPOSITORY) + private readonly pendingRewardRepo: IPendingRewardRepository, private readonly walletCacheService: WalletCacheService, private readonly eventPublisher: EventPublisherService, ) {} @@ -685,72 +688,74 @@ export class WalletApplicationService { ); } - - /** - * 分配资金到系统账户 - * 系统账户(S开头)已由 migration seed 创建,直接更新余额 - * - * 系统账户说明: - * - S0000000001: 总部社区 (user_id = -1) - * - S0000000002: 成本费账户 (user_id = -2) - * - S0000000003: 运营费账户 (user_id = -3) - * - S0000000004: RWA底池 (user_id = -4) - */ - private async allocateToSystemAccount( - allocation: FundAllocationItem, - orderId: string, - ): Promise { - this.logger.debug( - `System account allocation: ${allocation.amount} USDT to ${allocation.targetId} for ${allocation.allocationType}`, - ); - - const targetId = allocation.targetId; - if (!targetId.startsWith('S')) { - this.logger.warn(`Invalid system account format: ${targetId}`); - return; - } - - // 获取系统账户(已由 migration seed 创建) - const wallet = await this.walletRepo.findByAccountSequence(targetId); - if (!wallet) { - this.logger.error(`System account not found: ${targetId}`); - return; - } - - const amount = Money.USDT(allocation.amount); - - // 系统账户直接增加可用余额(不需要待领取/过期机制) - wallet.addAvailableBalance(amount); - await this.walletRepo.save(wallet); - - // 记录流水 - const ledgerEntry = LedgerEntry.create({ - accountSequence: wallet.accountSequence, - userId: wallet.userId, - entryType: LedgerEntryType.SYSTEM_ALLOCATION, - amount, - refOrderId: orderId, - memo: `${allocation.allocationType} - system account allocation`, - payloadJson: { - allocationType: allocation.allocationType, - metadata: allocation.metadata, - }, - }); - await this.ledgerRepo.save(ledgerEntry); - - this.logger.debug( - `Allocated ${allocation.amount} USDT to system account ${targetId} for ${allocation.allocationType}`, - ); - } + + /** + * 分配资金到系统账户 + * 系统账户(S开头)已由 migration seed 创建,直接更新余额 + * + * 系统账户说明: + * - S0000000001: 总部社区 (user_id = -1) + * - S0000000002: 成本费账户 (user_id = -2) + * - S0000000003: 运营费账户 (user_id = -3) + * - S0000000004: RWA底池 (user_id = -4) + */ + private async allocateToSystemAccount( + allocation: FundAllocationItem, + orderId: string, + ): Promise { + this.logger.debug( + `System account allocation: ${allocation.amount} USDT to ${allocation.targetId} for ${allocation.allocationType}`, + ); + + const targetId = allocation.targetId; + if (!targetId.startsWith('S')) { + this.logger.warn(`Invalid system account format: ${targetId}`); + return; + } + + // 获取系统账户(已由 migration seed 创建) + const wallet = await this.walletRepo.findByAccountSequence(targetId); + if (!wallet) { + this.logger.error(`System account not found: ${targetId}`); + return; + } + + const amount = Money.USDT(allocation.amount); + + // 系统账户直接增加可用余额(不需要待领取/过期机制) + wallet.addAvailableBalance(amount); + await this.walletRepo.save(wallet); + + // 记录流水 + const ledgerEntry = LedgerEntry.create({ + accountSequence: wallet.accountSequence, + userId: wallet.userId, + entryType: LedgerEntryType.SYSTEM_ALLOCATION, + amount, + refOrderId: orderId, + memo: `${allocation.allocationType} - system account allocation`, + payloadJson: { + allocationType: allocation.allocationType, + metadata: allocation.metadata, + }, + }); + await this.ledgerRepo.save(ledgerEntry); + + this.logger.debug( + `Allocated ${allocation.amount} USDT to system account ${targetId} for ${allocation.allocationType}`, + ); + } // =============== Region Accounts =============== /** - * 确保区域账户存在 - * 在用户选择省市确认后调用,动态创建省/市区域账户 + * 确保区域账户和团队账户存在 + * 在用户选择省市确认后调用,动态创建省/市区域账户和团队账户 * * 账户序列号规则: - * - 省区域账户: 9 + provinceCode (例: 9440000) - * - 市区域账户: 8 + cityCode (例: 8440100) + * - 省区域账户: 9 + provinceCode (例: 9440000) - 用于省区域权益 + * - 市区域账户: 8 + cityCode (例: 8440100) - 用于市区域权益 + * - 省团队账户: 7 + provinceCode (例: 7440000) - 用于省团队权益(无授权省公司时) + * - 市团队账户: 6 + cityCode (例: 6440100) - 用于市团队权益(无授权市公司时) */ async ensureRegionAccounts(params: { provinceCode: string; @@ -760,6 +765,8 @@ export class WalletApplicationService { }): Promise<{ provinceAccount: { accountSequence: string; created: boolean }; cityAccount: { accountSequence: string; created: boolean }; + provinceTeamAccount: { accountSequence: string; created: boolean }; + cityTeamAccount: { accountSequence: string; created: boolean }; }> { const { provinceCode, provinceName, cityCode, cityName } = params; @@ -771,9 +778,17 @@ export class WalletApplicationService { const cityAccountSequence = `8${cityCode}`; const cityUserId = BigInt(cityAccountSequence); - this.logger.log(`确保区域账户存在: 省=${provinceAccountSequence}, 市=${cityAccountSequence}`); + // 省团队账户: 7 + provinceCode + const provinceTeamAccountSequence = `7${provinceCode}`; + const provinceTeamUserId = BigInt(provinceTeamAccountSequence); - // 检查省账户是否已存在 + // 市团队账户: 6 + cityCode + const cityTeamAccountSequence = `6${cityCode}`; + const cityTeamUserId = BigInt(cityTeamAccountSequence); + + this.logger.log(`确保区域和团队账户存在: 省区域=${provinceAccountSequence}, 市区域=${cityAccountSequence}, 省团队=${provinceTeamAccountSequence}, 市团队=${cityTeamAccountSequence}`); + + // 检查省区域账户是否已存在 let provinceWallet = await this.walletRepo.findByAccountSequence(provinceAccountSequence); let provinceCreated = false; if (!provinceWallet) { @@ -782,7 +797,7 @@ export class WalletApplicationService { this.logger.log(`创建省区域账户: ${provinceAccountSequence} (${provinceName})`); } - // 检查市账户是否已存在 + // 检查市区域账户是否已存在 let cityWallet = await this.walletRepo.findByAccountSequence(cityAccountSequence); let cityCreated = false; if (!cityWallet) { @@ -791,6 +806,24 @@ export class WalletApplicationService { this.logger.log(`创建市区域账户: ${cityAccountSequence} (${cityName})`); } + // 检查省团队账户是否已存在 + let provinceTeamWallet = await this.walletRepo.findByAccountSequence(provinceTeamAccountSequence); + let provinceTeamCreated = false; + if (!provinceTeamWallet) { + provinceTeamWallet = await this.walletRepo.getOrCreate(provinceTeamAccountSequence, provinceTeamUserId); + provinceTeamCreated = true; + this.logger.log(`创建省团队账户: ${provinceTeamAccountSequence} (${provinceName}团队)`); + } + + // 检查市团队账户是否已存在 + let cityTeamWallet = await this.walletRepo.findByAccountSequence(cityTeamAccountSequence); + let cityTeamCreated = false; + if (!cityTeamWallet) { + cityTeamWallet = await this.walletRepo.getOrCreate(cityTeamAccountSequence, cityTeamUserId); + cityTeamCreated = true; + this.logger.log(`创建市团队账户: ${cityTeamAccountSequence} (${cityName}团队)`); + } + return { provinceAccount: { accountSequence: provinceAccountSequence, @@ -800,6 +833,14 @@ export class WalletApplicationService { accountSequence: cityAccountSequence, created: cityCreated, }, + provinceTeamAccount: { + accountSequence: provinceTeamAccountSequence, + created: provinceTeamCreated, + }, + cityTeamAccount: { + accountSequence: cityTeamAccountSequence, + created: cityTeamCreated, + }, }; } @@ -1145,4 +1186,165 @@ export class WalletApplicationService { totalPages: result.totalPages, }; } + + // =============== Pending Rewards =============== + + /** + * 查询用户的待领取奖励列表 + * 从 pending_rewards 表中读取,支持逐笔查看 + */ + async getPendingRewards(accountSequence: string): Promise> { + const rewards = await this.pendingRewardRepo.findByAccountSequence(accountSequence); + return rewards.map(r => ({ + id: r.id.toString(), + usdtAmount: r.usdtAmount.value, + hashpowerAmount: r.hashpowerAmount.value, + sourceOrderId: r.sourceOrderId, + allocationType: r.allocationType, + expireAt: r.expireAt.toISOString(), + status: r.status, + createdAt: r.createdAt.toISOString(), + })); + } + + /** + * 结算用户所有待领取奖励 + * 当用户认种后调用,将 PENDING 状态的奖励转为 SETTLED + * 同时将金额和算力转入钱包的可结算余额 + */ + async settleUserPendingRewards(accountSequence: string): Promise<{ + settledCount: number; + totalUsdt: number; + totalHashpower: number; + }> { + this.logger.log(`[settleUserPendingRewards] accountSequence=${accountSequence}`); + + // 查找所有 PENDING 状态的奖励 + const pendingRewards = await this.pendingRewardRepo.findByAccountSequence( + accountSequence, + PendingRewardStatus.PENDING, + ); + + if (pendingRewards.length === 0) { + return { settledCount: 0, totalUsdt: 0, totalHashpower: 0 }; + } + + let totalUsdt = 0; + let totalHashpower = 0; + + // 标记为已结算 + for (const reward of pendingRewards) { + reward.markAsSettled(); + totalUsdt += reward.usdtAmount.value; + totalHashpower += reward.hashpowerAmount.value; + } + + // 批量更新状态 + await this.pendingRewardRepo.updateAll(pendingRewards); + + // 更新钱包可结算余额 + const wallet = await this.walletRepo.findByAccountSequence(accountSequence); + if (wallet) { + // 将待领取转为可结算 + wallet.addSettleableReward( + Money.USDT(totalUsdt), + Hashpower.create(totalHashpower), + ); + await this.walletRepo.save(wallet); + + // 记录流水 + if (totalUsdt > 0) { + const ledgerEntry = LedgerEntry.create({ + accountSequence, + userId: wallet.userId, + entryType: LedgerEntryType.REWARD_TO_SETTLEABLE, + amount: Money.USDT(totalUsdt), + memo: `${pendingRewards.length} pending rewards settled`, + }); + await this.ledgerRepo.save(ledgerEntry); + } + + await this.walletCacheService.invalidateWallet(wallet.userId.value); + } + + this.logger.log( + `[settleUserPendingRewards] Settled ${pendingRewards.length} rewards: ${totalUsdt} USDT, ${totalHashpower} hashpower`, + ); + + return { + settledCount: pendingRewards.length, + totalUsdt, + totalHashpower, + }; + } + + /** + * 处理过期奖励 + * 定时任务调用,将已过期的 PENDING 奖励标记为 EXPIRED + * 过期金额转入总部社区账户 (S0000000001) + */ + async processExpiredRewards(batchSize = 100): Promise<{ + processedCount: number; + totalExpiredUsdt: number; + transferredToHeadquarters: number; + }> { + const now = new Date(); + this.logger.log(`[processExpiredRewards] Processing expired rewards at ${now.toISOString()}`); + + // 查找已过期的 PENDING 奖励 + const expiredRewards = await this.pendingRewardRepo.findExpiredPending(now, batchSize); + + if (expiredRewards.length === 0) { + return { processedCount: 0, totalExpiredUsdt: 0, transferredToHeadquarters: 0 }; + } + + let totalExpiredUsdt = 0; + + // 标记为已过期 + for (const reward of expiredRewards) { + reward.markAsExpired(); + totalExpiredUsdt += reward.usdtAmount.value; + } + + // 批量更新状态 + await this.pendingRewardRepo.updateAll(expiredRewards); + + // 将过期金额转入总部社区账户 + const headquartersAccountSequence = 'S0000000001'; + const hqWallet = await this.walletRepo.findByAccountSequence(headquartersAccountSequence); + + if (hqWallet && totalExpiredUsdt > 0) { + hqWallet.addAvailableBalance(Money.USDT(totalExpiredUsdt)); + await this.walletRepo.save(hqWallet); + + // 记录流水 + const ledgerEntry = LedgerEntry.create({ + accountSequence: headquartersAccountSequence, + userId: hqWallet.userId, + entryType: LedgerEntryType.SYSTEM_ALLOCATION, + amount: Money.USDT(totalExpiredUsdt), + memo: `Expired rewards from ${expiredRewards.length} pending entries`, + }); + await this.ledgerRepo.save(ledgerEntry); + + this.logger.log( + `[processExpiredRewards] Transferred ${totalExpiredUsdt} USDT to headquarters from ${expiredRewards.length} expired rewards`, + ); + } + + return { + processedCount: expiredRewards.length, + totalExpiredUsdt, + transferredToHeadquarters: totalExpiredUsdt, + }; + } } diff --git a/backend/services/wallet-service/src/domain/aggregates/index.ts b/backend/services/wallet-service/src/domain/aggregates/index.ts index 0b24b069..413e464b 100644 --- a/backend/services/wallet-service/src/domain/aggregates/index.ts +++ b/backend/services/wallet-service/src/domain/aggregates/index.ts @@ -3,3 +3,4 @@ export * from './ledger-entry.aggregate'; export * from './deposit-order.aggregate'; export * from './settlement-order.aggregate'; export * from './withdrawal-order.aggregate'; +export * from './pending-reward.aggregate'; diff --git a/backend/services/wallet-service/src/domain/aggregates/pending-reward.aggregate.ts b/backend/services/wallet-service/src/domain/aggregates/pending-reward.aggregate.ts new file mode 100644 index 00000000..72bd78c2 --- /dev/null +++ b/backend/services/wallet-service/src/domain/aggregates/pending-reward.aggregate.ts @@ -0,0 +1,159 @@ +import Decimal from 'decimal.js'; +import { UserId, Money, Hashpower } from '@/domain/value-objects'; + +export enum PendingRewardStatus { + PENDING = 'PENDING', + SETTLED = 'SETTLED', + EXPIRED = 'EXPIRED', +} + +export class PendingReward { + private readonly _id: bigint; + private readonly _accountSequence: string; + private readonly _userId: UserId; + private readonly _usdtAmount: Money; + private readonly _hashpowerAmount: Hashpower; + private readonly _sourceOrderId: string; + private readonly _allocationType: string; + private readonly _expireAt: Date; + private _status: PendingRewardStatus; + private _settledAt: Date | null; + private _expiredAt: Date | null; + private readonly _createdAt: Date; + + private constructor( + id: bigint, + accountSequence: string, + userId: UserId, + usdtAmount: Money, + hashpowerAmount: Hashpower, + sourceOrderId: string, + allocationType: string, + expireAt: Date, + status: PendingRewardStatus, + settledAt: Date | null, + expiredAt: Date | null, + createdAt: Date, + ) { + this._id = id; + this._accountSequence = accountSequence; + this._userId = userId; + this._usdtAmount = usdtAmount; + this._hashpowerAmount = hashpowerAmount; + this._sourceOrderId = sourceOrderId; + this._allocationType = allocationType; + this._expireAt = expireAt; + this._status = status; + this._settledAt = settledAt; + this._expiredAt = expiredAt; + this._createdAt = createdAt; + } + + // Getters + get id(): bigint { return this._id; } + get accountSequence(): string { return this._accountSequence; } + get userId(): UserId { return this._userId; } + get usdtAmount(): Money { return this._usdtAmount; } + get hashpowerAmount(): Hashpower { return this._hashpowerAmount; } + get sourceOrderId(): string { return this._sourceOrderId; } + get allocationType(): string { return this._allocationType; } + get expireAt(): Date { return this._expireAt; } + get status(): PendingRewardStatus { return this._status; } + get settledAt(): Date | null { return this._settledAt; } + get expiredAt(): Date | null { return this._expiredAt; } + get createdAt(): Date { return this._createdAt; } + + get isPending(): boolean { return this._status === PendingRewardStatus.PENDING; } + get isSettled(): boolean { return this._status === PendingRewardStatus.SETTLED; } + get isExpired(): boolean { return this._status === PendingRewardStatus.EXPIRED; } + + /** + * 创建新的待领取奖励 + */ + static create(params: { + accountSequence: string; + userId: UserId; + usdtAmount: Money; + hashpowerAmount: Hashpower; + sourceOrderId: string; + allocationType: string; + expireAt: Date; + }): PendingReward { + return new PendingReward( + BigInt(0), // Will be set by database + params.accountSequence, + params.userId, + params.usdtAmount, + params.hashpowerAmount, + params.sourceOrderId, + params.allocationType, + params.expireAt, + PendingRewardStatus.PENDING, + null, + null, + new Date(), + ); + } + + /** + * 从数据库重建 + */ + static reconstruct(params: { + id: bigint; + accountSequence: string; + userId: bigint; + usdtAmount: Decimal; + hashpowerAmount: Decimal; + sourceOrderId: string; + allocationType: string; + expireAt: Date; + status: string; + settledAt: Date | null; + expiredAt: Date | null; + createdAt: Date; + }): PendingReward { + return new PendingReward( + params.id, + params.accountSequence, + UserId.create(params.userId), + Money.USDT(params.usdtAmount), + Hashpower.create(params.hashpowerAmount), + params.sourceOrderId, + params.allocationType, + params.expireAt, + params.status as PendingRewardStatus, + params.settledAt, + params.expiredAt, + params.createdAt, + ); + } + + /** + * 标记为已结算 (用户认种后,待领取转为可结算) + */ + markAsSettled(): void { + if (this._status !== PendingRewardStatus.PENDING) { + throw new Error(`Cannot settle reward in status: ${this._status}`); + } + this._status = PendingRewardStatus.SETTLED; + this._settledAt = new Date(); + } + + /** + * 标记为已过期 (24小时未认种) + */ + markAsExpired(): void { + if (this._status !== PendingRewardStatus.PENDING) { + throw new Error(`Cannot expire reward in status: ${this._status}`); + } + this._status = PendingRewardStatus.EXPIRED; + this._expiredAt = new Date(); + } + + /** + * 检查是否已过期 + */ + isOverdue(now: Date = new Date()): boolean { + return this._status === PendingRewardStatus.PENDING && now > this._expireAt; + } +} diff --git a/backend/services/wallet-service/src/domain/aggregates/wallet-account.aggregate.ts b/backend/services/wallet-service/src/domain/aggregates/wallet-account.aggregate.ts index cc43222d..9bbad821 100644 --- a/backend/services/wallet-service/src/domain/aggregates/wallet-account.aggregate.ts +++ b/backend/services/wallet-service/src/domain/aggregates/wallet-account.aggregate.ts @@ -318,6 +318,31 @@ export class WalletAccount { })); } + /** + * 直接增加可结算余额(用于 pending_rewards 表方案) + * 当用户认种后,从 pending_rewards 表中结算的金额直接加到可结算余额 + */ + addSettleableReward(usdtAmount: Money, hashpowerAmount: Hashpower): void { + this.ensureActive(); + + this._rewards = { + ...this._rewards, + settleableUsdt: this._rewards.settleableUsdt.add(usdtAmount), + settleableHashpower: this._rewards.settleableHashpower.add(hashpowerAmount), + }; + + // 增加算力 + this._hashpower = this._hashpower.add(hashpowerAmount); + this._updatedAt = new Date(); + + this.addDomainEvent(new RewardMovedToSettleableEvent({ + userId: this._userId.toString(), + walletId: this._walletId.toString(), + usdtAmount: usdtAmount.value.toString(), + hashpowerAmount: hashpowerAmount.value.toString(), + })); + } + // 奖励过期 expirePendingRewards(): void { if (this._rewards.pendingUsdt.isZero() && this._rewards.pendingHashpower.isZero()) { diff --git a/backend/services/wallet-service/src/domain/repositories/index.ts b/backend/services/wallet-service/src/domain/repositories/index.ts index c48912a6..72226d8d 100644 --- a/backend/services/wallet-service/src/domain/repositories/index.ts +++ b/backend/services/wallet-service/src/domain/repositories/index.ts @@ -3,3 +3,4 @@ export * from './ledger-entry.repository.interface'; export * from './deposit-order.repository.interface'; export * from './settlement-order.repository.interface'; export * from './withdrawal-order.repository.interface'; +export * from './pending-reward.repository.interface'; diff --git a/backend/services/wallet-service/src/domain/repositories/pending-reward.repository.interface.ts b/backend/services/wallet-service/src/domain/repositories/pending-reward.repository.interface.ts new file mode 100644 index 00000000..430117ad --- /dev/null +++ b/backend/services/wallet-service/src/domain/repositories/pending-reward.repository.interface.ts @@ -0,0 +1,26 @@ +import { PendingReward, PendingRewardStatus } from '@/domain/aggregates'; + +export interface IPendingRewardRepository { + save(reward: PendingReward): Promise; + saveAll(rewards: PendingReward[]): Promise; + update(reward: PendingReward): Promise; + updateAll(rewards: PendingReward[]): Promise; + + findById(id: bigint): Promise; + findByAccountSequence(accountSequence: string, status?: PendingRewardStatus): Promise; + findByUserId(userId: bigint, status?: PendingRewardStatus): Promise; + findBySourceOrderId(sourceOrderId: string): Promise; + + /** + * 查找所有已过期但状态仍为 PENDING 的奖励 + * 用于定时任务批量处理过期 + */ + findExpiredPending(now: Date, limit?: number): Promise; + + /** + * 统计用户的待领取奖励总额 + */ + sumPendingByAccountSequence(accountSequence: string): Promise<{ usdtTotal: number; hashpowerTotal: number }>; +} + +export const PENDING_REWARD_REPOSITORY = Symbol('IPendingRewardRepository'); diff --git a/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts b/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts index daa27618..4c8f1c7c 100644 --- a/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts @@ -6,6 +6,7 @@ import { DepositOrderRepositoryImpl, SettlementOrderRepositoryImpl, WithdrawalOrderRepositoryImpl, + PendingRewardRepositoryImpl, } from './persistence/repositories'; import { WALLET_ACCOUNT_REPOSITORY, @@ -13,6 +14,7 @@ import { DEPOSIT_ORDER_REPOSITORY, SETTLEMENT_ORDER_REPOSITORY, WITHDRAWAL_ORDER_REPOSITORY, + PENDING_REWARD_REPOSITORY, } from '@/domain/repositories'; import { RedisModule } from './redis'; import { KafkaModule } from './kafka'; @@ -38,6 +40,10 @@ const repositories = [ provide: WITHDRAWAL_ORDER_REPOSITORY, useClass: WithdrawalOrderRepositoryImpl, }, + { + provide: PENDING_REWARD_REPOSITORY, + useClass: PendingRewardRepositoryImpl, + }, ]; @Global() diff --git a/backend/services/wallet-service/src/infrastructure/kafka/index.ts b/backend/services/wallet-service/src/infrastructure/kafka/index.ts index 26c10aa3..d09e5652 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/index.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/index.ts @@ -1,3 +1,4 @@ export * from './kafka.module'; export * from './event-publisher.service'; export * from './deposit-event-consumer.service'; +export * from './planting-event-consumer.service'; diff --git a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts index 723c0f8f..5455d093 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts @@ -3,6 +3,7 @@ import { ConfigModule, ConfigService } from '@nestjs/config'; import { ClientsModule, Transport } from '@nestjs/microservices'; import { EventPublisherService } from './event-publisher.service'; import { DepositEventConsumerService } from './deposit-event-consumer.service'; +import { PlantingEventConsumerService } from './planting-event-consumer.service'; // [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息 // import { RewardEventConsumerController } from './reward-event-consumer.controller'; // import { EventAckPublisher } from './event-ack.publisher'; @@ -34,7 +35,7 @@ import { PrismaService } from '../persistence/prisma/prisma.service'; // [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息 // controllers: [RewardEventConsumerController], controllers: [], - providers: [PrismaService, EventPublisherService, DepositEventConsumerService], - exports: [EventPublisherService, DepositEventConsumerService, ClientsModule], + providers: [PrismaService, EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService], + exports: [EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService, ClientsModule], }) export class KafkaModule {} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts b/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts new file mode 100644 index 00000000..585b3ced --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts @@ -0,0 +1,144 @@ +/** + * Planting Event Consumer Service for Wallet Service + * + * Consumes planting events from planting-service via Kafka. + * When a user plants a tree, settles their pending rewards. + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +export const PLANTING_TOPICS = { + PLANTING_EVENTS: 'planting-events', +} as const; + +export interface PlantingCreatedPayload { + orderNo: string; + accountSequence: string; + userId: string; + treeCount: number; + totalAmount: string; + createdAt: string; +} + +export type PlantingEventHandler = (payload: PlantingCreatedPayload) => Promise; + +@Injectable() +export class PlantingEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PlantingEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private plantingCreatedHandler?: PlantingEventHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'wallet-service'; + const groupId = 'wallet-service-planting-events'; + + this.logger.log(`[INIT] Planting Event Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics: ${Object.values(PLANTING_TOPICS).join(', ')}`); + + this.kafka = new Kafka({ + clientId: `${clientId}-planting`, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting Planting Event consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] Planting Event consumer connected successfully`); + + await this.consumer.subscribe({ + topics: Object.values(PLANTING_TOPICS), + fromBeginning: false, + }); + this.logger.log(`[SUBSCRIBE] Subscribed to planting topics`); + + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect Planting Event consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Planting Event consumer disconnected'); + } + } + + /** + * Register handler for planting created events + */ + onPlantingCreated(handler: PlantingEventHandler): void { + this.plantingCreatedHandler = handler; + this.logger.log(`[REGISTER] PlantingCreated handler registered`); + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); + return; + } + + this.logger.debug(`[RECEIVE] Raw message: ${value.substring(0, 500)}...`); + + const parsed = JSON.parse(value); + const eventType = parsed.eventType || parsed.type; + const payload = parsed.payload || parsed; + + this.logger.log(`[RECEIVE] Event type: ${eventType}`); + + // 监听认种创建事件 - 用户认种后,结算其所有待领取奖励 + if (eventType === 'planting.planting.created' || eventType === 'PlantingOrderCreated') { + this.logger.log(`[HANDLE] Processing PlantingCreated event`); + this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`); + this.logger.log(`[HANDLE] accountSequence: ${payload.accountSequence}`); + this.logger.log(`[HANDLE] userId: ${payload.userId}`); + this.logger.log(`[HANDLE] treeCount: ${payload.treeCount}`); + + if (this.plantingCreatedHandler) { + await this.plantingCreatedHandler(payload as PlantingCreatedPayload); + this.logger.log(`[HANDLE] PlantingCreated handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for PlantingCreated`); + } + } else { + this.logger.debug(`[RECEIVE] Ignoring event type: ${eventType}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing planting event from ${topic}`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming planting events`); + } +} diff --git a/backend/services/wallet-service/src/infrastructure/persistence/repositories/index.ts b/backend/services/wallet-service/src/infrastructure/persistence/repositories/index.ts index bf1aa3bc..0a2d7931 100644 --- a/backend/services/wallet-service/src/infrastructure/persistence/repositories/index.ts +++ b/backend/services/wallet-service/src/infrastructure/persistence/repositories/index.ts @@ -3,3 +3,4 @@ export * from './ledger-entry.repository.impl'; export * from './deposit-order.repository.impl'; export * from './settlement-order.repository.impl'; export * from './withdrawal-order.repository.impl'; +export * from './pending-reward.repository.impl'; diff --git a/backend/services/wallet-service/src/infrastructure/persistence/repositories/pending-reward.repository.impl.ts b/backend/services/wallet-service/src/infrastructure/persistence/repositories/pending-reward.repository.impl.ts new file mode 100644 index 00000000..508e5535 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/persistence/repositories/pending-reward.repository.impl.ts @@ -0,0 +1,167 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { IPendingRewardRepository } from '@/domain/repositories'; +import { PendingReward, PendingRewardStatus } from '@/domain/aggregates'; +import Decimal from 'decimal.js'; + +@Injectable() +export class PendingRewardRepositoryImpl implements IPendingRewardRepository { + constructor(private readonly prisma: PrismaService) {} + + async save(reward: PendingReward): Promise { + const created = await this.prisma.pendingReward.create({ + data: { + accountSequence: reward.accountSequence, + userId: reward.userId.value, + usdtAmount: reward.usdtAmount.toDecimal(), + hashpowerAmount: reward.hashpowerAmount.decimal, + sourceOrderId: reward.sourceOrderId, + allocationType: reward.allocationType, + expireAt: reward.expireAt, + status: reward.status, + settledAt: reward.settledAt, + expiredAt: reward.expiredAt, + }, + }); + return this.toDomain(created); + } + + async saveAll(rewards: PendingReward[]): Promise { + await this.prisma.pendingReward.createMany({ + data: rewards.map(reward => ({ + accountSequence: reward.accountSequence, + userId: reward.userId.value, + usdtAmount: reward.usdtAmount.toDecimal(), + hashpowerAmount: reward.hashpowerAmount.decimal, + sourceOrderId: reward.sourceOrderId, + allocationType: reward.allocationType, + expireAt: reward.expireAt, + status: reward.status, + settledAt: reward.settledAt, + expiredAt: reward.expiredAt, + })), + }); + } + + async update(reward: PendingReward): Promise { + await this.prisma.pendingReward.update({ + where: { id: reward.id }, + data: { + status: reward.status, + settledAt: reward.settledAt, + expiredAt: reward.expiredAt, + }, + }); + } + + async updateAll(rewards: PendingReward[]): Promise { + await this.prisma.$transaction( + rewards.map(reward => + this.prisma.pendingReward.update({ + where: { id: reward.id }, + data: { + status: reward.status, + settledAt: reward.settledAt, + expiredAt: reward.expiredAt, + }, + }), + ), + ); + } + + async findById(id: bigint): Promise { + const record = await this.prisma.pendingReward.findUnique({ + where: { id }, + }); + return record ? this.toDomain(record) : null; + } + + async findByAccountSequence(accountSequence: string, status?: PendingRewardStatus): Promise { + const records = await this.prisma.pendingReward.findMany({ + where: { + accountSequence, + ...(status && { status }), + }, + orderBy: { createdAt: 'desc' }, + }); + return records.map(r => this.toDomain(r)); + } + + async findByUserId(userId: bigint, status?: PendingRewardStatus): Promise { + const records = await this.prisma.pendingReward.findMany({ + where: { + userId, + ...(status && { status }), + }, + orderBy: { createdAt: 'desc' }, + }); + return records.map(r => this.toDomain(r)); + } + + async findBySourceOrderId(sourceOrderId: string): Promise { + const records = await this.prisma.pendingReward.findMany({ + where: { sourceOrderId }, + orderBy: { createdAt: 'desc' }, + }); + return records.map(r => this.toDomain(r)); + } + + async findExpiredPending(now: Date, limit = 100): Promise { + const records = await this.prisma.pendingReward.findMany({ + where: { + status: PendingRewardStatus.PENDING, + expireAt: { lt: now }, + }, + orderBy: { expireAt: 'asc' }, + take: limit, + }); + return records.map(r => this.toDomain(r)); + } + + async sumPendingByAccountSequence(accountSequence: string): Promise<{ usdtTotal: number; hashpowerTotal: number }> { + const result = await this.prisma.pendingReward.aggregate({ + where: { + accountSequence, + status: PendingRewardStatus.PENDING, + }, + _sum: { + usdtAmount: true, + hashpowerAmount: true, + }, + }); + return { + usdtTotal: result._sum.usdtAmount?.toNumber() ?? 0, + hashpowerTotal: result._sum.hashpowerAmount?.toNumber() ?? 0, + }; + } + + private toDomain(record: { + id: bigint; + accountSequence: string; + userId: bigint; + usdtAmount: Decimal; + hashpowerAmount: Decimal; + sourceOrderId: string; + allocationType: string; + expireAt: Date; + status: string; + settledAt: Date | null; + expiredAt: Date | null; + createdAt: Date; + }): PendingReward { + return PendingReward.reconstruct({ + id: record.id, + accountSequence: record.accountSequence, + userId: record.userId, + usdtAmount: new Decimal(record.usdtAmount.toString()), + hashpowerAmount: new Decimal(record.hashpowerAmount.toString()), + sourceOrderId: record.sourceOrderId, + allocationType: record.allocationType, + expireAt: record.expireAt, + status: record.status, + settledAt: record.settledAt, + expiredAt: record.expiredAt, + createdAt: record.createdAt, + }); + } +}