feat(authorization): 实现 Outbox 模式事件发布

## 概述
为 authorization-service 实现 Outbox 模式,确保数据库事务和 Kafka 事件发布的原子性。

## 新增表
- OutboxEvent: 事件暂存表,用于事务性事件发布

## 新增组件
- OutboxRepository: Outbox 事件持久化
- OutboxPublisherService: 轮询发布未处理事件到 Kafka

## 支持的事件
- authorization-events: 授权角色创建/更新事件(省公司、市公司授权)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-18 00:30:09 -08:00
parent 05a9ca31f6
commit f65b0d14b7
5 changed files with 701 additions and 2 deletions

View File

@ -0,0 +1,31 @@
-- CreateTable
CREATE TABLE "outbox_events" (
"outbox_id" BIGSERIAL NOT NULL,
"event_type" VARCHAR(100) NOT NULL,
"topic" VARCHAR(100) NOT NULL,
"key" VARCHAR(200) NOT NULL,
"payload" JSONB NOT NULL,
"aggregate_id" VARCHAR(100) NOT NULL,
"aggregate_type" VARCHAR(50) NOT NULL,
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
"retry_count" INTEGER NOT NULL DEFAULT 0,
"max_retries" INTEGER NOT NULL DEFAULT 5,
"last_error" TEXT,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"published_at" TIMESTAMP(3),
"next_retry_at" TIMESTAMP(3),
CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id")
);
-- CreateIndex
CREATE INDEX "outbox_events_status_created_at_idx" ON "outbox_events"("status", "created_at");
-- CreateIndex
CREATE INDEX "outbox_events_status_next_retry_at_idx" ON "outbox_events"("status", "next_retry_at");
-- CreateIndex
CREATE INDEX "outbox_events_aggregate_type_aggregate_id_idx" ON "outbox_events"("aggregate_type", "aggregate_id");
-- CreateIndex
CREATE INDEX "outbox_events_topic_idx" ON "outbox_events"("topic");

View File

@ -482,3 +482,38 @@ model SystemAccountLedger {
@@index([txHash], name: "idx_system_ledger_tx_hash")
@@map("system_account_ledgers")
}
// ============================================
// Outbox 事件表 - 保证事件可靠发送
// 使用 Outbox Pattern 确保领域事件100%送达
// ============================================
model OutboxEvent {
id BigInt @id @default(autoincrement()) @map("outbox_id")
// 事件信息
eventType String @map("event_type") @db.VarChar(100)
topic String @map("topic") @db.VarChar(100)
key String @map("key") @db.VarChar(200)
payload Json @map("payload")
// 聚合根信息 (用于幂等性检查)
aggregateId String @map("aggregate_id") @db.VarChar(100)
aggregateType String @map("aggregate_type") @db.VarChar(50)
// 发布状态: PENDING, SENT, CONFIRMED, FAILED
status String @default("PENDING") @map("status") @db.VarChar(20)
retryCount Int @default(0) @map("retry_count")
maxRetries Int @default(5) @map("max_retries")
lastError String? @map("last_error") @db.Text
// 时间戳
createdAt DateTime @default(now()) @map("created_at")
publishedAt DateTime? @map("published_at")
nextRetryAt DateTime? @map("next_retry_at")
@@index([status, createdAt])
@@index([status, nextRetryAt])
@@index([aggregateType, aggregateId])
@@index([topic])
@@map("outbox_events")
}

View File

@ -3,6 +3,9 @@ import { ConfigModule, ConfigService } from '@nestjs/config'
import { ClientsModule, Transport } from '@nestjs/microservices'
import { EventPublisherService } from './event-publisher.service'
import { EventAckPublisher } from './event-ack.publisher'
import { OutboxPublisherService } from './outbox-publisher.service'
import { OutboxRepository } from '../persistence/repositories/outbox.repository'
import { PrismaService } from '../persistence/prisma/prisma.service'
@Global()
@Module({
@ -27,7 +30,19 @@ import { EventAckPublisher } from './event-ack.publisher'
},
]),
],
providers: [EventPublisherService, EventAckPublisher],
exports: [EventPublisherService, EventAckPublisher, ClientsModule],
providers: [
PrismaService,
EventPublisherService,
EventAckPublisher,
OutboxRepository,
OutboxPublisherService,
],
exports: [
EventPublisherService,
EventAckPublisher,
OutboxRepository,
OutboxPublisherService,
ClientsModule,
],
})
export class KafkaModule {}

View File

@ -0,0 +1,288 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import { Kafka, Producer, logLevel } from 'kafkajs'
import { OutboxRepository, OutboxEvent } from '../persistence/repositories/outbox.repository'
/**
* Outbox Publisher Service (B方案 - )
*
* Outbox Kafka
* 使100%
*
* :
* 1. PENDING
* 2. Kafka SENT
* 3. authorization.events.ack
* 4. CONFIRMED
* 5. PENDING
*/
@Injectable()
export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(OutboxPublisherService.name)
private kafka: Kafka
private producer: Producer
private isRunning = false
private pollInterval: NodeJS.Timeout | null = null
private timeoutCheckInterval: NodeJS.Timeout | null = null
private cleanupInterval: NodeJS.Timeout | null = null
private isConnected = false
// 配置
private readonly pollIntervalMs: number
private readonly batchSize: number
private readonly cleanupIntervalMs: number
private readonly confirmationTimeoutMinutes: number
private readonly timeoutCheckIntervalMs: number
constructor(
private readonly outboxRepository: OutboxRepository,
private readonly configService: ConfigService,
) {
this.pollIntervalMs = this.configService.get<number>('OUTBOX_POLL_INTERVAL_MS', 1000)
this.batchSize = this.configService.get<number>('OUTBOX_BATCH_SIZE', 100)
this.cleanupIntervalMs = this.configService.get<number>('OUTBOX_CLEANUP_INTERVAL_MS', 3600000) // 1小时
this.confirmationTimeoutMinutes = this.configService.get<number>('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5)
this.timeoutCheckIntervalMs = this.configService.get<number>('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000) // 1分钟
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',')
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID', 'authorization-service')
this.kafka = new Kafka({
clientId: `${clientId}-outbox`,
brokers,
logLevel: logLevel.WARN,
})
this.producer = this.kafka.producer()
this.logger.log(
`[OUTBOX] OutboxPublisher (B方案) configured: ` +
`pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` +
`confirmationTimeout=${this.confirmationTimeoutMinutes}min`,
)
}
async onModuleInit() {
this.logger.log('[OUTBOX] Connecting to Kafka...')
try {
await this.producer.connect()
this.isConnected = true
this.logger.log('[OUTBOX] Connected to Kafka')
this.start()
} catch (error) {
this.logger.error('[OUTBOX] Failed to connect to Kafka:', error)
this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table')
}
}
async onModuleDestroy() {
this.stop()
if (this.isConnected) {
await this.producer.disconnect()
}
}
/**
*
*/
start(): void {
if (this.isRunning) {
this.logger.warn('[OUTBOX] Publisher already running')
return
}
this.isRunning = true
this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...')
// 启动发布轮询
this.pollInterval = setInterval(() => {
this.processOutbox().catch((err) => {
this.logger.error('[OUTBOX] Error processing outbox:', err)
})
}, this.pollIntervalMs)
// 启动超时检查任务B方案核心
this.timeoutCheckInterval = setInterval(() => {
this.checkConfirmationTimeouts().catch((err) => {
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err)
})
}, this.timeoutCheckIntervalMs)
// 启动清理任务
this.cleanupInterval = setInterval(() => {
this.cleanup().catch((err) => {
this.logger.error('[OUTBOX] Error cleaning up outbox:', err)
})
}, this.cleanupIntervalMs)
this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)')
}
/**
*
*/
stop(): void {
if (!this.isRunning) return
this.isRunning = false
if (this.pollInterval) {
clearInterval(this.pollInterval)
this.pollInterval = null
}
if (this.timeoutCheckInterval) {
clearInterval(this.timeoutCheckInterval)
this.timeoutCheckInterval = null
}
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval)
this.cleanupInterval = null
}
this.logger.log('[OUTBOX] Outbox publisher stopped')
}
/**
* Outbox
*/
async processOutbox(): Promise<void> {
if (!this.isConnected) {
return
}
try {
// 1. 获取待发布事件
const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize)
// 2. 获取需要重试的事件
const retryEvents = await this.outboxRepository.findEventsForRetry(Math.floor(this.batchSize / 2))
const allEvents = [...pendingEvents, ...retryEvents]
if (allEvents.length === 0) {
return
}
this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`)
// 3. 逐个发布
for (const event of allEvents) {
await this.publishEvent(event)
}
} catch (error) {
this.logger.error('[OUTBOX] Error in processOutbox:', error)
}
}
/**
* (B方案)
*/
private async publishEvent(event: OutboxEvent): Promise<void> {
try {
this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`)
// 构造 Kafka 消息,包含 outboxId 用于确认
const payload = {
...(event.payload as Record<string, unknown>),
_outbox: {
id: event.id.toString(),
aggregateId: event.aggregateId,
eventType: event.eventType,
},
}
// 发布到 Kafka
await this.producer.send({
topic: event.topic,
messages: [
{
key: event.key,
value: JSON.stringify(payload),
},
],
})
// B方案标记为 SENT等待消费方确认
await this.outboxRepository.markAsSent(event.id)
this.logger.log(
`[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`,
)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`)
// 标记为失败并安排重试
await this.outboxRepository.markAsFailed(event.id, errorMessage)
}
}
/**
* (B方案核心)
*/
private async checkConfirmationTimeouts(): Promise<void> {
if (!this.isConnected) {
return
}
try {
const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut(
this.confirmationTimeoutMinutes,
this.batchSize,
)
if (timedOutEvents.length === 0) {
return
}
this.logger.warn(
`[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`,
)
for (const event of timedOutEvents) {
await this.outboxRepository.resetSentToPending(event.id)
this.logger.warn(
`[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`,
)
}
} catch (error) {
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error)
}
}
/**
*
*/
private async cleanup(): Promise<void> {
const retentionDays = this.configService.get<number>('OUTBOX_RETENTION_DAYS', 7)
await this.outboxRepository.cleanupOldEvents(retentionDays)
}
/**
*
*/
async triggerProcessing(): Promise<void> {
this.logger.log('[OUTBOX] Manual processing triggered')
await this.processOutbox()
}
/**
*
*/
async getStats(): Promise<{
isRunning: boolean
isConnected: boolean
pending: number
sent: number
confirmed: number
failed: number
}> {
const stats = await this.outboxRepository.getStats()
return {
isRunning: this.isRunning,
isConnected: this.isConnected,
...stats,
}
}
}

View File

@ -0,0 +1,330 @@
import { Injectable, Logger } from '@nestjs/common'
import { PrismaService } from '../prisma/prisma.service'
import { Prisma } from '@prisma/client'
export enum OutboxStatus {
PENDING = 'PENDING', // 待发送
SENT = 'SENT', // 已发送到 Kafka等待消费方确认
CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功
FAILED = 'FAILED', // 发送失败,等待重试
}
export interface OutboxEventData {
eventType: string
topic: string
key: string
payload: Record<string, unknown>
aggregateId: string
aggregateType: string
}
export interface OutboxEvent extends OutboxEventData {
id: bigint
status: OutboxStatus
retryCount: number
maxRetries: number
lastError: string | null
createdAt: Date
publishedAt: Date | null
nextRetryAt: Date | null
}
@Injectable()
export class OutboxRepository {
private readonly logger = new Logger(OutboxRepository.name)
constructor(private readonly prisma: PrismaService) {}
/**
* Outbox
*/
async saveInTransaction(
tx: Prisma.TransactionClient,
events: OutboxEventData[],
): Promise<void> {
if (events.length === 0) return
this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`)
await tx.outboxEvent.createMany({
data: events.map((event) => ({
eventType: event.eventType,
topic: event.topic,
key: event.key,
payload: event.payload as Prisma.JsonObject,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
status: OutboxStatus.PENDING,
})),
})
this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`)
}
/**
* Outbox
*/
async saveEvents(events: OutboxEventData[]): Promise<void> {
if (events.length === 0) return
this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`)
await this.prisma.outboxEvent.createMany({
data: events.map((event) => ({
eventType: event.eventType,
topic: event.topic,
key: event.key,
payload: event.payload as Prisma.JsonObject,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
status: OutboxStatus.PENDING,
})),
})
this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`)
}
/**
*
*/
async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.PENDING,
},
orderBy: {
createdAt: 'asc',
},
take: limit,
})
return events.map((e) => this.mapToOutboxEvent(e))
}
/**
*
*/
async findEventsForRetry(limit: number = 50): Promise<OutboxEvent[]> {
const now = new Date()
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.FAILED,
retryCount: {
lt: 5, // maxRetries
},
nextRetryAt: {
lte: now,
},
},
orderBy: {
nextRetryAt: 'asc',
},
take: limit,
})
return events.map((e) => this.mapToOutboxEvent(e))
}
/**
*
*/
async markAsSent(id: bigint): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.SENT,
publishedAt: new Date(),
},
})
this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`)
}
/**
*
*/
async markAsConfirmed(eventId: string, eventType?: string): Promise<boolean> {
const whereClause: Prisma.OutboxEventWhereInput = {
aggregateId: eventId,
status: OutboxStatus.SENT,
}
if (eventType) {
whereClause.eventType = eventType
}
const result = await this.prisma.outboxEvent.updateMany({
where: whereClause,
data: {
status: OutboxStatus.CONFIRMED,
},
})
if (result.count > 0) {
this.logger.log(`[OUTBOX] ✓ Event ${eventId} (${eventType || 'all types'}) confirmed by consumer`)
return true
}
this.logger.warn(`[OUTBOX] Event ${eventId} (${eventType || 'any'}) not found or not in SENT status`)
return false
}
/**
* outbox ID
*/
async markAsConfirmedById(id: bigint): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.CONFIRMED,
},
})
this.logger.log(`[OUTBOX] ✓ Event ${id} confirmed by consumer`)
}
/**
*
*/
async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise<OutboxEvent[]> {
const cutoffTime = new Date()
cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes)
const events = await this.prisma.outboxEvent.findMany({
where: {
status: OutboxStatus.SENT,
publishedAt: {
lt: cutoffTime,
},
retryCount: {
lt: 5,
},
},
orderBy: {
publishedAt: 'asc',
},
take: limit,
})
return events.map((e) => this.mapToOutboxEvent(e))
}
/**
* SENT PENDING 便
*/
async resetSentToPending(id: bigint): Promise<void> {
const event = await this.prisma.outboxEvent.findUnique({
where: { id },
})
if (!event) return
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.PENDING,
retryCount: event.retryCount + 1,
lastError: 'Consumer confirmation timeout',
},
})
this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`)
}
/**
*
*/
async markAsFailed(id: bigint, error: string): Promise<void> {
const event = await this.prisma.outboxEvent.findUnique({
where: { id },
})
if (!event) return
const newRetryCount = event.retryCount + 1
const isFinalFailure = newRetryCount >= event.maxRetries
// 指数退避1min, 2min, 4min, 8min, 16min
const delayMinutes = Math.pow(2, newRetryCount - 1)
const nextRetryAt = new Date()
nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes)
await this.prisma.outboxEvent.update({
where: { id },
data: {
status: OutboxStatus.FAILED,
retryCount: newRetryCount,
lastError: error,
nextRetryAt: isFinalFailure ? null : nextRetryAt,
},
})
if (isFinalFailure) {
this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`)
} else {
this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`)
}
}
/**
* 7
*/
async cleanupOldEvents(retentionDays: number = 7): Promise<number> {
const cutoffDate = new Date()
cutoffDate.setDate(cutoffDate.getDate() - retentionDays)
const result = await this.prisma.outboxEvent.deleteMany({
where: {
status: OutboxStatus.CONFIRMED,
publishedAt: {
lt: cutoffDate,
},
},
})
if (result.count > 0) {
this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`)
}
return result.count
}
/**
* Outbox
*/
async getStats(): Promise<{
pending: number
sent: number
confirmed: number
failed: number
}> {
const [pending, sent, confirmed, failed] = await Promise.all([
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }),
this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }),
])
return { pending, sent, confirmed, failed }
}
private mapToOutboxEvent(record: any): OutboxEvent {
return {
id: record.id,
eventType: record.eventType,
topic: record.topic,
key: record.key,
payload: record.payload as Record<string, unknown>,
aggregateId: record.aggregateId,
aggregateType: record.aggregateType,
status: record.status as OutboxStatus,
retryCount: record.retryCount,
maxRetries: record.maxRetries,
lastError: record.lastError,
createdAt: record.createdAt,
publishedAt: record.publishedAt,
nextRetryAt: record.nextRetryAt,
}
}
}