import { Injectable, Logger } from '@nestjs/common'; import { EventPublisherService } from './event-publisher.service'; import { DeadLetterService } from './dead-letter.service'; @Injectable() export class EventRetryService { private readonly logger = new Logger(EventRetryService.name); private readonly maxRetries = 3; private isRunning = false; constructor( private readonly eventPublisher: EventPublisherService, private readonly deadLetterService: DeadLetterService, ) {} // 可以通过 API 手动触发或由外部调度器调用 async retryFailedEvents(): Promise { if (this.isRunning) { this.logger.debug('Retry job already running, skipping'); return; } this.isRunning = true; this.logger.log('Starting failed events retry job'); try { const failedEvents = await this.deadLetterService.getFailedEvents(50); let successCount = 0; let failCount = 0; for (const event of failedEvents) { if (event.retryCount >= this.maxRetries) { this.logger.warn( `Event ${event.eventId} exceeded max retries (${this.maxRetries}), skipping`, ); continue; } try { await this.eventPublisher.publish(event.topic, { eventId: event.eventId, occurredAt: event.createdAt.toISOString(), aggregateId: event.aggregateId, aggregateType: event.aggregateType, eventType: event.eventType, payload: event.payload, }); await this.deadLetterService.markAsProcessed(event.id); successCount++; this.logger.log(`Successfully retried event: ${event.eventId}`); } catch (error) { failCount++; await this.deadLetterService.incrementRetryCount(event.id); this.logger.error(`Failed to retry event: ${event.eventId}`, error); } } this.logger.log( `Finished retry job: ${successCount} succeeded, ${failCount} failed`, ); } finally { this.isRunning = false; } } async manualRetry(eventId: string): Promise { const events = await this.deadLetterService.getFailedEvents(1000); const event = events.find((e) => e.eventId === eventId); if (!event) { this.logger.warn(`Event not found: ${eventId}`); return false; } try { await this.eventPublisher.publish(event.topic, { eventId: event.eventId, occurredAt: event.createdAt.toISOString(), aggregateId: event.aggregateId, aggregateType: event.aggregateType, eventType: event.eventType, payload: event.payload, }); await this.deadLetterService.markAsProcessed(event.id); this.logger.log(`Manually retried event: ${eventId}`); return true; } catch (error) { this.logger.error(`Failed to manually retry event: ${eventId}`, error); return false; } } }