diff --git a/backend/services/wallet-service/src/application/services/wallet-application.service.ts b/backend/services/wallet-service/src/application/services/wallet-application.service.ts index 854d823c..5a6f074e 100644 --- a/backend/services/wallet-service/src/application/services/wallet-application.service.ts +++ b/backend/services/wallet-service/src/application/services/wallet-application.service.ts @@ -7,7 +7,8 @@ import { IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY, IPendingRewardRepository, PENDING_REWARD_REPOSITORY, } from '@/domain/repositories'; -import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { IUnitOfWork, UNIT_OF_WORK } from '@/infrastructure/persistence/unit-of-work'; import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingReward, PendingRewardStatus, WalletAccount } from '@/domain/aggregates'; import { UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency, @@ -96,6 +97,8 @@ export class WalletApplicationService { private readonly prisma: PrismaService, private readonly feeConfigRepo: FeeConfigRepositoryImpl, private readonly identityClient: IdentityClientService, + @Inject(UNIT_OF_WORK) + private readonly unitOfWork: IUnitOfWork, ) {} // =============== Commands =============== @@ -922,27 +925,32 @@ export class WalletApplicationService { // 2. 生成结算ID const settlementId = `STL_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; - // 3. 执行钱包结算 - wallet.settleToBalance(usdtAmount, settlementId); - await this.walletRepo.save(wallet); + // 3. 在事务中执行钱包结算和记录流水(确保原子性) + const savedWallet = await this.unitOfWork.runInTransaction(async (tx) => { + // 执行钱包结算 + wallet.settleToBalance(usdtAmount, settlementId); + const updated = await this.walletRepo.save(wallet, { tx }); - // 4. 记录账本流水(含详细来源信息) - const ledgerEntry = LedgerEntry.create({ - accountSequence: wallet.accountSequence, - userId: UserId.create(userId), - entryType: LedgerEntryType.REWARD_SETTLED, - amount: usdtAmount, - balanceAfter: wallet.balances.usdt.available, - refOrderId: settlementId, - memo: params.memo || `结算 ${params.usdtAmount} 绿积分到钱包余额`, - payloadJson: { - settlementType: 'SETTLE_TO_BALANCE', - rewardEntryIds: params.rewardEntryIds, - rewardCount: params.rewardEntryIds.length, - breakdown: params.breakdown, - }, + // 记录账本流水(含详细来源信息) + const ledgerEntry = LedgerEntry.create({ + accountSequence: wallet.accountSequence, + userId: UserId.create(userId), + entryType: LedgerEntryType.REWARD_SETTLED, + amount: usdtAmount, + balanceAfter: updated.balances.usdt.available, + refOrderId: settlementId, + memo: params.memo || `结算 ${params.usdtAmount} 绿积分到钱包余额`, + payloadJson: { + settlementType: 'SETTLE_TO_BALANCE', + rewardEntryIds: params.rewardEntryIds, + rewardCount: params.rewardEntryIds.length, + breakdown: params.breakdown, + }, + }); + await this.ledgerRepo.save(ledgerEntry, { tx }); + + return updated; }); - await this.ledgerRepo.save(ledgerEntry); // 5. 使缓存失效 await this.walletCacheService.invalidateWallet(userId); @@ -953,7 +961,7 @@ export class WalletApplicationService { success: true, settlementId, settledAmount: params.usdtAmount, - balanceAfter: wallet.balances.usdt.available.value, + balanceAfter: savedWallet.balances.usdt.available.value, }; } catch (error) { this.logger.error(`Failed to settle to balance for ${params.accountSequence}: ${error.message}`); diff --git a/backend/services/wallet-service/src/domain/repositories/ledger-entry.repository.interface.ts b/backend/services/wallet-service/src/domain/repositories/ledger-entry.repository.interface.ts index 2eaed8e1..0ee36f19 100644 --- a/backend/services/wallet-service/src/domain/repositories/ledger-entry.repository.interface.ts +++ b/backend/services/wallet-service/src/domain/repositories/ledger-entry.repository.interface.ts @@ -1,5 +1,6 @@ import { LedgerEntry } from '@/domain/aggregates'; import { LedgerEntryType, AssetType } from '@/domain/value-objects'; +import { RepositorySaveOptions } from '@/infrastructure/persistence/unit-of-work'; export interface LedgerFilters { entryType?: LedgerEntryType; @@ -22,8 +23,8 @@ export interface PaginatedResult { } export interface ILedgerEntryRepository { - save(entry: LedgerEntry): Promise; - saveAll(entries: LedgerEntry[]): Promise; + save(entry: LedgerEntry, options?: RepositorySaveOptions): Promise; + saveAll(entries: LedgerEntry[], options?: RepositorySaveOptions): Promise; findByUserId(userId: bigint, filters?: LedgerFilters, pagination?: Pagination): Promise>; findByAccountSequence(accountSequence: string, filters?: LedgerFilters, pagination?: Pagination): Promise>; findByRefOrderId(refOrderId: string): Promise; diff --git a/backend/services/wallet-service/src/domain/repositories/wallet-account.repository.interface.ts b/backend/services/wallet-service/src/domain/repositories/wallet-account.repository.interface.ts index 73e30902..360fdac2 100644 --- a/backend/services/wallet-service/src/domain/repositories/wallet-account.repository.interface.ts +++ b/backend/services/wallet-service/src/domain/repositories/wallet-account.repository.interface.ts @@ -1,11 +1,12 @@ import { WalletAccount } from '@/domain/aggregates'; +import { RepositorySaveOptions } from '@/infrastructure/persistence/unit-of-work'; export interface IWalletAccountRepository { - save(wallet: WalletAccount): Promise; - findById(walletId: bigint): Promise; - findByUserId(userId: bigint): Promise; - findByAccountSequence(accountSequence: string): Promise; - getOrCreate(accountSequence: string, userId: bigint): Promise; + save(wallet: WalletAccount, options?: RepositorySaveOptions): Promise; + findById(walletId: bigint, options?: RepositorySaveOptions): Promise; + findByUserId(userId: bigint, options?: RepositorySaveOptions): Promise; + findByAccountSequence(accountSequence: string, options?: RepositorySaveOptions): Promise; + getOrCreate(accountSequence: string, userId: bigint, options?: RepositorySaveOptions): Promise; findByUserIds(userIds: bigint[]): Promise>; } diff --git a/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts b/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts index be5ac54f..5b961f76 100644 --- a/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/wallet-service/src/infrastructure/infrastructure.module.ts @@ -10,6 +10,7 @@ import { PendingRewardRepositoryImpl, FeeConfigRepositoryImpl, } from './persistence/repositories'; +import { UnitOfWorkService, UNIT_OF_WORK } from './persistence/unit-of-work'; import { WALLET_ACCOUNT_REPOSITORY, LEDGER_ENTRY_REPOSITORY, @@ -55,10 +56,15 @@ const repositories = [ FeeConfigRepositoryImpl, ]; +const unitOfWork = { + provide: UNIT_OF_WORK, + useClass: UnitOfWorkService, +}; + @Global() @Module({ imports: [RedisModule, KafkaModule, IdentityModule], - providers: [PrismaService, ...repositories], - exports: [PrismaService, RedisModule, KafkaModule, IdentityModule, FeeConfigRepositoryImpl, ...repositories], + providers: [PrismaService, unitOfWork, ...repositories], + exports: [PrismaService, unitOfWork, RedisModule, KafkaModule, IdentityModule, FeeConfigRepositoryImpl, ...repositories], }) export class InfrastructureModule {} diff --git a/backend/services/wallet-service/src/infrastructure/persistence/repositories/ledger-entry.repository.impl.ts b/backend/services/wallet-service/src/infrastructure/persistence/repositories/ledger-entry.repository.impl.ts index bcd6f481..a73d7de0 100644 --- a/backend/services/wallet-service/src/infrastructure/persistence/repositories/ledger-entry.repository.impl.ts +++ b/backend/services/wallet-service/src/infrastructure/persistence/repositories/ledger-entry.repository.impl.ts @@ -5,14 +5,20 @@ import { ILedgerEntryRepository, LedgerFilters, Pagination, PaginatedResult, } from '@/domain/repositories'; import { LedgerEntry } from '@/domain/aggregates'; +import { RepositorySaveOptions, PrismaTransactionClient } from '@/infrastructure/persistence/unit-of-work'; import Decimal from 'decimal.js'; @Injectable() export class LedgerEntryRepositoryImpl implements ILedgerEntryRepository { constructor(private readonly prisma: PrismaService) {} - async save(entry: LedgerEntry): Promise { - const created = await this.prisma.ledgerEntry.create({ + private getClient(options?: RepositorySaveOptions): PrismaService | PrismaTransactionClient { + return options?.tx || this.prisma; + } + + async save(entry: LedgerEntry, options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); + const created = await client.ledgerEntry.create({ data: { accountSequence: entry.accountSequence, userId: entry.userId.value, @@ -29,8 +35,9 @@ export class LedgerEntryRepositoryImpl implements ILedgerEntryRepository { return this.toDomain(created); } - async saveAll(entries: LedgerEntry[]): Promise { - await this.prisma.ledgerEntry.createMany({ + async saveAll(entries: LedgerEntry[], options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); + await client.ledgerEntry.createMany({ data: entries.map(entry => ({ accountSequence: entry.accountSequence, userId: entry.userId.value, diff --git a/backend/services/wallet-service/src/infrastructure/persistence/repositories/wallet-account.repository.impl.ts b/backend/services/wallet-service/src/infrastructure/persistence/repositories/wallet-account.repository.impl.ts index e6850c9b..5c072f14 100644 --- a/backend/services/wallet-service/src/infrastructure/persistence/repositories/wallet-account.repository.impl.ts +++ b/backend/services/wallet-service/src/infrastructure/persistence/repositories/wallet-account.repository.impl.ts @@ -4,6 +4,7 @@ import { IWalletAccountRepository } from '@/domain/repositories'; import { WalletAccount } from '@/domain/aggregates'; import { UserId, WalletStatus } from '@/domain/value-objects'; import { OptimisticLockError } from '@/shared/exceptions/domain.exception'; +import { RepositorySaveOptions, PrismaTransactionClient } from '@/infrastructure/persistence/unit-of-work'; import Decimal from 'decimal.js'; @Injectable() @@ -12,7 +13,12 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository { constructor(private readonly prisma: PrismaService) {} - async save(wallet: WalletAccount): Promise { + private getClient(options?: RepositorySaveOptions): PrismaService | PrismaTransactionClient { + return options?.tx || this.prisma; + } + + async save(wallet: WalletAccount, options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); const data = { accountSequence: wallet.accountSequence, userId: wallet.userId.value, @@ -42,7 +48,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository { if (wallet.walletId.value === BigInt(0)) { // Create new wallet with version = 0 - const created = await this.prisma.walletAccount.create({ + const created = await client.walletAccount.create({ data: { ...data, version: 0, @@ -54,7 +60,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository { const currentVersion = wallet.version; const newVersion = currentVersion + 1; - const result = await this.prisma.walletAccount.updateMany({ + const result = await client.walletAccount.updateMany({ where: { id: wallet.walletId.value, version: currentVersion, // Optimistic lock: only update if version matches @@ -81,7 +87,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository { ); // Fetch the updated record to return - const updated = await this.prisma.walletAccount.findUnique({ + const updated = await client.walletAccount.findUnique({ where: { id: wallet.walletId.value }, }); @@ -93,35 +99,38 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository { } } - async findById(walletId: bigint): Promise { - const record = await this.prisma.walletAccount.findUnique({ + async findById(walletId: bigint, options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); + const record = await client.walletAccount.findUnique({ where: { id: walletId }, }); return record ? this.toDomain(record) : null; } - async findByUserId(userId: bigint): Promise { - const record = await this.prisma.walletAccount.findUnique({ + async findByUserId(userId: bigint, options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); + const record = await client.walletAccount.findUnique({ where: { userId }, }); return record ? this.toDomain(record) : null; } - async findByAccountSequence(accountSequence: string): Promise { - const record = await this.prisma.walletAccount.findUnique({ + async findByAccountSequence(accountSequence: string, options?: RepositorySaveOptions): Promise { + const client = this.getClient(options); + const record = await client.walletAccount.findUnique({ where: { accountSequence }, }); return record ? this.toDomain(record) : null; } - async getOrCreate(accountSequence: string, userId: bigint): Promise { - const existing = await this.findByAccountSequence(accountSequence); + async getOrCreate(accountSequence: string, userId: bigint, options?: RepositorySaveOptions): Promise { + const existing = await this.findByAccountSequence(accountSequence, options); if (existing) { return existing; } const newWallet = WalletAccount.createNew(accountSequence, UserId.create(userId)); - return this.save(newWallet); + return this.save(newWallet, options); } async findByUserIds(userIds: bigint[]): Promise> { diff --git a/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/index.ts b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/index.ts new file mode 100644 index 00000000..fde6a236 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/index.ts @@ -0,0 +1,2 @@ +export * from './unit-of-work.interface'; +export * from './unit-of-work.service'; diff --git a/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.interface.ts b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.interface.ts new file mode 100644 index 00000000..344893bd --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.interface.ts @@ -0,0 +1,35 @@ +import { PrismaClient } from '@prisma/client'; + +/** + * Prisma 事务客户端类型 + * 用于在事务中传递给仓库 + */ +export type PrismaTransactionClient = Omit< + PrismaClient, + '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' +>; + +/** + * 仓库保存选项 + * 允许传入事务客户端以在 Unit of Work 中使用 + */ +export interface RepositorySaveOptions { + tx?: PrismaTransactionClient; +} + +/** + * Unit of Work 接口 + * 用于管理跨多个仓库的事务 + */ +export interface IUnitOfWork { + /** + * 在事务中执行工作 + * @param work 要执行的工作函数,接收事务客户端 + * @returns 工作函数的返回值 + */ + runInTransaction( + work: (tx: PrismaTransactionClient) => Promise, + ): Promise; +} + +export const UNIT_OF_WORK = Symbol('IUnitOfWork'); diff --git a/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.service.ts b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.service.ts new file mode 100644 index 00000000..a0d7dfa4 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/persistence/unit-of-work/unit-of-work.service.ts @@ -0,0 +1,44 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { IUnitOfWork, PrismaTransactionClient } from './unit-of-work.interface'; + +/** + * Unit of Work 实现 + * 使用 Prisma Interactive Transaction 确保多个仓库操作的原子性 + */ +@Injectable() +export class UnitOfWorkService implements IUnitOfWork { + private readonly logger = new Logger(UnitOfWorkService.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中执行工作 + * 事务中的所有操作要么全部成功,要么全部回滚 + */ + async runInTransaction( + work: (tx: PrismaTransactionClient) => Promise, + ): Promise { + this.logger.debug('[UoW] Starting transaction'); + + try { + const result = await this.prisma.$transaction( + async (tx) => { + return await work(tx); + }, + { + // 设置较长的超时时间,因为某些操作可能需要更多时间 + timeout: 30000, // 30 秒 + // 使用 ReadCommitted 隔离级别,平衡一致性和性能 + isolationLevel: 'ReadCommitted', + }, + ); + + this.logger.debug('[UoW] Transaction committed successfully'); + return result; + } catch (error) { + this.logger.error('[UoW] Transaction rolled back due to error', error); + throw error; + } + } +}