diff --git a/backend/services/wallet-service/prisma/migrations/20260107100000_add_outbox_events/migration.sql b/backend/services/wallet-service/prisma/migrations/20260107100000_add_outbox_events/migration.sql new file mode 100644 index 00000000..ef864fa3 --- /dev/null +++ b/backend/services/wallet-service/prisma/migrations/20260107100000_add_outbox_events/migration.sql @@ -0,0 +1,53 @@ +-- ============================================ +-- Outbox 事件表 (可靠事件发布) +-- [2026-01-07] 新增:实现 Outbox Pattern 确保事件发布的原子性 +-- +-- 用途: +-- 1. 业务操作和事件写入在同一个事务中完成 +-- 2. 后台任务轮询此表发布事件到 Kafka +-- 3. 保证事件不丢失,即使 Kafka 暂时不可用 +-- ============================================ + +CREATE TABLE IF NOT EXISTS "outbox_events" ( + "outbox_id" BIGSERIAL PRIMARY KEY, + + -- 事件信息 + "event_type" VARCHAR(100) NOT NULL, -- 事件类型 (如 wallet.system-withdrawal.requested) + "topic" VARCHAR(100) NOT NULL, -- Kafka topic + "key" VARCHAR(200) NOT NULL, -- Kafka message key (通常是 orderNo 或 aggregateId) + "payload" JSONB NOT NULL, -- 事件载荷 (JSON 格式) + + -- 聚合根信息 (用于幂等性检查和事件确认) + "aggregate_id" VARCHAR(100) NOT NULL, -- 聚合根ID (如 orderNo) + "aggregate_type" VARCHAR(50) NOT NULL, -- 聚合根类型 (如 SystemWithdrawalOrder) + + -- 发布状态 + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING, SENT, CONFIRMED, FAILED + "retry_count" INT NOT NULL DEFAULT 0, -- 重试次数 + "max_retries" INT NOT NULL DEFAULT 5, -- 最大重试次数 + "last_error" TEXT, -- 最后一次错误信息 + + -- 时间戳 + "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间 + "published_at" TIMESTAMP, -- 发布到 Kafka 的时间 + "next_retry_at" TIMESTAMP -- 下次重试时间 (失败时设置) +); + +-- 索引 +-- 1. 状态+创建时间:用于轮询待发布事件 +CREATE INDEX "idx_outbox_status_created" ON "outbox_events" ("status", "created_at"); + +-- 2. 状态+下次重试时间:用于查找需要重试的事件 +CREATE INDEX "idx_outbox_status_next_retry" ON "outbox_events" ("status", "next_retry_at"); + +-- 3. 聚合根类型+ID:用于按聚合根查找事件(确认时使用) +CREATE INDEX "idx_outbox_aggregate" ON "outbox_events" ("aggregate_type", "aggregate_id"); + +-- 4. Topic:用于按主题统计 +CREATE INDEX "idx_outbox_topic" ON "outbox_events" ("topic"); + +-- 添加注释 +COMMENT ON TABLE "outbox_events" IS 'Outbox 事件表 - 实现可靠的事件发布机制'; +COMMENT ON COLUMN "outbox_events"."status" IS 'PENDING=待发送, SENT=已发送待确认, CONFIRMED=已确认, FAILED=失败'; +COMMENT ON COLUMN "outbox_events"."retry_count" IS '当前重试次数,达到 max_retries 后停止重试'; +COMMENT ON COLUMN "outbox_events"."next_retry_at" IS '使用指数退避策略计算的下次重试时间'; diff --git a/backend/services/wallet-service/prisma/schema.prisma b/backend/services/wallet-service/prisma/schema.prisma index 848598a3..2e9d11bd 100644 --- a/backend/services/wallet-service/prisma/schema.prisma +++ b/backend/services/wallet-service/prisma/schema.prisma @@ -463,3 +463,38 @@ model OfflineSettlementDeduction { @@index([deductionOrderNo]) @@index([createdAt]) } + +// ============================================ +// Outbox 事件表 (可靠事件发布) +// [2026-01-07] 新增:实现 Outbox Pattern 确保事件发布的原子性 +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + // 聚合根信息 (用于幂等性检查和事件确认) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + // 发布状态: PENDING(待发送), SENT(已发送待确认), CONFIRMED(已确认), FAILED(失败) + status String @default("PENDING") @map("status") @db.VarChar(20) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") @db.Text + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@index([status, createdAt]) + @@index([status, nextRetryAt]) + @@index([aggregateType, aggregateId]) + @@index([topic]) + @@map("outbox_events") +} diff --git a/backend/services/wallet-service/src/application/services/system-withdrawal-application.service.ts b/backend/services/wallet-service/src/application/services/system-withdrawal-application.service.ts index 0cfdb6f8..551a4d79 100644 --- a/backend/services/wallet-service/src/application/services/system-withdrawal-application.service.ts +++ b/backend/services/wallet-service/src/application/services/system-withdrawal-application.service.ts @@ -5,14 +5,23 @@ * - 支持从固定系统账户(总部、运营、积分股池等)转出 * - 支持从区域账户(省区域、市区域)转出 * - 记录双边流水(系统账户转出 + 用户账户转入) + * + * [2026-01-07] 更新:使用 Outbox Pattern 确保事件发布的原子性 + * - 事件写入 outbox_events 表与业务数据在同一个事务中 + * - 后台任务轮询 outbox 表发布事件到 Kafka + * - 保证事件不丢失,即使 Kafka 暂时不可用 */ -import { Injectable, Logger, BadRequestException, Inject } from '@nestjs/common'; +import { Injectable, Logger, BadRequestException } from '@nestjs/common'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; -import { EventPublisherService } from '@/infrastructure/kafka'; import { IdentityClientService } from '@/infrastructure/external/identity/identity-client.service'; import { HotWalletCacheService } from '@/infrastructure/redis'; import { LedgerEntryType } from '@/domain/value-objects/ledger-entry-type.enum'; +// [2026-01-07] 新增:使用 Outbox Pattern +import { + OutboxRepository, + OutboxEventData, +} from '@/infrastructure/persistence/repositories/outbox.repository'; import Decimal from 'decimal.js'; // 系统账户名称映射 @@ -61,7 +70,8 @@ export class SystemWithdrawalApplicationService { constructor( private readonly prisma: PrismaService, - private readonly eventPublisher: EventPublisherService, + // [2026-01-07] 更新:使用 OutboxRepository 替代直接的 EventPublisher + private readonly outboxRepository: OutboxRepository, private readonly identityClient: IdentityClientService, private readonly hotWalletCacheService: HotWalletCacheService, ) {} @@ -229,6 +239,34 @@ export class SystemWithdrawalApplicationService { }, }); + // 6.6 [2026-01-07] 新增:在事务中写入 Outbox 事件 + // 使用 Outbox Pattern 确保事件发布的原子性 + const outboxEvent: OutboxEventData = { + eventType: 'wallet.system-withdrawal.requested', + topic: 'wallet.system-withdrawals', + key: orderNo, + aggregateId: orderNo, + aggregateType: 'SystemWithdrawalOrder', + payload: { + eventId: `${Date.now()}-${Math.random().toString(36).substring(2, 9)}`, + eventType: 'wallet.system-withdrawal.requested', + occurredAt: new Date().toISOString(), + payload: { + orderNo, + fromAccountSequence: command.fromAccountSequence, + fromAccountName, + toAccountSequence: command.toAccountSequence, + toAddress: toUserInfo.walletAddress, + amount: command.amount.toString(), + chainType: 'KAVA', + }, + }, + }; + + await this.outboxRepository.saveInTransaction(tx, [outboxEvent]); + + this.logger.log(`[SYSTEM_WITHDRAWAL] 转出订单和 Outbox 事件已写入事务: ${orderNo}`); + return { orderNo: order.orderNo, fromAccountSequence: command.fromAccountSequence, @@ -241,21 +279,9 @@ export class SystemWithdrawalApplicationService { }; }); - // 7. 发布事件通知 blockchain-service 执行链上转账 - await this.eventPublisher.publish({ - eventType: 'wallet.system-withdrawal.requested', - payload: { - orderNo: result.orderNo, - fromAccountSequence: result.fromAccountSequence, - fromAccountName: result.fromAccountName, - toAccountSequence: result.toAccountSequence, - toAddress: result.toAddress, - amount: command.amount.toString(), - chainType: 'KAVA', - }, - }); - - this.logger.log(`[SYSTEM_WITHDRAWAL] 转出订单创建成功: ${result.orderNo}`); + // 7. [2026-01-07] 更新:事件已在事务中写入 Outbox 表 + // 后台 OutboxPublisher 服务会自动轮询并发布到 Kafka + this.logger.log(`[SYSTEM_WITHDRAWAL] 转出订单创建成功: ${result.orderNo}, 事件将由 OutboxPublisher 发布`); return result; } diff --git a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts index 151b8a9c..964e163e 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts @@ -5,6 +5,9 @@ import { EventPublisherService } from './event-publisher.service'; import { DepositEventConsumerService } from './deposit-event-consumer.service'; import { PlantingEventConsumerService } from './planting-event-consumer.service'; import { WithdrawalEventConsumerService } from './withdrawal-event-consumer.service'; +// [2026-01-07] 新增:Outbox Pattern 实现 +import { OutboxPublisherService } from './outbox-publisher.service'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; // [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息 // import { RewardEventConsumerController } from './reward-event-consumer.controller'; // import { EventAckPublisher } from './event-ack.publisher'; @@ -36,7 +39,25 @@ import { PrismaService } from '../persistence/prisma/prisma.service'; // [已屏蔽] 前端直接从 reward-service 查询,不再订阅 reward-service 消息 // controllers: [RewardEventConsumerController], controllers: [], - providers: [PrismaService, EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService, WithdrawalEventConsumerService], - exports: [EventPublisherService, DepositEventConsumerService, PlantingEventConsumerService, WithdrawalEventConsumerService, ClientsModule], + providers: [ + PrismaService, + EventPublisherService, + DepositEventConsumerService, + PlantingEventConsumerService, + WithdrawalEventConsumerService, + // [2026-01-07] 新增:Outbox Pattern + OutboxRepository, + OutboxPublisherService, + ], + exports: [ + EventPublisherService, + DepositEventConsumerService, + PlantingEventConsumerService, + WithdrawalEventConsumerService, + ClientsModule, + // [2026-01-07] 新增:导出 Outbox 相关服务 + OutboxRepository, + OutboxPublisherService, + ], }) export class KafkaModule {} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/outbox-publisher.service.ts b/backend/services/wallet-service/src/infrastructure/kafka/outbox-publisher.service.ts new file mode 100644 index 00000000..d5fb1be7 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/kafka/outbox-publisher.service.ts @@ -0,0 +1,352 @@ +/** + * Outbox Publisher Service + * [2026-01-07] 新增:实现 Outbox Pattern 确保事件发布的原子性和可靠性 + * + * 轮询 Outbox 表并发布事件到 Kafka + * 使用消费方确认机制保证事件100%被处理 + * + * 工作流程: + * 1. 轮询 PENDING 状态的事件 + * 2. 发送到 Kafka,标记为 SENT(等待确认) + * 3. 消费方处理成功后发送确认事件(如 blockchain.system-withdrawal.confirmed) + * 4. 收到确认后标记为 CONFIRMED + * 5. 超时未确认的事件重置为 PENDING 重发 + */ + +import { + Injectable, + Logger, + OnModuleInit, + OnModuleDestroy, +} from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Producer, logLevel } from 'kafkajs'; +import { + OutboxRepository, + OutboxEvent, +} from '../persistence/repositories/outbox.repository'; + +@Injectable() +export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxPublisherService.name); + private kafka: Kafka; + private producer: Producer; + private isRunning = false; + private pollInterval: NodeJS.Timeout | null = null; + private timeoutCheckInterval: NodeJS.Timeout | null = null; + private cleanupInterval: NodeJS.Timeout | null = null; + private isConnected = false; + + // 配置 + private readonly pollIntervalMs: number; + private readonly batchSize: number; + private readonly cleanupIntervalMs: number; + private readonly confirmationTimeoutMinutes: number; + private readonly timeoutCheckIntervalMs: number; + + constructor( + private readonly outboxRepository: OutboxRepository, + private readonly configService: ConfigService, + ) { + this.pollIntervalMs = this.configService.get( + 'OUTBOX_POLL_INTERVAL_MS', + 1000, + ); + this.batchSize = this.configService.get('OUTBOX_BATCH_SIZE', 100); + this.cleanupIntervalMs = this.configService.get( + 'OUTBOX_CLEANUP_INTERVAL_MS', + 3600000, + ); // 1小时 + this.confirmationTimeoutMinutes = this.configService.get( + 'OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', + 5, + ); + this.timeoutCheckIntervalMs = this.configService.get( + 'OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', + 60000, + ); // 1分钟 + + this.logger.log( + `[OUTBOX] OutboxPublisher configured: ` + + `pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` + + `confirmationTimeout=${this.confirmationTimeoutMinutes}min`, + ); + } + + async onModuleInit() { + const brokers = + this.configService.get('KAFKA_BROKERS')?.split(',') || [ + 'localhost:9092', + ]; + const clientId = + this.configService.get('KAFKA_CLIENT_ID') || 'wallet-service'; + + this.logger.log('[OUTBOX] Connecting to Kafka...'); + + this.kafka = new Kafka({ + clientId: `${clientId}-outbox`, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.producer = this.kafka.producer(); + + try { + await this.producer.connect(); + this.isConnected = true; + this.logger.log('[OUTBOX] Connected to Kafka'); + this.start(); + } catch (error) { + this.logger.error('[OUTBOX] Failed to connect to Kafka:', error); + this.logger.warn( + '[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table', + ); + } + } + + async onModuleDestroy() { + this.stop(); + if (this.isConnected) { + await this.producer.disconnect(); + } + } + + /** + * 启动轮询 + */ + start(): void { + if (this.isRunning) { + this.logger.warn('[OUTBOX] Publisher already running'); + return; + } + + this.isRunning = true; + this.logger.log('[OUTBOX] Starting outbox publisher...'); + + // 启动发布轮询 + this.pollInterval = setInterval(() => { + this.processOutbox().catch((err) => { + this.logger.error('[OUTBOX] Error processing outbox:', err); + }); + }, this.pollIntervalMs); + + // 启动超时检查任务 + this.timeoutCheckInterval = setInterval(() => { + this.checkConfirmationTimeouts().catch((err) => { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err); + }); + }, this.timeoutCheckIntervalMs); + + // 启动清理任务 + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + this.logger.error('[OUTBOX] Error cleaning up outbox:', err); + }); + }, this.cleanupIntervalMs); + + this.logger.log('[OUTBOX] Outbox publisher started (消费方确认模式)'); + } + + /** + * 停止轮询 + */ + stop(): void { + if (!this.isRunning) return; + + this.isRunning = false; + + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + if (this.timeoutCheckInterval) { + clearInterval(this.timeoutCheckInterval); + this.timeoutCheckInterval = null; + } + + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + + this.logger.log('[OUTBOX] Outbox publisher stopped'); + } + + /** + * 处理 Outbox 事件 + */ + async processOutbox(): Promise { + if (!this.isConnected) { + return; + } + + try { + // 1. 获取待发布事件 + const pendingEvents = + await this.outboxRepository.findPendingEvents(this.batchSize); + + // 2. 获取需要重试的事件 + const retryEvents = await this.outboxRepository.findEventsForRetry( + Math.floor(this.batchSize / 2), + ); + + const allEvents = [...pendingEvents, ...retryEvents]; + + if (allEvents.length === 0) { + return; + } + + this.logger.debug( + `[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`, + ); + + // 3. 逐个发布 + for (const event of allEvents) { + await this.publishEvent(event); + } + } catch (error) { + this.logger.error('[OUTBOX] Error in processOutbox:', error); + } + } + + /** + * 发布单个事件 + * + * 使用 producer.send() 发送到 Kafka,成功后标记为 SENT(等待消费方确认) + * 只有收到消费方确认后才标记为 CONFIRMED + */ + private async publishEvent(event: OutboxEvent): Promise { + try { + this.logger.debug( + `[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`, + ); + + // 构造 Kafka 消息,包含 outbox 元数据用于确认 + const payload = { + ...(event.payload as Record), + _outbox: { + id: event.id.toString(), + aggregateId: event.aggregateId, + eventType: event.eventType, + }, + }; + + const message = { + key: event.key, + value: JSON.stringify(payload), + headers: { + eventType: event.eventType, + source: 'wallet-service', + outboxId: event.id.toString(), + }, + }; + + // 发布到 Kafka + await this.producer.send({ + topic: event.topic, + messages: [message], + }); + + // 标记为 SENT(等待消费方确认) + await this.outboxRepository.markAsSent(event.id); + + this.logger.log( + `[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`, + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + this.logger.error( + `[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`, + ); + + // 标记为失败并安排重试 + await this.outboxRepository.markAsFailed(event.id, errorMessage); + } + } + + /** + * 检查确认超时的事件 + * + * 将超时未确认的 SENT 事件重置为 PENDING 以便重发 + */ + private async checkConfirmationTimeouts(): Promise { + if (!this.isConnected) { + return; + } + + try { + const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut( + this.confirmationTimeoutMinutes, + this.batchSize, + ); + + if (timedOutEvents.length === 0) { + return; + } + + this.logger.warn( + `[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`, + ); + + for (const event of timedOutEvents) { + await this.outboxRepository.resetSentToPending(event.id); + this.logger.warn( + `[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`, + ); + } + } catch (error) { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error); + } + } + + /** + * 清理旧事件 + */ + private async cleanup(): Promise { + const retentionDays = this.configService.get( + 'OUTBOX_RETENTION_DAYS', + 7, + ); + await this.outboxRepository.cleanupOldEvents(retentionDays); + } + + /** + * 手动触发处理(用于测试或紧急情况) + */ + async triggerProcessing(): Promise { + this.logger.log('[OUTBOX] Manual processing triggered'); + await this.processOutbox(); + } + + /** + * 获取统计信息 + */ + async getStats(): Promise<{ + isRunning: boolean; + isConnected: boolean; + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const stats = await this.outboxRepository.getStats(); + return { + isRunning: this.isRunning, + isConnected: this.isConnected, + ...stats, + }; + } + + /** + * 确认事件(供消费者确认处理器调用) + */ + async confirmEvent(aggregateId: string, eventType?: string): Promise { + return this.outboxRepository.markAsConfirmed(aggregateId, eventType); + } +} diff --git a/backend/services/wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..cfb172fd --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,373 @@ +/** + * Outbox Repository + * [2026-01-07] 新增:实现 Outbox Pattern 确保事件发布的原子性和可靠性 + * + * 事件流程: + * 1. 业务操作和 Outbox 事件写入在同一个事务中完成 + * 2. 后台任务轮询 Outbox 表,发布事件到 Kafka + * 3. 发布成功后标记为 SENT,等待消费方确认 + * 4. 收到消费方确认后标记为 CONFIRMED + * 5. 失败的事件会按指数退避策略重试 + */ + +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Prisma, PrismaClient } from '@prisma/client'; + +export enum OutboxStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送到 Kafka,等待消费方确认 + CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功 + FAILED = 'FAILED', // 发送失败,等待重试 +} + +export interface OutboxEventData { + eventType: string; + topic: string; + key: string; + payload: Record; + aggregateId: string; + aggregateType: string; +} + +export interface OutboxEvent extends OutboxEventData { + id: bigint; + status: OutboxStatus; + retryCount: number; + maxRetries: number; + lastError: string | null; + createdAt: Date; + publishedAt: Date | null; + nextRetryAt: Date | null; +} + +// Prisma 事务客户端类型(与 Prisma $transaction 返回的类型一致) +export type PrismaTransactionClient = Omit< + PrismaClient, + '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' +>; + +@Injectable() +export class OutboxRepository { + private readonly logger = new Logger(OutboxRepository.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中保存 Outbox 事件 + * 这是 Outbox Pattern 的核心:业务数据和事件数据在同一个事务中写入 + */ + async saveInTransaction( + tx: PrismaTransactionClient, + events: OutboxEventData[], + ): Promise { + if (events.length === 0) return; + + this.logger.debug( + `[OUTBOX] Saving ${events.length} events to outbox (in transaction)`, + ); + + await tx.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`); + } + + /** + * 直接保存 Outbox 事件(不在事务中) + */ + async saveEvents(events: OutboxEventData[]): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`); + + await this.prisma.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`); + } + + /** + * 获取待发布的事件(按创建时间排序) + */ + async findPendingEvents(limit: number = 100): Promise { + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + }, + orderBy: { + createdAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 获取需要重试的事件 + */ + async findEventsForRetry(limit: number = 50): Promise { + const now = new Date(); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.FAILED, + retryCount: { + lt: 5, // maxRetries + }, + nextRetryAt: { + lte: now, + }, + }, + orderBy: { + nextRetryAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 标记事件为已发送(等待消费方确认) + */ + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.SENT, + publishedAt: new Date(), + }, + }); + + this.logger.debug( + `[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`, + ); + } + + /** + * 标记事件为已确认(消费方已成功处理) + * 使用 aggregateId + eventType 组合精确匹配 + */ + async markAsConfirmed( + aggregateId: string, + eventType?: string, + ): Promise { + const whereClause: Prisma.OutboxEventWhereInput = { + aggregateId: aggregateId, + status: OutboxStatus.SENT, + }; + + // 如果提供了 eventType,则精确匹配 + if (eventType) { + whereClause.eventType = eventType; + } + + const result = await this.prisma.outboxEvent.updateMany({ + where: whereClause, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + if (result.count > 0) { + this.logger.log( + `[OUTBOX] ✓ Event ${aggregateId} (${eventType || 'all types'}) confirmed by consumer`, + ); + return true; + } + + this.logger.warn( + `[OUTBOX] Event ${aggregateId} (${eventType || 'any'}) not found or not in SENT status`, + ); + return false; + } + + /** + * 通过 outbox ID 标记为已确认 + */ + async markAsConfirmedById(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`); + } + + /** + * 获取已发送但未确认且超时的事件(用于重试) + */ + async findSentEventsTimedOut( + timeoutMinutes: number = 5, + limit: number = 50, + ): Promise { + const cutoffTime = new Date(); + cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.SENT, + publishedAt: { + lt: cutoffTime, + }, + retryCount: { + lt: 5, // 最多重试5次 + }, + }, + orderBy: { + publishedAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 将超时的 SENT 事件重置为 PENDING 以便重发 + */ + async resetSentToPending(id: bigint): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PENDING, + retryCount: event.retryCount + 1, + lastError: 'Consumer confirmation timeout', + }, + }); + + this.logger.warn( + `[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`, + ); + } + + /** + * 标记事件为失败并安排重试 + */ + async markAsFailed(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + const newRetryCount = event.retryCount + 1; + const isFinalFailure = newRetryCount >= event.maxRetries; + + // 指数退避:1min, 2min, 4min, 8min, 16min + const delayMinutes = Math.pow(2, newRetryCount - 1); + const nextRetryAt = new Date(); + nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.FAILED, + retryCount: newRetryCount, + lastError: error, + nextRetryAt: isFinalFailure ? null : nextRetryAt, + }, + }); + + if (isFinalFailure) { + this.logger.error( + `[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`, + ); + } else { + this.logger.warn( + `[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`, + ); + } + } + + /** + * 清理已确认的旧事件(保留最近7天) + */ + async cleanupOldEvents(retentionDays: number = 7): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - retentionDays); + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.CONFIRMED, + publishedAt: { + lt: cutoffDate, + }, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`); + } + + return result.count; + } + + /** + * 获取 Outbox 统计信息 + */ + async getStats(): Promise<{ + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const [pending, sent, confirmed, failed] = await Promise.all([ + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }), + this.prisma.outboxEvent.count({ + where: { status: OutboxStatus.CONFIRMED }, + }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }), + ]); + + return { pending, sent, confirmed, failed }; + } + + private mapToOutboxEvent(record: any): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + topic: record.topic, + key: record.key, + payload: record.payload as Record, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + status: record.status as OutboxStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + createdAt: record.createdAt, + publishedAt: record.publishedAt, + nextRetryAt: record.nextRetryAt, + }; + } +}