diff --git a/packages/services/conversation-service/src/app.module.ts b/packages/services/conversation-service/src/app.module.ts index e86f852..529f2ba 100644 --- a/packages/services/conversation-service/src/app.module.ts +++ b/packages/services/conversation-service/src/app.module.ts @@ -27,7 +27,16 @@ import { HealthModule } from './health/health.module'; entities: [__dirname + '/**/*.orm{.ts,.js}'], // 生产环境禁用synchronize,使用init-db.sql初始化schema synchronize: false, - logging: configService.get('NODE_ENV') === 'development', + // 连接池配置 - 优化并发性能 + extra: { + max: configService.get('DB_POOL_SIZE', 20), + min: 2, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, + }, + logging: configService.get('NODE_ENV') === 'development' ? ['query', 'error'] : ['error'], + retryAttempts: 3, + retryDelay: 1000, }), }), diff --git a/packages/services/payment-service/src/adapters/outbound/persistence/payment-postgres.repository.ts b/packages/services/payment-service/src/adapters/outbound/persistence/payment-postgres.repository.ts index 7350f6c..853a70b 100644 --- a/packages/services/payment-service/src/adapters/outbound/persistence/payment-postgres.repository.ts +++ b/packages/services/payment-service/src/adapters/outbound/persistence/payment-postgres.repository.ts @@ -30,6 +30,13 @@ export class PaymentPostgresRepository implements IPaymentRepository { return orm ? this.toEntity(orm) : null; } + async findByTransactionId(transactionId: string): Promise { + const orm = await this.repo.findOne({ + where: { transactionId }, + }); + return orm ? this.toEntity(orm) : null; + } + async update(payment: PaymentEntity): Promise { const orm = this.toORM(payment); const saved = await this.repo.save(orm); diff --git a/packages/services/payment-service/src/app.module.ts b/packages/services/payment-service/src/app.module.ts index f9f242e..5d8c8ff 100644 --- a/packages/services/payment-service/src/app.module.ts +++ b/packages/services/payment-service/src/app.module.ts @@ -23,7 +23,24 @@ import { HealthModule } from './health/health.module'; password: configService.get('POSTGRES_PASSWORD'), database: configService.get('POSTGRES_DB', 'iconsulting'), entities: [__dirname + '/**/*.orm{.ts,.js}'], - synchronize: configService.get('NODE_ENV') === 'development', + // 生产环境禁用 synchronize,使用 init-db.sql 初始化 + synchronize: false, + // 连接池配置 - 优化并发性能 + extra: { + // 连接池最大连接数 + max: configService.get('DB_POOL_SIZE', 20), + // 连接池最小连接数 + min: 2, + // 空闲连接超时时间 (ms) + idleTimeoutMillis: 30000, + // 连接超时时间 (ms) + connectionTimeoutMillis: 5000, + }, + // 日志配置 + logging: configService.get('NODE_ENV') === 'development' ? ['query', 'error'] : ['error'], + // 重试策略 + retryAttempts: 3, + retryDelay: 1000, }), }), diff --git a/packages/services/payment-service/src/application/services/payment.service.ts b/packages/services/payment-service/src/application/services/payment.service.ts index 7dc8536..343718d 100644 --- a/packages/services/payment-service/src/application/services/payment.service.ts +++ b/packages/services/payment-service/src/application/services/payment.service.ts @@ -1,11 +1,29 @@ -import { Injectable, Inject, NotFoundException, BadRequestException } from '@nestjs/common'; -import { PaymentEntity, PaymentMethod, PaymentStatus } from '../../domain/entities/payment.entity'; +import { + Injectable, + Inject, + NotFoundException, + BadRequestException, + ConflictException, + Logger, +} from '@nestjs/common'; +import { DataSource, EntityManager } from 'typeorm'; +import { + PaymentEntity, + PaymentMethod, + PaymentStatus, +} from '../../domain/entities/payment.entity'; import { OrderStatus } from '../../domain/entities/order.entity'; -import { IPaymentRepository, PAYMENT_REPOSITORY } from '../../domain/repositories/payment.repository.interface'; +import { + IPaymentRepository, + PAYMENT_REPOSITORY, +} from '../../domain/repositories/payment.repository.interface'; import { OrderService } from './order.service'; import { AlipayAdapter } from '../../adapters/outbound/payment-methods/alipay.adapter'; import { WechatPayAdapter } from '../../adapters/outbound/payment-methods/wechat-pay.adapter'; import { StripeAdapter } from '../../adapters/outbound/payment-methods/stripe.adapter'; +import { TransactionService } from '../../infrastructure/database/transaction.service'; +import { PaymentORM } from '../../infrastructure/database/postgres/entities/payment.orm'; +import { OrderORM } from '../../infrastructure/database/postgres/entities/order.orm'; export interface CreatePaymentParams { orderId: string; @@ -24,6 +42,8 @@ export interface PaymentResult { @Injectable() export class PaymentService { + private readonly logger = new Logger(PaymentService.name); + constructor( @Inject(PAYMENT_REPOSITORY) private readonly paymentRepo: IPaymentRepository, @@ -31,6 +51,8 @@ export class PaymentService { private readonly alipayAdapter: AlipayAdapter, private readonly wechatPayAdapter: WechatPayAdapter, private readonly stripeAdapter: StripeAdapter, + private readonly transactionService: TransactionService, + private readonly dataSource: DataSource, ) {} async createPayment(params: CreatePaymentParams): Promise { @@ -41,7 +63,9 @@ export class PaymentService { } // Check for existing pending payment - const existingPayment = await this.paymentRepo.findPendingByOrderId(params.orderId); + const existingPayment = await this.paymentRepo.findPendingByOrderId( + params.orderId, + ); if (existingPayment && !existingPayment.isExpired()) { return { @@ -89,10 +113,25 @@ export class PaymentService { paymentUrl, }); - const savedPayment = await this.paymentRepo.save(payment); + // 使用事务确保原子性:保存支付记录 + 更新订单状态 + const savedPayment = await this.transactionService.runInTransaction( + async (manager: EntityManager) => { + // 保存支付记录 + const paymentORM = this.toPaymentORM(payment); + await manager.save(PaymentORM, paymentORM); - // Update order status - await this.orderService.updateStatus(order.id, OrderStatus.PENDING_PAYMENT); + // 更新订单状态 + await manager.update(OrderORM, order.id, { + status: OrderStatus.PENDING_PAYMENT, + }); + + this.logger.log( + `Payment created and order updated in transaction: paymentId=${payment.id}, orderId=${order.id}`, + ); + + return payment; + }, + ); return { paymentId: savedPayment.id, @@ -115,6 +154,14 @@ export class PaymentService { return payment; } + /** + * 处理支付回调 - 使用事务和幂等性保证 + * + * 关键保障: + * 1. 幂等性:通过 transaction_id 唯一索引防止重复处理 + * 2. 事务:Payment 和 Order 更新在同一事务中 + * 3. 乐观锁:version 列防止并发更新冲突 + */ async handleCallback( method: PaymentMethod, payload: Record, @@ -149,31 +196,97 @@ export class PaymentService { throw new BadRequestException('Unsupported payment method'); } - // Find payment by order ID - const payment = await this.paymentRepo.findPendingByOrderId(orderId); - - if (!payment) { - throw new NotFoundException('Payment not found'); + // 幂等性检查:是否已处理过此 transactionId + const existingByTxId = + await this.paymentRepo.findByTransactionId(transactionId); + if (existingByTxId) { + this.logger.warn( + `Duplicate callback ignored: transactionId=${transactionId}`, + ); + return; // 已处理过,直接返回(幂等) } - // Update payment - if (success) { - payment.markAsCompleted(transactionId, payload); - await this.paymentRepo.update(payment); + // 使用乐观锁事务处理,防止并发问题 + await this.transactionService.runWithOptimisticLock( + async (manager: EntityManager) => { + // 在事务中重新查询,获取最新版本 + const paymentORM = await manager.findOne(PaymentORM, { + where: { orderId, status: PaymentStatus.PENDING }, + lock: { mode: 'pessimistic_write' }, // 悲观锁,防止并发 + }); - // Update order - await this.orderService.markAsPaid(orderId, payment.id, method); - } else { - payment.markAsFailed('Payment failed', payload); - await this.paymentRepo.update(payment); - } + if (!paymentORM) { + throw new NotFoundException( + `Pending payment not found for order: ${orderId}`, + ); + } + + const now = new Date(); + + if (success) { + // 更新支付状态 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await manager.update(PaymentORM, paymentORM.id, { + status: PaymentStatus.COMPLETED, + transactionId, + paidAt: now, + callbackPayload: payload, + } as any); + + // 更新订单状态 + await manager.update(OrderORM, orderId, { + status: OrderStatus.PAID, + paymentId: paymentORM.id, + paymentMethod: method, + paidAt: now, + }); + + this.logger.log( + `Payment completed in transaction: paymentId=${paymentORM.id}, orderId=${orderId}, transactionId=${transactionId}`, + ); + } else { + // 支付失败 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await manager.update(PaymentORM, paymentORM.id, { + status: PaymentStatus.FAILED, + failedReason: 'Payment failed', + callbackPayload: payload, + } as any); + + this.logger.warn( + `Payment failed: paymentId=${paymentORM.id}, orderId=${orderId}`, + ); + } + }, + 3, // 最多重试 3 次 + ); } - async checkStatus(paymentId: string): Promise<{ status: PaymentStatus; paidAt?: Date }> { + async checkStatus( + paymentId: string, + ): Promise<{ status: PaymentStatus; paidAt?: Date }> { const payment = await this.findById(paymentId); return { status: payment.status, paidAt: payment.paidAt || undefined, }; } + + private toPaymentORM(entity: PaymentEntity): Partial { + return { + id: entity.id, + orderId: entity.orderId, + method: entity.method, + amount: entity.amount, + currency: entity.currency, + status: entity.status, + qrCodeUrl: entity.qrCodeUrl, + paymentUrl: entity.paymentUrl, + expiresAt: entity.expiresAt, + paidAt: entity.paidAt, + transactionId: entity.transactionId, + failedReason: entity.failedReason, + callbackPayload: entity.callbackPayload, + }; + } } diff --git a/packages/services/payment-service/src/domain/repositories/payment.repository.interface.ts b/packages/services/payment-service/src/domain/repositories/payment.repository.interface.ts index f0b2ad2..2b599f4 100644 --- a/packages/services/payment-service/src/domain/repositories/payment.repository.interface.ts +++ b/packages/services/payment-service/src/domain/repositories/payment.repository.interface.ts @@ -7,6 +7,7 @@ export interface IPaymentRepository { save(payment: PaymentEntity): Promise; findById(id: string): Promise; findPendingByOrderId(orderId: string): Promise; + findByTransactionId(transactionId: string): Promise; update(payment: PaymentEntity): Promise; } diff --git a/packages/services/payment-service/src/infrastructure/database/postgres/entities/order.orm.ts b/packages/services/payment-service/src/infrastructure/database/postgres/entities/order.orm.ts index d733898..0737e65 100644 --- a/packages/services/payment-service/src/infrastructure/database/postgres/entities/order.orm.ts +++ b/packages/services/payment-service/src/infrastructure/database/postgres/entities/order.orm.ts @@ -4,9 +4,12 @@ import { Column, CreateDateColumn, UpdateDateColumn, + VersionColumn, + Index, } from 'typeorm'; @Entity('orders') +@Index('idx_orders_user_status', ['userId', 'status']) export class OrderORM { @PrimaryGeneratedColumn('uuid') id: string; @@ -52,4 +55,7 @@ export class OrderORM { @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; + + @VersionColumn() + version: number; } diff --git a/packages/services/payment-service/src/infrastructure/database/postgres/entities/payment.orm.ts b/packages/services/payment-service/src/infrastructure/database/postgres/entities/payment.orm.ts index 5633358..857bfba 100644 --- a/packages/services/payment-service/src/infrastructure/database/postgres/entities/payment.orm.ts +++ b/packages/services/payment-service/src/infrastructure/database/postgres/entities/payment.orm.ts @@ -4,9 +4,13 @@ import { Column, CreateDateColumn, UpdateDateColumn, + VersionColumn, + Index, } from 'typeorm'; @Entity('payments') +@Index('idx_payments_transaction_id', ['transactionId'], { unique: true, where: '"transaction_id" IS NOT NULL' }) +@Index('idx_payments_order_status', ['orderId', 'status']) export class PaymentORM { @PrimaryGeneratedColumn('uuid') id: string; @@ -52,4 +56,7 @@ export class PaymentORM { @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; + + @VersionColumn() + version: number; } diff --git a/packages/services/payment-service/src/infrastructure/database/transaction.service.ts b/packages/services/payment-service/src/infrastructure/database/transaction.service.ts new file mode 100644 index 0000000..8792ff8 --- /dev/null +++ b/packages/services/payment-service/src/infrastructure/database/transaction.service.ts @@ -0,0 +1,137 @@ +import { Injectable } from '@nestjs/common'; +import { DataSource, QueryRunner, EntityManager } from 'typeorm'; + +/** + * Transaction Service - 提供可靠的事务管理 + * + * 基于 TypeORM 官方推荐的 QueryRunner 模式 + * 参考: https://typeorm.io/docs/advanced-topics/transactions/ + */ +@Injectable() +export class TransactionService { + constructor(private readonly dataSource: DataSource) {} + + /** + * 在事务中执行操作 (推荐使用) + * + * @param operation - 接收 EntityManager 的异步操作函数 + * @returns 操作的返回值 + * @throws 如果操作失败,事务会自动回滚并抛出原始错误 + * + * @example + * ```typescript + * const result = await this.transactionService.runInTransaction(async (manager) => { + * await manager.save(PaymentORM, payment); + * await manager.update(OrderORM, orderId, { status: 'PAID' }); + * return payment; + * }); + * ``` + */ + async runInTransaction( + operation: (manager: EntityManager) => Promise, + ): Promise { + const queryRunner = this.dataSource.createQueryRunner(); + + await queryRunner.connect(); + await queryRunner.startTransaction(); + + try { + const result = await operation(queryRunner.manager); + await queryRunner.commitTransaction(); + return result; + } catch (error) { + await queryRunner.rollbackTransaction(); + throw error; + } finally { + // 重要:必须释放连接,否则会导致连接池耗尽 + await queryRunner.release(); + } + } + + /** + * 使用 QueryRunner 执行事务 (用于需要更多控制的场景) + * + * @param operation - 接收 QueryRunner 的异步操作函数 + * @returns 操作的返回值 + * + * @example + * ```typescript + * const result = await this.transactionService.runWithQueryRunner(async (queryRunner) => { + * // 可以使用 queryRunner.manager 进行操作 + * // 也可以使用 queryRunner.query() 执行原始 SQL + * await queryRunner.manager.save(entity); + * return entity; + * }); + * ``` + */ + async runWithQueryRunner( + operation: (queryRunner: QueryRunner) => Promise, + ): Promise { + const queryRunner = this.dataSource.createQueryRunner(); + + await queryRunner.connect(); + await queryRunner.startTransaction(); + + try { + const result = await operation(queryRunner); + await queryRunner.commitTransaction(); + return result; + } catch (error) { + await queryRunner.rollbackTransaction(); + throw error; + } finally { + await queryRunner.release(); + } + } + + /** + * 使用乐观锁执行更新 (防止并发覆盖) + * + * @param operation - 异步操作函数 + * @param maxRetries - 最大重试次数 (默认 3) + * @returns 操作的返回值 + * @throws 如果达到最大重试次数仍失败 + * + * @example + * ```typescript + * await this.transactionService.runWithOptimisticLock(async (manager) => { + * const payment = await manager.findOne(PaymentORM, { where: { id } }); + * payment.status = 'COMPLETED'; + * await manager.save(payment); // 如果 version 不匹配会抛出错误 + * }); + * ``` + */ + async runWithOptimisticLock( + operation: (manager: EntityManager) => Promise, + maxRetries: number = 3, + ): Promise { + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + return await this.runInTransaction(operation); + } catch (error: unknown) { + // TypeORM 乐观锁冲突错误 + if ( + error instanceof Error && + (error.name === 'OptimisticLockVersionMismatchError' || + error.message.includes('version')) + ) { + lastError = error; + // 短暂延迟后重试 + await this.delay(50 * attempt); + continue; + } + throw error; + } + } + + throw new Error( + `Optimistic lock failed after ${maxRetries} retries: ${lastError?.message}`, + ); + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/packages/services/payment-service/src/payment/payment.module.ts b/packages/services/payment-service/src/payment/payment.module.ts index 15be464..9ec6c0b 100644 --- a/packages/services/payment-service/src/payment/payment.module.ts +++ b/packages/services/payment-service/src/payment/payment.module.ts @@ -9,6 +9,7 @@ import { PaymentController } from '../adapters/inbound/payment.controller'; import { AlipayAdapter } from '../adapters/outbound/payment-methods/alipay.adapter'; import { WechatPayAdapter } from '../adapters/outbound/payment-methods/wechat-pay.adapter'; import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapter'; +import { TransactionService } from '../infrastructure/database/transaction.service'; @Module({ imports: [ @@ -18,6 +19,7 @@ import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapt controllers: [PaymentController], providers: [ PaymentService, + TransactionService, { provide: PAYMENT_REPOSITORY, useClass: PaymentPostgresRepository, @@ -26,6 +28,6 @@ import { StripeAdapter } from '../adapters/outbound/payment-methods/stripe.adapt WechatPayAdapter, StripeAdapter, ], - exports: [PaymentService, PAYMENT_REPOSITORY], + exports: [PaymentService, PAYMENT_REPOSITORY, TransactionService], }) export class PaymentModule {} diff --git a/scripts/init-db.sql b/scripts/init-db.sql index a595ab7..2705c60 100644 --- a/scripts/init-db.sql +++ b/scripts/init-db.sql @@ -209,7 +209,9 @@ CREATE TABLE orders ( -- 创建时间 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), -- 更新时间 - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + -- 乐观锁版本号(防止并发更新冲突) + version INTEGER NOT NULL DEFAULT 1 ); COMMENT ON TABLE orders IS '订单表 - 存储用户购买的服务订单'; @@ -223,6 +225,8 @@ CREATE INDEX idx_orders_status ON orders(status); CREATE INDEX idx_orders_service_type ON orders(service_type); CREATE INDEX idx_orders_created_at ON orders(created_at DESC); CREATE INDEX idx_orders_paid_at ON orders(paid_at) WHERE paid_at IS NOT NULL; +-- 复合索引:用于按用户查询特定状态订单 +CREATE INDEX idx_orders_user_status ON orders(user_id, status); -- =========================================== -- 支付表 (payments) @@ -258,7 +262,9 @@ CREATE TABLE payments ( -- 创建时间 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), -- 更新时间 - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + -- 乐观锁版本号(防止并发更新冲突) + version INTEGER NOT NULL DEFAULT 1 ); COMMENT ON TABLE payments IS '支付表 - 存储支付交易记录'; @@ -268,8 +274,11 @@ COMMENT ON COLUMN payments.callback_payload IS '支付回调的原始数据, CREATE INDEX idx_payments_order_id ON payments(order_id); CREATE INDEX idx_payments_status ON payments(status); CREATE INDEX idx_payments_method ON payments(method); -CREATE INDEX idx_payments_transaction_id ON payments(transaction_id); CREATE INDEX idx_payments_created_at ON payments(created_at); +-- 复合索引:用于按订单查询特定状态支付 +CREATE INDEX idx_payments_order_status ON payments(order_id, status); +-- 唯一索引:确保 transaction_id 唯一性(幂等性保证,仅非空值) +CREATE UNIQUE INDEX idx_payments_transaction_id_unique ON payments(transaction_id) WHERE transaction_id IS NOT NULL; -- =========================================== -- 分类账/财务流水表 (ledger_entries)