From f65b0d14b7be62d4dd4d9bd3e2872b975c607e43 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 18 Dec 2025 00:30:09 -0800 Subject: [PATCH] =?UTF-8?q?feat(authorization):=20=E5=AE=9E=E7=8E=B0=20Out?= =?UTF-8?q?box=20=E6=A8=A1=E5=BC=8F=E4=BA=8B=E4=BB=B6=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 概述 为 authorization-service 实现 Outbox 模式,确保数据库事务和 Kafka 事件发布的原子性。 ## 新增表 - OutboxEvent: 事件暂存表,用于事务性事件发布 ## 新增组件 - OutboxRepository: Outbox 事件持久化 - OutboxPublisherService: 轮询发布未处理事件到 Kafka ## 支持的事件 - authorization-events: 授权角色创建/更新事件(省公司、市公司授权) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 31 ++ .../prisma/schema.prisma | 35 ++ .../src/infrastructure/kafka/kafka.module.ts | 19 +- .../kafka/outbox-publisher.service.ts | 288 +++++++++++++++ .../repositories/outbox.repository.ts | 330 ++++++++++++++++++ 5 files changed, 701 insertions(+), 2 deletions(-) create mode 100644 backend/services/authorization-service/prisma/migrations/20251217000000_add_outbox_events/migration.sql create mode 100644 backend/services/authorization-service/src/infrastructure/kafka/outbox-publisher.service.ts create mode 100644 backend/services/authorization-service/src/infrastructure/persistence/repositories/outbox.repository.ts diff --git a/backend/services/authorization-service/prisma/migrations/20251217000000_add_outbox_events/migration.sql b/backend/services/authorization-service/prisma/migrations/20251217000000_add_outbox_events/migration.sql new file mode 100644 index 00000000..80955d78 --- /dev/null +++ b/backend/services/authorization-service/prisma/migrations/20251217000000_add_outbox_events/migration.sql @@ -0,0 +1,31 @@ +-- CreateTable +CREATE TABLE "outbox_events" ( + "outbox_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "topic" VARCHAR(100) NOT NULL, + "key" VARCHAR(200) NOT NULL, + "payload" JSONB NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "retry_count" INTEGER NOT NULL DEFAULT 0, + "max_retries" INTEGER NOT NULL DEFAULT 5, + "last_error" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "published_at" TIMESTAMP(3), + "next_retry_at" TIMESTAMP(3), + + CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id") +); + +-- CreateIndex +CREATE INDEX "outbox_events_status_created_at_idx" ON "outbox_events"("status", "created_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_status_next_retry_at_idx" ON "outbox_events"("status", "next_retry_at"); + +-- CreateIndex +CREATE INDEX "outbox_events_aggregate_type_aggregate_id_idx" ON "outbox_events"("aggregate_type", "aggregate_id"); + +-- CreateIndex +CREATE INDEX "outbox_events_topic_idx" ON "outbox_events"("topic"); diff --git a/backend/services/authorization-service/prisma/schema.prisma b/backend/services/authorization-service/prisma/schema.prisma index 87461901..3f8d41de 100644 --- a/backend/services/authorization-service/prisma/schema.prisma +++ b/backend/services/authorization-service/prisma/schema.prisma @@ -482,3 +482,38 @@ model SystemAccountLedger { @@index([txHash], name: "idx_system_ledger_tx_hash") @@map("system_account_ledgers") } + +// ============================================ +// Outbox 事件表 - 保证事件可靠发送 +// 使用 Outbox Pattern 确保领域事件100%送达 +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + // 聚合根信息 (用于幂等性检查) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + // 发布状态: PENDING, SENT, CONFIRMED, FAILED + status String @default("PENDING") @map("status") @db.VarChar(20) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") @db.Text + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@index([status, createdAt]) + @@index([status, nextRetryAt]) + @@index([aggregateType, aggregateId]) + @@index([topic]) + @@map("outbox_events") +} diff --git a/backend/services/authorization-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/authorization-service/src/infrastructure/kafka/kafka.module.ts index 620d3d16..ecbccfee 100644 --- a/backend/services/authorization-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/authorization-service/src/infrastructure/kafka/kafka.module.ts @@ -3,6 +3,9 @@ import { ConfigModule, ConfigService } from '@nestjs/config' import { ClientsModule, Transport } from '@nestjs/microservices' import { EventPublisherService } from './event-publisher.service' import { EventAckPublisher } from './event-ack.publisher' +import { OutboxPublisherService } from './outbox-publisher.service' +import { OutboxRepository } from '../persistence/repositories/outbox.repository' +import { PrismaService } from '../persistence/prisma/prisma.service' @Global() @Module({ @@ -27,7 +30,19 @@ import { EventAckPublisher } from './event-ack.publisher' }, ]), ], - providers: [EventPublisherService, EventAckPublisher], - exports: [EventPublisherService, EventAckPublisher, ClientsModule], + providers: [ + PrismaService, + EventPublisherService, + EventAckPublisher, + OutboxRepository, + OutboxPublisherService, + ], + exports: [ + EventPublisherService, + EventAckPublisher, + OutboxRepository, + OutboxPublisherService, + ClientsModule, + ], }) export class KafkaModule {} diff --git a/backend/services/authorization-service/src/infrastructure/kafka/outbox-publisher.service.ts b/backend/services/authorization-service/src/infrastructure/kafka/outbox-publisher.service.ts new file mode 100644 index 00000000..e69a3961 --- /dev/null +++ b/backend/services/authorization-service/src/infrastructure/kafka/outbox-publisher.service.ts @@ -0,0 +1,288 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common' +import { ConfigService } from '@nestjs/config' +import { Kafka, Producer, logLevel } from 'kafkajs' +import { OutboxRepository, OutboxEvent } from '../persistence/repositories/outbox.repository' + +/** + * Outbox Publisher Service (B方案 - 消费方确认模式) + * + * 轮询 Outbox 表并发布事件到 Kafka + * 使用消费方确认机制保证事件100%被处理 + * + * 工作流程: + * 1. 轮询 PENDING 状态的事件 + * 2. 发送到 Kafka,标记为 SENT(等待确认) + * 3. 消费方处理成功后发送确认到 authorization.events.ack + * 4. 收到确认后标记为 CONFIRMED + * 5. 超时未确认的事件重置为 PENDING 重发 + */ +@Injectable() +export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxPublisherService.name) + private kafka: Kafka + private producer: Producer + private isRunning = false + private pollInterval: NodeJS.Timeout | null = null + private timeoutCheckInterval: NodeJS.Timeout | null = null + private cleanupInterval: NodeJS.Timeout | null = null + private isConnected = false + + // 配置 + private readonly pollIntervalMs: number + private readonly batchSize: number + private readonly cleanupIntervalMs: number + private readonly confirmationTimeoutMinutes: number + private readonly timeoutCheckIntervalMs: number + + constructor( + private readonly outboxRepository: OutboxRepository, + private readonly configService: ConfigService, + ) { + this.pollIntervalMs = this.configService.get('OUTBOX_POLL_INTERVAL_MS', 1000) + this.batchSize = this.configService.get('OUTBOX_BATCH_SIZE', 100) + this.cleanupIntervalMs = this.configService.get('OUTBOX_CLEANUP_INTERVAL_MS', 3600000) // 1小时 + this.confirmationTimeoutMinutes = this.configService.get('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5) + this.timeoutCheckIntervalMs = this.configService.get('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000) // 1分钟 + + const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(',') + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'authorization-service') + + this.kafka = new Kafka({ + clientId: `${clientId}-outbox`, + brokers, + logLevel: logLevel.WARN, + }) + this.producer = this.kafka.producer() + + this.logger.log( + `[OUTBOX] OutboxPublisher (B方案) configured: ` + + `pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` + + `confirmationTimeout=${this.confirmationTimeoutMinutes}min`, + ) + } + + async onModuleInit() { + this.logger.log('[OUTBOX] Connecting to Kafka...') + try { + await this.producer.connect() + this.isConnected = true + this.logger.log('[OUTBOX] Connected to Kafka') + this.start() + } catch (error) { + this.logger.error('[OUTBOX] Failed to connect to Kafka:', error) + this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table') + } + } + + async onModuleDestroy() { + this.stop() + if (this.isConnected) { + await this.producer.disconnect() + } + } + + /** + * 启动轮询 + */ + start(): void { + if (this.isRunning) { + this.logger.warn('[OUTBOX] Publisher already running') + return + } + + this.isRunning = true + this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...') + + // 启动发布轮询 + this.pollInterval = setInterval(() => { + this.processOutbox().catch((err) => { + this.logger.error('[OUTBOX] Error processing outbox:', err) + }) + }, this.pollIntervalMs) + + // 启动超时检查任务(B方案核心) + this.timeoutCheckInterval = setInterval(() => { + this.checkConfirmationTimeouts().catch((err) => { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err) + }) + }, this.timeoutCheckIntervalMs) + + // 启动清理任务 + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + this.logger.error('[OUTBOX] Error cleaning up outbox:', err) + }) + }, this.cleanupIntervalMs) + + this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)') + } + + /** + * 停止轮询 + */ + stop(): void { + if (!this.isRunning) return + + this.isRunning = false + + if (this.pollInterval) { + clearInterval(this.pollInterval) + this.pollInterval = null + } + + if (this.timeoutCheckInterval) { + clearInterval(this.timeoutCheckInterval) + this.timeoutCheckInterval = null + } + + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval) + this.cleanupInterval = null + } + + this.logger.log('[OUTBOX] Outbox publisher stopped') + } + + /** + * 处理 Outbox 事件 + */ + async processOutbox(): Promise { + if (!this.isConnected) { + return + } + + try { + // 1. 获取待发布事件 + const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize) + + // 2. 获取需要重试的事件 + const retryEvents = await this.outboxRepository.findEventsForRetry(Math.floor(this.batchSize / 2)) + + const allEvents = [...pendingEvents, ...retryEvents] + + if (allEvents.length === 0) { + return + } + + this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`) + + // 3. 逐个发布 + for (const event of allEvents) { + await this.publishEvent(event) + } + } catch (error) { + this.logger.error('[OUTBOX] Error in processOutbox:', error) + } + } + + /** + * 发布单个事件 (B方案) + */ + private async publishEvent(event: OutboxEvent): Promise { + try { + this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`) + + // 构造 Kafka 消息,包含 outboxId 用于确认 + const payload = { + ...(event.payload as Record), + _outbox: { + id: event.id.toString(), + aggregateId: event.aggregateId, + eventType: event.eventType, + }, + } + + // 发布到 Kafka + await this.producer.send({ + topic: event.topic, + messages: [ + { + key: event.key, + value: JSON.stringify(payload), + }, + ], + }) + + // B方案:标记为 SENT(等待消费方确认) + await this.outboxRepository.markAsSent(event.id) + + this.logger.log( + `[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`, + ) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`) + + // 标记为失败并安排重试 + await this.outboxRepository.markAsFailed(event.id, errorMessage) + } + } + + /** + * 检查确认超时的事件 (B方案核心) + */ + private async checkConfirmationTimeouts(): Promise { + if (!this.isConnected) { + return + } + + try { + const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut( + this.confirmationTimeoutMinutes, + this.batchSize, + ) + + if (timedOutEvents.length === 0) { + return + } + + this.logger.warn( + `[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`, + ) + + for (const event of timedOutEvents) { + await this.outboxRepository.resetSentToPending(event.id) + this.logger.warn( + `[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`, + ) + } + } catch (error) { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error) + } + } + + /** + * 清理旧事件 + */ + private async cleanup(): Promise { + const retentionDays = this.configService.get('OUTBOX_RETENTION_DAYS', 7) + await this.outboxRepository.cleanupOldEvents(retentionDays) + } + + /** + * 手动触发处理(用于测试或紧急情况) + */ + async triggerProcessing(): Promise { + this.logger.log('[OUTBOX] Manual processing triggered') + await this.processOutbox() + } + + /** + * 获取统计信息 + */ + async getStats(): Promise<{ + isRunning: boolean + isConnected: boolean + pending: number + sent: number + confirmed: number + failed: number + }> { + const stats = await this.outboxRepository.getStats() + return { + isRunning: this.isRunning, + isConnected: this.isConnected, + ...stats, + } + } +} diff --git a/backend/services/authorization-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/authorization-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..8f19cfae --- /dev/null +++ b/backend/services/authorization-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,330 @@ +import { Injectable, Logger } from '@nestjs/common' +import { PrismaService } from '../prisma/prisma.service' +import { Prisma } from '@prisma/client' + +export enum OutboxStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送到 Kafka,等待消费方确认 + CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功 + FAILED = 'FAILED', // 发送失败,等待重试 +} + +export interface OutboxEventData { + eventType: string + topic: string + key: string + payload: Record + aggregateId: string + aggregateType: string +} + +export interface OutboxEvent extends OutboxEventData { + id: bigint + status: OutboxStatus + retryCount: number + maxRetries: number + lastError: string | null + createdAt: Date + publishedAt: Date | null + nextRetryAt: Date | null +} + +@Injectable() +export class OutboxRepository { + private readonly logger = new Logger(OutboxRepository.name) + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中保存 Outbox 事件 + */ + async saveInTransaction( + tx: Prisma.TransactionClient, + events: OutboxEventData[], + ): Promise { + if (events.length === 0) return + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`) + + await tx.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }) + + this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`) + } + + /** + * 直接保存 Outbox 事件(不在事务中) + */ + async saveEvents(events: OutboxEventData[]): Promise { + if (events.length === 0) return + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`) + + await this.prisma.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }) + + this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`) + } + + /** + * 获取待发布的事件(按创建时间排序) + */ + async findPendingEvents(limit: number = 100): Promise { + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + }, + orderBy: { + createdAt: 'asc', + }, + take: limit, + }) + + return events.map((e) => this.mapToOutboxEvent(e)) + } + + /** + * 获取需要重试的事件 + */ + async findEventsForRetry(limit: number = 50): Promise { + const now = new Date() + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.FAILED, + retryCount: { + lt: 5, // maxRetries + }, + nextRetryAt: { + lte: now, + }, + }, + orderBy: { + nextRetryAt: 'asc', + }, + take: limit, + }) + + return events.map((e) => this.mapToOutboxEvent(e)) + } + + /** + * 标记事件为已发送(等待消费方确认) + */ + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.SENT, + publishedAt: new Date(), + }, + }) + + this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`) + } + + /** + * 标记事件为已确认(消费方已成功处理) + */ + async markAsConfirmed(eventId: string, eventType?: string): Promise { + const whereClause: Prisma.OutboxEventWhereInput = { + aggregateId: eventId, + status: OutboxStatus.SENT, + } + + if (eventType) { + whereClause.eventType = eventType + } + + const result = await this.prisma.outboxEvent.updateMany({ + where: whereClause, + data: { + status: OutboxStatus.CONFIRMED, + }, + }) + + if (result.count > 0) { + this.logger.log(`[OUTBOX] ✓ Event ${eventId} (${eventType || 'all types'}) confirmed by consumer`) + return true + } + + this.logger.warn(`[OUTBOX] Event ${eventId} (${eventType || 'any'}) not found or not in SENT status`) + return false + } + + /** + * 通过 outbox ID 标记为已确认 + */ + async markAsConfirmedById(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.CONFIRMED, + }, + }) + + this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`) + } + + /** + * 获取已发送但未确认且超时的事件(用于重试) + */ + async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise { + const cutoffTime = new Date() + cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes) + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.SENT, + publishedAt: { + lt: cutoffTime, + }, + retryCount: { + lt: 5, + }, + }, + orderBy: { + publishedAt: 'asc', + }, + take: limit, + }) + + return events.map((e) => this.mapToOutboxEvent(e)) + } + + /** + * 将超时的 SENT 事件重置为 PENDING 以便重发 + */ + async resetSentToPending(id: bigint): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }) + + if (!event) return + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PENDING, + retryCount: event.retryCount + 1, + lastError: 'Consumer confirmation timeout', + }, + }) + + this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`) + } + + /** + * 标记事件为失败并安排重试 + */ + async markAsFailed(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }) + + if (!event) return + + const newRetryCount = event.retryCount + 1 + const isFinalFailure = newRetryCount >= event.maxRetries + + // 指数退避:1min, 2min, 4min, 8min, 16min + const delayMinutes = Math.pow(2, newRetryCount - 1) + const nextRetryAt = new Date() + nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes) + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.FAILED, + retryCount: newRetryCount, + lastError: error, + nextRetryAt: isFinalFailure ? null : nextRetryAt, + }, + }) + + if (isFinalFailure) { + this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`) + } else { + this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`) + } + } + + /** + * 清理已确认的旧事件(保留最近7天) + */ + async cleanupOldEvents(retentionDays: number = 7): Promise { + const cutoffDate = new Date() + cutoffDate.setDate(cutoffDate.getDate() - retentionDays) + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.CONFIRMED, + publishedAt: { + lt: cutoffDate, + }, + }, + }) + + if (result.count > 0) { + this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`) + } + + return result.count + } + + /** + * 获取 Outbox 统计信息 + */ + async getStats(): Promise<{ + pending: number + sent: number + confirmed: number + failed: number + }> { + const [pending, sent, confirmed, failed] = await Promise.all([ + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }), + ]) + + return { pending, sent, confirmed, failed } + } + + private mapToOutboxEvent(record: any): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + topic: record.topic, + key: record.key, + payload: record.payload as Record, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + status: record.status as OutboxStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + createdAt: record.createdAt, + publishedAt: record.publishedAt, + nextRetryAt: record.nextRetryAt, + } + } +}