fix(referral): add idempotency check for Kafka event processing

- Add processed_events table to track handled events
- Check eventId before processing planting.created events
- Skip duplicate events and still send ACK to stop retries

This prevents data accumulation when Kafka events are redelivered
due to ACK failures or consumer timeouts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-10 09:38:13 -08:00
parent 1b2211d641
commit 70d1a8bfb8
3 changed files with 62 additions and 1 deletions

View File

@ -0,0 +1,18 @@
-- CreateTable
CREATE TABLE "processed_events" (
"id" BIGSERIAL NOT NULL,
"event_id" VARCHAR(100) NOT NULL,
"event_type" VARCHAR(50) NOT NULL,
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "processed_events_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "processed_events_event_id_key" ON "processed_events"("event_id");
-- CreateIndex
CREATE INDEX "idx_processed_event_id" ON "processed_events"("event_id");
-- CreateIndex
CREATE INDEX "idx_processed_at" ON "processed_events"("processed_at");

View File

@ -159,6 +159,21 @@ model TeamProvinceCityDetail {
@@index([cityCode], name: "idx_detail_city")
}
// ============================================
// 已处理事件表 (幂等性检查)
// 用于防止重复处理Kafka消息
// ============================================
model ProcessedEvent {
id BigInt @id @default(autoincrement())
eventId String @unique @map("event_id") @db.VarChar(100) // 事件唯一标识 (outbox.aggregateId)
eventType String @map("event_type") @db.VarChar(50)
processedAt DateTime @default(now()) @map("processed_at")
@@map("processed_events")
@@index([eventId], name: "idx_processed_event_id")
@@index([processedAt], name: "idx_processed_at")
}
// ============================================
// 推荐事件表 (行为表append-only用于审计和事件溯源)
// ============================================

View File

@ -1,5 +1,5 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { KafkaService } from '../../infrastructure';
import { KafkaService, PrismaService } from '../../infrastructure';
import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher';
import { TeamStatisticsService } from '../services';
import { UpdateTeamStatisticsCommand } from '../commands';
@ -31,6 +31,7 @@ export class PlantingCreatedHandler implements OnModuleInit {
private readonly kafkaService: KafkaService,
private readonly teamStatisticsService: TeamStatisticsService,
private readonly eventAckPublisher: EventAckPublisher,
private readonly prisma: PrismaService,
) {}
async onModuleInit() {
@ -53,6 +54,22 @@ export class PlantingCreatedHandler implements OnModuleInit {
const outboxInfo = event._outbox;
const eventId = outboxInfo?.aggregateId || 'unknown';
// 幂等性检查:检查是否已经处理过该事件
if (eventId !== 'unknown') {
const existingEvent = await this.prisma.processedEvent.findUnique({
where: { eventId },
});
if (existingEvent) {
this.logger.log(`Event ${eventId} already processed, skipping`);
// 仍然发送成功确认,避免发送方继续重试
if (outboxInfo) {
await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType);
}
return;
}
}
try {
const command = new UpdateTeamStatisticsCommand(
BigInt(event.data.userId),
@ -62,6 +79,17 @@ export class PlantingCreatedHandler implements OnModuleInit {
);
await this.teamStatisticsService.handlePlantingEvent(command);
// 记录已处理的事件
if (eventId !== 'unknown') {
await this.prisma.processedEvent.create({
data: {
eventId,
eventType: outboxInfo?.eventType || 'planting.created',
},
});
}
this.logger.log(
`Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`,
);