rwadurian/backend/services/identity-service/src/infrastructure/kafka/event-retry.service.ts

95 lines
2.9 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { EventPublisherService } from './event-publisher.service';
import { DeadLetterService } from './dead-letter.service';
@Injectable()
export class EventRetryService {
private readonly logger = new Logger(EventRetryService.name);
private readonly maxRetries = 3;
private isRunning = false;
constructor(
private readonly eventPublisher: EventPublisherService,
private readonly deadLetterService: DeadLetterService,
) {}
// 可以通过 API 手动触发或由外部调度器调用
async retryFailedEvents(): Promise<void> {
if (this.isRunning) {
this.logger.debug('Retry job already running, skipping');
return;
}
this.isRunning = true;
this.logger.log('Starting failed events retry job');
try {
const failedEvents = await this.deadLetterService.getFailedEvents(50);
let successCount = 0;
let failCount = 0;
for (const event of failedEvents) {
if (event.retryCount >= this.maxRetries) {
this.logger.warn(
`Event ${event.eventId} exceeded max retries (${this.maxRetries}), skipping`,
);
continue;
}
try {
await this.eventPublisher.publish(event.topic, {
eventId: event.eventId,
occurredAt: event.createdAt.toISOString(),
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
payload: event.payload,
});
await this.deadLetterService.markAsProcessed(event.id);
successCount++;
this.logger.log(`Successfully retried event: ${event.eventId}`);
} catch (error) {
failCount++;
await this.deadLetterService.incrementRetryCount(event.id);
this.logger.error(`Failed to retry event: ${event.eventId}`, error);
}
}
this.logger.log(
`Finished retry job: ${successCount} succeeded, ${failCount} failed`,
);
} finally {
this.isRunning = false;
}
}
async manualRetry(eventId: string): Promise<boolean> {
const events = await this.deadLetterService.getFailedEvents(1000);
const event = events.find((e) => e.eventId === eventId);
if (!event) {
this.logger.warn(`Event not found: ${eventId}`);
return false;
}
try {
await this.eventPublisher.publish(event.topic, {
eventId: event.eventId,
occurredAt: event.createdAt.toISOString(),
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventType: event.eventType,
payload: event.payload,
});
await this.deadLetterService.markAsProcessed(event.id);
this.logger.log(`Manually retried event: ${eventId}`);
return true;
} catch (error) {
this.logger.error(`Failed to manually retry event: ${eventId}`, error);
return false;
}
}
}