feat(wallet-service): 实现 Unit of Work 模式保证 settleToBalance 事务原子性

- 新增 UnitOfWork 接口和实现,使用 Prisma Interactive Transaction
- 修改 IWalletAccountRepository 和 ILedgerEntryRepository 接口支持可选事务参数
- 修改仓库实现,支持在事务中执行数据库操作
- 修改 settleToBalance 方法使用 UnitOfWork,确保钱包更新和流水记录原子性
- 注册 UnitOfWorkService 到 InfrastructureModule

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-06 07:50:02 -08:00
parent 7dc25b75d2
commit bf1c8d2228
9 changed files with 160 additions and 47 deletions

View File

@ -7,7 +7,8 @@ import {
IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY, IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY,
IPendingRewardRepository, PENDING_REWARD_REPOSITORY, IPendingRewardRepository, PENDING_REWARD_REPOSITORY,
} from '@/domain/repositories'; } from '@/domain/repositories';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { IUnitOfWork, UNIT_OF_WORK } from '@/infrastructure/persistence/unit-of-work';
import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingReward, PendingRewardStatus, WalletAccount } from '@/domain/aggregates'; import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingReward, PendingRewardStatus, WalletAccount } from '@/domain/aggregates';
import { import {
UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency, UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency,
@ -96,6 +97,8 @@ export class WalletApplicationService {
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly feeConfigRepo: FeeConfigRepositoryImpl, private readonly feeConfigRepo: FeeConfigRepositoryImpl,
private readonly identityClient: IdentityClientService, private readonly identityClient: IdentityClientService,
@Inject(UNIT_OF_WORK)
private readonly unitOfWork: IUnitOfWork,
) {} ) {}
// =============== Commands =============== // =============== Commands ===============
@ -922,27 +925,32 @@ export class WalletApplicationService {
// 2. 生成结算ID // 2. 生成结算ID
const settlementId = `STL_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const settlementId = `STL_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// 3. 执行钱包结算 // 3. 在事务中执行钱包结算和记录流水(确保原子性)
wallet.settleToBalance(usdtAmount, settlementId); const savedWallet = await this.unitOfWork.runInTransaction(async (tx) => {
await this.walletRepo.save(wallet); // 执行钱包结算
wallet.settleToBalance(usdtAmount, settlementId);
const updated = await this.walletRepo.save(wallet, { tx });
// 4. 记录账本流水(含详细来源信息) // 记录账本流水(含详细来源信息)
const ledgerEntry = LedgerEntry.create({ const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence, accountSequence: wallet.accountSequence,
userId: UserId.create(userId), userId: UserId.create(userId),
entryType: LedgerEntryType.REWARD_SETTLED, entryType: LedgerEntryType.REWARD_SETTLED,
amount: usdtAmount, amount: usdtAmount,
balanceAfter: wallet.balances.usdt.available, balanceAfter: updated.balances.usdt.available,
refOrderId: settlementId, refOrderId: settlementId,
memo: params.memo || `结算 ${params.usdtAmount} 绿积分到钱包余额`, memo: params.memo || `结算 ${params.usdtAmount} 绿积分到钱包余额`,
payloadJson: { payloadJson: {
settlementType: 'SETTLE_TO_BALANCE', settlementType: 'SETTLE_TO_BALANCE',
rewardEntryIds: params.rewardEntryIds, rewardEntryIds: params.rewardEntryIds,
rewardCount: params.rewardEntryIds.length, rewardCount: params.rewardEntryIds.length,
breakdown: params.breakdown, breakdown: params.breakdown,
}, },
});
await this.ledgerRepo.save(ledgerEntry, { tx });
return updated;
}); });
await this.ledgerRepo.save(ledgerEntry);
// 5. 使缓存失效 // 5. 使缓存失效
await this.walletCacheService.invalidateWallet(userId); await this.walletCacheService.invalidateWallet(userId);
@ -953,7 +961,7 @@ export class WalletApplicationService {
success: true, success: true,
settlementId, settlementId,
settledAmount: params.usdtAmount, settledAmount: params.usdtAmount,
balanceAfter: wallet.balances.usdt.available.value, balanceAfter: savedWallet.balances.usdt.available.value,
}; };
} catch (error) { } catch (error) {
this.logger.error(`Failed to settle to balance for ${params.accountSequence}: ${error.message}`); this.logger.error(`Failed to settle to balance for ${params.accountSequence}: ${error.message}`);

View File

@ -1,5 +1,6 @@
import { LedgerEntry } from '@/domain/aggregates'; import { LedgerEntry } from '@/domain/aggregates';
import { LedgerEntryType, AssetType } from '@/domain/value-objects'; import { LedgerEntryType, AssetType } from '@/domain/value-objects';
import { RepositorySaveOptions } from '@/infrastructure/persistence/unit-of-work';
export interface LedgerFilters { export interface LedgerFilters {
entryType?: LedgerEntryType; entryType?: LedgerEntryType;
@ -22,8 +23,8 @@ export interface PaginatedResult<T> {
} }
export interface ILedgerEntryRepository { export interface ILedgerEntryRepository {
save(entry: LedgerEntry): Promise<LedgerEntry>; save(entry: LedgerEntry, options?: RepositorySaveOptions): Promise<LedgerEntry>;
saveAll(entries: LedgerEntry[]): Promise<void>; saveAll(entries: LedgerEntry[], options?: RepositorySaveOptions): Promise<void>;
findByUserId(userId: bigint, filters?: LedgerFilters, pagination?: Pagination): Promise<PaginatedResult<LedgerEntry>>; findByUserId(userId: bigint, filters?: LedgerFilters, pagination?: Pagination): Promise<PaginatedResult<LedgerEntry>>;
findByAccountSequence(accountSequence: string, filters?: LedgerFilters, pagination?: Pagination): Promise<PaginatedResult<LedgerEntry>>; findByAccountSequence(accountSequence: string, filters?: LedgerFilters, pagination?: Pagination): Promise<PaginatedResult<LedgerEntry>>;
findByRefOrderId(refOrderId: string): Promise<LedgerEntry[]>; findByRefOrderId(refOrderId: string): Promise<LedgerEntry[]>;

View File

@ -1,11 +1,12 @@
import { WalletAccount } from '@/domain/aggregates'; import { WalletAccount } from '@/domain/aggregates';
import { RepositorySaveOptions } from '@/infrastructure/persistence/unit-of-work';
export interface IWalletAccountRepository { export interface IWalletAccountRepository {
save(wallet: WalletAccount): Promise<WalletAccount>; save(wallet: WalletAccount, options?: RepositorySaveOptions): Promise<WalletAccount>;
findById(walletId: bigint): Promise<WalletAccount | null>; findById(walletId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount | null>;
findByUserId(userId: bigint): Promise<WalletAccount | null>; findByUserId(userId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount | null>;
findByAccountSequence(accountSequence: string): Promise<WalletAccount | null>; findByAccountSequence(accountSequence: string, options?: RepositorySaveOptions): Promise<WalletAccount | null>;
getOrCreate(accountSequence: string, userId: bigint): Promise<WalletAccount>; getOrCreate(accountSequence: string, userId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount>;
findByUserIds(userIds: bigint[]): Promise<Map<string, WalletAccount>>; findByUserIds(userIds: bigint[]): Promise<Map<string, WalletAccount>>;
} }

View File

@ -10,6 +10,7 @@ import {
PendingRewardRepositoryImpl, PendingRewardRepositoryImpl,
FeeConfigRepositoryImpl, FeeConfigRepositoryImpl,
} from './persistence/repositories'; } from './persistence/repositories';
import { UnitOfWorkService, UNIT_OF_WORK } from './persistence/unit-of-work';
import { import {
WALLET_ACCOUNT_REPOSITORY, WALLET_ACCOUNT_REPOSITORY,
LEDGER_ENTRY_REPOSITORY, LEDGER_ENTRY_REPOSITORY,
@ -55,10 +56,15 @@ const repositories = [
FeeConfigRepositoryImpl, FeeConfigRepositoryImpl,
]; ];
const unitOfWork = {
provide: UNIT_OF_WORK,
useClass: UnitOfWorkService,
};
@Global() @Global()
@Module({ @Module({
imports: [RedisModule, KafkaModule, IdentityModule], imports: [RedisModule, KafkaModule, IdentityModule],
providers: [PrismaService, ...repositories], providers: [PrismaService, unitOfWork, ...repositories],
exports: [PrismaService, RedisModule, KafkaModule, IdentityModule, FeeConfigRepositoryImpl, ...repositories], exports: [PrismaService, unitOfWork, RedisModule, KafkaModule, IdentityModule, FeeConfigRepositoryImpl, ...repositories],
}) })
export class InfrastructureModule {} export class InfrastructureModule {}

View File

@ -5,14 +5,20 @@ import {
ILedgerEntryRepository, LedgerFilters, Pagination, PaginatedResult, ILedgerEntryRepository, LedgerFilters, Pagination, PaginatedResult,
} from '@/domain/repositories'; } from '@/domain/repositories';
import { LedgerEntry } from '@/domain/aggregates'; import { LedgerEntry } from '@/domain/aggregates';
import { RepositorySaveOptions, PrismaTransactionClient } from '@/infrastructure/persistence/unit-of-work';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
@Injectable() @Injectable()
export class LedgerEntryRepositoryImpl implements ILedgerEntryRepository { export class LedgerEntryRepositoryImpl implements ILedgerEntryRepository {
constructor(private readonly prisma: PrismaService) {} constructor(private readonly prisma: PrismaService) {}
async save(entry: LedgerEntry): Promise<LedgerEntry> { private getClient(options?: RepositorySaveOptions): PrismaService | PrismaTransactionClient {
const created = await this.prisma.ledgerEntry.create({ return options?.tx || this.prisma;
}
async save(entry: LedgerEntry, options?: RepositorySaveOptions): Promise<LedgerEntry> {
const client = this.getClient(options);
const created = await client.ledgerEntry.create({
data: { data: {
accountSequence: entry.accountSequence, accountSequence: entry.accountSequence,
userId: entry.userId.value, userId: entry.userId.value,
@ -29,8 +35,9 @@ export class LedgerEntryRepositoryImpl implements ILedgerEntryRepository {
return this.toDomain(created); return this.toDomain(created);
} }
async saveAll(entries: LedgerEntry[]): Promise<void> { async saveAll(entries: LedgerEntry[], options?: RepositorySaveOptions): Promise<void> {
await this.prisma.ledgerEntry.createMany({ const client = this.getClient(options);
await client.ledgerEntry.createMany({
data: entries.map(entry => ({ data: entries.map(entry => ({
accountSequence: entry.accountSequence, accountSequence: entry.accountSequence,
userId: entry.userId.value, userId: entry.userId.value,

View File

@ -4,6 +4,7 @@ import { IWalletAccountRepository } from '@/domain/repositories';
import { WalletAccount } from '@/domain/aggregates'; import { WalletAccount } from '@/domain/aggregates';
import { UserId, WalletStatus } from '@/domain/value-objects'; import { UserId, WalletStatus } from '@/domain/value-objects';
import { OptimisticLockError } from '@/shared/exceptions/domain.exception'; import { OptimisticLockError } from '@/shared/exceptions/domain.exception';
import { RepositorySaveOptions, PrismaTransactionClient } from '@/infrastructure/persistence/unit-of-work';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
@Injectable() @Injectable()
@ -12,7 +13,12 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
constructor(private readonly prisma: PrismaService) {} constructor(private readonly prisma: PrismaService) {}
async save(wallet: WalletAccount): Promise<WalletAccount> { private getClient(options?: RepositorySaveOptions): PrismaService | PrismaTransactionClient {
return options?.tx || this.prisma;
}
async save(wallet: WalletAccount, options?: RepositorySaveOptions): Promise<WalletAccount> {
const client = this.getClient(options);
const data = { const data = {
accountSequence: wallet.accountSequence, accountSequence: wallet.accountSequence,
userId: wallet.userId.value, userId: wallet.userId.value,
@ -42,7 +48,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
if (wallet.walletId.value === BigInt(0)) { if (wallet.walletId.value === BigInt(0)) {
// Create new wallet with version = 0 // Create new wallet with version = 0
const created = await this.prisma.walletAccount.create({ const created = await client.walletAccount.create({
data: { data: {
...data, ...data,
version: 0, version: 0,
@ -54,7 +60,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
const currentVersion = wallet.version; const currentVersion = wallet.version;
const newVersion = currentVersion + 1; const newVersion = currentVersion + 1;
const result = await this.prisma.walletAccount.updateMany({ const result = await client.walletAccount.updateMany({
where: { where: {
id: wallet.walletId.value, id: wallet.walletId.value,
version: currentVersion, // Optimistic lock: only update if version matches version: currentVersion, // Optimistic lock: only update if version matches
@ -81,7 +87,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
); );
// Fetch the updated record to return // Fetch the updated record to return
const updated = await this.prisma.walletAccount.findUnique({ const updated = await client.walletAccount.findUnique({
where: { id: wallet.walletId.value }, where: { id: wallet.walletId.value },
}); });
@ -93,35 +99,38 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
} }
} }
async findById(walletId: bigint): Promise<WalletAccount | null> { async findById(walletId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount | null> {
const record = await this.prisma.walletAccount.findUnique({ const client = this.getClient(options);
const record = await client.walletAccount.findUnique({
where: { id: walletId }, where: { id: walletId },
}); });
return record ? this.toDomain(record) : null; return record ? this.toDomain(record) : null;
} }
async findByUserId(userId: bigint): Promise<WalletAccount | null> { async findByUserId(userId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount | null> {
const record = await this.prisma.walletAccount.findUnique({ const client = this.getClient(options);
const record = await client.walletAccount.findUnique({
where: { userId }, where: { userId },
}); });
return record ? this.toDomain(record) : null; return record ? this.toDomain(record) : null;
} }
async findByAccountSequence(accountSequence: string): Promise<WalletAccount | null> { async findByAccountSequence(accountSequence: string, options?: RepositorySaveOptions): Promise<WalletAccount | null> {
const record = await this.prisma.walletAccount.findUnique({ const client = this.getClient(options);
const record = await client.walletAccount.findUnique({
where: { accountSequence }, where: { accountSequence },
}); });
return record ? this.toDomain(record) : null; return record ? this.toDomain(record) : null;
} }
async getOrCreate(accountSequence: string, userId: bigint): Promise<WalletAccount> { async getOrCreate(accountSequence: string, userId: bigint, options?: RepositorySaveOptions): Promise<WalletAccount> {
const existing = await this.findByAccountSequence(accountSequence); const existing = await this.findByAccountSequence(accountSequence, options);
if (existing) { if (existing) {
return existing; return existing;
} }
const newWallet = WalletAccount.createNew(accountSequence, UserId.create(userId)); const newWallet = WalletAccount.createNew(accountSequence, UserId.create(userId));
return this.save(newWallet); return this.save(newWallet, options);
} }
async findByUserIds(userIds: bigint[]): Promise<Map<string, WalletAccount>> { async findByUserIds(userIds: bigint[]): Promise<Map<string, WalletAccount>> {

View File

@ -0,0 +1,2 @@
export * from './unit-of-work.interface';
export * from './unit-of-work.service';

View File

@ -0,0 +1,35 @@
import { PrismaClient } from '@prisma/client';
/**
* Prisma
*
*/
export type PrismaTransactionClient = Omit<
PrismaClient,
'$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends'
>;
/**
*
* Unit of Work 使
*/
export interface RepositorySaveOptions {
tx?: PrismaTransactionClient;
}
/**
* Unit of Work
*
*/
export interface IUnitOfWork {
/**
*
* @param work
* @returns
*/
runInTransaction<T>(
work: (tx: PrismaTransactionClient) => Promise<T>,
): Promise<T>;
}
export const UNIT_OF_WORK = Symbol('IUnitOfWork');

View File

@ -0,0 +1,44 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { IUnitOfWork, PrismaTransactionClient } from './unit-of-work.interface';
/**
* Unit of Work
* 使 Prisma Interactive Transaction
*/
@Injectable()
export class UnitOfWorkService implements IUnitOfWork {
private readonly logger = new Logger(UnitOfWorkService.name);
constructor(private readonly prisma: PrismaService) {}
/**
*
*
*/
async runInTransaction<T>(
work: (tx: PrismaTransactionClient) => Promise<T>,
): Promise<T> {
this.logger.debug('[UoW] Starting transaction');
try {
const result = await this.prisma.$transaction(
async (tx) => {
return await work(tx);
},
{
// 设置较长的超时时间,因为某些操作可能需要更多时间
timeout: 30000, // 30 秒
// 使用 ReadCommitted 隔离级别,平衡一致性和性能
isolationLevel: 'ReadCommitted',
},
);
this.logger.debug('[UoW] Transaction committed successfully');
return result;
} catch (error) {
this.logger.error('[UoW] Transaction rolled back due to error', error);
throw error;
}
}
}