From 538aae4ef08310c78a67a82ecd436c2a661d4efd Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 10 Dec 2025 23:26:49 -0800 Subject: [PATCH] feat(sync): implement Outbox Pattern for reward-service to wallet-service sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add event synchronization infrastructure between reward-service and wallet-service: reward-service changes: - Add OutboxEvent model to prisma schema for reliable event publishing - Add outbox.repository.ts for outbox table CRUD operations - Add outbox-publisher.service.ts for polling and publishing events to Kafka - Add event-ack.controller.ts to receive consumer confirmations wallet-service changes: - Add ProcessedEvent model to prisma schema for idempotency checking - Add reward-event-consumer.controller.ts to consume reward.summary.updated events - Add event-ack.publisher.ts to send ACK to reward-service - Update kafka.module.ts with Kafka client configuration - Update main.ts to connect Kafka microservice on startup Event flow: reward-service -> Kafka (reward.summary.updated) -> wallet-service -> Kafka (reward.events.ack) -> reward-service 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 31 ++ .../reward-service/prisma/schema.prisma | 38 +++ .../infrastructure/infrastructure.module.ts | 3 + .../kafka/event-ack.controller.ts | 84 +++++ .../src/infrastructure/kafka/kafka.module.ts | 10 +- .../kafka/outbox-publisher.service.ts | 280 ++++++++++++++++ .../repositories/outbox.repository.ts | 316 ++++++++++++++++++ .../migration.sql | 22 ++ .../wallet-service/prisma/schema.prisma | 24 ++ .../kafka/event-ack.publisher.ts | 86 +++++ .../src/infrastructure/kafka/kafka.module.ts | 31 +- .../kafka/reward-event-consumer.controller.ts | 139 ++++++++ backend/services/wallet-service/src/main.ts | 28 +- 13 files changed, 1084 insertions(+), 8 deletions(-) create mode 100644 backend/services/reward-service/prisma/migrations/20241210000002_add_outbox_events/migration.sql create mode 100644 backend/services/reward-service/src/infrastructure/kafka/event-ack.controller.ts create mode 100644 backend/services/reward-service/src/infrastructure/kafka/outbox-publisher.service.ts create mode 100644 backend/services/reward-service/src/infrastructure/persistence/repositories/outbox.repository.ts create mode 100644 backend/services/wallet-service/prisma/migrations/20241210000001_add_processed_events/migration.sql create mode 100644 backend/services/wallet-service/src/infrastructure/kafka/event-ack.publisher.ts create mode 100644 backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts diff --git a/backend/services/reward-service/prisma/migrations/20241210000002_add_outbox_events/migration.sql b/backend/services/reward-service/prisma/migrations/20241210000002_add_outbox_events/migration.sql new file mode 100644 index 00000000..475b72db --- /dev/null +++ b/backend/services/reward-service/prisma/migrations/20241210000002_add_outbox_events/migration.sql @@ -0,0 +1,31 @@ +-- CreateTable +CREATE TABLE "outbox_events" ( + "outbox_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "topic" VARCHAR(100) NOT NULL, + "key" VARCHAR(200) NOT NULL, + "payload" JSONB NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "retry_count" INTEGER NOT NULL DEFAULT 0, + "max_retries" INTEGER NOT NULL DEFAULT 5, + "last_error" TEXT, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "published_at" TIMESTAMP(3), + "next_retry_at" TIMESTAMP(3), + + CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("outbox_id") +); + +-- CreateIndex +CREATE INDEX "idx_outbox_status_created" ON "outbox_events"("status", "created_at"); + +-- CreateIndex +CREATE INDEX "idx_outbox_status_retry" ON "outbox_events"("status", "next_retry_at"); + +-- CreateIndex +CREATE INDEX "idx_outbox_aggregate" ON "outbox_events"("aggregate_type", "aggregate_id"); + +-- CreateIndex +CREATE INDEX "idx_outbox_topic" ON "outbox_events"("topic"); diff --git a/backend/services/reward-service/prisma/schema.prisma b/backend/services/reward-service/prisma/schema.prisma index f63e14a1..3ec3573e 100644 --- a/backend/services/reward-service/prisma/schema.prisma +++ b/backend/services/reward-service/prisma/schema.prisma @@ -181,3 +181,41 @@ model RewardEvent { @@index([userId], name: "idx_reward_event_user") @@index([occurredAt], name: "idx_reward_event_occurred") } + +// ============================================ +// Outbox 事件发件箱表 (Outbox Pattern) +// 保证事件发布的可靠性: +// 1. 业务数据和 Outbox 记录在同一个事务中写入 +// 2. 后台任务轮询 Outbox 表并发布到 Kafka +// 3. 消费方确认后标记为 CONFIRMED +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("outbox_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + topic String @map("topic") @db.VarChar(100) + key String @map("key") @db.VarChar(200) + payload Json @map("payload") + + // 聚合根信息 (用于幂等性检查) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + + // 发布状态: PENDING -> SENT -> CONFIRMED 或 FAILED + status String @default("PENDING") @map("status") @db.VarChar(20) + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(5) @map("max_retries") + lastError String? @map("last_error") @db.Text + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + publishedAt DateTime? @map("published_at") + nextRetryAt DateTime? @map("next_retry_at") + + @@map("outbox_events") + @@index([status, createdAt], name: "idx_outbox_status_created") + @@index([status, nextRetryAt], name: "idx_outbox_status_retry") + @@index([aggregateType, aggregateId], name: "idx_outbox_aggregate") + @@index([topic], name: "idx_outbox_topic") +} diff --git a/backend/services/reward-service/src/infrastructure/infrastructure.module.ts b/backend/services/reward-service/src/infrastructure/infrastructure.module.ts index a726f6d6..8619decf 100644 --- a/backend/services/reward-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/reward-service/src/infrastructure/infrastructure.module.ts @@ -3,6 +3,7 @@ import { ConfigModule } from '@nestjs/config'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RewardLedgerEntryRepositoryImpl } from './persistence/repositories/reward-ledger-entry.repository.impl'; import { RewardSummaryRepositoryImpl } from './persistence/repositories/reward-summary.repository.impl'; +import { OutboxRepository } from './persistence/repositories/outbox.repository'; import { ReferralServiceClient } from './external/referral-service/referral-service.client'; import { AuthorizationServiceClient } from './external/authorization-service/authorization-service.client'; import { WalletServiceClient } from './external/wallet-service/wallet-service.client'; @@ -16,6 +17,7 @@ import { REFERRAL_SERVICE_CLIENT, AUTHORIZATION_SERVICE_CLIENT } from '../domain imports: [ConfigModule, KafkaModule, RedisModule], providers: [ PrismaService, + OutboxRepository, { provide: REWARD_LEDGER_ENTRY_REPOSITORY, useClass: RewardLedgerEntryRepositoryImpl, @@ -40,6 +42,7 @@ import { REFERRAL_SERVICE_CLIENT, AUTHORIZATION_SERVICE_CLIENT } from '../domain ], exports: [ PrismaService, + OutboxRepository, REWARD_LEDGER_ENTRY_REPOSITORY, REWARD_SUMMARY_REPOSITORY, REFERRAL_SERVICE_CLIENT, diff --git a/backend/services/reward-service/src/infrastructure/kafka/event-ack.controller.ts b/backend/services/reward-service/src/infrastructure/kafka/event-ack.controller.ts new file mode 100644 index 00000000..c547b020 --- /dev/null +++ b/backend/services/reward-service/src/infrastructure/kafka/event-ack.controller.ts @@ -0,0 +1,84 @@ +import { Controller, Logger } from '@nestjs/common'; +import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认 Kafka 控制器 + * + * B方案核心组件:监听消费方(wallet-service)的确认事件 + * 使用 @MessagePattern 装饰器来处理 Kafka 消息 + */ +@Controller() +export class EventAckController { + private readonly logger = new Logger(EventAckController.name); + + constructor(private readonly outboxRepository: OutboxRepository) {} + + /** + * 处理事件确认消息 + * + * 消费方 (wallet-service) 成功处理事件后,会发送确认消息到此 topic + */ + @MessagePattern('reward.events.ack') + async handleEventAck( + @Payload() message: EventAckMessage, + @Ctx() context: KafkaContext, + ): Promise { + const partition = context.getPartition(); + const offset = context.getMessage().offset; + + this.logger.debug( + `[ACK] Received ack from ${message.consumerService} for event ${message.eventId} ` + + `[partition=${partition}, offset=${offset}]`, + ); + + try { + if (message.success) { + // 标记事件为已确认(使用 eventId + eventType 精确匹配) + const confirmed = await this.outboxRepository.markAsConfirmed( + message.eventId, + message.eventType, + ); + + if (confirmed) { + this.logger.log( + `[ACK] ✓ Event ${message.eventId} (${message.eventType}) confirmed by ${message.consumerService}`, + ); + } else { + this.logger.warn( + `[ACK] Event ${message.eventId} (${message.eventType}) not found or already confirmed`, + ); + } + } else { + // 消费方处理失败 + this.logger.warn( + `[ACK] ✗ Event ${message.eventId} failed in ${message.consumerService}: ${message.error}`, + ); + // 不改变状态,等待超时重发 + } + } catch (error) { + this.logger.error( + `[ACK] Error processing ack for event ${message.eventId}:`, + error, + ); + } + } +} diff --git a/backend/services/reward-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/reward-service/src/infrastructure/kafka/kafka.module.ts index 0f0442fb..e64ee880 100644 --- a/backend/services/reward-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/reward-service/src/infrastructure/kafka/kafka.module.ts @@ -4,6 +4,10 @@ import { ClientsModule, Transport } from '@nestjs/microservices'; import { EventPublisherService } from './event-publisher.service'; import { EventConsumerController } from './event-consumer.controller'; import { EventAckPublisher } from './event-ack.publisher'; +import { EventAckController } from './event-ack.controller'; +import { OutboxPublisherService } from './outbox-publisher.service'; +import { OutboxRepository } from '../persistence/repositories/outbox.repository'; +import { PrismaService } from '../persistence/prisma/prisma.service'; import { ApplicationModule } from '../../application/application.module'; @Module({ @@ -29,8 +33,8 @@ import { ApplicationModule } from '../../application/application.module'; }, ]), ], - controllers: [EventConsumerController], - providers: [EventPublisherService, EventAckPublisher], - exports: [EventPublisherService, EventAckPublisher, ClientsModule], + controllers: [EventConsumerController, EventAckController], + providers: [PrismaService, OutboxRepository, EventPublisherService, EventAckPublisher, OutboxPublisherService], + exports: [EventPublisherService, EventAckPublisher, OutboxPublisherService, ClientsModule], }) export class KafkaModule {} diff --git a/backend/services/reward-service/src/infrastructure/kafka/outbox-publisher.service.ts b/backend/services/reward-service/src/infrastructure/kafka/outbox-publisher.service.ts new file mode 100644 index 00000000..d084c5e5 --- /dev/null +++ b/backend/services/reward-service/src/infrastructure/kafka/outbox-publisher.service.ts @@ -0,0 +1,280 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { ClientKafka } from '@nestjs/microservices'; +import { OutboxRepository, OutboxEvent, OutboxStatus } from '../persistence/repositories/outbox.repository'; + +/** + * Outbox Publisher Service (B方案 - 消费方确认模式) + * + * 轮询 Outbox 表并发布事件到 Kafka + * 使用消费方确认机制保证事件100%被处理 + * + * 工作流程: + * 1. 轮询 PENDING 状态的事件 + * 2. 发送到 Kafka,标记为 SENT(等待确认) + * 3. 消费方(wallet-service)处理成功后发送确认到 reward.events.ack + * 4. 收到确认后标记为 CONFIRMED + * 5. 超时未确认的事件重置为 PENDING 重发 + */ +@Injectable() +export class OutboxPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxPublisherService.name); + private isRunning = false; + private pollInterval: NodeJS.Timeout | null = null; + private timeoutCheckInterval: NodeJS.Timeout | null = null; + private cleanupInterval: NodeJS.Timeout | null = null; + private isConnected = false; + + // 配置 + private readonly pollIntervalMs: number; + private readonly batchSize: number; + private readonly cleanupIntervalMs: number; + private readonly confirmationTimeoutMinutes: number; + private readonly timeoutCheckIntervalMs: number; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + private readonly outboxRepository: OutboxRepository, + private readonly configService: ConfigService, + ) { + this.pollIntervalMs = this.configService.get('OUTBOX_POLL_INTERVAL_MS', 1000); + this.batchSize = this.configService.get('OUTBOX_BATCH_SIZE', 100); + this.cleanupIntervalMs = this.configService.get('OUTBOX_CLEANUP_INTERVAL_MS', 3600000); // 1小时 + this.confirmationTimeoutMinutes = this.configService.get('OUTBOX_CONFIRMATION_TIMEOUT_MINUTES', 5); + this.timeoutCheckIntervalMs = this.configService.get('OUTBOX_TIMEOUT_CHECK_INTERVAL_MS', 60000); // 1分钟 + + this.logger.log( + `[OUTBOX] OutboxPublisher (B方案) configured: ` + + `pollInterval=${this.pollIntervalMs}ms, batchSize=${this.batchSize}, ` + + `confirmationTimeout=${this.confirmationTimeoutMinutes}min`, + ); + } + + async onModuleInit() { + this.logger.log('[OUTBOX] Connecting to Kafka...'); + try { + await this.kafkaClient.connect(); + this.isConnected = true; + this.logger.log('[OUTBOX] Connected to Kafka'); + this.start(); + } catch (error) { + this.logger.error('[OUTBOX] Failed to connect to Kafka:', error); + this.logger.warn('[OUTBOX] OutboxPublisher will not start - events will accumulate in outbox table'); + } + } + + async onModuleDestroy() { + this.stop(); + if (this.isConnected) { + await this.kafkaClient.close(); + } + } + + /** + * 启动轮询 + */ + start(): void { + if (this.isRunning) { + this.logger.warn('[OUTBOX] Publisher already running'); + return; + } + + this.isRunning = true; + this.logger.log('[OUTBOX] Starting outbox publisher (B方案)...'); + + // 启动发布轮询 + this.pollInterval = setInterval(() => { + this.processOutbox().catch((err) => { + this.logger.error('[OUTBOX] Error processing outbox:', err); + }); + }, this.pollIntervalMs); + + // 启动超时检查任务(B方案核心) + this.timeoutCheckInterval = setInterval(() => { + this.checkConfirmationTimeouts().catch((err) => { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', err); + }); + }, this.timeoutCheckIntervalMs); + + // 启动清理任务 + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + this.logger.error('[OUTBOX] Error cleaning up outbox:', err); + }); + }, this.cleanupIntervalMs); + + this.logger.log('[OUTBOX] Outbox publisher started (B方案 - 消费方确认模式)'); + } + + /** + * 停止轮询 + */ + stop(): void { + if (!this.isRunning) return; + + this.isRunning = false; + + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + if (this.timeoutCheckInterval) { + clearInterval(this.timeoutCheckInterval); + this.timeoutCheckInterval = null; + } + + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + + this.logger.log('[OUTBOX] Outbox publisher stopped'); + } + + /** + * 处理 Outbox 事件 + */ + async processOutbox(): Promise { + if (!this.isConnected) { + return; + } + + try { + // 1. 获取待发布事件 + const pendingEvents = await this.outboxRepository.findPendingEvents(this.batchSize); + + // 2. 获取需要重试的事件 + const retryEvents = await this.outboxRepository.findEventsForRetry(Math.floor(this.batchSize / 2)); + + const allEvents = [...pendingEvents, ...retryEvents]; + + if (allEvents.length === 0) { + return; + } + + this.logger.debug(`[OUTBOX] Processing ${allEvents.length} events (${pendingEvents.length} pending, ${retryEvents.length} retry)`); + + // 3. 逐个发布 + for (const event of allEvents) { + await this.publishEvent(event); + } + } catch (error) { + this.logger.error('[OUTBOX] Error in processOutbox:', error); + } + } + + /** + * 发布单个事件 (B方案) + * + * 使用 emit() 发送到 Kafka,成功后标记为 SENT(等待消费方确认) + * 只有收到消费方确认后才标记为 CONFIRMED + */ + private async publishEvent(event: OutboxEvent): Promise { + try { + this.logger.debug(`[OUTBOX] Publishing event ${event.id} to topic ${event.topic}`); + + // 构造 Kafka 消息,包含 outboxId 用于确认 + const payload = { + ...(event.payload as Record), + _outbox: { + id: event.id.toString(), + aggregateId: event.aggregateId, + eventType: event.eventType, + }, + }; + + const message = { + key: event.key, + value: JSON.stringify(payload), + }; + + // 发布到 Kafka + this.kafkaClient.emit(event.topic, message); + + // B方案:标记为 SENT(等待消费方确认) + await this.outboxRepository.markAsSent(event.id); + + this.logger.log( + `[OUTBOX] → Event ${event.id} sent to ${event.topic} (awaiting consumer confirmation)`, + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`[OUTBOX] ✗ Failed to publish event ${event.id}: ${errorMessage}`); + + // 标记为失败并安排重试 + await this.outboxRepository.markAsFailed(event.id, errorMessage); + } + } + + /** + * 检查确认超时的事件 (B方案核心) + * + * 将超时未确认的 SENT 事件重置为 PENDING 以便重发 + */ + private async checkConfirmationTimeouts(): Promise { + if (!this.isConnected) { + return; + } + + try { + const timedOutEvents = await this.outboxRepository.findSentEventsTimedOut( + this.confirmationTimeoutMinutes, + this.batchSize, + ); + + if (timedOutEvents.length === 0) { + return; + } + + this.logger.warn( + `[OUTBOX] Found ${timedOutEvents.length} events without confirmation after ${this.confirmationTimeoutMinutes} minutes`, + ); + + for (const event of timedOutEvents) { + await this.outboxRepository.resetSentToPending(event.id); + this.logger.warn( + `[OUTBOX] Event ${event.id} reset to PENDING for retry (retry #${event.retryCount + 1})`, + ); + } + } catch (error) { + this.logger.error('[OUTBOX] Error checking confirmation timeouts:', error); + } + } + + /** + * 清理旧事件 + */ + private async cleanup(): Promise { + const retentionDays = this.configService.get('OUTBOX_RETENTION_DAYS', 7); + await this.outboxRepository.cleanupOldEvents(retentionDays); + } + + /** + * 手动触发处理(用于测试或紧急情况) + */ + async triggerProcessing(): Promise { + this.logger.log('[OUTBOX] Manual processing triggered'); + await this.processOutbox(); + } + + /** + * 获取统计信息 + */ + async getStats(): Promise<{ + isRunning: boolean; + isConnected: boolean; + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const stats = await this.outboxRepository.getStats(); + return { + isRunning: this.isRunning, + isConnected: this.isConnected, + ...stats, + }; + } +} diff --git a/backend/services/reward-service/src/infrastructure/persistence/repositories/outbox.repository.ts b/backend/services/reward-service/src/infrastructure/persistence/repositories/outbox.repository.ts new file mode 100644 index 00000000..3c4399a8 --- /dev/null +++ b/backend/services/reward-service/src/infrastructure/persistence/repositories/outbox.repository.ts @@ -0,0 +1,316 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { Prisma } from '@prisma/client'; + +export enum OutboxStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送到 Kafka,等待消费方确认 + CONFIRMED = 'CONFIRMED', // 消费方已确认处理成功 + FAILED = 'FAILED', // 发送失败,等待重试 +} + +export interface OutboxEventData { + eventType: string; + topic: string; + key: string; + payload: Record; + aggregateId: string; + aggregateType: string; +} + +export interface OutboxEvent extends OutboxEventData { + id: bigint; + status: OutboxStatus; + retryCount: number; + maxRetries: number; + lastError: string | null; + createdAt: Date; + publishedAt: Date | null; + nextRetryAt: Date | null; +} + +@Injectable() +export class OutboxRepository { + private readonly logger = new Logger(OutboxRepository.name); + + constructor(private readonly prisma: PrismaService) {} + + /** + * 在事务中保存 Outbox 事件 + */ + async saveInTransaction( + tx: Prisma.TransactionClient, + events: OutboxEventData[], + ): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox (in transaction)`); + + await tx.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.debug(`[OUTBOX] Saved ${events.length} events to outbox`); + } + + /** + * 直接保存 Outbox 事件(不在事务中) + */ + async saveEvents(events: OutboxEventData[]): Promise { + if (events.length === 0) return; + + this.logger.debug(`[OUTBOX] Saving ${events.length} events to outbox`); + + await this.prisma.outboxEvent.createMany({ + data: events.map((event) => ({ + eventType: event.eventType, + topic: event.topic, + key: event.key, + payload: event.payload as Prisma.JsonObject, + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + status: OutboxStatus.PENDING, + })), + }); + + this.logger.log(`[OUTBOX] ✓ Saved ${events.length} events to outbox`); + } + + /** + * 获取待发布的事件(按创建时间排序) + */ + async findPendingEvents(limit: number = 100): Promise { + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.PENDING, + }, + orderBy: { + createdAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 获取需要重试的事件 + */ + async findEventsForRetry(limit: number = 50): Promise { + const now = new Date(); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.FAILED, + retryCount: { + lt: 5, + }, + nextRetryAt: { + lte: now, + }, + }, + orderBy: { + nextRetryAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 标记事件为已发送(等待消费方确认) + */ + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.SENT, + publishedAt: new Date(), + }, + }); + + this.logger.debug(`[OUTBOX] Marked event ${id} as SENT (awaiting consumer confirmation)`); + } + + /** + * 标记事件为已确认(消费方已成功处理) + */ + async markAsConfirmed(eventId: string, eventType?: string): Promise { + const whereClause: Prisma.OutboxEventWhereInput = { + aggregateId: eventId, + status: OutboxStatus.SENT, + }; + + if (eventType) { + whereClause.eventType = eventType; + } + + const result = await this.prisma.outboxEvent.updateMany({ + where: whereClause, + data: { + status: OutboxStatus.CONFIRMED, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] ✓ Event ${eventId} (${eventType || 'all types'}) confirmed by consumer`); + return true; + } + + this.logger.warn(`[OUTBOX] Event ${eventId} (${eventType || 'any'}) not found or not in SENT status`); + return false; + } + + /** + * 获取已发送但未确认且超时的事件 + */ + async findSentEventsTimedOut(timeoutMinutes: number = 5, limit: number = 50): Promise { + const cutoffTime = new Date(); + cutoffTime.setMinutes(cutoffTime.getMinutes() - timeoutMinutes); + + const events = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxStatus.SENT, + publishedAt: { + lt: cutoffTime, + }, + retryCount: { + lt: 5, + }, + }, + orderBy: { + publishedAt: 'asc', + }, + take: limit, + }); + + return events.map((e) => this.mapToOutboxEvent(e)); + } + + /** + * 将超时的 SENT 事件重置为 PENDING 以便重发 + */ + async resetSentToPending(id: bigint): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.PENDING, + retryCount: event.retryCount + 1, + lastError: 'Consumer confirmation timeout', + }, + }); + + this.logger.warn(`[OUTBOX] Event ${id} reset to PENDING for retry (confirmation timeout)`); + } + + /** + * 标记事件为失败并安排重试 + */ + async markAsFailed(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + const newRetryCount = event.retryCount + 1; + const isFinalFailure = newRetryCount >= event.maxRetries; + + // 指数退避:1min, 2min, 4min, 8min, 16min + const delayMinutes = Math.pow(2, newRetryCount - 1); + const nextRetryAt = new Date(); + nextRetryAt.setMinutes(nextRetryAt.getMinutes() + delayMinutes); + + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxStatus.FAILED, + retryCount: newRetryCount, + lastError: error, + nextRetryAt: isFinalFailure ? null : nextRetryAt, + }, + }); + + if (isFinalFailure) { + this.logger.error(`[OUTBOX] Event ${id} permanently failed after ${newRetryCount} retries: ${error}`); + } else { + this.logger.warn(`[OUTBOX] Event ${id} failed, retry ${newRetryCount}/${event.maxRetries}, next retry at ${nextRetryAt}`); + } + } + + /** + * 清理已确认的旧事件(保留最近7天) + */ + async cleanupOldEvents(retentionDays: number = 7): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - retentionDays); + + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxStatus.CONFIRMED, + publishedAt: { + lt: cutoffDate, + }, + }, + }); + + if (result.count > 0) { + this.logger.log(`[OUTBOX] Cleaned up ${result.count} old events`); + } + + return result.count; + } + + /** + * 获取 Outbox 统计信息 + */ + async getStats(): Promise<{ + pending: number; + sent: number; + confirmed: number; + failed: number; + }> { + const [pending, sent, confirmed, failed] = await Promise.all([ + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.PENDING } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.SENT } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.CONFIRMED } }), + this.prisma.outboxEvent.count({ where: { status: OutboxStatus.FAILED } }), + ]); + + return { pending, sent, confirmed, failed }; + } + + private mapToOutboxEvent(record: any): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + topic: record.topic, + key: record.key, + payload: record.payload as Record, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + status: record.status as OutboxStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + createdAt: record.createdAt, + publishedAt: record.publishedAt, + nextRetryAt: record.nextRetryAt, + }; + } +} diff --git a/backend/services/wallet-service/prisma/migrations/20241210000001_add_processed_events/migration.sql b/backend/services/wallet-service/prisma/migrations/20241210000001_add_processed_events/migration.sql new file mode 100644 index 00000000..941863c1 --- /dev/null +++ b/backend/services/wallet-service/prisma/migrations/20241210000001_add_processed_events/migration.sql @@ -0,0 +1,22 @@ +-- CreateTable +CREATE TABLE "processed_events" ( + "processed_id" BIGSERIAL NOT NULL, + "event_id" VARCHAR(200) NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "source_service" VARCHAR(50) NOT NULL, + "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "processed_events_pkey" PRIMARY KEY ("processed_id") +); + +-- CreateIndex +CREATE INDEX "processed_events_event_id_idx" ON "processed_events"("event_id"); + +-- CreateIndex +CREATE INDEX "processed_events_event_type_idx" ON "processed_events"("event_type"); + +-- CreateIndex +CREATE INDEX "processed_events_processed_at_idx" ON "processed_events"("processed_at"); + +-- CreateIndex +CREATE UNIQUE INDEX "processed_events_event_id_event_type_key" ON "processed_events"("event_id", "event_type"); diff --git a/backend/services/wallet-service/prisma/schema.prisma b/backend/services/wallet-service/prisma/schema.prisma index 1f861034..76bd24bf 100644 --- a/backend/services/wallet-service/prisma/schema.prisma +++ b/backend/services/wallet-service/prisma/schema.prisma @@ -199,3 +199,27 @@ model WithdrawalOrder { @@index([txHash]) @@index([createdAt]) } + +// ============================================ +// 已处理事件表 (幂等性检查) +// 用于确保 Kafka 事件不会被重复处理 +// ============================================ +model ProcessedEvent { + id BigInt @id @default(autoincrement()) @map("processed_id") + + // 事件标识 (聚合根ID + 事件类型) + eventId String @map("event_id") @db.VarChar(200) + eventType String @map("event_type") @db.VarChar(100) + + // 来源服务 + sourceService String @map("source_service") @db.VarChar(50) + + // 处理时间 + processedAt DateTime @default(now()) @map("processed_at") + + @@unique([eventId, eventType]) + @@map("processed_events") + @@index([eventId]) + @@index([eventType]) + @@index([processedAt]) +} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/event-ack.publisher.ts b/backend/services/wallet-service/src/infrastructure/kafka/event-ack.publisher.ts new file mode 100644 index 00000000..469c4576 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/kafka/event-ack.publisher.ts @@ -0,0 +1,86 @@ +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; + +/** + * 事件确认消息结构 + */ +interface EventAckMessage { + /** 原始事件的 aggregateId */ + eventId: string; + /** 原始事件类型 */ + eventType: string; + /** 消费服务名称 */ + consumerService: string; + /** 处理结果 */ + success: boolean; + /** 错误信息(如果失败) */ + error?: string; + /** 确认时间 */ + confirmedAt: string; +} + +/** + * 事件确认发布器 + * + * B方案核心组件:消费方处理事件后发送确认 + * 发送确认消息到 reward.events.ack topic + */ +@Injectable() +export class EventAckPublisher { + private readonly logger = new Logger(EventAckPublisher.name); + private readonly serviceName = 'wallet-service'; + + constructor( + @Inject('KAFKA_SERVICE') + private readonly kafkaClient: ClientKafka, + ) {} + + /** + * 发送处理成功确认 + */ + async sendSuccess(eventId: string, eventType: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: true, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('reward.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.log(`[ACK] ✓ Sent success confirmation for event ${eventId} (${eventType})`); + } catch (error) { + this.logger.error(`[ACK] Failed to send confirmation for event ${eventId}:`, error); + } + } + + /** + * 发送处理失败确认 + */ + async sendFailure(eventId: string, eventType: string, errorMessage: string): Promise { + const ackMessage: EventAckMessage = { + eventId, + eventType, + consumerService: this.serviceName, + success: false, + error: errorMessage, + confirmedAt: new Date().toISOString(), + }; + + try { + this.kafkaClient.emit('reward.events.ack', { + key: eventId, + value: JSON.stringify(ackMessage), + }); + + this.logger.warn(`[ACK] ✗ Sent failure confirmation for event ${eventId}: ${errorMessage}`); + } catch (error) { + this.logger.error(`[ACK] Failed to send failure confirmation for event ${eventId}:`, error); + } + } +} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts index e12f7df7..86df97a9 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts @@ -1,10 +1,37 @@ import { Module, Global } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { ClientsModule, Transport } from '@nestjs/microservices'; import { EventPublisherService } from './event-publisher.service'; import { DepositEventConsumerService } from './deposit-event-consumer.service'; +import { RewardEventConsumerController } from './reward-event-consumer.controller'; +import { EventAckPublisher } from './event-ack.publisher'; +import { PrismaService } from '../persistence/prisma/prisma.service'; @Global() @Module({ - providers: [EventPublisherService, DepositEventConsumerService], - exports: [EventPublisherService, DepositEventConsumerService], + imports: [ + ClientsModule.registerAsync([ + { + name: 'KAFKA_SERVICE', + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: configService.get('KAFKA_CLIENT_ID', 'wallet-service'), + brokers: configService.get('KAFKA_BROKERS', 'localhost:9092').split(','), + }, + consumer: { + groupId: configService.get('KAFKA_GROUP_ID', 'wallet-service-group'), + }, + }, + }), + inject: [ConfigService], + }, + ]), + ], + controllers: [RewardEventConsumerController], + providers: [PrismaService, EventPublisherService, DepositEventConsumerService, EventAckPublisher], + exports: [EventPublisherService, DepositEventConsumerService, EventAckPublisher, ClientsModule], }) export class KafkaModule {} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts b/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts new file mode 100644 index 00000000..879bcf13 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts @@ -0,0 +1,139 @@ +import { Controller, Logger } from '@nestjs/common'; +import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { EventAckPublisher } from './event-ack.publisher'; +import Decimal from 'decimal.js'; + +/** + * 奖励事件消息结构 + */ +interface RewardSummaryUpdatedEvent { + eventType: string; + aggregateId: string; + payload: { + accountSequence: string; + userId: string; + pendingUsdt: number; + pendingHashpower: number; + pendingExpireAt: string | null; + settleableUsdt: number; + settleableHashpower: number; + settledTotalUsdt: number; + settledTotalHashpower: number; + expiredTotalUsdt: number; + expiredTotalHashpower: number; + }; + _outbox?: { + id: string; + aggregateId: string; + eventType: string; + }; +} + +/** + * 奖励事件 Kafka 控制器 + * + * 消费 reward-service 发布的奖励汇总更新事件 + * 使用幂等性检查确保不重复处理 + */ +@Controller() +export class RewardEventConsumerController { + private readonly logger = new Logger(RewardEventConsumerController.name); + + constructor( + private readonly prisma: PrismaService, + private readonly eventAckPublisher: EventAckPublisher, + ) {} + + /** + * 处理奖励汇总更新事件 + */ + @MessagePattern('reward.summary.updated') + async handleRewardSummaryUpdated( + @Payload() message: RewardSummaryUpdatedEvent, + @Ctx() context: KafkaContext, + ): Promise { + const partition = context.getPartition(); + const offset = context.getMessage().offset; + const outboxInfo = message._outbox; + const eventId = outboxInfo?.aggregateId || message.aggregateId; + const eventType = outboxInfo?.eventType || message.eventType; + + this.logger.log( + `[REWARD-EVENT] Received reward.summary.updated for ${eventId} ` + + `[partition=${partition}, offset=${offset}]`, + ); + + try { + // 1. 幂等性检查 + const alreadyProcessed = await this.prisma.processedEvent.findUnique({ + where: { + eventId_eventType: { + eventId, + eventType, + }, + }, + }); + + if (alreadyProcessed) { + this.logger.warn(`[REWARD-EVENT] Event ${eventId} (${eventType}) already processed, skipping`); + // 仍然发送确认,避免重复发送 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, eventType); + } + return; + } + + // 2. 更新 wallet_accounts 表的 rewards 数据 + const payload = message.payload; + const accountSequence = BigInt(payload.accountSequence); + + await this.prisma.$transaction(async (tx) => { + // 更新钱包账户的奖励数据 + await tx.walletAccount.update({ + where: { accountSequence }, + data: { + pendingUsdt: new Decimal(payload.pendingUsdt), + pendingHashpower: new Decimal(payload.pendingHashpower), + pendingExpireAt: payload.pendingExpireAt ? new Date(payload.pendingExpireAt) : null, + settleableUsdt: new Decimal(payload.settleableUsdt), + settleableHashpower: new Decimal(payload.settleableHashpower), + settledTotalUsdt: new Decimal(payload.settledTotalUsdt), + settledTotalHashpower: new Decimal(payload.settledTotalHashpower), + expiredTotalUsdt: new Decimal(payload.expiredTotalUsdt), + expiredTotalHashpower: new Decimal(payload.expiredTotalHashpower), + }, + }); + + // 记录已处理事件(幂等性) + await tx.processedEvent.create({ + data: { + eventId, + eventType, + sourceService: 'reward-service', + }, + }); + }); + + this.logger.log( + `[REWARD-EVENT] ✓ Updated rewards for accountSequence ${accountSequence}: ` + + `settleable=${payload.settleableUsdt}, pending=${payload.pendingUsdt}`, + ); + + // 3. 发送确认 + if (outboxInfo) { + await this.eventAckPublisher.sendSuccess(eventId, eventType); + } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`[REWARD-EVENT] Error processing event ${eventId}: ${errorMessage}`); + + // 发送失败确认 + if (outboxInfo) { + await this.eventAckPublisher.sendFailure(eventId, eventType, errorMessage); + } + + throw error; + } + } +} diff --git a/backend/services/wallet-service/src/main.ts b/backend/services/wallet-service/src/main.ts index 30eaf853..8c148f86 100644 --- a/backend/services/wallet-service/src/main.ts +++ b/backend/services/wallet-service/src/main.ts @@ -1,9 +1,11 @@ import { NestFactory } from '@nestjs/core'; -import { ValidationPipe } from '@nestjs/common'; +import { ValidationPipe, Logger } from '@nestjs/common'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { + const logger = new Logger('Bootstrap'); const app = await NestFactory.create(AppModule); // Global prefix @@ -36,10 +38,30 @@ async function bootstrap() { const document = SwaggerModule.createDocument(app, config); SwaggerModule.setup('api/docs', app, document); + // Kafka 微服务 - 用于 @MessagePattern 消费消息 + const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092']; + const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'wallet-service-group'; + + app.connectMicroservice({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'wallet-service', + brokers: kafkaBrokers, + }, + consumer: { + groupId: kafkaGroupId, + }, + }, + }); + + await app.startAllMicroservices(); + logger.log('Kafka microservice started'); + const port = process.env.APP_PORT || 3002; await app.listen(port); - console.log(`Wallet Service is running on port ${port}`); - console.log(`Swagger docs: http://localhost:${port}/api/docs`); + logger.log(`Wallet Service is running on port ${port}`); + logger.log(`Swagger docs: http://localhost:${port}/api/docs`); } bootstrap();