fix(outbox): 实现指数退避重试策略,最大延迟3小时
修复Outbox事件发布的重试机制: 1. 更新Prisma Schema (mining-service, trading-service): - 添加OutboxStatus枚举 (PENDING, PUBLISHED, FAILED) - 添加topic、key、status、retryCount、maxRetries、lastError等字段 - 添加publishedAt、nextRetryAt时间戳 - 优化索引 (status, nextRetryAt, createdAt) 2. 更新OutboxRepository (mining-service, trading-service): - findPendingEvents(): 查询待处理且到达重试时间的事件 - markAsPublished(): 标记事件已发布 - markAsFailed(): 实现指数退避算法 (30s基础, 最大3小时) - deletePublished(): 清理已发布的旧事件 3. 更新OutboxScheduler (auth/mining/trading-service): - 使用指数退避: 30s, 60s, 120s, 240s, ... 最大10800s (3小时) - 记录重试次数和错误信息 - 达到最大重试次数后标记为FAILED 指数退避公式: delay = min(30s * 2^(retryCount-1), 3h) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
28ad8c2e2f
commit
ee5f841034
|
|
@ -8,6 +8,7 @@ import { OutboxStatus } from '@prisma/client';
|
||||||
/**
|
/**
|
||||||
* Outbox 事件发布调度器
|
* Outbox 事件发布调度器
|
||||||
* 定期将 Outbox 中的事件发布到 Kafka
|
* 定期将 Outbox 中的事件发布到 Kafka
|
||||||
|
* 使用指数退避重试策略,最大延迟3小时
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OutboxScheduler implements OnModuleInit {
|
export class OutboxScheduler implements OnModuleInit {
|
||||||
|
|
@ -82,20 +83,25 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
error instanceof Error ? error.message : 'Unknown error';
|
error instanceof Error ? error.message : 'Unknown error';
|
||||||
this.logger.error(`Failed to publish event ${event.id}`, error);
|
this.logger.error(`Failed to publish event ${event.id}`, error);
|
||||||
|
|
||||||
// 更新重试信息
|
// 更新重试信息(指数退避,最大3小时)
|
||||||
const newRetryCount = event.retryCount + 1;
|
const newRetryCount = event.retryCount + 1;
|
||||||
const shouldFail = newRetryCount >= event.maxRetries;
|
const shouldFail = newRetryCount >= event.maxRetries;
|
||||||
|
|
||||||
|
// 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时)
|
||||||
|
const baseDelayMs = 30000; // 30 seconds
|
||||||
|
const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours
|
||||||
|
const delayMs = Math.min(
|
||||||
|
baseDelayMs * Math.pow(2, newRetryCount - 1),
|
||||||
|
maxDelayMs,
|
||||||
|
);
|
||||||
|
|
||||||
await this.prisma.outboxEvent.update({
|
await this.prisma.outboxEvent.update({
|
||||||
where: { id: event.id },
|
where: { id: event.id },
|
||||||
data: {
|
data: {
|
||||||
retryCount: newRetryCount,
|
retryCount: newRetryCount,
|
||||||
lastError: errorMessage,
|
lastError: errorMessage,
|
||||||
status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING,
|
status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING,
|
||||||
// 指数退避: 30s, 60s, 120s
|
nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs),
|
||||||
nextRetryAt: shouldFail
|
|
||||||
? null
|
|
||||||
: new Date(Date.now() + 30000 * Math.pow(2, newRetryCount - 1)),
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -183,16 +183,30 @@ model PriceSnapshot {
|
||||||
|
|
||||||
// ==================== Outbox ====================
|
// ==================== Outbox ====================
|
||||||
|
|
||||||
model OutboxEvent {
|
enum OutboxStatus {
|
||||||
id String @id @default(uuid())
|
PENDING
|
||||||
aggregateType String
|
PUBLISHED
|
||||||
aggregateId String
|
FAILED
|
||||||
eventType String
|
}
|
||||||
payload Json
|
|
||||||
createdAt DateTime @default(now())
|
|
||||||
processedAt DateTime?
|
|
||||||
|
|
||||||
@@index([processedAt])
|
model OutboxEvent {
|
||||||
|
id String @id @default(uuid())
|
||||||
|
aggregateType String @map("aggregate_type")
|
||||||
|
aggregateId String @map("aggregate_id")
|
||||||
|
eventType String @map("event_type")
|
||||||
|
payload Json
|
||||||
|
topic String @default("mining.events")
|
||||||
|
key String?
|
||||||
|
status OutboxStatus @default(PENDING)
|
||||||
|
retryCount Int @default(0) @map("retry_count")
|
||||||
|
maxRetries Int @default(5) @map("max_retries")
|
||||||
|
lastError String? @map("last_error")
|
||||||
|
publishedAt DateTime? @map("published_at")
|
||||||
|
nextRetryAt DateTime? @map("next_retry_at")
|
||||||
|
createdAt DateTime @default(now()) @map("created_at")
|
||||||
|
|
||||||
|
@@index([status])
|
||||||
|
@@index([nextRetryAt])
|
||||||
@@index([createdAt])
|
@@index([createdAt])
|
||||||
@@map("outbox_events")
|
@@map("outbox_events")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service';
|
||||||
/**
|
/**
|
||||||
* Outbox 事件发布调度器
|
* Outbox 事件发布调度器
|
||||||
* 定期将 Outbox 中的事件发布到 Kafka
|
* 定期将 Outbox 中的事件发布到 Kafka
|
||||||
|
* 使用指数退避重试策略,最大延迟3小时
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OutboxScheduler implements OnModuleInit {
|
export class OutboxScheduler implements OnModuleInit {
|
||||||
|
|
@ -25,6 +26,7 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每30秒发布 Outbox 中的待处理事件
|
* 每30秒发布 Outbox 中的待处理事件
|
||||||
|
* 使用指数退避重试策略
|
||||||
*/
|
*/
|
||||||
@Cron('*/30 * * * * *')
|
@Cron('*/30 * * * * *')
|
||||||
async publishOutboxEvents(): Promise<void> {
|
async publishOutboxEvents(): Promise<void> {
|
||||||
|
|
@ -37,18 +39,19 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const events = await this.outboxRepository.findUnprocessed(100);
|
const events = await this.outboxRepository.findPendingEvents(100);
|
||||||
|
|
||||||
if (events.length === 0) {
|
if (events.length === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const processedIds: string[] = [];
|
let successCount = 0;
|
||||||
|
let failCount = 0;
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
try {
|
try {
|
||||||
await this.kafkaProducer.emit(`mining.${event.eventType}`, {
|
await this.kafkaProducer.emit(event.topic, {
|
||||||
key: event.aggregateId,
|
key: event.key || event.aggregateId,
|
||||||
value: {
|
value: {
|
||||||
eventId: event.id,
|
eventId: event.id,
|
||||||
aggregateType: event.aggregateType,
|
aggregateType: event.aggregateType,
|
||||||
|
|
@ -58,16 +61,33 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
createdAt: event.createdAt.toISOString(),
|
createdAt: event.createdAt.toISOString(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
processedIds.push(event.id);
|
|
||||||
|
// 标记为已发布
|
||||||
|
await this.outboxRepository.markAsPublished(event.id);
|
||||||
|
successCount++;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(`Failed to publish event ${event.id}`, error);
|
failCount++;
|
||||||
// 继续处理下一个事件
|
const errorMessage =
|
||||||
|
error instanceof Error ? error.message : 'Unknown error';
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to publish event ${event.id} (retry ${event.retryCount}/${event.maxRetries})`,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
|
||||||
|
// 标记失败并设置指数退避重试时间
|
||||||
|
await this.outboxRepository.markAsFailed(
|
||||||
|
event.id,
|
||||||
|
errorMessage,
|
||||||
|
event.retryCount,
|
||||||
|
event.maxRetries,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (processedIds.length > 0) {
|
if (successCount > 0 || failCount > 0) {
|
||||||
await this.outboxRepository.markAsProcessed(processedIds);
|
this.logger.log(
|
||||||
this.logger.debug(`Published ${processedIds.length} outbox events`);
|
`Outbox: published ${successCount}, failed ${failCount}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to process outbox events', error);
|
this.logger.error('Failed to process outbox events', error);
|
||||||
|
|
@ -77,10 +97,10 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每天凌晨3点清理已处理的事件(保留7天)
|
* 每天凌晨3点清理已发布的事件(保留7天)
|
||||||
*/
|
*/
|
||||||
@Cron('0 3 * * *')
|
@Cron('0 3 * * *')
|
||||||
async cleanupProcessedEvents(): Promise<void> {
|
async cleanupPublishedEvents(): Promise<void> {
|
||||||
const lockValue = await this.redis.acquireLock(
|
const lockValue = await this.redis.acquireLock(
|
||||||
`${this.LOCK_KEY}:cleanup`,
|
`${this.LOCK_KEY}:cleanup`,
|
||||||
300,
|
300,
|
||||||
|
|
@ -93,12 +113,12 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
const sevenDaysAgo = new Date();
|
const sevenDaysAgo = new Date();
|
||||||
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
|
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
|
||||||
|
|
||||||
const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo);
|
const deleted = await this.outboxRepository.deletePublished(sevenDaysAgo);
|
||||||
if (deleted > 0) {
|
if (deleted > 0) {
|
||||||
this.logger.log(`Cleaned up ${deleted} processed outbox events`);
|
this.logger.log(`Cleaned up ${deleted} published outbox events`);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to cleanup processed events', error);
|
this.logger.error('Failed to cleanup published events', error);
|
||||||
} finally {
|
} finally {
|
||||||
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
|
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { PrismaService } from '../prisma/prisma.service';
|
import { PrismaService } from '../prisma/prisma.service';
|
||||||
import { OutboxEvent } from '@prisma/client';
|
import { OutboxEvent, OutboxStatus } from '@prisma/client';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OutboxRepository {
|
export class OutboxRepository {
|
||||||
|
|
@ -14,6 +14,8 @@ export class OutboxRepository {
|
||||||
aggregateId: string;
|
aggregateId: string;
|
||||||
eventType: string;
|
eventType: string;
|
||||||
payload: any;
|
payload: any;
|
||||||
|
topic?: string;
|
||||||
|
key?: string;
|
||||||
}): Promise<OutboxEvent> {
|
}): Promise<OutboxEvent> {
|
||||||
return this.prisma.outboxEvent.create({
|
return this.prisma.outboxEvent.create({
|
||||||
data: {
|
data: {
|
||||||
|
|
@ -21,38 +23,97 @@ export class OutboxRepository {
|
||||||
aggregateId: data.aggregateId,
|
aggregateId: data.aggregateId,
|
||||||
eventType: data.eventType,
|
eventType: data.eventType,
|
||||||
payload: data.payload,
|
payload: data.payload,
|
||||||
|
topic: data.topic || `mining.${data.eventType}`,
|
||||||
|
key: data.key || data.aggregateId,
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取未处理的事件
|
* 获取待处理的事件(PENDING 状态且到达重试时间)
|
||||||
*/
|
*/
|
||||||
async findUnprocessed(limit: number = 100): Promise<OutboxEvent[]> {
|
async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
|
||||||
return this.prisma.outboxEvent.findMany({
|
return this.prisma.outboxEvent.findMany({
|
||||||
where: { processedAt: null },
|
where: {
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
|
OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }],
|
||||||
|
},
|
||||||
orderBy: { createdAt: 'asc' },
|
orderBy: { createdAt: 'asc' },
|
||||||
take: limit,
|
take: limit,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 标记事件为已处理
|
* 标记事件为已发布
|
||||||
*/
|
*/
|
||||||
async markAsProcessed(ids: string[]): Promise<void> {
|
async markAsPublished(id: string): Promise<void> {
|
||||||
await this.prisma.outboxEvent.updateMany({
|
await this.prisma.outboxEvent.update({
|
||||||
where: { id: { in: ids } },
|
where: { id },
|
||||||
data: { processedAt: new Date() },
|
data: {
|
||||||
|
status: OutboxStatus.PUBLISHED,
|
||||||
|
publishedAt: new Date(),
|
||||||
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 删除已处理的旧事件
|
* 标记事件发布失败,计算下次重试时间(指数退避,最大3小时)
|
||||||
*/
|
*/
|
||||||
async deleteProcessed(before: Date): Promise<number> {
|
async markAsFailed(
|
||||||
|
id: string,
|
||||||
|
error: string,
|
||||||
|
currentRetryCount: number,
|
||||||
|
maxRetries: number,
|
||||||
|
): Promise<void> {
|
||||||
|
const newRetryCount = currentRetryCount + 1;
|
||||||
|
const shouldFail = newRetryCount >= maxRetries;
|
||||||
|
|
||||||
|
// 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时)
|
||||||
|
const baseDelayMs = 30000; // 30 seconds
|
||||||
|
const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours
|
||||||
|
const delayMs = Math.min(
|
||||||
|
baseDelayMs * Math.pow(2, newRetryCount - 1),
|
||||||
|
maxDelayMs,
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.prisma.outboxEvent.update({
|
||||||
|
where: { id },
|
||||||
|
data: {
|
||||||
|
retryCount: newRetryCount,
|
||||||
|
lastError: error,
|
||||||
|
status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING,
|
||||||
|
nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除已发布的旧事件
|
||||||
|
*/
|
||||||
|
async deletePublished(before: Date): Promise<number> {
|
||||||
const result = await this.prisma.outboxEvent.deleteMany({
|
const result = await this.prisma.outboxEvent.deleteMany({
|
||||||
where: {
|
where: {
|
||||||
processedAt: { not: null, lt: before },
|
status: OutboxStatus.PUBLISHED,
|
||||||
|
publishedAt: { lt: before },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return result.count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重置长时间失败的事件(可选:给予重试机会)
|
||||||
|
*/
|
||||||
|
async resetFailedEvents(olderThan: Date): Promise<number> {
|
||||||
|
const result = await this.prisma.outboxEvent.updateMany({
|
||||||
|
where: {
|
||||||
|
status: OutboxStatus.FAILED,
|
||||||
|
createdAt: { gt: olderThan },
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
|
retryCount: 0,
|
||||||
|
nextRetryAt: new Date(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
return result.count;
|
return result.count;
|
||||||
|
|
|
||||||
|
|
@ -200,15 +200,30 @@ model TransferRecord {
|
||||||
|
|
||||||
// ==================== Outbox ====================
|
// ==================== Outbox ====================
|
||||||
|
|
||||||
model OutboxEvent {
|
enum OutboxStatus {
|
||||||
id String @id @default(uuid())
|
PENDING
|
||||||
aggregateType String
|
PUBLISHED
|
||||||
aggregateId String
|
FAILED
|
||||||
eventType String
|
}
|
||||||
payload Json
|
|
||||||
createdAt DateTime @default(now())
|
|
||||||
processedAt DateTime?
|
|
||||||
|
|
||||||
@@index([processedAt])
|
model OutboxEvent {
|
||||||
|
id String @id @default(uuid())
|
||||||
|
aggregateType String @map("aggregate_type")
|
||||||
|
aggregateId String @map("aggregate_id")
|
||||||
|
eventType String @map("event_type")
|
||||||
|
payload Json
|
||||||
|
topic String @default("trading.events")
|
||||||
|
key String?
|
||||||
|
status OutboxStatus @default(PENDING)
|
||||||
|
retryCount Int @default(0) @map("retry_count")
|
||||||
|
maxRetries Int @default(10) @map("max_retries")
|
||||||
|
lastError String? @map("last_error")
|
||||||
|
publishedAt DateTime? @map("published_at")
|
||||||
|
nextRetryAt DateTime? @map("next_retry_at")
|
||||||
|
createdAt DateTime @default(now()) @map("created_at")
|
||||||
|
|
||||||
|
@@index([status])
|
||||||
|
@@index([nextRetryAt])
|
||||||
|
@@index([createdAt])
|
||||||
@@map("outbox_events")
|
@@map("outbox_events")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import { RedisService } from '../../infrastructure/redis/redis.service';
|
||||||
/**
|
/**
|
||||||
* Outbox 事件发布调度器
|
* Outbox 事件发布调度器
|
||||||
* 定期将 Outbox 中的事件发布到 Kafka
|
* 定期将 Outbox 中的事件发布到 Kafka
|
||||||
|
* 使用指数退避重试策略,最大延迟3小时
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OutboxScheduler implements OnModuleInit {
|
export class OutboxScheduler implements OnModuleInit {
|
||||||
|
|
@ -25,6 +26,7 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每30秒发布 Outbox 中的待处理事件
|
* 每30秒发布 Outbox 中的待处理事件
|
||||||
|
* 使用指数退避重试策略
|
||||||
*/
|
*/
|
||||||
@Cron('*/30 * * * * *')
|
@Cron('*/30 * * * * *')
|
||||||
async publishOutboxEvents(): Promise<void> {
|
async publishOutboxEvents(): Promise<void> {
|
||||||
|
|
@ -37,18 +39,19 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const events = await this.outboxRepository.findUnprocessed(100);
|
const events = await this.outboxRepository.findPendingEvents(100);
|
||||||
|
|
||||||
if (events.length === 0) {
|
if (events.length === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const processedIds: string[] = [];
|
let successCount = 0;
|
||||||
|
let failCount = 0;
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
try {
|
try {
|
||||||
await this.kafkaProducer.emit(`trading.${event.eventType}`, {
|
await this.kafkaProducer.emit(event.topic, {
|
||||||
key: event.aggregateId,
|
key: event.key || event.aggregateId,
|
||||||
value: {
|
value: {
|
||||||
eventId: event.id,
|
eventId: event.id,
|
||||||
aggregateType: event.aggregateType,
|
aggregateType: event.aggregateType,
|
||||||
|
|
@ -58,16 +61,33 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
createdAt: event.createdAt.toISOString(),
|
createdAt: event.createdAt.toISOString(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
processedIds.push(event.id);
|
|
||||||
|
// 标记为已发布
|
||||||
|
await this.outboxRepository.markAsPublished(event.id);
|
||||||
|
successCount++;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(`Failed to publish event ${event.id}`, error);
|
failCount++;
|
||||||
// 继续处理下一个事件
|
const errorMessage =
|
||||||
|
error instanceof Error ? error.message : 'Unknown error';
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to publish event ${event.id} (retry ${event.retryCount}/${event.maxRetries})`,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
|
||||||
|
// 标记失败并设置指数退避重试时间
|
||||||
|
await this.outboxRepository.markAsFailed(
|
||||||
|
event.id,
|
||||||
|
errorMessage,
|
||||||
|
event.retryCount,
|
||||||
|
event.maxRetries,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (processedIds.length > 0) {
|
if (successCount > 0 || failCount > 0) {
|
||||||
await this.outboxRepository.markAsProcessed(processedIds);
|
this.logger.log(
|
||||||
this.logger.debug(`Published ${processedIds.length} outbox events`);
|
`Outbox: published ${successCount}, failed ${failCount}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to process outbox events', error);
|
this.logger.error('Failed to process outbox events', error);
|
||||||
|
|
@ -77,10 +97,10 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每天凌晨3点清理已处理的事件(保留7天)
|
* 每天凌晨3点清理已发布的事件(保留7天)
|
||||||
*/
|
*/
|
||||||
@Cron('0 3 * * *')
|
@Cron('0 3 * * *')
|
||||||
async cleanupProcessedEvents(): Promise<void> {
|
async cleanupPublishedEvents(): Promise<void> {
|
||||||
const lockValue = await this.redis.acquireLock(
|
const lockValue = await this.redis.acquireLock(
|
||||||
`${this.LOCK_KEY}:cleanup`,
|
`${this.LOCK_KEY}:cleanup`,
|
||||||
300,
|
300,
|
||||||
|
|
@ -93,12 +113,12 @@ export class OutboxScheduler implements OnModuleInit {
|
||||||
const sevenDaysAgo = new Date();
|
const sevenDaysAgo = new Date();
|
||||||
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
|
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
|
||||||
|
|
||||||
const deleted = await this.outboxRepository.deleteProcessed(sevenDaysAgo);
|
const deleted = await this.outboxRepository.deletePublished(sevenDaysAgo);
|
||||||
if (deleted > 0) {
|
if (deleted > 0) {
|
||||||
this.logger.log(`Cleaned up ${deleted} processed outbox events`);
|
this.logger.log(`Cleaned up ${deleted} published outbox events`);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to cleanup processed events', error);
|
this.logger.error('Failed to cleanup published events', error);
|
||||||
} finally {
|
} finally {
|
||||||
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
|
await this.redis.releaseLock(`${this.LOCK_KEY}:cleanup`, lockValue);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { PrismaService } from '../prisma/prisma.service';
|
import { PrismaService } from '../prisma/prisma.service';
|
||||||
import { OutboxEvent } from '@prisma/client';
|
import { OutboxEvent, OutboxStatus } from '@prisma/client';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class OutboxRepository {
|
export class OutboxRepository {
|
||||||
|
|
@ -14,6 +14,8 @@ export class OutboxRepository {
|
||||||
aggregateId: string;
|
aggregateId: string;
|
||||||
eventType: string;
|
eventType: string;
|
||||||
payload: any;
|
payload: any;
|
||||||
|
topic?: string;
|
||||||
|
key?: string;
|
||||||
}): Promise<OutboxEvent> {
|
}): Promise<OutboxEvent> {
|
||||||
return this.prisma.outboxEvent.create({
|
return this.prisma.outboxEvent.create({
|
||||||
data: {
|
data: {
|
||||||
|
|
@ -21,38 +23,97 @@ export class OutboxRepository {
|
||||||
aggregateId: data.aggregateId,
|
aggregateId: data.aggregateId,
|
||||||
eventType: data.eventType,
|
eventType: data.eventType,
|
||||||
payload: data.payload,
|
payload: data.payload,
|
||||||
|
topic: data.topic || `trading.${data.eventType}`,
|
||||||
|
key: data.key || data.aggregateId,
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取未处理的事件
|
* 获取待处理的事件(PENDING 状态且到达重试时间)
|
||||||
*/
|
*/
|
||||||
async findUnprocessed(limit: number = 100): Promise<OutboxEvent[]> {
|
async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
|
||||||
return this.prisma.outboxEvent.findMany({
|
return this.prisma.outboxEvent.findMany({
|
||||||
where: { processedAt: null },
|
where: {
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
|
OR: [{ nextRetryAt: null }, { nextRetryAt: { lte: new Date() } }],
|
||||||
|
},
|
||||||
orderBy: { createdAt: 'asc' },
|
orderBy: { createdAt: 'asc' },
|
||||||
take: limit,
|
take: limit,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 标记事件为已处理
|
* 标记事件为已发布
|
||||||
*/
|
*/
|
||||||
async markAsProcessed(ids: string[]): Promise<void> {
|
async markAsPublished(id: string): Promise<void> {
|
||||||
await this.prisma.outboxEvent.updateMany({
|
await this.prisma.outboxEvent.update({
|
||||||
where: { id: { in: ids } },
|
where: { id },
|
||||||
data: { processedAt: new Date() },
|
data: {
|
||||||
|
status: OutboxStatus.PUBLISHED,
|
||||||
|
publishedAt: new Date(),
|
||||||
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 删除已处理的旧事件
|
* 标记事件发布失败,计算下次重试时间(指数退避,最大3小时)
|
||||||
*/
|
*/
|
||||||
async deleteProcessed(before: Date): Promise<number> {
|
async markAsFailed(
|
||||||
|
id: string,
|
||||||
|
error: string,
|
||||||
|
currentRetryCount: number,
|
||||||
|
maxRetries: number,
|
||||||
|
): Promise<void> {
|
||||||
|
const newRetryCount = currentRetryCount + 1;
|
||||||
|
const shouldFail = newRetryCount >= maxRetries;
|
||||||
|
|
||||||
|
// 指数退避: 30s, 60s, 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 10800s (最大3小时)
|
||||||
|
const baseDelayMs = 30000; // 30 seconds
|
||||||
|
const maxDelayMs = 3 * 60 * 60 * 1000; // 3 hours
|
||||||
|
const delayMs = Math.min(
|
||||||
|
baseDelayMs * Math.pow(2, newRetryCount - 1),
|
||||||
|
maxDelayMs,
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.prisma.outboxEvent.update({
|
||||||
|
where: { id },
|
||||||
|
data: {
|
||||||
|
retryCount: newRetryCount,
|
||||||
|
lastError: error,
|
||||||
|
status: shouldFail ? OutboxStatus.FAILED : OutboxStatus.PENDING,
|
||||||
|
nextRetryAt: shouldFail ? null : new Date(Date.now() + delayMs),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除已发布的旧事件
|
||||||
|
*/
|
||||||
|
async deletePublished(before: Date): Promise<number> {
|
||||||
const result = await this.prisma.outboxEvent.deleteMany({
|
const result = await this.prisma.outboxEvent.deleteMany({
|
||||||
where: {
|
where: {
|
||||||
processedAt: { not: null, lt: before },
|
status: OutboxStatus.PUBLISHED,
|
||||||
|
publishedAt: { lt: before },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return result.count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重置长时间失败的事件(可选:给予重试机会)
|
||||||
|
*/
|
||||||
|
async resetFailedEvents(olderThan: Date): Promise<number> {
|
||||||
|
const result = await this.prisma.outboxEvent.updateMany({
|
||||||
|
where: {
|
||||||
|
status: OutboxStatus.FAILED,
|
||||||
|
createdAt: { gt: olderThan },
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: OutboxStatus.PENDING,
|
||||||
|
retryCount: 0,
|
||||||
|
nextRetryAt: new Date(),
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
return result.count;
|
return result.count;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue