iconsulting/packages/services/payment-service/src/infrastructure/database/transaction.service.ts

138 lines
4.0 KiB
TypeScript

import { Injectable } from '@nestjs/common';
import { DataSource, QueryRunner, EntityManager } from 'typeorm';
/**
* Transaction Service - 提供可靠的事务管理
*
* 基于 TypeORM 官方推荐的 QueryRunner 模式
* 参考: https://typeorm.io/docs/advanced-topics/transactions/
*/
@Injectable()
export class TransactionService {
constructor(private readonly dataSource: DataSource) {}
/**
* 在事务中执行操作 (推荐使用)
*
* @param operation - 接收 EntityManager 的异步操作函数
* @returns 操作的返回值
* @throws 如果操作失败,事务会自动回滚并抛出原始错误
*
* @example
* ```typescript
* const result = await this.transactionService.runInTransaction(async (manager) => {
* await manager.save(PaymentORM, payment);
* await manager.update(OrderORM, orderId, { status: 'PAID' });
* return payment;
* });
* ```
*/
async runInTransaction<T>(
operation: (manager: EntityManager) => Promise<T>,
): Promise<T> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const result = await operation(queryRunner.manager);
await queryRunner.commitTransaction();
return result;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
// 重要:必须释放连接,否则会导致连接池耗尽
await queryRunner.release();
}
}
/**
* 使用 QueryRunner 执行事务 (用于需要更多控制的场景)
*
* @param operation - 接收 QueryRunner 的异步操作函数
* @returns 操作的返回值
*
* @example
* ```typescript
* const result = await this.transactionService.runWithQueryRunner(async (queryRunner) => {
* // 可以使用 queryRunner.manager 进行操作
* // 也可以使用 queryRunner.query() 执行原始 SQL
* await queryRunner.manager.save(entity);
* return entity;
* });
* ```
*/
async runWithQueryRunner<T>(
operation: (queryRunner: QueryRunner) => Promise<T>,
): Promise<T> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const result = await operation(queryRunner);
await queryRunner.commitTransaction();
return result;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}
/**
* 使用乐观锁执行更新 (防止并发覆盖)
*
* @param operation - 异步操作函数
* @param maxRetries - 最大重试次数 (默认 3)
* @returns 操作的返回值
* @throws 如果达到最大重试次数仍失败
*
* @example
* ```typescript
* await this.transactionService.runWithOptimisticLock(async (manager) => {
* const payment = await manager.findOne(PaymentORM, { where: { id } });
* payment.status = 'COMPLETED';
* await manager.save(payment); // 如果 version 不匹配会抛出错误
* });
* ```
*/
async runWithOptimisticLock<T>(
operation: (manager: EntityManager) => Promise<T>,
maxRetries: number = 3,
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.runInTransaction(operation);
} catch (error: unknown) {
// TypeORM 乐观锁冲突错误
if (
error instanceof Error &&
(error.name === 'OptimisticLockVersionMismatchError' ||
error.message.includes('version'))
) {
lastError = error;
// 短暂延迟后重试
await this.delay(50 * attempt);
continue;
}
throw error;
}
}
throw new Error(
`Optimistic lock failed after ${maxRetries} retries: ${lastError?.message}`,
);
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}