feat(payment): add transaction reliability and idempotency support

- Add TransactionService for atomic database operations with optimistic lock retry
- Implement pessimistic locking in payment callback handling to prevent race conditions
- Add idempotency check via transactionId unique index to prevent duplicate processing
- Add version columns to PaymentORM and OrderORM for optimistic locking
- Add composite indexes for performance (order_status, transaction_id)
- Optimize connection pool settings for both payment and conversation services
- Update init-db.sql with version columns and new indexes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-25 07:00:01 -08:00
parent 43a48f0c6a
commit 65c0bdd17c
10 changed files with 337 additions and 29 deletions

View File

@ -27,7 +27,16 @@ import { HealthModule } from './health/health.module';
entities: [__dirname + '/**/*.orm{.ts,.js}'], entities: [__dirname + '/**/*.orm{.ts,.js}'],
// 生产环境禁用synchronize使用init-db.sql初始化schema // 生产环境禁用synchronize使用init-db.sql初始化schema
synchronize: false, synchronize: false,
logging: configService.get('NODE_ENV') === 'development', // 连接池配置 - 优化并发性能
extra: {
max: configService.get<number>('DB_POOL_SIZE', 20),
min: 2,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
},
logging: configService.get('NODE_ENV') === 'development' ? ['query', 'error'] : ['error'],
retryAttempts: 3,
retryDelay: 1000,
}), }),
}), }),

View File

@ -30,6 +30,13 @@ export class PaymentPostgresRepository implements IPaymentRepository {
return orm ? this.toEntity(orm) : null; return orm ? this.toEntity(orm) : null;
} }
async findByTransactionId(transactionId: string): Promise<PaymentEntity | null> {
const orm = await this.repo.findOne({
where: { transactionId },
});
return orm ? this.toEntity(orm) : null;
}
async update(payment: PaymentEntity): Promise<PaymentEntity> { async update(payment: PaymentEntity): Promise<PaymentEntity> {
const orm = this.toORM(payment); const orm = this.toORM(payment);
const saved = await this.repo.save(orm); const saved = await this.repo.save(orm);

View File

@ -23,7 +23,24 @@ import { HealthModule } from './health/health.module';
password: configService.get('POSTGRES_PASSWORD'), password: configService.get('POSTGRES_PASSWORD'),
database: configService.get('POSTGRES_DB', 'iconsulting'), database: configService.get('POSTGRES_DB', 'iconsulting'),
entities: [__dirname + '/**/*.orm{.ts,.js}'], entities: [__dirname + '/**/*.orm{.ts,.js}'],
synchronize: configService.get('NODE_ENV') === 'development', // 生产环境禁用 synchronize使用 init-db.sql 初始化
synchronize: false,
// 连接池配置 - 优化并发性能
extra: {
// 连接池最大连接数
max: configService.get<number>('DB_POOL_SIZE', 20),
// 连接池最小连接数
min: 2,
// 空闲连接超时时间 (ms)
idleTimeoutMillis: 30000,
// 连接超时时间 (ms)
connectionTimeoutMillis: 5000,
},
// 日志配置
logging: configService.get('NODE_ENV') === 'development' ? ['query', 'error'] : ['error'],
// 重试策略
retryAttempts: 3,
retryDelay: 1000,
}), }),
}), }),

View File

@ -1,11 +1,29 @@
import { Injectable, Inject, NotFoundException, BadRequestException } from '@nestjs/common'; import {
import { PaymentEntity, PaymentMethod, PaymentStatus } from '../../domain/entities/payment.entity'; Injectable,
Inject,
NotFoundException,
BadRequestException,
ConflictException,
Logger,
} from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import {
PaymentEntity,
PaymentMethod,
PaymentStatus,
} from '../../domain/entities/payment.entity';
import { OrderStatus } from '../../domain/entities/order.entity'; import { OrderStatus } from '../../domain/entities/order.entity';
import { IPaymentRepository, PAYMENT_REPOSITORY } from '../../domain/repositories/payment.repository.interface'; import {
IPaymentRepository,
PAYMENT_REPOSITORY,
} from '../../domain/repositories/payment.repository.interface';
import { OrderService } from './order.service'; import { OrderService } from './order.service';
import { AlipayAdapter } from '../../adapters/outbound/payment-methods/alipay.adapter'; import { AlipayAdapter } from '../../adapters/outbound/payment-methods/alipay.adapter';
import { WechatPayAdapter } from '../../adapters/outbound/payment-methods/wechat-pay.adapter'; import { WechatPayAdapter } from '../../adapters/outbound/payment-methods/wechat-pay.adapter';
import { StripeAdapter } from '../../adapters/outbound/payment-methods/stripe.adapter'; import { StripeAdapter } from '../../adapters/outbound/payment-methods/stripe.adapter';
import { TransactionService } from '../../infrastructure/database/transaction.service';
import { PaymentORM } from '../../infrastructure/database/postgres/entities/payment.orm';
import { OrderORM } from '../../infrastructure/database/postgres/entities/order.orm';
export interface CreatePaymentParams { export interface CreatePaymentParams {
orderId: string; orderId: string;
@ -24,6 +42,8 @@ export interface PaymentResult {
@Injectable() @Injectable()
export class PaymentService { export class PaymentService {
private readonly logger = new Logger(PaymentService.name);
constructor( constructor(
@Inject(PAYMENT_REPOSITORY) @Inject(PAYMENT_REPOSITORY)
private readonly paymentRepo: IPaymentRepository, private readonly paymentRepo: IPaymentRepository,
@ -31,6 +51,8 @@ export class PaymentService {
private readonly alipayAdapter: AlipayAdapter, private readonly alipayAdapter: AlipayAdapter,
private readonly wechatPayAdapter: WechatPayAdapter, private readonly wechatPayAdapter: WechatPayAdapter,
private readonly stripeAdapter: StripeAdapter, private readonly stripeAdapter: StripeAdapter,
private readonly transactionService: TransactionService,
private readonly dataSource: DataSource,
) {} ) {}
async createPayment(params: CreatePaymentParams): Promise<PaymentResult> { async createPayment(params: CreatePaymentParams): Promise<PaymentResult> {
@ -41,7 +63,9 @@ export class PaymentService {
} }
// Check for existing pending payment // Check for existing pending payment
const existingPayment = await this.paymentRepo.findPendingByOrderId(params.orderId); const existingPayment = await this.paymentRepo.findPendingByOrderId(
params.orderId,
);
if (existingPayment && !existingPayment.isExpired()) { if (existingPayment && !existingPayment.isExpired()) {
return { return {
@ -89,10 +113,25 @@ export class PaymentService {
paymentUrl, paymentUrl,
}); });
const savedPayment = await this.paymentRepo.save(payment); // 使用事务确保原子性:保存支付记录 + 更新订单状态
const savedPayment = await this.transactionService.runInTransaction(
async (manager: EntityManager) => {
// 保存支付记录
const paymentORM = this.toPaymentORM(payment);
await manager.save(PaymentORM, paymentORM);
// Update order status // 更新订单状态
await this.orderService.updateStatus(order.id, OrderStatus.PENDING_PAYMENT); await manager.update(OrderORM, order.id, {
status: OrderStatus.PENDING_PAYMENT,
});
this.logger.log(
`Payment created and order updated in transaction: paymentId=${payment.id}, orderId=${order.id}`,
);
return payment;
},
);
return { return {
paymentId: savedPayment.id, paymentId: savedPayment.id,
@ -115,6 +154,14 @@ export class PaymentService {
return payment; return payment;
} }
/**
* - 使
*
*
* 1. transaction_id
* 2. Payment Order
* 3. version
*/
async handleCallback( async handleCallback(
method: PaymentMethod, method: PaymentMethod,
payload: Record<string, unknown>, payload: Record<string, unknown>,
@ -149,31 +196,97 @@ export class PaymentService {
throw new BadRequestException('Unsupported payment method'); throw new BadRequestException('Unsupported payment method');
} }
// Find payment by order ID // 幂等性检查:是否已处理过此 transactionId
const payment = await this.paymentRepo.findPendingByOrderId(orderId); const existingByTxId =
await this.paymentRepo.findByTransactionId(transactionId);
if (!payment) { if (existingByTxId) {
throw new NotFoundException('Payment not found'); this.logger.warn(
`Duplicate callback ignored: transactionId=${transactionId}`,
);
return; // 已处理过,直接返回(幂等)
} }
// Update payment // 使用乐观锁事务处理,防止并发问题
if (success) { await this.transactionService.runWithOptimisticLock(
payment.markAsCompleted(transactionId, payload); async (manager: EntityManager) => {
await this.paymentRepo.update(payment); // 在事务中重新查询,获取最新版本
const paymentORM = await manager.findOne(PaymentORM, {
where: { orderId, status: PaymentStatus.PENDING },
lock: { mode: 'pessimistic_write' }, // 悲观锁,防止并发
});
// Update order if (!paymentORM) {
await this.orderService.markAsPaid(orderId, payment.id, method); throw new NotFoundException(
} else { `Pending payment not found for order: ${orderId}`,
payment.markAsFailed('Payment failed', payload); );
await this.paymentRepo.update(payment); }
}
const now = new Date();
if (success) {
// 更新支付状态
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await manager.update(PaymentORM, paymentORM.id, {
status: PaymentStatus.COMPLETED,
transactionId,
paidAt: now,
callbackPayload: payload,
} as any);
// 更新订单状态
await manager.update(OrderORM, orderId, {
status: OrderStatus.PAID,
paymentId: paymentORM.id,
paymentMethod: method,
paidAt: now,
});
this.logger.log(
`Payment completed in transaction: paymentId=${paymentORM.id}, orderId=${orderId}, transactionId=${transactionId}`,
);
} else {
// 支付失败
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await manager.update(PaymentORM, paymentORM.id, {
status: PaymentStatus.FAILED,
failedReason: 'Payment failed',
callbackPayload: payload,
} as any);
this.logger.warn(
`Payment failed: paymentId=${paymentORM.id}, orderId=${orderId}`,
);
}
},
3, // 最多重试 3 次
);
} }
async checkStatus(paymentId: string): Promise<{ status: PaymentStatus; paidAt?: Date }> { async checkStatus(
paymentId: string,
): Promise<{ status: PaymentStatus; paidAt?: Date }> {
const payment = await this.findById(paymentId); const payment = await this.findById(paymentId);
return { return {
status: payment.status, status: payment.status,
paidAt: payment.paidAt || undefined, paidAt: payment.paidAt || undefined,
}; };
} }
private toPaymentORM(entity: PaymentEntity): Partial<PaymentORM> {
return {
id: entity.id,
orderId: entity.orderId,
method: entity.method,
amount: entity.amount,
currency: entity.currency,
status: entity.status,
qrCodeUrl: entity.qrCodeUrl,
paymentUrl: entity.paymentUrl,
expiresAt: entity.expiresAt,
paidAt: entity.paidAt,
transactionId: entity.transactionId,
failedReason: entity.failedReason,
callbackPayload: entity.callbackPayload,
};
}
} }

View File

@ -7,6 +7,7 @@ export interface IPaymentRepository {
save(payment: PaymentEntity): Promise<PaymentEntity>; save(payment: PaymentEntity): Promise<PaymentEntity>;
findById(id: string): Promise<PaymentEntity | null>; findById(id: string): Promise<PaymentEntity | null>;
findPendingByOrderId(orderId: string): Promise<PaymentEntity | null>; findPendingByOrderId(orderId: string): Promise<PaymentEntity | null>;
findByTransactionId(transactionId: string): Promise<PaymentEntity | null>;
update(payment: PaymentEntity): Promise<PaymentEntity>; update(payment: PaymentEntity): Promise<PaymentEntity>;
} }

View File

@ -4,9 +4,12 @@ import {
Column, Column,
CreateDateColumn, CreateDateColumn,
UpdateDateColumn, UpdateDateColumn,
VersionColumn,
Index,
} from 'typeorm'; } from 'typeorm';
@Entity('orders') @Entity('orders')
@Index('idx_orders_user_status', ['userId', 'status'])
export class OrderORM { export class OrderORM {
@PrimaryGeneratedColumn('uuid') @PrimaryGeneratedColumn('uuid')
id: string; id: string;
@ -52,4 +55,7 @@ export class OrderORM {
@UpdateDateColumn({ name: 'updated_at' }) @UpdateDateColumn({ name: 'updated_at' })
updatedAt: Date; updatedAt: Date;
@VersionColumn()
version: number;
} }

View File

@ -4,9 +4,13 @@ import {
Column, Column,
CreateDateColumn, CreateDateColumn,
UpdateDateColumn, UpdateDateColumn,
VersionColumn,
Index,
} from 'typeorm'; } from 'typeorm';
@Entity('payments') @Entity('payments')
@Index('idx_payments_transaction_id', ['transactionId'], { unique: true, where: '"transaction_id" IS NOT NULL' })
@Index('idx_payments_order_status', ['orderId', 'status'])
export class PaymentORM { export class PaymentORM {
@PrimaryGeneratedColumn('uuid') @PrimaryGeneratedColumn('uuid')
id: string; id: string;
@ -52,4 +56,7 @@ export class PaymentORM {
@UpdateDateColumn({ name: 'updated_at' }) @UpdateDateColumn({ name: 'updated_at' })
updatedAt: Date; updatedAt: Date;
@VersionColumn()
version: number;
} }

View File

@ -0,0 +1,137 @@
import { Injectable } from '@nestjs/common';
import { DataSource, QueryRunner, EntityManager } from 'typeorm';
/**
* Transaction Service -
*
* TypeORM QueryRunner
* 参考: https://typeorm.io/docs/advanced-topics/transactions/
*/
@Injectable()
export class TransactionService {
constructor(private readonly dataSource: DataSource) {}
/**
* (使)
*
* @param operation - EntityManager
* @returns
* @throws
*
* @example
* ```typescript
* const result = await this.transactionService.runInTransaction(async (manager) => {
* await manager.save(PaymentORM, payment);
* await manager.update(OrderORM, orderId, { status: 'PAID' });
* return payment;
* });
* ```
*/
async runInTransaction<T>(
operation: (manager: EntityManager) => Promise<T>,
): Promise<T> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const result = await operation(queryRunner.manager);
await queryRunner.commitTransaction();
return result;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
// 重要:必须释放连接,否则会导致连接池耗尽
await queryRunner.release();
}
}
/**
* 使 QueryRunner ()
*
* @param operation - QueryRunner
* @returns
*
* @example
* ```typescript
* const result = await this.transactionService.runWithQueryRunner(async (queryRunner) => {
* // 可以使用 queryRunner.manager 进行操作
* // 也可以使用 queryRunner.query() 执行原始 SQL
* await queryRunner.manager.save(entity);
* return entity;
* });
* ```
*/
async runWithQueryRunner<T>(
operation: (queryRunner: QueryRunner) => Promise<T>,
): Promise<T> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const result = await operation(queryRunner);
await queryRunner.commitTransaction();
return result;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}
/**
* 使 ()
*
* @param operation -
* @param maxRetries - ( 3)
* @returns
* @throws
*
* @example
* ```typescript
* await this.transactionService.runWithOptimisticLock(async (manager) => {
* const payment = await manager.findOne(PaymentORM, { where: { id } });
* payment.status = 'COMPLETED';
* await manager.save(payment); // 如果 version 不匹配会抛出错误
* });
* ```
*/
async runWithOptimisticLock<T>(
operation: (manager: EntityManager) => Promise<T>,
maxRetries: number = 3,
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.runInTransaction(operation);
} catch (error: unknown) {
// TypeORM 乐观锁冲突错误
if (
error instanceof Error &&
(error.name === 'OptimisticLockVersionMismatchError' ||
error.message.includes('version'))
) {
lastError = error;
// 短暂延迟后重试
await this.delay(50 * attempt);
continue;
}
throw error;
}
}
throw new Error(
`Optimistic lock failed after ${maxRetries} retries: ${lastError?.message}`,
);
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

View File

@ -9,6 +9,7 @@ import { PaymentController } from '../adapters/inbound/payment.controller';
import { AlipayAdapter } from '../adapters/outbound/payment-methods/alipay.adapter'; import { AlipayAdapter } from '../adapters/outbound/payment-methods/alipay.adapter';
import { WechatPayAdapter } from '../adapters/outbound/payment-methods/wechat-pay.adapter'; import { WechatPayAdapter } from '../adapters/outbound/payment-methods/wechat-pay.adapter';
import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapter'; import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapter';
import { TransactionService } from '../infrastructure/database/transaction.service';
@Module({ @Module({
imports: [ imports: [
@ -18,6 +19,7 @@ import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapt
controllers: [PaymentController], controllers: [PaymentController],
providers: [ providers: [
PaymentService, PaymentService,
TransactionService,
{ {
provide: PAYMENT_REPOSITORY, provide: PAYMENT_REPOSITORY,
useClass: PaymentPostgresRepository, useClass: PaymentPostgresRepository,
@ -26,6 +28,6 @@ import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapt
WechatPayAdapter, WechatPayAdapter,
StripeAdapter, StripeAdapter,
], ],
exports: [PaymentService, PAYMENT_REPOSITORY], exports: [PaymentService, PAYMENT_REPOSITORY, TransactionService],
}) })
export class PaymentModule {} export class PaymentModule {}

View File

@ -209,7 +209,9 @@ CREATE TABLE orders (
-- 创建时间 -- 创建时间
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 更新时间 -- 更新时间
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 乐观锁版本号(防止并发更新冲突)
version INTEGER NOT NULL DEFAULT 1
); );
COMMENT ON TABLE orders IS '订单表 - 存储用户购买的服务订单'; COMMENT ON TABLE orders IS '订单表 - 存储用户购买的服务订单';
@ -223,6 +225,8 @@ CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_service_type ON orders(service_type); CREATE INDEX idx_orders_service_type ON orders(service_type);
CREATE INDEX idx_orders_created_at ON orders(created_at DESC); CREATE INDEX idx_orders_created_at ON orders(created_at DESC);
CREATE INDEX idx_orders_paid_at ON orders(paid_at) WHERE paid_at IS NOT NULL; CREATE INDEX idx_orders_paid_at ON orders(paid_at) WHERE paid_at IS NOT NULL;
-- 复合索引:用于按用户查询特定状态订单
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- =========================================== -- ===========================================
-- 支付表 (payments) -- 支付表 (payments)
@ -258,7 +262,9 @@ CREATE TABLE payments (
-- 创建时间 -- 创建时间
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 更新时间 -- 更新时间
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 乐观锁版本号(防止并发更新冲突)
version INTEGER NOT NULL DEFAULT 1
); );
COMMENT ON TABLE payments IS '支付表 - 存储支付交易记录'; COMMENT ON TABLE payments IS '支付表 - 存储支付交易记录';
@ -268,8 +274,11 @@ COMMENT ON COLUMN payments.callback_payload IS '支付回调的原始数据,
CREATE INDEX idx_payments_order_id ON payments(order_id); CREATE INDEX idx_payments_order_id ON payments(order_id);
CREATE INDEX idx_payments_status ON payments(status); CREATE INDEX idx_payments_status ON payments(status);
CREATE INDEX idx_payments_method ON payments(method); CREATE INDEX idx_payments_method ON payments(method);
CREATE INDEX idx_payments_transaction_id ON payments(transaction_id);
CREATE INDEX idx_payments_created_at ON payments(created_at); CREATE INDEX idx_payments_created_at ON payments(created_at);
-- 复合索引:用于按订单查询特定状态支付
CREATE INDEX idx_payments_order_status ON payments(order_id, status);
-- 唯一索引:确保 transaction_id 唯一性(幂等性保证,仅非空值)
CREATE UNIQUE INDEX idx_payments_transaction_id_unique ON payments(transaction_id) WHERE transaction_id IS NOT NULL;
-- =========================================== -- ===========================================
-- 分类账/财务流水表 (ledger_entries) -- 分类账/财务流水表 (ledger_entries)