rwadurian/backend/services/identity-service/src/infrastructure/kafka/dead-letter.service.ts

84 lines
2.3 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../persistence/prisma/prisma.service';
import { DomainEventMessage } from './event-publisher.service';
@Injectable()
export class DeadLetterService {
private readonly logger = new Logger(DeadLetterService.name);
constructor(private readonly prisma: PrismaService) {}
async saveFailedEvent(
topic: string,
message: DomainEventMessage,
error: Error,
retryCount: number,
): Promise<void> {
await this.prisma.deadLetterEvent.create({
data: {
topic,
eventId: message.eventId,
eventType: message.eventType,
aggregateId: message.aggregateId,
aggregateType: message.aggregateType,
payload: message.payload,
errorMessage: error.message,
errorStack: error.stack,
retryCount,
createdAt: new Date(),
},
});
this.logger.warn(`Event saved to dead letter queue: ${message.eventId}`);
}
async getFailedEvents(limit: number = 100): Promise<any[]> {
return this.prisma.deadLetterEvent.findMany({
where: { processedAt: null },
orderBy: { createdAt: 'asc' },
take: limit,
});
}
async markAsProcessed(id: bigint): Promise<void> {
await this.prisma.deadLetterEvent.update({
where: { id },
data: { processedAt: new Date() },
});
this.logger.log(`Dead letter event marked as processed: ${id}`);
}
async incrementRetryCount(id: bigint): Promise<void> {
await this.prisma.deadLetterEvent.update({
where: { id },
data: { retryCount: { increment: 1 } },
});
}
async getStatistics(): Promise<{
total: number;
pending: number;
processed: number;
byTopic: Record<string, number>;
}> {
const [total, pending, processed, byTopic] = await Promise.all([
this.prisma.deadLetterEvent.count(),
this.prisma.deadLetterEvent.count({ where: { processedAt: null } }),
this.prisma.deadLetterEvent.count({ where: { processedAt: { not: null } } }),
this.prisma.deadLetterEvent.groupBy({
by: ['topic'],
_count: true,
where: { processedAt: null },
}),
]);
const topicStats: Record<string, number> = {};
for (const item of byTopic) {
topicStats[item.topic] = item._count;
}
return { total, pending, processed, byTopic: topicStats };
}
}