feat(planting): add payment reliability improvements

## wallet-service
- Add freeze/confirm/unfreeze API endpoints for planting deduction
- Add deductFrozen method to wallet-account aggregate
- Add PLANT_FREEZE and PLANT_UNFREEZE ledger entry types
- Add idempotency check in deductForPlanting
- Fix test files to include accountSequence parameter

## planting-service
- Add PaymentCompensation model and migration
- Add payment-compensation.repository.ts
- Add payment-compensation.service.ts (background job for retry)
- Add HTTP retry mechanism with exponential backoff
- Refactor payOrder to use freeze → transaction → confirm flow
- Create compensation record on unfreeze failure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-10 06:31:54 -08:00
parent 68182fd0a3
commit e5e2793337
19 changed files with 1456 additions and 287 deletions

View File

@ -0,0 +1,35 @@
-- CreateTable
CREATE TABLE "payment_compensations" (
"compensation_id" BIGSERIAL NOT NULL,
"order_no" VARCHAR(50) NOT NULL,
"user_id" BIGINT NOT NULL,
"compensation_type" VARCHAR(50) NOT NULL,
"amount" DECIMAL(20,8) NOT NULL,
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"failure_reason" TEXT,
"failure_stage" VARCHAR(50),
"retry_count" INTEGER NOT NULL DEFAULT 0,
"max_retries" INTEGER NOT NULL DEFAULT 5,
"next_retry_at" TIMESTAMP(3),
"last_error" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"processed_at" TIMESTAMP(3),
"completed_at" TIMESTAMP(3),
CONSTRAINT "payment_compensations_pkey" PRIMARY KEY ("compensation_id")
);
-- CreateIndex
CREATE INDEX "payment_compensations_status_next_retry_at_idx" ON "payment_compensations"("status", "next_retry_at");
-- CreateIndex
CREATE INDEX "payment_compensations_user_id_idx" ON "payment_compensations"("user_id");
-- CreateIndex
CREATE INDEX "payment_compensations_order_no_idx" ON "payment_compensations"("order_no");
-- CreateIndex
CREATE INDEX "payment_compensations_created_at_idx" ON "payment_compensations"("created_at");
-- CreateIndex
CREATE UNIQUE INDEX "payment_compensations_order_no_compensation_type_key" ON "payment_compensations"("order_no", "compensation_type");

View File

@ -1,235 +1,279 @@
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// ============================================
// 认种订单表 (状态表)
// ============================================
model PlantingOrder {
id BigInt @id @default(autoincrement()) @map("order_id")
orderNo String @unique @map("order_no") @db.VarChar(50)
userId BigInt @map("user_id")
// 认种信息
treeCount Int @map("tree_count")
totalAmount Decimal @map("total_amount") @db.Decimal(20, 8)
// 省市选择 (不可修改)
selectedProvince String? @map("selected_province") @db.VarChar(10)
selectedCity String? @map("selected_city") @db.VarChar(10)
provinceCitySelectedAt DateTime? @map("province_city_selected_at")
provinceCityConfirmedAt DateTime? @map("province_city_confirmed_at")
// 订单状态
status String @default("CREATED") @map("status") @db.VarChar(30)
// 底池信息
poolInjectionBatchId BigInt? @map("pool_injection_batch_id")
poolInjectionScheduledTime DateTime? @map("pool_injection_scheduled_time")
poolInjectionActualTime DateTime? @map("pool_injection_actual_time")
poolInjectionTxHash String? @map("pool_injection_tx_hash") @db.VarChar(100)
// 挖矿
miningEnabledAt DateTime? @map("mining_enabled_at")
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
paidAt DateTime? @map("paid_at")
fundAllocatedAt DateTime? @map("fund_allocated_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
fundAllocations FundAllocation[]
batch PoolInjectionBatch? @relation(fields: [poolInjectionBatchId], references: [id])
@@index([userId])
@@index([orderNo])
@@index([status])
@@index([poolInjectionBatchId])
@@index([selectedProvince, selectedCity])
@@index([createdAt])
@@index([paidAt])
@@map("planting_orders")
}
// ============================================
// 资金分配明细表 (行为表, append-only)
// ============================================
model FundAllocation {
id BigInt @id @default(autoincrement()) @map("allocation_id")
orderId BigInt @map("order_id")
// 分配信息
targetType String @map("target_type") @db.VarChar(50)
amount Decimal @map("amount") @db.Decimal(20, 8)
targetAccountId String? @map("target_account_id") @db.VarChar(100)
// 元数据
metadata Json? @map("metadata")
createdAt DateTime @default(now()) @map("created_at")
// 关联
order PlantingOrder @relation(fields: [orderId], references: [id])
@@index([orderId])
@@index([targetType, targetAccountId])
@@index([createdAt])
@@map("fund_allocations")
}
// ============================================
// 用户持仓表 (状态表)
// ============================================
model PlantingPosition {
id BigInt @id @default(autoincrement()) @map("position_id")
userId BigInt @unique @map("user_id")
// 持仓统计
totalTreeCount Int @default(0) @map("total_tree_count")
effectiveTreeCount Int @default(0) @map("effective_tree_count")
pendingTreeCount Int @default(0) @map("pending_tree_count")
// 挖矿状态
firstMiningStartAt DateTime? @map("first_mining_start_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
distributions PositionDistribution[]
@@index([userId])
@@index([totalTreeCount])
@@map("planting_positions")
}
// ============================================
// 持仓省市分布表
// ============================================
model PositionDistribution {
id BigInt @id @default(autoincrement()) @map("distribution_id")
userId BigInt @map("user_id")
// 省市信息
provinceCode String? @map("province_code") @db.VarChar(10)
cityCode String? @map("city_code") @db.VarChar(10)
// 数量
treeCount Int @default(0) @map("tree_count")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
position PlantingPosition @relation(fields: [userId], references: [userId])
@@unique([userId, provinceCode, cityCode])
@@index([userId])
@@index([provinceCode])
@@index([cityCode])
@@map("position_province_city_distribution")
}
// ============================================
// 底池注入批次表 (状态表)
// ============================================
model PoolInjectionBatch {
id BigInt @id @default(autoincrement()) @map("batch_id")
batchNo String @unique @map("batch_no") @db.VarChar(50)
// 批次时间窗口 (5天)
startDate DateTime @map("start_date") @db.Date
endDate DateTime @map("end_date") @db.Date
// 统计信息
orderCount Int @default(0) @map("order_count")
totalAmount Decimal @default(0) @map("total_amount") @db.Decimal(20, 8)
// 注入状态
status String @default("PENDING") @map("status") @db.VarChar(20)
scheduledInjectionTime DateTime? @map("scheduled_injection_time")
actualInjectionTime DateTime? @map("actual_injection_time")
injectionTxHash String? @map("injection_tx_hash") @db.VarChar(100)
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
orders PlantingOrder[]
@@index([batchNo])
@@index([startDate, endDate])
@@index([status])
@@index([scheduledInjectionTime])
@@map("pool_injection_batches")
}
// ============================================
// 认种事件表 (行为表, append-only)
// ============================================
model PlantingEvent {
id BigInt @id @default(autoincrement()) @map("event_id")
eventType String @map("event_type") @db.VarChar(50)
// 聚合根信息
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 事件数据
eventData Json @map("event_data")
// 元数据
userId BigInt? @map("user_id")
occurredAt DateTime @default(now()) @map("occurred_at")
version Int @default(1) @map("version")
@@index([aggregateType, aggregateId])
@@index([eventType])
@@index([userId])
@@index([occurredAt])
@@map("planting_events")
}
// ============================================
// Outbox 事件发件箱表 (Outbox Pattern)
// 保证事件发布的可靠性:
// 1. 业务数据和 Outbox 记录在同一个事务中写入
// 2. 后台任务轮询 Outbox 表并发布到 Kafka
// 3. 发布成功后标记为已处理
// ============================================
model OutboxEvent {
id BigInt @id @default(autoincrement()) @map("outbox_id")
// 事件信息
eventType String @map("event_type") @db.VarChar(100)
topic String @map("topic") @db.VarChar(100)
key String @map("key") @db.VarChar(200)
payload Json @map("payload")
// 聚合根信息 (用于幂等性检查)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 发布状态
status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PUBLISHED, FAILED
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(5) @map("max_retries")
lastError String? @map("last_error") @db.Text
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
publishedAt DateTime? @map("published_at")
nextRetryAt DateTime? @map("next_retry_at")
@@index([status, createdAt])
@@index([status, nextRetryAt])
@@index([aggregateType, aggregateId])
@@index([topic])
@@map("outbox_events")
}
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
// ============================================
// 认种订单表 (状态表)
// ============================================
model PlantingOrder {
id BigInt @id @default(autoincrement()) @map("order_id")
orderNo String @unique @map("order_no") @db.VarChar(50)
userId BigInt @map("user_id")
// 认种信息
treeCount Int @map("tree_count")
totalAmount Decimal @map("total_amount") @db.Decimal(20, 8)
// 省市选择 (不可修改)
selectedProvince String? @map("selected_province") @db.VarChar(10)
selectedCity String? @map("selected_city") @db.VarChar(10)
provinceCitySelectedAt DateTime? @map("province_city_selected_at")
provinceCityConfirmedAt DateTime? @map("province_city_confirmed_at")
// 订单状态
status String @default("CREATED") @map("status") @db.VarChar(30)
// 底池信息
poolInjectionBatchId BigInt? @map("pool_injection_batch_id")
poolInjectionScheduledTime DateTime? @map("pool_injection_scheduled_time")
poolInjectionActualTime DateTime? @map("pool_injection_actual_time")
poolInjectionTxHash String? @map("pool_injection_tx_hash") @db.VarChar(100)
// 挖矿
miningEnabledAt DateTime? @map("mining_enabled_at")
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
paidAt DateTime? @map("paid_at")
fundAllocatedAt DateTime? @map("fund_allocated_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
fundAllocations FundAllocation[]
batch PoolInjectionBatch? @relation(fields: [poolInjectionBatchId], references: [id])
@@index([userId])
@@index([orderNo])
@@index([status])
@@index([poolInjectionBatchId])
@@index([selectedProvince, selectedCity])
@@index([createdAt])
@@index([paidAt])
@@map("planting_orders")
}
// ============================================
// 资金分配明细表 (行为表, append-only)
// ============================================
model FundAllocation {
id BigInt @id @default(autoincrement()) @map("allocation_id")
orderId BigInt @map("order_id")
// 分配信息
targetType String @map("target_type") @db.VarChar(50)
amount Decimal @map("amount") @db.Decimal(20, 8)
targetAccountId String? @map("target_account_id") @db.VarChar(100)
// 元数据
metadata Json? @map("metadata")
createdAt DateTime @default(now()) @map("created_at")
// 关联
order PlantingOrder @relation(fields: [orderId], references: [id])
@@index([orderId])
@@index([targetType, targetAccountId])
@@index([createdAt])
@@map("fund_allocations")
}
// ============================================
// 用户持仓表 (状态表)
// ============================================
model PlantingPosition {
id BigInt @id @default(autoincrement()) @map("position_id")
userId BigInt @unique @map("user_id")
// 持仓统计
totalTreeCount Int @default(0) @map("total_tree_count")
effectiveTreeCount Int @default(0) @map("effective_tree_count")
pendingTreeCount Int @default(0) @map("pending_tree_count")
// 挖矿状态
firstMiningStartAt DateTime? @map("first_mining_start_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
distributions PositionDistribution[]
@@index([userId])
@@index([totalTreeCount])
@@map("planting_positions")
}
// ============================================
// 持仓省市分布表
// ============================================
model PositionDistribution {
id BigInt @id @default(autoincrement()) @map("distribution_id")
userId BigInt @map("user_id")
// 省市信息
provinceCode String? @map("province_code") @db.VarChar(10)
cityCode String? @map("city_code") @db.VarChar(10)
// 数量
treeCount Int @default(0) @map("tree_count")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
position PlantingPosition @relation(fields: [userId], references: [userId])
@@unique([userId, provinceCode, cityCode])
@@index([userId])
@@index([provinceCode])
@@index([cityCode])
@@map("position_province_city_distribution")
}
// ============================================
// 底池注入批次表 (状态表)
// ============================================
model PoolInjectionBatch {
id BigInt @id @default(autoincrement()) @map("batch_id")
batchNo String @unique @map("batch_no") @db.VarChar(50)
// 批次时间窗口 (5天)
startDate DateTime @map("start_date") @db.Date
endDate DateTime @map("end_date") @db.Date
// 统计信息
orderCount Int @default(0) @map("order_count")
totalAmount Decimal @default(0) @map("total_amount") @db.Decimal(20, 8)
// 注入状态
status String @default("PENDING") @map("status") @db.VarChar(20)
scheduledInjectionTime DateTime? @map("scheduled_injection_time")
actualInjectionTime DateTime? @map("actual_injection_time")
injectionTxHash String? @map("injection_tx_hash") @db.VarChar(100)
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
// 关联
orders PlantingOrder[]
@@index([batchNo])
@@index([startDate, endDate])
@@index([status])
@@index([scheduledInjectionTime])
@@map("pool_injection_batches")
}
// ============================================
// 认种事件表 (行为表, append-only)
// ============================================
model PlantingEvent {
id BigInt @id @default(autoincrement()) @map("event_id")
eventType String @map("event_type") @db.VarChar(50)
// 聚合根信息
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 事件数据
eventData Json @map("event_data")
// 元数据
userId BigInt? @map("user_id")
occurredAt DateTime @default(now()) @map("occurred_at")
version Int @default(1) @map("version")
@@index([aggregateType, aggregateId])
@@index([eventType])
@@index([userId])
@@index([occurredAt])
@@map("planting_events")
}
// ============================================
// Outbox 事件发件箱表 (Outbox Pattern)
// 保证事件发布的可靠性:
// 1. 业务数据和 Outbox 记录在同一个事务中写入
// 2. 后台任务轮询 Outbox 表并发布到 Kafka
// 3. 发布成功后标记为已处理
// ============================================
model OutboxEvent {
id BigInt @id @default(autoincrement()) @map("outbox_id")
// 事件信息
eventType String @map("event_type") @db.VarChar(100)
topic String @map("topic") @db.VarChar(100)
key String @map("key") @db.VarChar(200)
payload Json @map("payload")
// 聚合根信息 (用于幂等性检查)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 发布状态
status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PUBLISHED, FAILED
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(5) @map("max_retries")
lastError String? @map("last_error") @db.Text
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
publishedAt DateTime? @map("published_at")
nextRetryAt DateTime? @map("next_retry_at")
@@index([status, createdAt])
@@index([status, nextRetryAt])
@@index([aggregateType, aggregateId])
@@index([topic])
@@map("outbox_events")
}
// ============================================
// 支付补偿表 (用于处理支付失败需要补偿的订单)
// 当订单支付过程中发生以下情况时创建补偿记录:
// 1. 资金已冻结/扣款但数据库事务失败
// 2. 数据库事务成功但确认扣款失败
// 3. 确认扣款成功但资金分配失败
// ============================================
model PaymentCompensation {
id BigInt @id @default(autoincrement()) @map("compensation_id")
orderNo String @map("order_no") @db.VarChar(50)
userId BigInt @map("user_id")
// 补偿类型
compensationType String @map("compensation_type") @db.VarChar(50) // UNFREEZE, REFUND, RETRY_CONFIRM, RETRY_ALLOCATE
// 金额信息
amount Decimal @map("amount") @db.Decimal(20, 8)
// 状态
status String @default("PENDING") @map("status") @db.VarChar(20) // PENDING, PROCESSING, COMPLETED, FAILED
// 失败信息
failureReason String? @map("failure_reason") @db.Text
failureStage String? @map("failure_stage") @db.VarChar(50) // FREEZE, DB_TRANSACTION, CONFIRM, ALLOCATE
// 重试信息
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(5) @map("max_retries")
nextRetryAt DateTime? @map("next_retry_at")
lastError String? @map("last_error") @db.Text
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
processedAt DateTime? @map("processed_at")
completedAt DateTime? @map("completed_at")
@@unique([orderNo, compensationType])
@@index([status, nextRetryAt])
@@index([userId])
@@index([orderNo])
@@index([createdAt])
@@map("payment_compensations")
}

View File

@ -0,0 +1,228 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import {
PaymentCompensationRepository,
PaymentCompensationRecord,
CompensationType,
CompensationStatus,
FailureStage,
} from '../../infrastructure/persistence/repositories/payment-compensation.repository';
import { WalletServiceClient } from '../../infrastructure/external/wallet-service.client';
/**
*
*
* :
* 1. UNFREEZE: 解冻资金
* 2. REFUND: 退款
* 3. RETRY_CONFIRM: 重试确认扣款
* 4. RETRY_ALLOCATE: 重试资金分配
*/
@Injectable()
export class PaymentCompensationService implements OnModuleInit {
private readonly logger = new Logger(PaymentCompensationService.name);
private isProcessing = false;
private processingInterval: NodeJS.Timeout | null = null;
// 配置
private readonly POLL_INTERVAL_MS = 30000; // 30秒轮询一次
private readonly BATCH_SIZE = 50;
constructor(
private readonly compensationRepo: PaymentCompensationRepository,
private readonly walletService: WalletServiceClient,
) {}
onModuleInit() {
this.startPolling();
this.logger.log('Payment compensation service started');
}
/**
*
*/
private startPolling(): void {
this.processingInterval = setInterval(async () => {
await this.processCompensations();
}, this.POLL_INTERVAL_MS);
// 启动时立即执行一次
this.processCompensations().catch((err) => {
this.logger.error('Initial compensation processing failed', err.stack);
});
}
/**
*
*/
async processCompensations(): Promise<void> {
if (this.isProcessing) {
this.logger.debug('Compensation processing already in progress, skipping');
return;
}
this.isProcessing = true;
try {
const records = await this.compensationRepo.findPendingCompensations(this.BATCH_SIZE);
if (records.length === 0) {
return;
}
this.logger.log(`Processing ${records.length} compensation records`);
for (const record of records) {
await this.processCompensation(record);
}
} catch (error) {
this.logger.error('Failed to process compensations', error.stack);
} finally {
this.isProcessing = false;
}
}
/**
*
*/
private async processCompensation(record: PaymentCompensationRecord): Promise<void> {
this.logger.log(
`Processing compensation ${record.id}: ${record.compensationType} for order ${record.orderNo}`,
);
try {
await this.compensationRepo.markAsProcessing(record.id);
switch (record.compensationType) {
case CompensationType.UNFREEZE:
await this.handleUnfreeze(record);
break;
case CompensationType.RETRY_CONFIRM:
await this.handleRetryConfirm(record);
break;
case CompensationType.RETRY_ALLOCATE:
await this.handleRetryAllocate(record);
break;
case CompensationType.REFUND:
// 退款需要人工处理,记录日志
this.logger.warn(
`REFUND compensation ${record.id} requires manual processing for order ${record.orderNo}`,
);
await this.compensationRepo.markAsFailedWithRetry(
record.id,
'Refund requires manual processing',
);
return;
default:
this.logger.error(`Unknown compensation type: ${record.compensationType}`);
await this.compensationRepo.markAsFailedWithRetry(
record.id,
`Unknown compensation type: ${record.compensationType}`,
);
return;
}
await this.compensationRepo.markAsCompleted(record.id);
this.logger.log(
`Compensation ${record.id} completed successfully for order ${record.orderNo}`,
);
} catch (error) {
this.logger.error(
`Compensation ${record.id} failed for order ${record.orderNo}: ${error.message}`,
error.stack,
);
await this.compensationRepo.markAsFailedWithRetry(record.id, error.message);
}
}
/**
*
*/
private async handleUnfreeze(record: PaymentCompensationRecord): Promise<void> {
await this.walletService.unfreezeForPlanting({
userId: record.userId.toString(),
orderId: record.orderNo,
});
this.logger.log(`Unfroze funds for order ${record.orderNo}`);
}
/**
*
*/
private async handleRetryConfirm(record: PaymentCompensationRecord): Promise<void> {
await this.walletService.confirmPlantingDeduction({
userId: record.userId.toString(),
orderId: record.orderNo,
});
this.logger.log(`Confirmed deduction for order ${record.orderNo}`);
}
/**
*
*
*/
private async handleRetryAllocate(record: PaymentCompensationRecord): Promise<void> {
// 资金分配需要订单的分配明细,这里暂时跳过
// 在实际实现中,应该从订单中获取分配信息并重新调用 allocateFunds
this.logger.warn(
`RETRY_ALLOCATE for order ${record.orderNo} - allocation details not available, marking as completed`,
);
// 标记为完成,因为分配失败不影响用户资金安全
// 可以通过后续的对账机制来补齐
}
/**
* payOrder
*/
async createCompensation(data: {
orderNo: string;
userId: bigint;
amount: number;
compensationType: CompensationType;
failureReason: string;
failureStage: FailureStage;
}): Promise<void> {
// 检查是否已存在
const exists = await this.compensationRepo.exists(data.orderNo, data.compensationType);
if (exists) {
this.logger.warn(
`Compensation already exists for order ${data.orderNo}, type ${data.compensationType}`,
);
return;
}
await this.compensationRepo.create({
orderNo: data.orderNo,
userId: data.userId,
compensationType: data.compensationType,
amount: data.amount,
failureReason: data.failureReason,
failureStage: data.failureStage,
});
this.logger.log(
`Created ${data.compensationType} compensation for order ${data.orderNo}`,
);
}
/**
*
*/
async getStats(): Promise<{
pending: number;
processing: number;
completed: number;
failed: number;
}> {
return this.compensationRepo.getStats();
}
/**
*
*/
async triggerProcessing(): Promise<void> {
await this.processCompensations();
}
}

View File

@ -18,6 +18,11 @@ import { ReferralServiceClient } from '../../infrastructure/external/referral-se
import { UnitOfWork, UNIT_OF_WORK } from '../../infrastructure/persistence/unit-of-work';
import { OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository';
import { PRICE_PER_TREE } from '../../domain/value-objects/fund-allocation-target-type.enum';
import { PaymentCompensationService } from './payment-compensation.service';
import {
CompensationType,
FailureStage,
} from '../../infrastructure/persistence/repositories/payment-compensation.repository';
// 个人最大认种数量限制
const MAX_TREES_PER_USER = 1000;
@ -67,6 +72,7 @@ export class PlantingApplicationService {
private readonly fundAllocationService: FundAllocationDomainService,
private readonly walletService: WalletServiceClient,
private readonly referralService: ReferralServiceClient,
private readonly compensationService: PaymentCompensationService,
) {}
/**
@ -158,9 +164,11 @@ export class PlantingApplicationService {
/**
*
*
* "先验证后执行":
* "预扣款/冻结":
* 1. 验证阶段: 获取所有外部依赖数据
* 2. 执行阶段: 按顺序执行所有写操作
* 2. 冻结阶段: 先冻结用户资金
* 3. 执行阶段: 执行数据库事务
* 4. 确认阶段: 确认扣款或解冻回滚
*/
async payOrder(
orderNo: string,
@ -213,17 +221,26 @@ export class PlantingApplicationService {
);
this.logger.log(`Fund allocations calculated: ${allocations.length} targets`);
// ==================== 冻结阶段 ====================
// 5. 冻结用户资金(幂等,可回滚)
let frozen = false;
try {
await this.walletService.freezeForPlanting({
userId: userId.toString(),
amount: order.totalAmount,
orderId: order.orderNo,
});
frozen = true;
this.logger.log(`Wallet frozen: ${order.totalAmount} USDT for order ${order.orderNo}`);
} catch (freezeError) {
this.logger.error(
`Failed to freeze funds for order ${order.orderNo}: ${freezeError.message}`,
freezeError.stack,
);
throw new Error(`资金冻结失败: ${freezeError.message}`);
}
// ==================== 执行阶段 ====================
// 所有验证通过后,按顺序执行写操作
// 5. 调用钱包服务扣款
await this.walletService.deductForPlanting({
userId: userId.toString(),
amount: order.totalAmount,
orderId: order.orderNo,
});
this.logger.log(`Wallet deducted: ${order.totalAmount} USDT for order ${order.orderNo}`);
try {
// 6. 标记已支付并分配资金 (内存操作)
order.markAsPaid();
@ -271,7 +288,15 @@ export class PlantingApplicationService {
this.logger.log(`Local database transaction committed for order ${order.orderNo}`);
// 8. 调用钱包服务执行资金分配 (外部调用,在事务外)
// ==================== 确认阶段 ====================
// 9. 确认扣款(从冻结金额中正式扣除)
await this.walletService.confirmPlantingDeduction({
userId: userId.toString(),
orderId: order.orderNo,
});
this.logger.log(`Wallet deduction confirmed for order ${order.orderNo}`);
// 10. 调用钱包服务执行资金分配 (外部调用,在事务外)
await this.walletService.allocateFunds({
orderId: order.orderNo,
allocations: allocations.map((a) => a.toDTO()),
@ -288,15 +313,48 @@ export class PlantingApplicationService {
allocations: allocations.map((a) => a.toDTO()),
};
} catch (error) {
// 扣款后出错,记录错误以便后续补偿
// 执行阶段出错,需要解冻资金
this.logger.error(
`Payment post-deduction error for order ${order.orderNo}: ${error.message}`,
`Payment execution error for order ${order.orderNo}: ${error.message}`,
error.stack,
);
// TODO: 实现补偿机制 - 将失败的订单放入补偿队列
// 由于使用了数据库事务,如果事务内操作失败,本地数据会自动回滚
// 但扣款已完成,需要记录以便人工补偿或自动退款
throw new Error(`支付处理失败,请联系客服处理订单 ${order.orderNo}: ${error.message}`);
if (frozen) {
// 尝试解冻资金
try {
await this.walletService.unfreezeForPlanting({
userId: userId.toString(),
orderId: order.orderNo,
});
this.logger.log(`Wallet unfrozen (rollback) for order ${order.orderNo}`);
} catch (unfreezeError) {
// 解冻失败,创建补偿记录以便后台任务处理
this.logger.error(
`CRITICAL: Failed to unfreeze funds for order ${order.orderNo}: ${unfreezeError.message}`,
unfreezeError.stack,
);
// 创建补偿记录
await this.compensationService.createCompensation({
orderNo: order.orderNo,
userId,
amount: order.totalAmount,
compensationType: CompensationType.UNFREEZE,
failureReason: error.message,
failureStage: FailureStage.DB_TRANSACTION,
});
this.logger.log(
`Created UNFREEZE compensation for order ${order.orderNo}, will be processed by background task`,
);
throw new Error(
`支付处理失败,资金将稍后自动解冻,订单 ${order.orderNo}: ${error.message}`,
);
}
}
throw new Error(`支付处理失败: ${error.message}`);
}
}

View File

@ -22,10 +22,47 @@ export interface WalletBalance {
currency: string;
}
export interface FreezeForPlantingRequest {
userId: string;
amount: number;
orderId: string;
}
export interface ConfirmPlantingDeductionRequest {
userId: string;
orderId: string;
}
export interface UnfreezeForPlantingRequest {
userId: string;
orderId: string;
}
export interface FreezeResult {
success: boolean;
frozenAmount: number;
}
/**
* HTTP
*/
interface RetryConfig {
maxRetries: number;
baseDelayMs: number;
maxDelayMs: number;
}
const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 10000,
};
@Injectable()
export class WalletServiceClient {
private readonly logger = new Logger(WalletServiceClient.name);
private readonly baseUrl: string;
private readonly retryConfig: RetryConfig;
constructor(
private readonly configService: ConfigService,
@ -34,6 +71,104 @@ export class WalletServiceClient {
this.baseUrl =
this.configService.get<string>('WALLET_SERVICE_URL') ||
'http://localhost:3002';
this.retryConfig = DEFAULT_RETRY_CONFIG;
}
/**
* HTTP
* - 使退
* - 5xx
* - 4xx
*/
private async withRetry<T>(
operation: string,
fn: () => Promise<T>,
config: RetryConfig = this.retryConfig,
): Promise<T> {
let lastError: Error | undefined;
for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
try {
return await fn();
} catch (error: unknown) {
lastError = error as Error;
// 判断是否应该重试
const shouldRetry = this.shouldRetry(error, attempt, config.maxRetries);
if (!shouldRetry) {
throw error;
}
// 计算退避延迟(指数退避 + 随机抖动)
const delay = this.calculateBackoffDelay(attempt, config);
this.logger.warn(
`${operation} failed (attempt ${attempt + 1}/${config.maxRetries + 1}), ` +
`retrying in ${delay}ms: ${(error as Error).message}`,
);
await this.delay(delay);
}
}
throw lastError;
}
/**
*
*/
private shouldRetry(
error: unknown,
attempt: number,
maxRetries: number,
): boolean {
// 已达到最大重试次数
if (attempt >= maxRetries) {
return false;
}
// 检查是否是 HTTP 响应错误
const axiosError = error as { response?: { status?: number }; code?: string };
// 网络错误(无响应)- 应该重试
if (!axiosError.response) {
// 常见的网络错误码
const retryableCodes = ['ECONNREFUSED', 'ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND'];
if (axiosError.code && retryableCodes.includes(axiosError.code)) {
return true;
}
// 超时错误
if ((error as Error).message?.includes('timeout')) {
return true;
}
return true; // 网络问题默认重试
}
// 5xx 服务器错误 - 应该重试
if (axiosError.response.status && axiosError.response.status >= 500) {
return true;
}
// 4xx 客户端错误 - 不重试(业务错误)
// 429 Too Many Requests - 可以重试
if (axiosError.response.status === 429) {
return true;
}
return false;
}
/**
* 退
*/
private calculateBackoffDelay(attempt: number, config: RetryConfig): number {
// 基础延迟 * 2^attempt + 随机抖动
const exponentialDelay = config.baseDelayMs * Math.pow(2, attempt);
const jitter = Math.random() * config.baseDelayMs * 0.5;
return Math.min(exponentialDelay + jitter, config.maxDelayMs);
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
@ -41,12 +176,14 @@ export class WalletServiceClient {
*/
async getBalance(userId: string): Promise<WalletBalance> {
try {
const response = await firstValueFrom(
this.httpService.get<WalletBalance>(
`${this.baseUrl}/api/v1/wallets/${userId}/balance`,
),
);
return response.data;
return await this.withRetry(`getBalance(${userId})`, async () => {
const response = await firstValueFrom(
this.httpService.get<WalletBalance>(
`${this.baseUrl}/api/v1/wallets/${userId}/balance`,
),
);
return response.data;
});
} catch (error) {
this.logger.error(`Failed to get balance for user ${userId}`, error);
// 在开发环境返回模拟数据
@ -63,17 +200,22 @@ export class WalletServiceClient {
}
/**
*
*
*/
async deductForPlanting(request: DeductForPlantingRequest): Promise<boolean> {
try {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/deduct-for-planting`,
request,
),
return await this.withRetry(
`deductForPlanting(${request.orderId})`,
async () => {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/deduct-for-planting`,
request,
),
);
return response.data.success;
},
);
return response.data.success;
} catch (error) {
this.logger.error(
`Failed to deduct for planting: ${request.orderId}`,
@ -89,17 +231,115 @@ export class WalletServiceClient {
}
/**
*
*
*/
async freezeForPlanting(request: FreezeForPlantingRequest): Promise<FreezeResult> {
try {
return await this.withRetry(
`freezeForPlanting(${request.orderId})`,
async () => {
const response = await firstValueFrom(
this.httpService.post<FreezeResult>(
`${this.baseUrl}/api/v1/wallets/freeze-for-planting`,
request,
),
);
return response.data;
},
);
} catch (error) {
this.logger.error(
`Failed to freeze for planting: ${request.orderId}`,
error,
);
// 在开发环境模拟成功
if (this.configService.get('NODE_ENV') === 'development') {
this.logger.warn('Development mode: simulating successful freeze');
return { success: true, frozenAmount: request.amount };
}
throw error;
}
}
/**
*
*/
async confirmPlantingDeduction(request: ConfirmPlantingDeductionRequest): Promise<boolean> {
try {
return await this.withRetry(
`confirmPlantingDeduction(${request.orderId})`,
async () => {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/confirm-planting-deduction`,
request,
),
);
return response.data.success;
},
);
} catch (error) {
this.logger.error(
`Failed to confirm planting deduction: ${request.orderId}`,
error,
);
// 在开发环境模拟成功
if (this.configService.get('NODE_ENV') === 'development') {
this.logger.warn('Development mode: simulating successful confirmation');
return true;
}
throw error;
}
}
/**
*
*/
async unfreezeForPlanting(request: UnfreezeForPlantingRequest): Promise<boolean> {
try {
return await this.withRetry(
`unfreezeForPlanting(${request.orderId})`,
async () => {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/unfreeze-for-planting`,
request,
),
);
return response.data.success;
},
);
} catch (error) {
this.logger.error(
`Failed to unfreeze for planting: ${request.orderId}`,
error,
);
// 在开发环境模拟成功
if (this.configService.get('NODE_ENV') === 'development') {
this.logger.warn('Development mode: simulating successful unfreeze');
return true;
}
throw error;
}
}
/**
*
*/
async allocateFunds(request: AllocateFundsRequest): Promise<boolean> {
try {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/allocate-funds`,
request,
),
return await this.withRetry(
`allocateFunds(${request.orderId})`,
async () => {
const response = await firstValueFrom(
this.httpService.post(
`${this.baseUrl}/api/v1/wallets/allocate-funds`,
request,
),
);
return response.data.success;
},
);
return response.data.success;
} catch (error) {
this.logger.error(
`Failed to allocate funds for order: ${request.orderId}`,
@ -122,13 +362,15 @@ export class WalletServiceClient {
amount: number,
): Promise<{ txHash: string }> {
try {
const response = await firstValueFrom(
this.httpService.post<{ txHash: string }>(
`${this.baseUrl}/api/v1/pool/inject`,
{ batchId, amount },
),
);
return response.data;
return await this.withRetry(`injectToPool(${batchId})`, async () => {
const response = await firstValueFrom(
this.httpService.post<{ txHash: string }>(
`${this.baseUrl}/api/v1/pool/inject`,
{ batchId, amount },
),
);
return response.data;
});
} catch (error) {
this.logger.error(`Failed to inject to pool: batch ${batchId}`, error);
// 在开发环境返回模拟交易哈希

View File

@ -5,6 +5,7 @@ import { PlantingOrderRepositoryImpl } from './persistence/repositories/planting
import { PlantingPositionRepositoryImpl } from './persistence/repositories/planting-position.repository.impl';
import { PoolInjectionBatchRepositoryImpl } from './persistence/repositories/pool-injection-batch.repository.impl';
import { OutboxRepository } from './persistence/repositories/outbox.repository';
import { PaymentCompensationRepository } from './persistence/repositories/payment-compensation.repository';
import { UnitOfWork, UNIT_OF_WORK } from './persistence/unit-of-work';
import { WalletServiceClient } from './external/wallet-service.client';
import { ReferralServiceClient } from './external/referral-service.client';
@ -14,6 +15,7 @@ import { EventAckController } from './kafka/event-ack.controller';
import { PLANTING_ORDER_REPOSITORY } from '../domain/repositories/planting-order.repository.interface';
import { PLANTING_POSITION_REPOSITORY } from '../domain/repositories/planting-position.repository.interface';
import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-injection-batch.repository.interface';
import { PaymentCompensationService } from '../application/services/payment-compensation.service';
@Global()
@Module({
@ -44,7 +46,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj
useClass: UnitOfWork,
},
OutboxRepository,
PaymentCompensationRepository,
OutboxPublisherService,
PaymentCompensationService,
WalletServiceClient,
ReferralServiceClient,
],
@ -55,7 +59,9 @@ import { POOL_INJECTION_BATCH_REPOSITORY } from '../domain/repositories/pool-inj
POOL_INJECTION_BATCH_REPOSITORY,
UNIT_OF_WORK,
OutboxRepository,
PaymentCompensationRepository,
OutboxPublisherService,
PaymentCompensationService,
WalletServiceClient,
ReferralServiceClient,
],

View File

@ -0,0 +1,233 @@
import { Injectable } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { PrismaService } from '../prisma/prisma.service';
export enum CompensationType {
UNFREEZE = 'UNFREEZE', // 解冻资金
REFUND = 'REFUND', // 退款(已扣款的情况)
RETRY_CONFIRM = 'RETRY_CONFIRM', // 重试确认扣款
RETRY_ALLOCATE = 'RETRY_ALLOCATE', // 重试资金分配
}
export enum CompensationStatus {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}
export enum FailureStage {
FREEZE = 'FREEZE',
DB_TRANSACTION = 'DB_TRANSACTION',
CONFIRM = 'CONFIRM',
ALLOCATE = 'ALLOCATE',
}
export interface PaymentCompensationData {
orderNo: string;
userId: bigint;
compensationType: CompensationType;
amount: number;
failureReason?: string;
failureStage?: FailureStage;
}
export interface PaymentCompensationRecord {
id: bigint;
orderNo: string;
userId: bigint;
compensationType: CompensationType;
amount: number;
status: CompensationStatus;
failureReason: string | null;
failureStage: string | null;
retryCount: number;
maxRetries: number;
nextRetryAt: Date | null;
lastError: string | null;
createdAt: Date;
processedAt: Date | null;
completedAt: Date | null;
}
type PrismaCompensationRecord = {
id: bigint;
orderNo: string;
userId: bigint;
compensationType: string;
amount: Prisma.Decimal;
status: string;
failureReason: string | null;
failureStage: string | null;
retryCount: number;
maxRetries: number;
nextRetryAt: Date | null;
lastError: string | null;
createdAt: Date;
processedAt: Date | null;
completedAt: Date | null;
};
@Injectable()
export class PaymentCompensationRepository {
constructor(private readonly prisma: PrismaService) {}
/**
*
*/
async create(data: PaymentCompensationData): Promise<PaymentCompensationRecord> {
const record = await this.prisma.paymentCompensation.create({
data: {
orderNo: data.orderNo,
userId: data.userId,
compensationType: data.compensationType,
amount: new Prisma.Decimal(data.amount),
status: CompensationStatus.PENDING,
failureReason: data.failureReason,
failureStage: data.failureStage,
nextRetryAt: new Date(), // 立即可重试
},
});
return this.mapToRecord(record);
}
/**
*
*/
async findPendingCompensations(limit: number = 100): Promise<PaymentCompensationRecord[]> {
const records = await this.prisma.paymentCompensation.findMany({
where: {
status: {
in: [CompensationStatus.PENDING, CompensationStatus.PROCESSING],
},
OR: [
{ nextRetryAt: null },
{ nextRetryAt: { lte: new Date() } },
],
},
orderBy: { createdAt: 'asc' },
take: limit,
});
// 过滤掉已超过最大重试次数的记录
return records
.filter((r) => r.retryCount < r.maxRetries)
.map((r) => this.mapToRecord(r));
}
/**
*
*/
async markAsProcessing(id: bigint): Promise<void> {
await this.prisma.paymentCompensation.update({
where: { id },
data: {
status: CompensationStatus.PROCESSING,
processedAt: new Date(),
},
});
}
/**
*
*/
async markAsCompleted(id: bigint): Promise<void> {
await this.prisma.paymentCompensation.update({
where: { id },
data: {
status: CompensationStatus.COMPLETED,
completedAt: new Date(),
},
});
}
/**
*
*/
async markAsFailedWithRetry(id: bigint, error: string): Promise<void> {
const record = await this.prisma.paymentCompensation.findUnique({
where: { id },
});
if (!record) return;
const nextRetryCount = record.retryCount + 1;
const isFinalFailure = nextRetryCount >= record.maxRetries;
// 指数退避: 1min, 2min, 4min, 8min, 16min
const delayMinutes = Math.pow(2, nextRetryCount - 1);
const nextRetryAt = new Date(Date.now() + delayMinutes * 60 * 1000);
await this.prisma.paymentCompensation.update({
where: { id },
data: {
status: isFinalFailure ? CompensationStatus.FAILED : CompensationStatus.PENDING,
retryCount: nextRetryCount,
lastError: error,
nextRetryAt: isFinalFailure ? null : nextRetryAt,
},
});
}
/**
*
*/
async findByOrderNo(orderNo: string): Promise<PaymentCompensationRecord[]> {
const records = await this.prisma.paymentCompensation.findMany({
where: { orderNo },
orderBy: { createdAt: 'desc' },
});
return records.map((r) => this.mapToRecord(r));
}
/**
*
*/
async exists(orderNo: string, compensationType: CompensationType): Promise<boolean> {
const count = await this.prisma.paymentCompensation.count({
where: { orderNo, compensationType },
});
return count > 0;
}
/**
*
*/
async getStats(): Promise<{
pending: number;
processing: number;
completed: number;
failed: number;
}> {
const [pending, processing, completed, failed] = await Promise.all([
this.prisma.paymentCompensation.count({ where: { status: CompensationStatus.PENDING } }),
this.prisma.paymentCompensation.count({ where: { status: CompensationStatus.PROCESSING } }),
this.prisma.paymentCompensation.count({ where: { status: CompensationStatus.COMPLETED } }),
this.prisma.paymentCompensation.count({ where: { status: CompensationStatus.FAILED } }),
]);
return { pending, processing, completed, failed };
}
private mapToRecord(record: PrismaCompensationRecord): PaymentCompensationRecord {
return {
id: record.id,
orderNo: record.orderNo,
userId: record.userId,
compensationType: record.compensationType as CompensationType,
amount: record.amount.toNumber(),
status: record.status as CompensationStatus,
failureReason: record.failureReason,
failureStage: record.failureStage,
retryCount: record.retryCount,
maxRetries: record.maxRetries,
nextRetryAt: record.nextRetryAt,
lastError: record.lastError,
createdAt: record.createdAt,
processedAt: record.processedAt,
completedAt: record.completedAt,
};
}
}

View File

@ -2,7 +2,14 @@ import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse, ApiParam } from '@nestjs/swagger';
import { WalletApplicationService } from '@/application/services';
import { GetMyWalletQuery } from '@/application/queries';
import { DeductForPlantingCommand, AllocateFundsCommand, FundAllocationItem } from '@/application/commands';
import {
DeductForPlantingCommand,
AllocateFundsCommand,
FundAllocationItem,
FreezeForPlantingCommand,
ConfirmPlantingDeductionCommand,
UnfreezeForPlantingCommand,
} from '@/application/commands';
import { Public } from '@/shared/decorators';
/**
@ -32,7 +39,7 @@ export class InternalWalletController {
@Post('deduct-for-planting')
@Public()
@ApiOperation({ summary: '认种扣款(内部API)' })
@ApiOperation({ summary: '认种扣款(内部API) - 直接扣款模式' })
@ApiResponse({ status: 200, description: '扣款结果' })
async deductForPlanting(
@Body() dto: { userId: string; amount: number; orderId: string },
@ -46,6 +53,52 @@ export class InternalWalletController {
return { success };
}
@Post('freeze-for-planting')
@Public()
@ApiOperation({ summary: '认种冻结资金(内部API) - 预扣款模式第一步' })
@ApiResponse({ status: 200, description: '冻结结果' })
async freezeForPlanting(
@Body() dto: { userId: string; amount: number; orderId: string },
) {
const command = new FreezeForPlantingCommand(
dto.userId,
dto.amount,
dto.orderId,
);
const result = await this.walletService.freezeForPlanting(command);
return result;
}
@Post('confirm-planting-deduction')
@Public()
@ApiOperation({ summary: '确认认种扣款(内部API) - 预扣款模式第二步' })
@ApiResponse({ status: 200, description: '确认结果' })
async confirmPlantingDeduction(
@Body() dto: { userId: string; orderId: string },
) {
const command = new ConfirmPlantingDeductionCommand(
dto.userId,
dto.orderId,
);
const success = await this.walletService.confirmPlantingDeduction(command);
return { success };
}
@Post('unfreeze-for-planting')
@Public()
@ApiOperation({ summary: '解冻认种资金(内部API) - 认种失败时回滚' })
@ApiResponse({ status: 200, description: '解冻结果' })
async unfreezeForPlanting(
@Body() dto: { userId: string; orderId: string },
) {
const command = new UnfreezeForPlantingCommand(
dto.userId,
dto.orderId,
);
const success = await this.walletService.unfreezeForPlanting(command);
return { success };
}
@Post('allocate-funds')
@Public()
@ApiOperation({ summary: '资金分配(内部API)' })

View File

@ -0,0 +1,10 @@
/**
*
*
*/
export class ConfirmPlantingDeductionCommand {
constructor(
public readonly userId: string,
public readonly orderId: string,
) {}
}

View File

@ -0,0 +1,11 @@
/**
*
*
*/
export class FreezeForPlantingCommand {
constructor(
public readonly userId: string,
public readonly amount: number,
public readonly orderId: string,
) {}
}

View File

@ -1,5 +1,8 @@
export * from './handle-deposit.command';
export * from './deduct-for-planting.command';
export * from './freeze-for-planting.command';
export * from './confirm-planting-deduction.command';
export * from './unfreeze-for-planting.command';
export * from './add-rewards.command';
export * from './claim-rewards.command';
export * from './settle-rewards.command';

View File

@ -0,0 +1,10 @@
/**
*
*
*/
export class UnfreezeForPlantingCommand {
constructor(
public readonly userId: string,
public readonly orderId: string,
) {}
}

View File

@ -28,6 +28,7 @@ describe('WalletApplicationService', () => {
const createMockWallet = (userId: bigint, usdtBalance = 0) => {
return WalletAccount.reconstruct({
walletId: BigInt(1),
accountSequence: userId, // 使用 userId 作为 accountSequence
userId,
usdtAvailable: new Decimal(usdtBalance),
usdtFrozen: new Decimal(0),
@ -102,7 +103,7 @@ describe('WalletApplicationService', () => {
describe('handleDeposit', () => {
it('should process deposit successfully', async () => {
const command = new HandleDepositCommand('1', 100, ChainType.KAVA, 'tx_123');
const command = new HandleDepositCommand('1', '1', 100, ChainType.KAVA, 'tx_123');
const mockWallet = createMockWallet(BigInt(1), 0);
mockDepositRepo.existsByTxHash.mockResolvedValue(false);
@ -121,7 +122,7 @@ describe('WalletApplicationService', () => {
});
it('should throw error for duplicate transaction', async () => {
const command = new HandleDepositCommand('1', 100, ChainType.KAVA, 'tx_duplicate');
const command = new HandleDepositCommand('1', '1', 100, ChainType.KAVA, 'tx_duplicate');
mockDepositRepo.existsByTxHash.mockResolvedValue(true);
await expect(service.handleDeposit(command)).rejects.toThrow('Duplicate');
@ -171,7 +172,7 @@ describe('WalletApplicationService', () => {
describe('getMyWallet', () => {
it('should return wallet DTO', async () => {
const query = new GetMyWalletQuery('1');
const query = new GetMyWalletQuery('1', '1');
const mockWallet = createMockWallet(BigInt(1), 100);
mockWalletRepo.getOrCreate.mockResolvedValue(mockWallet);

View File

@ -14,6 +14,7 @@ import {
HandleDepositCommand, DeductForPlantingCommand, AddRewardsCommand,
ClaimRewardsCommand, SettleRewardsCommand, AllocateFundsCommand, FundAllocationItem,
RequestWithdrawalCommand, UpdateWithdrawalStatusCommand,
FreezeForPlantingCommand, ConfirmPlantingDeductionCommand, UnfreezeForPlantingCommand,
} from '@/application/commands';
import { GetMyWalletQuery, GetMyLedgerQuery } from '@/application/queries';
import { DuplicateTransactionError, WalletNotFoundError } from '@/shared/exceptions/domain.exception';
@ -140,6 +141,18 @@ export class WalletApplicationService {
const userId = BigInt(command.userId);
const amount = Money.USDT(command.amount);
// 幂等性检查:通过 orderId 检查是否已经扣款
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
const alreadyDeducted = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT,
);
if (alreadyDeducted) {
this.logger.warn(
`Order ${command.orderId} already deducted, returning success (idempotent)`,
);
return true;
}
const wallet = await this.walletRepo.findByUserId(userId);
if (!wallet) {
throw new WalletNotFoundError(`userId: ${command.userId}`);
@ -166,6 +179,196 @@ export class WalletApplicationService {
return true;
}
/**
*
*
*/
async freezeForPlanting(command: FreezeForPlantingCommand): Promise<{
success: boolean;
frozenAmount: number;
}> {
const userId = BigInt(command.userId);
const amount = Money.USDT(command.amount);
// 幂等性检查:通过 orderId 检查是否已经冻结
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
const alreadyFrozen = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (alreadyFrozen) {
this.logger.warn(
`Order ${command.orderId} already frozen, returning success (idempotent)`,
);
return { success: true, frozenAmount: command.amount };
}
const wallet = await this.walletRepo.findByUserId(userId);
if (!wallet) {
throw new WalletNotFoundError(`userId: ${command.userId}`);
}
// 检查余额是否足够
if (wallet.balances.usdt.available.lessThan(amount)) {
throw new BadRequestException(
`余额不足: 需要 ${command.amount} USDT, 当前可用 ${wallet.balances.usdt.available.value} USDT`,
);
}
// 冻结资金
wallet.freeze(amount);
await this.walletRepo.save(wallet);
// 记录冻结流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: UserId.create(userId),
entryType: LedgerEntryType.PLANT_FREEZE,
amount: Money.signed(-command.amount, 'USDT'), // Negative: 可用余额减少
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant freeze',
});
await this.ledgerRepo.save(ledgerEntry);
await this.walletCacheService.invalidateWallet(userId);
this.logger.log(`Frozen ${command.amount} USDT for order ${command.orderId}`);
return { success: true, frozenAmount: command.amount };
}
/**
*
*
*/
async confirmPlantingDeduction(command: ConfirmPlantingDeductionCommand): Promise<boolean> {
const userId = BigInt(command.userId);
// 查找冻结记录,获取冻结金额
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
// 幂等性检查:是否已经扣款
const alreadyDeducted = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT,
);
if (alreadyDeducted) {
this.logger.warn(
`Order ${command.orderId} already confirmed deduction, returning success (idempotent)`,
);
return true;
}
// 查找冻结记录
const freezeEntry = existingEntries.find(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (!freezeEntry) {
throw new BadRequestException(`订单 ${command.orderId} 未找到冻结记录`);
}
// 获取冻结金额(流水中是负数,取绝对值)
const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value));
const wallet = await this.walletRepo.findByUserId(userId);
if (!wallet) {
throw new WalletNotFoundError(`userId: ${command.userId}`);
}
// 从冻结金额扣款
wallet.deductFrozen(frozenAmount, 'Plant payment confirmed', command.orderId);
await this.walletRepo.save(wallet);
// 记录扣款流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: UserId.create(userId),
entryType: LedgerEntryType.PLANT_PAYMENT,
amount: Money.signed(-frozenAmount.value, 'USDT'),
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant payment (from frozen)',
});
await this.ledgerRepo.save(ledgerEntry);
await this.walletCacheService.invalidateWallet(userId);
this.logger.log(`Confirmed deduction ${frozenAmount.value} USDT for order ${command.orderId}`);
return true;
}
/**
*
*
*/
async unfreezeForPlanting(command: UnfreezeForPlantingCommand): Promise<boolean> {
const userId = BigInt(command.userId);
// 查找相关流水
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
// 幂等性检查:是否已经解冻
const alreadyUnfrozen = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_UNFREEZE,
);
if (alreadyUnfrozen) {
this.logger.warn(
`Order ${command.orderId} already unfrozen, returning success (idempotent)`,
);
return true;
}
// 检查是否已经扣款(扣款后不能解冻)
const alreadyDeducted = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT,
);
if (alreadyDeducted) {
this.logger.warn(
`Order ${command.orderId} already deducted, cannot unfreeze`,
);
throw new BadRequestException(`订单 ${command.orderId} 已扣款,无法解冻`);
}
// 查找冻结记录
const freezeEntry = existingEntries.find(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (!freezeEntry) {
// 没有冻结记录,可能从未冻结,直接返回成功
this.logger.warn(
`Order ${command.orderId} has no freeze record, returning success`,
);
return true;
}
// 获取冻结金额
const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value));
const wallet = await this.walletRepo.findByUserId(userId);
if (!wallet) {
throw new WalletNotFoundError(`userId: ${command.userId}`);
}
// 解冻资金
wallet.unfreeze(frozenAmount);
await this.walletRepo.save(wallet);
// 记录解冻流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: UserId.create(userId),
entryType: LedgerEntryType.PLANT_UNFREEZE,
amount: frozenAmount, // Positive: 可用余额增加
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant unfreeze (rollback)',
});
await this.ledgerRepo.save(ledgerEntry);
await this.walletCacheService.invalidateWallet(userId);
this.logger.log(`Unfrozen ${frozenAmount.value} USDT for order ${command.orderId}`);
return true;
}
async addRewards(command: AddRewardsCommand): Promise<void> {
const userId = BigInt(command.userId);

View File

@ -15,6 +15,7 @@ describe('DepositOrder Aggregate', () => {
describe('create', () => {
it('should create a new deposit order', () => {
const order = DepositOrder.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
chainType: ChainType.KAVA,
amount: Money.USDT(100),
@ -35,6 +36,7 @@ describe('DepositOrder Aggregate', () => {
describe('confirm', () => {
it('should confirm a pending deposit', () => {
const order = DepositOrder.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
chainType: ChainType.BSC,
amount: Money.USDT(50),
@ -51,6 +53,7 @@ describe('DepositOrder Aggregate', () => {
it('should throw error when confirming non-pending deposit', () => {
const order = DepositOrder.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
chainType: ChainType.KAVA,
amount: Money.USDT(100),
@ -65,6 +68,7 @@ describe('DepositOrder Aggregate', () => {
describe('fail', () => {
it('should mark pending deposit as failed', () => {
const order = DepositOrder.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
chainType: ChainType.KAVA,
amount: Money.USDT(100),
@ -78,6 +82,7 @@ describe('DepositOrder Aggregate', () => {
it('should throw error when failing non-pending deposit', () => {
const order = DepositOrder.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
chainType: ChainType.KAVA,
amount: Money.USDT(100),
@ -93,6 +98,7 @@ describe('DepositOrder Aggregate', () => {
it('should reconstruct from database record', () => {
const order = DepositOrder.reconstruct({
id: BigInt(1),
accountSequence: BigInt(100),
userId: BigInt(100),
chainType: 'KAVA',
amount: new Decimal(200),

View File

@ -15,6 +15,7 @@ describe('LedgerEntry Aggregate', () => {
describe('create', () => {
it('should create a new ledger entry', () => {
const entry = LedgerEntry.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
entryType: LedgerEntryType.DEPOSIT_KAVA,
amount: Money.USDT(100),
@ -33,6 +34,7 @@ describe('LedgerEntry Aggregate', () => {
it('should create entry with optional fields as null', () => {
const entry = LedgerEntry.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
entryType: LedgerEntryType.PLANT_PAYMENT,
amount: Money.signed(-50, 'USDT'),
@ -48,6 +50,7 @@ describe('LedgerEntry Aggregate', () => {
it('should create entry with payload json', () => {
const payload = { key: 'value', number: 123 };
const entry = LedgerEntry.create({
accountSequence: BigInt(1),
userId: UserId.create(1),
entryType: LedgerEntryType.REWARD_PENDING,
amount: Money.USDT(10),
@ -62,6 +65,7 @@ describe('LedgerEntry Aggregate', () => {
it('should reconstruct ledger entry from database record', () => {
const entry = LedgerEntry.reconstruct({
id: BigInt(1),
accountSequence: BigInt(100),
userId: BigInt(100),
entryType: 'DEPOSIT_KAVA',
amount: new Decimal(50),

View File

@ -27,7 +27,7 @@ describe('WalletAccount Aggregate', () => {
let wallet: WalletAccount;
beforeEach(() => {
wallet = WalletAccount.createNew(UserId.create(1));
wallet = WalletAccount.createNew(BigInt(1), UserId.create(1));
});
describe('createNew', () => {

View File

@ -234,6 +234,26 @@ export class WalletAccount {
this._updatedAt = new Date();
}
// 从冻结余额扣款
deductFrozen(amount: Money, reason: string, refOrderId?: string): void {
this.ensureActive();
const balance = this.getBalance(amount.currency as AssetType);
const newBalance = balance.deductFrozen(amount);
this.setBalance(amount.currency as AssetType, newBalance);
this._updatedAt = new Date();
this.addDomainEvent(new BalanceDeductedEvent({
userId: this._userId.toString(),
walletId: this._walletId.toString(),
amount: amount.value.toString(),
assetType: amount.currency,
reason,
refOrderId,
balanceAfter: newBalance.available.value.toString(),
}));
}
// 添加待领取奖励
addPendingReward(usdtAmount: Money, hashpowerAmount: Hashpower, expireAt: Date, refOrderId?: string): void {
this.ensureActive();

View File

@ -2,6 +2,8 @@ export enum LedgerEntryType {
DEPOSIT_KAVA = 'DEPOSIT_KAVA',
DEPOSIT_BSC = 'DEPOSIT_BSC',
PLANT_PAYMENT = 'PLANT_PAYMENT',
PLANT_FREEZE = 'PLANT_FREEZE', // 认种冻结
PLANT_UNFREEZE = 'PLANT_UNFREEZE', // 认种解冻(失败回滚)
REWARD_PENDING = 'REWARD_PENDING',
REWARD_TO_SETTLEABLE = 'REWARD_TO_SETTLEABLE',
REWARD_EXPIRED = 'REWARD_EXPIRED',