281 lines
8.5 KiB
TypeScript
281 lines
8.5 KiB
TypeScript
import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import { ClientKafka } from '@nestjs/microservices';
|
||
import { OutboxRepository, OutboxEvent, OutboxStatus } from '../persistence/repositories/outbox.repository';
|
||
|
||
/**
|
||
* Outbox Publisher Service (B方案 - 消费方确认模式)
|
||
*
|
||
* 轮询 Outbox 表并发布事件到 Kafka
|
||
* 使用消费方确认机制保证事件100%被处理
|
||
*
|
||
* 工作流程:
|
||
* 1. 轮询 PENDING 状态的事件
|
||
* 2. 发送到 Kafka,标记为 SENT(等待确认)
|
||
* 3. 消费方(wallet-service)处理成功后发送确认到 reward.events.ack
|
||
* 4. 收到确认后标记为 CONFIRMED
|
||
* 5. 超时未确认的事件重置为 PENDING 重发
|
||
*/
|
||
@Injectable()
|
||
export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy {
|
||
private readonly logger = new Logger(OutboxPublisherService.name);
|
||
private isRunning = false;
|
||
private pollInterval: NodeJS.Timeout | null = null;
|
||
private timeoutCheckInterval: NodeJS.Timeout | null = null;
|
||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||
private isConnected = false;
|
||
|
||
// 配置
|
||
private readonly pollIntervalMs: number;
|
||
private readonly batchSize: number;
|
||
private readonly cleanupIntervalMs: number;
|
||
private readonly confirmationTimeoutMinutes: number;
|
||
private readonly timeoutCheckIntervalMs: number;
|
||
|
||
constructor(
|
||
@Inject('KAFKA_SERVICE')
|
||
private readonly kafkaClient: ClientKafka,
|
||
private readonly outboxRepository: OutboxRepository,
|
||
private readonly configService: ConfigService,
|
||
) {
|
||
this.pollIntervalMs = this.configService.get<number>('OUTBOX_POLL_INTERVAL_MS', 1000);
|
||
this.batchSize = this.configService.get<number>('OUTBOX_BATCH_SIZE', 100);
|
||
this.cleanupIntervalMs = this.configService.get<number>('OUTBOX_CLEANUP_INTERVAL_MS', 3600000); // 1小时
|
||
this.confirmationTimeoutMinutes = this.configService.get<number>('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5);
|
||
this.timeoutCheckIntervalMs = this.configService.get<number>('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000); // 1分钟
|
||
|
||
this.logger.log(
|
||
`[OUTBOX] OutboxPublisher (B方案) configured: ` +
|
||
`pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` +
|
||
`confirmationTimeout=${this.confirmationTimeoutMinutes}min`,
|
||
);
|
||
}
|
||
|
||
async onModuleInit() {
|
||
this.logger.log('[OUTBOX] Connecting to Kafka...');
|
||
try {
|
||
await this.kafkaClient.connect();
|
||
this.isConnected = true;
|
||
this.logger.log('[OUTBOX] Connected to Kafka');
|
||
this.start();
|
||
} catch (error) {
|
||
this.logger.error('[OUTBOX] Failed to connect to Kafka:', error);
|
||
this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table');
|
||
}
|
||
}
|
||
|
||
async onModuleDestroy() {
|
||
this.stop();
|
||
if (this.isConnected) {
|
||
await this.kafkaClient.close();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 启动轮询
|
||
*/
|
||
start(): void {
|
||
if (this.isRunning) {
|
||
this.logger.warn('[OUTBOX] Publisher already running');
|
||
return;
|
||
}
|
||
|
||
this.isRunning = true;
|
||
this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...');
|
||
|
||
// 启动发布轮询
|
||
this.pollInterval = setInterval(() => {
|
||
this.processOutbox().catch((err) => {
|
||
this.logger.error('[OUTBOX] Error processing outbox:', err);
|
||
});
|
||
}, this.pollIntervalMs);
|
||
|
||
// 启动超时检查任务(B方案核心)
|
||
this.timeoutCheckInterval = setInterval(() => {
|
||
this.checkConfirmationTimeouts().catch((err) => {
|
||
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err);
|
||
});
|
||
}, this.timeoutCheckIntervalMs);
|
||
|
||
// 启动清理任务
|
||
this.cleanupInterval = setInterval(() => {
|
||
this.cleanup().catch((err) => {
|
||
this.logger.error('[OUTBOX] Error cleaning up outbox:', err);
|
||
});
|
||
}, this.cleanupIntervalMs);
|
||
|
||
this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)');
|
||
}
|
||
|
||
/**
|
||
* 停止轮询
|
||
*/
|
||
stop(): void {
|
||
if (!this.isRunning) return;
|
||
|
||
this.isRunning = false;
|
||
|
||
if (this.pollInterval) {
|
||
clearInterval(this.pollInterval);
|
||
this.pollInterval = null;
|
||
}
|
||
|
||
if (this.timeoutCheckInterval) {
|
||
clearInterval(this.timeoutCheckInterval);
|
||
this.timeoutCheckInterval = null;
|
||
}
|
||
|
||
if (this.cleanupInterval) {
|
||
clearInterval(this.cleanupInterval);
|
||
this.cleanupInterval = null;
|
||
}
|
||
|
||
this.logger.log('[OUTBOX] Outbox publisher stopped');
|
||
}
|
||
|
||
/**
|
||
* 处理 Outbox 事件
|
||
*/
|
||
async processOutbox(): Promise<void> {
|
||
if (!this.isConnected) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
// 1. 获取待发布事件
|
||
const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize);
|
||
|
||
// 2. 获取需要重试的事件
|
||
const retryEvents = await this.outboxRepository.findEventsForRetry(Math.floor(this.batchSize / 2));
|
||
|
||
const allEvents = [...pendingEvents, ...retryEvents];
|
||
|
||
if (allEvents.length === 0) {
|
||
return;
|
||
}
|
||
|
||
this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`);
|
||
|
||
// 3. 逐个发布
|
||
for (const event of allEvents) {
|
||
await this.publishEvent(event);
|
||
}
|
||
} catch (error) {
|
||
this.logger.error('[OUTBOX] Error in processOutbox:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 发布单个事件 (B方案)
|
||
*
|
||
* 使用 emit() 发送到 Kafka,成功后标记为 SENT(等待消费方确认)
|
||
* 只有收到消费方确认后才标记为 CONFIRMED
|
||
*/
|
||
private async publishEvent(event: OutboxEvent): Promise<void> {
|
||
try {
|
||
this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`);
|
||
|
||
// 构造 Kafka 消息,包含 outboxId 用于确认
|
||
const payload = {
|
||
...(event.payload as Record<string, unknown>),
|
||
_outbox: {
|
||
id: event.id.toString(),
|
||
aggregateId: event.aggregateId,
|
||
eventType: event.eventType,
|
||
},
|
||
};
|
||
|
||
const message = {
|
||
key: event.key,
|
||
value: JSON.stringify(payload),
|
||
};
|
||
|
||
// 发布到 Kafka
|
||
this.kafkaClient.emit(event.topic, message);
|
||
|
||
// B方案:标记为 SENT(等待消费方确认)
|
||
await this.outboxRepository.markAsSent(event.id);
|
||
|
||
this.logger.log(
|
||
`[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`,
|
||
);
|
||
} catch (error) {
|
||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||
this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`);
|
||
|
||
// 标记为失败并安排重试
|
||
await this.outboxRepository.markAsFailed(event.id, errorMessage);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查确认超时的事件 (B方案核心)
|
||
*
|
||
* 将超时未确认的 SENT 事件重置为 PENDING 以便重发
|
||
*/
|
||
private async checkConfirmationTimeouts(): Promise<void> {
|
||
if (!this.isConnected) {
|
||
return;
|
||
}
|
||
|
||
try {
|
||
const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut(
|
||
this.confirmationTimeoutMinutes,
|
||
this.batchSize,
|
||
);
|
||
|
||
if (timedOutEvents.length === 0) {
|
||
return;
|
||
}
|
||
|
||
this.logger.warn(
|
||
`[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`,
|
||
);
|
||
|
||
for (const event of timedOutEvents) {
|
||
await this.outboxRepository.resetSentToPending(event.id);
|
||
this.logger.warn(
|
||
`[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`,
|
||
);
|
||
}
|
||
} catch (error) {
|
||
this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清理旧事件
|
||
*/
|
||
private async cleanup(): Promise<void> {
|
||
const retentionDays = this.configService.get<number>('OUTBOX_RETENTION_DAYS', 7);
|
||
await this.outboxRepository.cleanupOldEvents(retentionDays);
|
||
}
|
||
|
||
/**
|
||
* 手动触发处理(用于测试或紧急情况)
|
||
*/
|
||
async triggerProcessing(): Promise<void> {
|
||
this.logger.log('[OUTBOX] Manual processing triggered');
|
||
await this.processOutbox();
|
||
}
|
||
|
||
/**
|
||
* 获取统计信息
|
||
*/
|
||
async getStats(): Promise<{
|
||
isRunning: boolean;
|
||
isConnected: boolean;
|
||
pending: number;
|
||
sent: number;
|
||
confirmed: number;
|
||
failed: number;
|
||
}> {
|
||
const stats = await this.outboxRepository.getStats();
|
||
return {
|
||
isRunning: this.isRunning,
|
||
isConnected: this.isConnected,
|
||
...stats,
|
||
};
|
||
}
|
||
}
|