From ee5f841034e24a7c1e5d8d0999f8b592736394f3 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 10 Jan 2026 21:08:57 -0800 Subject: [PATCH] =?UTF-8?q?fix(outbox):=20=E5=AE=9E=E7=8E=B0=E6=8C=87?= =?UTF-8?q?=E6=95=B0=E9=80=80=E9=81=BF=E9=87=8D=E8=AF=95=E7=AD=96=E7=95=A5?= =?UTF-8?q?=EF=BC=8C=E6=9C=80=E5=A4=A7=E5=BB=B6=E8=BF=9F3=E5=B0=8F?= =?UTF-8?q?=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复Outbox事件发布的重试机制: 1. 更新Prisma Schema (mining-service, trading-service): - 添加OutboxStatus枚举 (PENDING, PUBLISHED, FAILED) - 添加topic、key、status、retryCount、maxRetries、lastError等字段 - 添加publishedAt、nextRetryAt时间戳 - 优化索引 (status, nextRetryAt, createdAt) 2. 更新OutboxRepository (mining-service, trading-service): - findPendingEvents(): 查询待处理且到达重试时间的事件 - markAsPublished(): 标记事件已发布 - markAsFailed(): 实现指数退避算法 (30s基础, 最大3小时) - deletePublished(): 清理已发布的旧事件 3. 更新OutboxScheduler (auth/mining/trading-service): - 使用指数退避: 30s, 60s, 120s, 240s, ... 最大10800s (3小时) - 记录重试次数和错误信息 - 达到最大重试次数后标记为FAILED 指数退避公式: delay = min(30s * 2^(retryCount-1), 3h) Co-Authored-By: Claude Opus 4.5 --- .../schedulers/outbox.scheduler.ts | 16 ++-- .../mining-service/prisma/schema.prisma | 32 +++++-- .../schedulers/outbox.scheduler.ts | 50 +++++++---- .../repositories/outbox.repository.ts | 85 ++++++++++++++++--- .../trading-service/prisma/schema.prisma | 33 +++++-- .../schedulers/outbox.scheduler.ts | 50 +++++++---- .../repositories/outbox.repository.ts | 85 ++++++++++++++++--- 7 files changed, 274 insertions(+), 77 deletions(-) diff --git a/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts index 88d84702..930fab23 100644 --- a/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts +++ b/backend/services/auth-service/src/application/schedulers/outbox.scheduler.ts @@ -8,6 +8,7 @@ import { OutboxStatus } from '@prisma/client'; /** * Outbox 事件发布调度器 * 定期将 Outbox 中的事件发布到 Kafka + * 使用指数退避重试策略,最大延迟3小时 */ @Injectable() export class OutboxScheduler implements OnModuleInit { @@ -82,20 +83,25 @@ export class OutboxScheduler implements OnModuleInit { error instanceof Error ? error.message : 'Unknown error'; this.logger.error(`Failed to publish event ${event.id}`, error); - // 更新重试信息 + // 更新重试信息(指数退避,最大3小时) const newRetryCount = event.retryCount + 1; const shouldFail = newRetryCount >= event.maxRetries; + // 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时) + const baseDelayMs = 30000; // 30 seconds + const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours + const delayMs = Math.min( + baseDelayMs * Math.pow(2, newRetryCount - 1), + maxDelayMs, + ); + await this.prisma.outboxEvent.update({ where: { id: event.id }, data: { retryCount: newRetryCount, lastError: errorMessage, status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING, - // 指数退避: 30s, 60s, 120s - nextRetryAt: shouldFail - ? null - : new Date(Date.now() + 30000 * Math.pow(2, newRetryCount - 1)), + nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs), }, }); } diff --git a/backend/services/mining-service/prisma/schema.prisma b/backend/services/mining-service/prisma/schema.prisma index 5b38ef42..e8bd1c42 100644 --- a/backend/services/mining-service/prisma/schema.prisma +++ b/backend/services/mining-service/prisma/schema.prisma @@ -183,16 +183,30 @@ model PriceSnapshot { // ==================== Outbox ==================== -model OutboxEvent { - id String @id @default(uuid()) - aggregateType String - aggregateId String - eventType String - payload Json - createdAt DateTime @default(now()) - processedAt DateTime? +enum OutboxStatus { + PENDING + PUBLISHED + FAILED +} - @@index([processedAt]) +model OutboxEvent { + id String @id @default(uuid()) + aggregateType String @map("aggregate_type") + aggregateId String @map("aggregate_id") + eventType String @map("event_type") + payload Json + topic String @default("mining.events") + key String? + status OutboxStatus @default(PENDING) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + createdAt DateTime @default(now()) @map("created_at") + + @@index([status]) + @@index([nextRetryAt]) @@index([createdAt]) @@map("outbox_events") } diff --git a/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts index 68e67501..1c75ab5e 100644 --- a/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts +++ b/backend/services/mining-service/src/application/schedulers/outbox.scheduler.ts @@ -7,6 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service'; /** * Outbox 事件发布调度器 * 定期将 Outbox 中的事件发布到 Kafka + * 使用指数退避重试策略,最大延迟3小时 */ @Injectable() export class OutboxScheduler implements OnModuleInit { @@ -25,6 +26,7 @@ export class OutboxScheduler implements OnModuleInit { /** * 每30秒发布 Outbox 中的待处理事件 + * 使用指数退避重试策略 */ @Cron('*/30 * * * * *') async publishOutboxEvents(): Promise { @@ -37,18 +39,19 @@ export class OutboxScheduler implements OnModuleInit { } try { - const events = await this.outboxRepository.findUnprocessed(100); + const events = await this.outboxRepository.findPendingEvents(100); if (events.length === 0) { return; } - const processedIds: string[] = []; + let successCount = 0; + let failCount = 0; for (const event of events) { try { - await this.kafkaProducer.emit(`mining.${event.eventType}`, { - key: event.aggregateId, + await this.kafkaProducer.emit(event.topic, { + key: event.key || event.aggregateId, value: { eventId: event.id, aggregateType: event.aggregateType, @@ -58,16 +61,33 @@ export class OutboxScheduler implements OnModuleInit { createdAt: event.createdAt.toISOString(), }, }); - processedIds.push(event.id); + + // 标记为已发布 + await this.outboxRepository.markAsPublished(event.id); + successCount++; } catch (error) { - this.logger.error(`Failed to publish event ${event.id}`, error); - // 继续处理下一个事件 + failCount++; + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + `Failed to publish event ${event.id} (retry ${event.retryCount}/${event.maxRetries})`, + error, + ); + + // 标记失败并设置指数退避重试时间 + await this.outboxRepository.markAsFailed( + event.id, + errorMessage, + event.retryCount, + event.maxRetries, + ); } } - if (processedIds.length > 0) { - await this.outboxRepository.markAsProcessed(processedIds); - this.logger.debug(`Published ${processedIds.length} outbox events`); + if (successCount > 0 || failCount > 0) { + this.logger.log( + `Outbox: published ${successCount}, failed ${failCount}`, + ); } } catch (error) { this.logger.error('Failed to process outbox events', error); @@ -77,10 +97,10 @@ export class OutboxScheduler implements OnModuleInit { } /** - * 每天凌晨3点清理已处理的事件(保留7天) + * 每天凌晨3点清理已发布的事件(保留7天) */ @Cron('0 3 * * *') - async cleanupProcessedEvents(): Promise { + async cleanupPublishedEvents(): Promise { const lockValue = await this.redis.acquireLock( `${this.LOCK_KEY}:cleanup`, 300, @@ -93,12 +113,12 @@ export class OutboxScheduler implements OnModuleInit { const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); - const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo); + const deleted = await this.outboxRepository.deletePublished(sevenDaysAgo); if (deleted > 0) { - this.logger.log(`Cleaned up ${deleted} processed outbox events`); + this.logger.log(`Cleaned up ${deleted} published outbox events`); } } catch (error) { - this.logger.error('Failed to cleanup processed events', error); + this.logger.error('Failed to cleanup published events', error); } finally { await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); } diff --git a/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts index e5cd88c3..91ad9eeb 100644 --- a/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts +++ b/backend/services/mining-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; -import { OutboxEvent } from '@prisma/client'; +import { OutboxEvent, OutboxStatus } from '@prisma/client'; @Injectable() export class OutboxRepository { @@ -14,6 +14,8 @@ export class OutboxRepository { aggregateId: string; eventType: string; payload: any; + topic?: string; + key?: string; }): Promise { return this.prisma.outboxEvent.create({ data: { @@ -21,38 +23,97 @@ export class OutboxRepository { aggregateId: data.aggregateId, eventType: data.eventType, payload: data.payload, + topic: data.topic || `mining.${data.eventType}`, + key: data.key || data.aggregateId, + status: OutboxStatus.PENDING, }, }); } /** - * 获取未处理的事件 + * 获取待处理的事件(PENDING 状态且到达重试时间) */ - async findUnprocessed(limit: number = 100): Promise { + async findPendingEvents(limit: number = 100): Promise { return this.prisma.outboxEvent.findMany({ - where: { processedAt: null }, + where: { + status: OutboxStatus.PENDING, + OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }], + }, orderBy: { createdAt: 'asc' }, take: limit, }); } /** - * 标记事件为已处理 + * 标记事件为已发布 */ - async markAsProcessed(ids: string[]): Promise { - await this.prisma.outboxEvent.updateMany({ - where: { id: { in: ids } }, - data: { processedAt: new Date() }, + async markAsPublished(id: string): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PUBLISHED, + publishedAt: new Date(), + }, }); } /** - * 删除已处理的旧事件 + * 标记事件发布失败,计算下次重试时间(指数退避,最大3小时) */ - async deleteProcessed(before: Date): Promise { + async markAsFailed( + id: string, + error: string, + currentRetryCount: number, + maxRetries: number, + ): Promise { + const newRetryCount = currentRetryCount + 1; + const shouldFail = newRetryCount >= maxRetries; + + // 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时) + const baseDelayMs = 30000; // 30 seconds + const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours + const delayMs = Math.min( + baseDelayMs * Math.pow(2, newRetryCount - 1), + maxDelayMs, + ); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + retryCount: newRetryCount, + lastError: error, + status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING, + nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs), + }, + }); + } + + /** + * 删除已发布的旧事件 + */ + async deletePublished(before: Date): Promise { const result = await this.prisma.outboxEvent.deleteMany({ where: { - processedAt: { not: null, lt: before }, + status: OutboxStatus.PUBLISHED, + publishedAt: { lt: before }, + }, + }); + return result.count; + } + + /** + * 重置长时间失败的事件(可选:给予重试机会) + */ + async resetFailedEvents(olderThan: Date): Promise { + const result = await this.prisma.outboxEvent.updateMany({ + where: { + status: OutboxStatus.FAILED, + createdAt: { gt: olderThan }, + }, + data: { + status: OutboxStatus.PENDING, + retryCount: 0, + nextRetryAt: new Date(), }, }); return result.count; diff --git a/backend/services/trading-service/prisma/schema.prisma b/backend/services/trading-service/prisma/schema.prisma index 71b01e00..5670cb8b 100644 --- a/backend/services/trading-service/prisma/schema.prisma +++ b/backend/services/trading-service/prisma/schema.prisma @@ -200,15 +200,30 @@ model TransferRecord { // ==================== Outbox ==================== -model OutboxEvent { - id String @id @default(uuid()) - aggregateType String - aggregateId String - eventType String - payload Json - createdAt DateTime @default(now()) - processedAt DateTime? +enum OutboxStatus { + PENDING + PUBLISHED + FAILED +} - @@index([processedAt]) +model OutboxEvent { + id String @id @default(uuid()) + aggregateType String @map("aggregate_type") + aggregateId String @map("aggregate_id") + eventType String @map("event_type") + payload Json + topic String @default("trading.events") + key String? + status OutboxStatus @default(PENDING) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(10) @map("max_retries") + lastError String? @map("last_error") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + createdAt DateTime @default(now()) @map("created_at") + + @@index([status]) + @@index([nextRetryAt]) + @@index([createdAt]) @@map("outbox_events") } diff --git a/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts b/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts index 7cb0448f..5365506e 100644 --- a/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts +++ b/backend/services/trading-service/src/application/schedulers/outbox.scheduler.ts @@ -7,6 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service'; /** * Outbox 事件发布调度器 * 定期将 Outbox 中的事件发布到 Kafka + * 使用指数退避重试策略,最大延迟3小时 */ @Injectable() export class OutboxScheduler implements OnModuleInit { @@ -25,6 +26,7 @@ export class OutboxScheduler implements OnModuleInit { /** * 每30秒发布 Outbox 中的待处理事件 + * 使用指数退避重试策略 */ @Cron('*/30 * * * * *') async publishOutboxEvents(): Promise { @@ -37,18 +39,19 @@ export class OutboxScheduler implements OnModuleInit { } try { - const events = await this.outboxRepository.findUnprocessed(100); + const events = await this.outboxRepository.findPendingEvents(100); if (events.length === 0) { return; } - const processedIds: string[] = []; + let successCount = 0; + let failCount = 0; for (const event of events) { try { - await this.kafkaProducer.emit(`trading.${event.eventType}`, { - key: event.aggregateId, + await this.kafkaProducer.emit(event.topic, { + key: event.key || event.aggregateId, value: { eventId: event.id, aggregateType: event.aggregateType, @@ -58,16 +61,33 @@ export class OutboxScheduler implements OnModuleInit { createdAt: event.createdAt.toISOString(), }, }); - processedIds.push(event.id); + + // 标记为已发布 + await this.outboxRepository.markAsPublished(event.id); + successCount++; } catch (error) { - this.logger.error(`Failed to publish event ${event.id}`, error); - // 继续处理下一个事件 + failCount++; + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error( + `Failed to publish event ${event.id} (retry ${event.retryCount}/${event.maxRetries})`, + error, + ); + + // 标记失败并设置指数退避重试时间 + await this.outboxRepository.markAsFailed( + event.id, + errorMessage, + event.retryCount, + event.maxRetries, + ); } } - if (processedIds.length > 0) { - await this.outboxRepository.markAsProcessed(processedIds); - this.logger.debug(`Published ${processedIds.length} outbox events`); + if (successCount > 0 || failCount > 0) { + this.logger.log( + `Outbox: published ${successCount}, failed ${failCount}`, + ); } } catch (error) { this.logger.error('Failed to process outbox events', error); @@ -77,10 +97,10 @@ export class OutboxScheduler implements OnModuleInit { } /** - * 每天凌晨3点清理已处理的事件(保留7天) + * 每天凌晨3点清理已发布的事件(保留7天) */ @Cron('0 3 * * *') - async cleanupProcessedEvents(): Promise { + async cleanupPublishedEvents(): Promise { const lockValue = await this.redis.acquireLock( `${this.LOCK_KEY}:cleanup`, 300, @@ -93,12 +113,12 @@ export class OutboxScheduler implements OnModuleInit { const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); - const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo); + const deleted = await this.outboxRepository.deletePublished(sevenDaysAgo); if (deleted > 0) { - this.logger.log(`Cleaned up ${deleted} processed outbox events`); + this.logger.log(`Cleaned up ${deleted} published outbox events`); } } catch (error) { - this.logger.error('Failed to cleanup processed events', error); + this.logger.error('Failed to cleanup published events', error); } finally { await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue); } diff --git a/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts index e5cd88c3..dbdaa6ec 100644 --- a/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts +++ b/backend/services/trading-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; -import { OutboxEvent } from '@prisma/client'; +import { OutboxEvent, OutboxStatus } from '@prisma/client'; @Injectable() export class OutboxRepository { @@ -14,6 +14,8 @@ export class OutboxRepository { aggregateId: string; eventType: string; payload: any; + topic?: string; + key?: string; }): Promise { return this.prisma.outboxEvent.create({ data: { @@ -21,38 +23,97 @@ export class OutboxRepository { aggregateId: data.aggregateId, eventType: data.eventType, payload: data.payload, + topic: data.topic || `trading.${data.eventType}`, + key: data.key || data.aggregateId, + status: OutboxStatus.PENDING, }, }); } /** - * 获取未处理的事件 + * 获取待处理的事件(PENDING 状态且到达重试时间) */ - async findUnprocessed(limit: number = 100): Promise { + async findPendingEvents(limit: number = 100): Promise { return this.prisma.outboxEvent.findMany({ - where: { processedAt: null }, + where: { + status: OutboxStatus.PENDING, + OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }], + }, orderBy: { createdAt: 'asc' }, take: limit, }); } /** - * 标记事件为已处理 + * 标记事件为已发布 */ - async markAsProcessed(ids: string[]): Promise { - await this.prisma.outboxEvent.updateMany({ - where: { id: { in: ids } }, - data: { processedAt: new Date() }, + async markAsPublished(id: string): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PUBLISHED, + publishedAt: new Date(), + }, }); } /** - * 删除已处理的旧事件 + * 标记事件发布失败,计算下次重试时间(指数退避,最大3小时) */ - async deleteProcessed(before: Date): Promise { + async markAsFailed( + id: string, + error: string, + currentRetryCount: number, + maxRetries: number, + ): Promise { + const newRetryCount = currentRetryCount + 1; + const shouldFail = newRetryCount >= maxRetries; + + // 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时) + const baseDelayMs = 30000; // 30 seconds + const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours + const delayMs = Math.min( + baseDelayMs * Math.pow(2, newRetryCount - 1), + maxDelayMs, + ); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + retryCount: newRetryCount, + lastError: error, + status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING, + nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs), + }, + }); + } + + /** + * 删除已发布的旧事件 + */ + async deletePublished(before: Date): Promise { const result = await this.prisma.outboxEvent.deleteMany({ where: { - processedAt: { not: null, lt: before }, + status: OutboxStatus.PUBLISHED, + publishedAt: { lt: before }, + }, + }); + return result.count; + } + + /** + * 重置长时间失败的事件(可选:给予重试机会) + */ + async resetFailedEvents(olderThan: Date): Promise { + const result = await this.prisma.outboxEvent.updateMany({ + where: { + status: OutboxStatus.FAILED, + createdAt: { gt: olderThan }, + }, + data: { + status: OutboxStatus.PENDING, + retryCount: 0, + nextRetryAt: new Date(), }, }); return result.count;