rwadurian/backend/services/identity-service/src/infrastructure/kafka/outbox-publisher.service.ts

343 lines
9.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
Injectable,
Logger,
OnModuleInit,
OnModuleDestroy,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Producer, logLevel } from 'kafkajs';
import {
OutboxRepository,
OutboxEvent,
} from '../persistence/repositories/outbox.repository';
/**
* Outbox Publisher Service (B方案 - 消费方确认模式)
*
* 轮询 Outbox 表并发布事件到 Kafka
* 使用消费方确认机制保证事件100%被处理
*
* 工作流程:
* 1. 轮询 PENDING 状态的事件
* 2. 发送到 Kafka标记为 SENT等待确认
* 3. 消费方处理成功后发送确认到 identity.events.ack
* 4. 收到确认后标记为 CONFIRMED
* 5. 超时未确认的事件重置为 PENDING 重发
*/
@Injectable()
export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(OutboxPublisherService.name);
private kafka: Kafka;
private producer: Producer;
private isRunning = false;
private pollInterval: NodeJS.Timeout | null = null;
private timeoutCheckInterval: NodeJS.Timeout | null = null;
private cleanupInterval: NodeJS.Timeout | null = null;
private isConnected = false;
// 配置
private readonly pollIntervalMs: number;
private readonly batchSize: number;
private readonly cleanupIntervalMs: number;
private readonly confirmationTimeoutMinutes: number;
private readonly timeoutCheckIntervalMs: number;
constructor(
private readonly outboxRepository: OutboxRepository,
private readonly configService: ConfigService,
) {
this.pollIntervalMs = this.configService.get<number>(
'OUTBOX_POLL_INTERVAL_MS',
1000,
);
this.batchSize = this.configService.get<number>('OUTBOX_BATCH_SIZE', 100);
this.cleanupIntervalMs = this.configService.get<number>(
'OUTBOX_CLEANUP_INTERVAL_MS',
3600000,
); // 1小时
this.confirmationTimeoutMinutes = this.configService.get<number>(
'OUTBOX_CONFIRMATION_TIMEOUT_MINUTES',
5,
);
this.timeoutCheckIntervalMs = this.configService.get<number>(
'OUTBOX_TIMEOUT_CHECK_INTERVAL_MS',
60000,
); // 1分钟
const brokers = this.configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(',');
const clientId = this.configService.get<string>(
'KAFKA_CLIENT_ID',
'identity-service',
);
this.kafka = new Kafka({
clientId: `${clientId}-outbox`,
brokers,
logLevel: logLevel.WARN,
});
this.producer = this.kafka.producer();
this.logger.log(
`[OUTBOX] OutboxPublisher (B方案) configured: ` +
`pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` +
`confirmationTimeout=${this.confirmationTimeoutMinutes}min`,
);
}
async onModuleInit() {
this.logger.log('[OUTBOX] Connecting to Kafka...');
try {
await this.producer.connect();
this.isConnected = true;
this.logger.log('[OUTBOX] Connected to Kafka');
this.start();
} catch (error) {
this.logger.error('[OUTBOX] Failed to connect to Kafka:', error);
this.logger.warn(
'[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table',
);
}
}
async onModuleDestroy() {
this.stop();
if (this.isConnected) {
await this.producer.disconnect();
}
}
/**
* 启动轮询
*/
start(): void {
if (this.isRunning) {
this.logger.warn('[OUTBOX] Publisher already running');
return;
}
this.isRunning = true;
this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...');
// 启动发布轮询
this.pollInterval = setInterval(() => {
this.processOutbox().catch((err) => {
this.logger.error('[OUTBOX] Error processing outbox:', err);
});
}, this.pollIntervalMs);
// 启动超时检查任务B方案核心
this.timeoutCheckInterval = setInterval(() => {
this.checkConfirmationTimeouts().catch((err) => {
this.logger.error(
'[OUTBOX] Error checking confirmation timeouts:',
err,
);
});
}, this.timeoutCheckIntervalMs);
// 启动清理任务
this.cleanupInterval = setInterval(() => {
this.cleanup().catch((err) => {
this.logger.error('[OUTBOX] Error cleaning up outbox:', err);
});
}, this.cleanupIntervalMs);
this.logger.log(
'[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)',
);
}
/**
* 停止轮询
*/
stop(): void {
if (!this.isRunning) return;
this.isRunning = false;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.timeoutCheckInterval) {
clearInterval(this.timeoutCheckInterval);
this.timeoutCheckInterval = null;
}
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.logger.log('[OUTBOX] Outbox publisher stopped');
}
/**
* 处理 Outbox 事件
*/
async processOutbox(): Promise<void> {
if (!this.isConnected) {
return;
}
try {
// 1. 获取待发布事件
const pendingEvents = await this.outboxRepository.findPendingEvents(
this.batchSize,
);
// 2. 获取需要重试的事件
const retryEvents = await this.outboxRepository.findEventsForRetry(
Math.floor(this.batchSize / 2),
);
const allEvents = [...pendingEvents, ...retryEvents];
if (allEvents.length === 0) {
return;
}
this.logger.debug(
`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`,
);
// 3. 逐个发布
for (const event of allEvents) {
await this.publishEvent(event);
}
} catch (error) {
this.logger.error('[OUTBOX] Error in processOutbox:', error);
}
}
/**
* 发布单个事件 (B方案)
*
* 使用 producer.send() 发送到 Kafka成功后标记为 SENT等待消费方确认
* 只有收到消费方确认后才标记为 CONFIRMED
*/
private async publishEvent(event: OutboxEvent): Promise<void> {
try {
this.logger.debug(
`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`,
);
// 构造 Kafka 消息,包含 outboxId 用于确认
const payload = {
...(event.payload as Record<string, unknown>),
_outbox: {
id: event.id.toString(),
aggregateId: event.aggregateId,
eventType: event.eventType,
},
};
// 发布到 Kafka
await this.producer.send({
topic: event.topic,
messages: [
{
key: event.key,
value: JSON.stringify(payload),
},
],
});
// B方案标记为 SENT等待消费方确认
await this.outboxRepository.markAsSent(event.id);
this.logger.log(
`[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`,
);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
this.logger.error(
`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`,
);
// 标记为失败并安排重试
await this.outboxRepository.markAsFailed(event.id, errorMessage);
}
}
/**
* 检查确认超时的事件 (B方案核心)
*
* 将超时未确认的 SENT 事件重置为 PENDING 以便重发
*/
private async checkConfirmationTimeouts(): Promise<void> {
if (!this.isConnected) {
return;
}
try {
const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut(
this.confirmationTimeoutMinutes,
this.batchSize,
);
if (timedOutEvents.length === 0) {
return;
}
this.logger.warn(
`[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`,
);
for (const event of timedOutEvents) {
await this.outboxRepository.resetSentToPending(event.id);
this.logger.warn(
`[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`,
);
}
} catch (error) {
this.logger.error(
'[OUTBOX] Error checking confirmation timeouts:',
error,
);
}
}
/**
* 清理旧事件
*/
private async cleanup(): Promise<void> {
const retentionDays = this.configService.get<number>(
'OUTBOX_RETENTION_DAYS',
7,
);
await this.outboxRepository.cleanupOldEvents(retentionDays);
}
/**
* 手动触发处理(用于测试或紧急情况)
*/
async triggerProcessing(): Promise<void> {
this.logger.log('[OUTBOX] Manual processing triggered');
await this.processOutbox();
}
/**
* 获取统计信息
*/
async getStats(): Promise<{
isRunning: boolean;
isConnected: boolean;
pending: number;
sent: number;
confirmed: number;
failed: number;
}> {
const stats = await this.outboxRepository.getStats();
return {
isRunning: this.isRunning,
isConnected: this.isConnected,
...stats,
};
}
}