fix(wallet-service): 添加钱包乐观锁防止并发修改

- WalletAccount aggregate 添加 version 字段
- WalletAccountRepositoryImpl 使用 updateMany + version 检查实现乐观锁
- requestWithdrawal 添加重试机制处理乐观锁冲突

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-25 20:44:55 -08:00
parent 1e15b820b4
commit 305bdf63af
3 changed files with 92 additions and 7 deletions

View File

@ -1349,6 +1349,40 @@ export class WalletApplicationService {
fee: number; fee: number;
netAmount: number; netAmount: number;
status: string; status: string;
}> {
const MAX_RETRIES = 3;
let retries = 0;
while (retries < MAX_RETRIES) {
try {
return await this.executeRequestWithdrawal(command);
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[requestWithdrawal] Optimistic lock conflict for ${command.userId}, retry ${retries}/${MAX_RETRIES}`);
if (retries >= MAX_RETRIES) {
this.logger.error(`[requestWithdrawal] Max retries exceeded for ${command.userId}`);
throw error;
}
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
throw new Error('Unexpected: exited retry loop without result');
}
/**
* Execute the withdrawal request logic
*/
private async executeRequestWithdrawal(command: RequestWithdrawalCommand): Promise<{
orderNo: string;
amount: number;
fee: number;
netAmount: number;
status: string;
}> { }> {
const userId = BigInt(command.userId); const userId = BigInt(command.userId);
const amount = Money.USDT(command.amount); const amount = Money.USDT(command.amount);

View File

@ -40,6 +40,7 @@ export class WalletAccount {
private _rewards: WalletRewards; private _rewards: WalletRewards;
private _status: WalletStatus; private _status: WalletStatus;
private _hasPlanted: boolean; // 是否已认种过 private _hasPlanted: boolean; // 是否已认种过
private _version: number; // 乐观锁版本号
private readonly _createdAt: Date; private readonly _createdAt: Date;
private _updatedAt: Date; private _updatedAt: Date;
private _domainEvents: DomainEvent[] = []; private _domainEvents: DomainEvent[] = [];
@ -53,6 +54,7 @@ export class WalletAccount {
rewards: WalletRewards, rewards: WalletRewards,
status: WalletStatus, status: WalletStatus,
hasPlanted: boolean, hasPlanted: boolean,
version: number,
createdAt: Date, createdAt: Date,
updatedAt: Date, updatedAt: Date,
) { ) {
@ -64,6 +66,7 @@ export class WalletAccount {
this._rewards = rewards; this._rewards = rewards;
this._status = status; this._status = status;
this._hasPlanted = hasPlanted; this._hasPlanted = hasPlanted;
this._version = version;
this._createdAt = createdAt; this._createdAt = createdAt;
this._updatedAt = updatedAt; this._updatedAt = updatedAt;
} }
@ -76,6 +79,7 @@ export class WalletAccount {
get hashpower(): Hashpower { return this._hashpower; } get hashpower(): Hashpower { return this._hashpower; }
get rewards(): WalletRewards { return this._rewards; } get rewards(): WalletRewards { return this._rewards; }
get status(): WalletStatus { return this._status; } get status(): WalletStatus { return this._status; }
get version(): number { return this._version; }
get createdAt(): Date { return this._createdAt; } get createdAt(): Date { return this._createdAt; }
get updatedAt(): Date { return this._updatedAt; } get updatedAt(): Date { return this._updatedAt; }
get hasPlanted(): boolean { return this._hasPlanted; } get hasPlanted(): boolean { return this._hasPlanted; }
@ -109,6 +113,7 @@ export class WalletAccount {
}, },
WalletStatus.ACTIVE, WalletStatus.ACTIVE,
false, // hasPlanted false, // hasPlanted
0, // version (new wallet starts at 0)
now, now,
now, now,
); );
@ -140,6 +145,7 @@ export class WalletAccount {
expiredTotalHashpower: Decimal; expiredTotalHashpower: Decimal;
status: string; status: string;
hasPlanted: boolean; hasPlanted: boolean;
version: number;
createdAt: Date; createdAt: Date;
updatedAt: Date; updatedAt: Date;
}): WalletAccount { }): WalletAccount {
@ -168,6 +174,7 @@ export class WalletAccount {
}, },
params.status as WalletStatus, params.status as WalletStatus,
params.hasPlanted, params.hasPlanted,
params.version,
params.createdAt, params.createdAt,
params.updatedAt, params.updatedAt,
); );

View File

@ -1,12 +1,15 @@
import { Injectable } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { IWalletAccountRepository } from '@/domain/repositories'; import { IWalletAccountRepository } from '@/domain/repositories';
import { WalletAccount } from '@/domain/aggregates'; import { WalletAccount } from '@/domain/aggregates';
import { UserId, WalletStatus } from '@/domain/value-objects'; import { UserId, WalletStatus } from '@/domain/value-objects';
import { OptimisticLockError } from '@/shared/exceptions/domain.exception';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
@Injectable() @Injectable()
export class WalletAccountRepositoryImpl implements IWalletAccountRepository { export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
private readonly logger = new Logger(WalletAccountRepositoryImpl.name);
constructor(private readonly prisma: PrismaService) {} constructor(private readonly prisma: PrismaService) {}
async save(wallet: WalletAccount): Promise<WalletAccount> { async save(wallet: WalletAccount): Promise<WalletAccount> {
@ -38,15 +41,54 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
}; };
if (wallet.walletId.value === BigInt(0)) { if (wallet.walletId.value === BigInt(0)) {
// Create new // Create new wallet with version = 0
const created = await this.prisma.walletAccount.create({ data }); const created = await this.prisma.walletAccount.create({
data: {
...data,
version: 0,
},
});
return this.toDomain(created); return this.toDomain(created);
} else { } else {
// Update existing // Update existing with optimistic lock
const updated = await this.prisma.walletAccount.update({ const currentVersion = wallet.version;
where: { id: wallet.walletId.value }, const newVersion = currentVersion + 1;
data,
const result = await this.prisma.walletAccount.updateMany({
where: {
id: wallet.walletId.value,
version: currentVersion, // Optimistic lock: only update if version matches
},
data: {
...data,
version: newVersion,
updatedAt: new Date(),
},
}); });
if (result.count === 0) {
// Version mismatch - concurrent modification detected
this.logger.warn(
`[OPTIMISTIC_LOCK] Conflict detected for wallet ${wallet.accountSequence} (id=${wallet.walletId.value}, version=${currentVersion})`,
);
throw new OptimisticLockError(
`Wallet ${wallet.accountSequence} was modified by another transaction (version=${currentVersion})`,
);
}
this.logger.debug(
`[SAVE] Wallet ${wallet.accountSequence} updated (version: ${currentVersion} -> ${newVersion})`,
);
// Fetch the updated record to return
const updated = await this.prisma.walletAccount.findUnique({
where: { id: wallet.walletId.value },
});
if (!updated) {
throw new Error(`Wallet ${wallet.walletId.value} not found after update`);
}
return this.toDomain(updated); return this.toDomain(updated);
} }
} }
@ -120,6 +162,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
expiredTotalHashpower: Decimal; expiredTotalHashpower: Decimal;
status: string; status: string;
hasPlanted: boolean; hasPlanted: boolean;
version: number;
createdAt: Date; createdAt: Date;
updatedAt: Date; updatedAt: Date;
}): WalletAccount { }): WalletAccount {
@ -149,6 +192,7 @@ export class WalletAccountRepositoryImpl implements IWalletAccountRepository {
expiredTotalHashpower: new Decimal(record.expiredTotalHashpower.toString()), expiredTotalHashpower: new Decimal(record.expiredTotalHashpower.toString()),
status: record.status, status: record.status,
hasPlanted: record.hasPlanted, hasPlanted: record.hasPlanted,
version: record.version,
createdAt: record.createdAt, createdAt: record.createdAt,
updatedAt: record.updatedAt, updatedAt: record.updatedAt,
}); });