127 lines
4.0 KiB
TypeScript
127 lines
4.0 KiB
TypeScript
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
import { Repository, LessThan } from 'typeorm';
|
|
import { Kafka, Producer } from 'kafkajs';
|
|
import { OutboxEvent } from './outbox.entity';
|
|
|
|
/**
|
|
* Outbox Relay - polls the outbox table and publishes pending events to Kafka.
|
|
* This is the FALLBACK mechanism when Debezium CDC is not available.
|
|
* In production, Debezium CDC watches the outbox table via PostgreSQL WAL.
|
|
*
|
|
* Retry strategy: exponential backoff within 24h total window.
|
|
* Retry delays: 1s, 2s, 4s, 8s, 16s (max 5 retries, ~31s total)
|
|
* After max retries or 24h expiry, event is marked as 'failed'.
|
|
*/
|
|
@Injectable()
|
|
export class OutboxRelayService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger('OutboxRelay');
|
|
private producer: Producer;
|
|
private intervalHandle: NodeJS.Timeout;
|
|
private isRunning = false;
|
|
|
|
constructor(
|
|
@InjectRepository(OutboxEvent)
|
|
private readonly outboxRepo: Repository<OutboxEvent>,
|
|
) {}
|
|
|
|
async onModuleInit() {
|
|
const kafka = new Kafka({
|
|
clientId: 'outbox-relay',
|
|
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
|
|
});
|
|
this.producer = kafka.producer({
|
|
idempotent: true, // Kafka producer-level idempotency
|
|
});
|
|
await this.producer.connect();
|
|
this.logger.log('Outbox Relay connected to Kafka');
|
|
|
|
// Poll every 100ms for pending events
|
|
this.intervalHandle = setInterval(() => this.processOutbox(), 100);
|
|
}
|
|
|
|
async onModuleDestroy() {
|
|
if (this.intervalHandle) {
|
|
clearInterval(this.intervalHandle);
|
|
}
|
|
if (this.producer) {
|
|
await this.producer.disconnect();
|
|
}
|
|
}
|
|
|
|
private async processOutbox(): Promise<void> {
|
|
if (this.isRunning) return; // Prevent concurrent processing
|
|
this.isRunning = true;
|
|
|
|
try {
|
|
// Fetch batch of pending events (oldest first)
|
|
const events = await this.outboxRepo.find({
|
|
where: { status: 'pending' },
|
|
order: { createdAt: 'ASC' },
|
|
take: 100,
|
|
});
|
|
|
|
for (const event of events) {
|
|
// Check if expired (24h window)
|
|
if (new Date() > event.expiresAt) {
|
|
event.status = 'failed';
|
|
await this.outboxRepo.save(event);
|
|
this.logger.warn(
|
|
`Outbox event ${event.id} expired after 24h, marked as failed`,
|
|
);
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
await this.producer.send({
|
|
topic: event.topic,
|
|
messages: [
|
|
{
|
|
key: event.partitionKey || event.aggregateId,
|
|
value: JSON.stringify(event.payload),
|
|
headers: {
|
|
eventId: event.id,
|
|
eventType: event.eventType,
|
|
aggregateType: event.aggregateType,
|
|
aggregateId: event.aggregateId,
|
|
...Object.fromEntries(
|
|
Object.entries(event.headers || {}).map(([k, v]) => [
|
|
k,
|
|
String(v),
|
|
]),
|
|
),
|
|
},
|
|
},
|
|
],
|
|
});
|
|
|
|
// Mark as published
|
|
event.status = 'published';
|
|
event.publishedAt = new Date();
|
|
await this.outboxRepo.save(event);
|
|
} catch (error) {
|
|
// Increment retry count with exponential backoff
|
|
event.retryCount += 1;
|
|
if (event.retryCount >= event.maxRetries) {
|
|
event.status = 'failed';
|
|
this.logger.error(
|
|
`Outbox event ${event.id} failed after ${event.maxRetries} retries: ${error.message}`,
|
|
);
|
|
}
|
|
await this.outboxRepo.save(event);
|
|
}
|
|
}
|
|
|
|
// Cleanup: remove published events older than 24h
|
|
await this.outboxRepo.delete({
|
|
status: 'published',
|
|
expiresAt: LessThan(new Date()),
|
|
});
|
|
} catch (error) {
|
|
this.logger.error(`Outbox relay error: ${error.message}`);
|
|
} finally {
|
|
this.isRunning = false;
|
|
}
|
|
}
|
|
}
|