diff --git a/backend/services/referral-service/prisma/migrations/20251210093201_add_processed_events_table/migration.sql b/backend/services/referral-service/prisma/migrations/20251210093201_add_processed_events_table/migration.sql new file mode 100644 index 00000000..ba0a49e3 --- /dev/null +++ b/backend/services/referral-service/prisma/migrations/20251210093201_add_processed_events_table/migration.sql @@ -0,0 +1,18 @@ +-- CreateTable +CREATE TABLE "processed_events" ( + "id" BIGSERIAL NOT NULL, + "event_id" VARCHAR(100) NOT NULL, + "event_type" VARCHAR(50) NOT NULL, + "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "processed_events_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "processed_events_event_id_key" ON "processed_events"("event_id"); + +-- CreateIndex +CREATE INDEX "idx_processed_event_id" ON "processed_events"("event_id"); + +-- CreateIndex +CREATE INDEX "idx_processed_at" ON "processed_events"("processed_at"); diff --git a/backend/services/referral-service/prisma/schema.prisma b/backend/services/referral-service/prisma/schema.prisma index 1ac3e7c9..40ffc1f5 100644 --- a/backend/services/referral-service/prisma/schema.prisma +++ b/backend/services/referral-service/prisma/schema.prisma @@ -159,6 +159,21 @@ model TeamProvinceCityDetail { @@index([cityCode], name: "idx_detail_city") } +// ============================================ +// 已处理事件表 (幂等性检查) +// 用于防止重复处理Kafka消息 +// ============================================ +model ProcessedEvent { + id BigInt @id @default(autoincrement()) + eventId String @unique @map("event_id") @db.VarChar(100) // 事件唯一标识 (outbox.aggregateId) + eventType String @map("event_type") @db.VarChar(50) + processedAt DateTime @default(now()) @map("processed_at") + + @@map("processed_events") + @@index([eventId], name: "idx_processed_event_id") + @@index([processedAt], name: "idx_processed_at") +} + // ============================================ // 推荐事件表 (行为表,append-only,用于审计和事件溯源) // ============================================ diff --git a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts index a2bd4ab8..aab33c8e 100644 --- a/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts +++ b/backend/services/referral-service/src/application/event-handlers/planting-created.handler.ts @@ -1,5 +1,5 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { KafkaService } from '../../infrastructure'; +import { KafkaService, PrismaService } from '../../infrastructure'; import { EventAckPublisher } from '../../infrastructure/kafka/event-ack.publisher'; import { TeamStatisticsService } from '../services'; import { UpdateTeamStatisticsCommand } from '../commands'; @@ -31,6 +31,7 @@ export class PlantingCreatedHandler implements OnModuleInit { private readonly kafkaService: KafkaService, private readonly teamStatisticsService: TeamStatisticsService, private readonly eventAckPublisher: EventAckPublisher, + private readonly prisma: PrismaService, ) {} async onModuleInit() { @@ -53,6 +54,22 @@ export class PlantingCreatedHandler implements OnModuleInit { const outboxInfo = event._outbox; const eventId = outboxInfo?.aggregateId || 'unknown'; + // 幂等性检查:检查是否已经处理过该事件 + if (eventId !== 'unknown') { + const existingEvent = await this.prisma.processedEvent.findUnique({ + where: { eventId }, + }); + + if (existingEvent) { + this.logger.log(`Event ${eventId} already processed, skipping`); + // 仍然发送成功确认,避免发送方继续重试 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, outboxInfo.eventType); + } + return; + } + } + try { const command = new UpdateTeamStatisticsCommand( BigInt(event.data.userId), @@ -62,6 +79,17 @@ export class PlantingCreatedHandler implements OnModuleInit { ); await this.teamStatisticsService.handlePlantingEvent(command); + + // 记录已处理的事件 + if (eventId !== 'unknown') { + await this.prisma.processedEvent.create({ + data: { + eventId, + eventType: outboxInfo?.eventType || 'planting.created', + }, + }); + } + this.logger.log( `Updated team statistics for user ${event.data.userId}, count: ${event.data.treeCount}`, );