From 075c9aaa48ff6b48df9fb3a03e703df68ad3177e Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 9 Dec 2025 21:47:31 -0800 Subject: [PATCH] feat(blockchain): implement Outbox Pattern for reliable event delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement Outbox Pattern with consumer ACK to ensure 100% reliable event delivery between blockchain-service and wallet-service: blockchain-service: - Add OutboxEvent model to Prisma schema with status tracking - Create outbox repository interface and implementation - Modify deposit-detection.service to write events to outbox - Add outbox-publisher.service with cron jobs for publishing/retry - Add deposit-ack-consumer.service to receive ACK from wallet-service - Add publishRaw method to event-publisher.service wallet-service: - Modify deposit-confirmed.handler to send ACK after successful processing - Add wallet.deposit.credited topic mapping for ACK events Event flow: 1. Deposit detected → written to outbox (status: PENDING) 2. Outbox publisher sends to Kafka → status: SENT 3. wallet-service processes and sends ACK → status: ACKED 4. Events without ACK are retried with exponential backoff 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 30 +++ .../blockchain-service/prisma/schema.prisma | 34 +++ .../src/application/application.module.ts | 8 + .../services/deposit-detection.service.ts | 25 +- .../src/application/services/index.ts | 1 + .../services/outbox-publisher.service.ts | 148 ++++++++++++ .../src/domain/repositories/index.ts | 1 + .../outbox-event.repository.interface.ts | 99 ++++++++ .../infrastructure/infrastructure.module.ts | 7 + .../kafka/deposit-ack-consumer.service.ts | 147 ++++++++++++ .../kafka/event-publisher.service.ts | 32 +++ .../src/infrastructure/kafka/index.ts | 1 + .../persistence/repositories/index.ts | 1 + .../outbox-event.repository.impl.ts | 218 ++++++++++++++++++ .../deposit-confirmed.handler.ts | 38 +++ .../kafka/event-publisher.service.ts | 2 + 16 files changed, 790 insertions(+), 2 deletions(-) create mode 100644 backend/services/blockchain-service/prisma/migrations/20241209000000_add_outbox_events/migration.sql create mode 100644 backend/services/blockchain-service/src/application/services/outbox-publisher.service.ts create mode 100644 backend/services/blockchain-service/src/domain/repositories/outbox-event.repository.interface.ts create mode 100644 backend/services/blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts create mode 100644 backend/services/blockchain-service/src/infrastructure/persistence/repositories/outbox-event.repository.impl.ts diff --git a/backend/services/blockchain-service/prisma/migrations/20241209000000_add_outbox_events/migration.sql b/backend/services/blockchain-service/prisma/migrations/20241209000000_add_outbox_events/migration.sql new file mode 100644 index 00000000..4268fe9a --- /dev/null +++ b/backend/services/blockchain-service/prisma/migrations/20241209000000_add_outbox_events/migration.sql @@ -0,0 +1,30 @@ +-- CreateTable +CREATE TABLE "outbox_events" ( + "event_id" BIGSERIAL NOT NULL, + "event_type" VARCHAR(100) NOT NULL, + "aggregate_id" VARCHAR(100) NOT NULL, + "aggregate_type" VARCHAR(50) NOT NULL, + "payload" JSONB NOT NULL, + "status" VARCHAR(20) NOT NULL DEFAULT 'PENDING', + "retry_count" INTEGER NOT NULL DEFAULT 0, + "max_retries" INTEGER NOT NULL DEFAULT 10, + "last_error" TEXT, + "next_retry_at" TIMESTAMP(3), + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "sent_at" TIMESTAMP(3), + "acked_at" TIMESTAMP(3), + + CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("event_id") +); + +-- CreateIndex +CREATE INDEX "idx_outbox_pending" ON "outbox_events"("status", "next_retry_at"); + +-- CreateIndex +CREATE INDEX "idx_outbox_aggregate" ON "outbox_events"("aggregate_type", "aggregate_id"); + +-- CreateIndex +CREATE INDEX "idx_outbox_event_type" ON "outbox_events"("event_type"); + +-- CreateIndex +CREATE INDEX "idx_outbox_created" ON "outbox_events"("created_at"); diff --git a/backend/services/blockchain-service/prisma/schema.prisma b/backend/services/blockchain-service/prisma/schema.prisma index ee4e2a16..2f01e786 100644 --- a/backend/services/blockchain-service/prisma/schema.prisma +++ b/backend/services/blockchain-service/prisma/schema.prisma @@ -200,6 +200,40 @@ model RecoveryMnemonic { @@map("recovery_mnemonics") } +// ============================================ +// Outbox 事件表 (发件箱模式) +// 保证事件发布的可靠性 +// ============================================ +model OutboxEvent { + id BigInt @id @default(autoincrement()) @map("event_id") + + // 事件信息 + eventType String @map("event_type") @db.VarChar(100) + aggregateId String @map("aggregate_id") @db.VarChar(100) + aggregateType String @map("aggregate_type") @db.VarChar(50) + payload Json @map("payload") + + // 发送状态: PENDING -> SENT -> ACKED / FAILED + status String @default("PENDING") @db.VarChar(20) + + // 重试信息 + retryCount Int @default(0) @map("retry_count") + maxRetries Int @default(10) @map("max_retries") + lastError String? @map("last_error") @db.Text + nextRetryAt DateTime? @map("next_retry_at") + + // 时间戳 + createdAt DateTime @default(now()) @map("created_at") + sentAt DateTime? @map("sent_at") + ackedAt DateTime? @map("acked_at") + + @@index([status, nextRetryAt], name: "idx_outbox_pending") + @@index([aggregateType, aggregateId], name: "idx_outbox_aggregate") + @@index([eventType], name: "idx_outbox_event_type") + @@index([createdAt], name: "idx_outbox_created") + @@map("outbox_events") +} + // ============================================ // 区块链事件日志 (Append-Only 审计) // ============================================ diff --git a/backend/services/blockchain-service/src/application/application.module.ts b/backend/services/blockchain-service/src/application/application.module.ts index cfc09b94..6feab0a3 100644 --- a/backend/services/blockchain-service/src/application/application.module.ts +++ b/backend/services/blockchain-service/src/application/application.module.ts @@ -6,8 +6,10 @@ import { DepositDetectionService, BalanceQueryService, MnemonicVerificationService, + OutboxPublisherService, } from './services'; import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-handlers'; +import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service'; @Module({ imports: [InfrastructureModule, DomainModule], @@ -17,6 +19,10 @@ import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-h DepositDetectionService, BalanceQueryService, MnemonicVerificationService, + OutboxPublisherService, + + // 事件消费者(依赖 OutboxPublisherService,需要在这里注册) + DepositAckConsumerService, // 事件处理器 MpcKeygenCompletedHandler, @@ -27,6 +33,8 @@ import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-h DepositDetectionService, BalanceQueryService, MnemonicVerificationService, + OutboxPublisherService, + DepositAckConsumerService, MpcKeygenCompletedHandler, WithdrawalRequestedHandler, ], diff --git a/backend/services/blockchain-service/src/application/services/deposit-detection.service.ts b/backend/services/blockchain-service/src/application/services/deposit-detection.service.ts index 17ef02a1..2b5b4152 100644 --- a/backend/services/blockchain-service/src/application/services/deposit-detection.service.ts +++ b/backend/services/blockchain-service/src/application/services/deposit-detection.service.ts @@ -21,8 +21,13 @@ import { BLOCK_CHECKPOINT_REPOSITORY, IBlockCheckpointRepository, } from '@/domain/repositories/block-checkpoint.repository.interface'; +import { + OUTBOX_EVENT_REPOSITORY, + IOutboxEventRepository, +} from '@/domain/repositories/outbox-event.repository.interface'; import { DepositTransaction } from '@/domain/aggregates/deposit-transaction'; import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects'; +import { DepositConfirmedEvent } from '@/domain/events'; /** * 充值检测服务 @@ -45,6 +50,8 @@ export class DepositDetectionService implements OnModuleInit { private readonly monitoredAddressRepo: IMonitoredAddressRepository, @Inject(BLOCK_CHECKPOINT_REPOSITORY) private readonly checkpointRepo: IBlockCheckpointRepository, + @Inject(OUTBOX_EVENT_REPOSITORY) + private readonly outboxRepo: IOutboxEventRepository, ) {} async onModuleInit() { @@ -195,9 +202,23 @@ export class DepositDetectionService implements OnModuleInit { await this.depositRepo.save(deposit); - // 发布确认事件 + // 处理领域事件 for (const event of deposit.domainEvents) { - await this.eventPublisher.publish(event); + if (event instanceof DepositConfirmedEvent) { + // 重要事件写入 outbox,保证可靠投递 + await this.outboxRepo.create({ + eventType: event.eventType, + aggregateId: deposit.id?.toString() || deposit.txHash.toString(), + aggregateType: 'DepositTransaction', + payload: event.toPayload(), + }); + this.logger.log( + `Deposit confirmed event saved to outbox: ${deposit.txHash.toShort()} (${deposit.confirmations} confirmations)`, + ); + } else { + // 非关键事件直接发送(如 DepositDetectedEvent) + await this.eventPublisher.publish(event); + } } deposit.clearDomainEvents(); diff --git a/backend/services/blockchain-service/src/application/services/index.ts b/backend/services/blockchain-service/src/application/services/index.ts index 843861a1..31abfbaa 100644 --- a/backend/services/blockchain-service/src/application/services/index.ts +++ b/backend/services/blockchain-service/src/application/services/index.ts @@ -2,3 +2,4 @@ export * from './address-derivation.service'; export * from './deposit-detection.service'; export * from './balance-query.service'; export * from './mnemonic-verification.service'; +export * from './outbox-publisher.service'; diff --git a/backend/services/blockchain-service/src/application/services/outbox-publisher.service.ts b/backend/services/blockchain-service/src/application/services/outbox-publisher.service.ts new file mode 100644 index 00000000..17decb6c --- /dev/null +++ b/backend/services/blockchain-service/src/application/services/outbox-publisher.service.ts @@ -0,0 +1,148 @@ +import { Injectable, Logger, Inject, OnModuleInit } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { + OUTBOX_EVENT_REPOSITORY, + IOutboxEventRepository, + OutboxEventStatus, +} from '@/domain/repositories/outbox-event.repository.interface'; +import { + DEPOSIT_TRANSACTION_REPOSITORY, + IDepositTransactionRepository, +} from '@/domain/repositories/deposit-transaction.repository.interface'; + +/** + * Outbox 发布服务 + * + * 定时扫描 outbox_events 表,将待发送的事件发布到 Kafka。 + * 实现发件箱模式(Outbox Pattern),保证事件的可靠投递。 + */ +@Injectable() +export class OutboxPublisherService implements OnModuleInit { + private readonly logger = new Logger(OutboxPublisherService.name); + + // 发送超时时间(秒)- 超过此时间未收到 ACK 则重发 + private readonly SENT_TIMEOUT_SECONDS = 300; // 5 分钟 + + // 清理已确认事件的天数 + private readonly CLEANUP_DAYS = 7; + + constructor( + private readonly eventPublisher: EventPublisherService, + @Inject(OUTBOX_EVENT_REPOSITORY) + private readonly outboxRepo: IOutboxEventRepository, + @Inject(DEPOSIT_TRANSACTION_REPOSITORY) + private readonly depositRepo: IDepositTransactionRepository, + ) {} + + async onModuleInit() { + this.logger.log('OutboxPublisherService initialized'); + } + + /** + * 定时发布待发送事件(每5秒) + */ + @Cron(CronExpression.EVERY_5_SECONDS) + async publishPendingEvents(): Promise { + try { + const pendingEvents = await this.outboxRepo.findPendingEvents(50); + + if (pendingEvents.length === 0) return; + + this.logger.debug(`Found ${pendingEvents.length} pending events to publish`); + + for (const event of pendingEvents) { + try { + // 发送到 Kafka + await this.eventPublisher.publishRaw({ + eventId: `outbox-${event.id}`, + eventType: event.eventType, + occurredAt: event.createdAt, + payload: event.payload, + }); + + // 标记为已发送 + await this.outboxRepo.markAsSent(event.id); + + this.logger.log( + `Published event ${event.id}: ${event.eventType} for ${event.aggregateType}:${event.aggregateId}`, + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + this.logger.error(`Failed to publish event ${event.id}: ${errorMessage}`); + + // 记录失败,安排重试 + await this.outboxRepo.recordFailure(event.id, errorMessage); + } + } + } catch (error) { + this.logger.error('Error in publishPendingEvents:', error); + } + } + + /** + * 定时检查超时未确认的事件(每分钟) + * 将 SENT 状态但超时的事件重置为 PENDING + */ + @Cron(CronExpression.EVERY_MINUTE) + async checkUnackedEvents(): Promise { + try { + const unackedEvents = await this.outboxRepo.findUnackedEvents( + this.SENT_TIMEOUT_SECONDS, + 50, + ); + + if (unackedEvents.length === 0) return; + + this.logger.warn(`Found ${unackedEvents.length} unacked events, will retry`); + + for (const event of unackedEvents) { + // 记录超时失败,触发重试逻辑 + await this.outboxRepo.recordFailure(event.id, 'ACK timeout'); + this.logger.warn( + `Event ${event.id} ACK timeout, scheduled for retry (attempt ${event.retryCount + 1})`, + ); + } + } catch (error) { + this.logger.error('Error in checkUnackedEvents:', error); + } + } + + /** + * 定时清理已确认的旧事件(每天凌晨3点) + */ + @Cron('0 3 * * *') + async cleanupOldEvents(): Promise { + try { + const count = await this.outboxRepo.cleanupAckedEvents(this.CLEANUP_DAYS); + if (count > 0) { + this.logger.log(`Cleaned up ${count} old ACKED events`); + } + } catch (error) { + this.logger.error('Error in cleanupOldEvents:', error); + } + } + + /** + * 处理 ACK 确认 + * 当收到 wallet-service 的确认事件时调用 + */ + async handleAck(aggregateType: string, aggregateId: string, eventType: string): Promise { + try { + await this.outboxRepo.markAsAckedByAggregateId(aggregateType, aggregateId, eventType); + + // 同时更新 deposit_transactions 表的 notified_at + if (aggregateType === 'DepositTransaction') { + const depositId = BigInt(aggregateId); + const deposit = await this.depositRepo.findById(depositId); + if (deposit) { + deposit.markAsNotified(); + await this.depositRepo.save(deposit); + this.logger.log(`Deposit ${aggregateId} marked as notified`); + } + } + } catch (error) { + this.logger.error(`Error handling ACK for ${aggregateType}:${aggregateId}:`, error); + } + } +} diff --git a/backend/services/blockchain-service/src/domain/repositories/index.ts b/backend/services/blockchain-service/src/domain/repositories/index.ts index 0266c009..17e87bdc 100644 --- a/backend/services/blockchain-service/src/domain/repositories/index.ts +++ b/backend/services/blockchain-service/src/domain/repositories/index.ts @@ -2,3 +2,4 @@ export * from './deposit-transaction.repository.interface'; export * from './monitored-address.repository.interface'; export * from './block-checkpoint.repository.interface'; export * from './transaction-request.repository.interface'; +export * from './outbox-event.repository.interface'; diff --git a/backend/services/blockchain-service/src/domain/repositories/outbox-event.repository.interface.ts b/backend/services/blockchain-service/src/domain/repositories/outbox-event.repository.interface.ts new file mode 100644 index 00000000..0494ae5e --- /dev/null +++ b/backend/services/blockchain-service/src/domain/repositories/outbox-event.repository.interface.ts @@ -0,0 +1,99 @@ +/** + * Outbox Event Repository Interface + * + * 发件箱模式 - 保证事件发布的可靠性 + */ + +export const OUTBOX_EVENT_REPOSITORY = Symbol('OUTBOX_EVENT_REPOSITORY'); + +export enum OutboxEventStatus { + PENDING = 'PENDING', // 待发送 + SENT = 'SENT', // 已发送,等待确认 + ACKED = 'ACKED', // 已确认 + FAILED = 'FAILED', // 发送失败(超过最大重试次数) +} + +export interface OutboxEventData { + eventType: string; + aggregateId: string; + aggregateType: string; + payload: Record; +} + +export interface OutboxEvent { + id: bigint; + eventType: string; + aggregateId: string; + aggregateType: string; + payload: Record; + status: OutboxEventStatus; + retryCount: number; + maxRetries: number; + lastError: string | null; + nextRetryAt: Date | null; + createdAt: Date; + sentAt: Date | null; + ackedAt: Date | null; +} + +export interface IOutboxEventRepository { + /** + * 创建新的 outbox 事件 + */ + create(data: OutboxEventData): Promise; + + /** + * 批量创建 outbox 事件 + */ + createMany(data: OutboxEventData[]): Promise; + + /** + * 根据 ID 查找 + */ + findById(id: bigint): Promise; + + /** + * 查找待发送的事件(PENDING 状态且到达重试时间) + */ + findPendingEvents(limit?: number): Promise; + + /** + * 查找已发送但未确认的事件(SENT 状态且超时) + */ + findUnackedEvents(timeoutSeconds: number, limit?: number): Promise; + + /** + * 标记为已发送 + */ + markAsSent(id: bigint): Promise; + + /** + * 标记为已确认 + */ + markAsAcked(id: bigint): Promise; + + /** + * 根据聚合ID标记为已确认(用于接收 ACK 时) + */ + markAsAckedByAggregateId(aggregateType: string, aggregateId: string, eventType: string): Promise; + + /** + * 记录发送失败,增加重试计数 + */ + recordFailure(id: bigint, error: string): Promise; + + /** + * 标记为最终失败(超过最大重试次数) + */ + markAsFailed(id: bigint, error: string): Promise; + + /** + * 重置为待发送状态(用于手动重试) + */ + resetToPending(id: bigint): Promise; + + /** + * 清理已确认的旧事件 + */ + cleanupAckedEvents(olderThanDays: number): Promise; +} diff --git a/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts index e8b60d53..50695888 100644 --- a/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts @@ -9,12 +9,14 @@ import { MONITORED_ADDRESS_REPOSITORY, BLOCK_CHECKPOINT_REPOSITORY, TRANSACTION_REQUEST_REPOSITORY, + OUTBOX_EVENT_REPOSITORY, } from '@/domain/repositories'; import { DepositTransactionRepositoryImpl, MonitoredAddressRepositoryImpl, BlockCheckpointRepositoryImpl, TransactionRequestRepositoryImpl, + OutboxEventRepositoryImpl, } from './persistence/repositories'; @Global() @@ -55,6 +57,10 @@ import { provide: TRANSACTION_REQUEST_REPOSITORY, useClass: TransactionRequestRepositoryImpl, }, + { + provide: OUTBOX_EVENT_REPOSITORY, + useClass: OutboxEventRepositoryImpl, + }, ], exports: [ PrismaService, @@ -72,6 +78,7 @@ import { MONITORED_ADDRESS_REPOSITORY, BLOCK_CHECKPOINT_REPOSITORY, TRANSACTION_REQUEST_REPOSITORY, + OUTBOX_EVENT_REPOSITORY, ], }) export class InfrastructureModule {} diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts new file mode 100644 index 00000000..2ea3f306 --- /dev/null +++ b/backend/services/blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts @@ -0,0 +1,147 @@ +/** + * Deposit ACK Consumer Service + * + * 监听 wallet-service 发送的充值确认事件。 + * 当 wallet-service 成功处理充值后,会发送 ACK 事件。 + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; +import { OutboxPublisherService } from '@/application/services/outbox-publisher.service'; + +export const ACK_TOPICS = { + WALLET_ACKS: 'wallet.acks', +} as const; + +export interface DepositCreditedPayload { + depositId: string; + txHash: string; + userId: string; + accountSequence: string; + amount: string; + creditedAt: string; +} + +@Injectable() +export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DepositAckConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + constructor( + private readonly configService: ConfigService, + private readonly outboxPublisher: OutboxPublisherService, + ) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || + this.configService.get('kafka.brokers') || + ['localhost:9092']; + const clientId = this.configService.get('kafka.clientId') || 'blockchain-service'; + const groupId = 'blockchain-service-deposit-acks'; + + this.logger.log(`[INIT] Deposit ACK Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers}`); + this.logger.log(`[INIT] Topics: ${Object.values(ACK_TOPICS).join(', ')}`); + + this.kafka = new Kafka({ + clientId, + brokers: Array.isArray(brokers) ? brokers : brokers.split(','), + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting Deposit ACK consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] Deposit ACK consumer connected successfully`); + + await this.consumer.subscribe({ + topics: Object.values(ACK_TOPICS), + fromBeginning: false, + }); + this.logger.log(`[SUBSCRIBE] Subscribed to ACK topics`); + + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect Deposit ACK consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Deposit ACK consumer disconnected'); + } + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] ACK message received: topic=${topic}, partition=${partition}, offset=${offset}`); + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn(`[RECEIVE] Empty ACK message received on ${topic}`); + return; + } + + this.logger.debug(`[RECEIVE] Raw ACK message: ${value.substring(0, 500)}`); + + const parsed = JSON.parse(value); + const eventType = parsed.eventType; + const payload = parsed.payload || parsed; + + this.logger.log(`[RECEIVE] ACK event type: ${eventType}`); + + if (eventType === 'wallet.deposit.credited') { + await this.handleDepositCredited(payload as DepositCreditedPayload); + } else { + this.logger.debug(`[RECEIVE] Unknown ACK event type: ${eventType}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing ACK message from ${topic}`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming ACK events`); + } + + private async handleDepositCredited(payload: DepositCreditedPayload): Promise { + this.logger.log(`[ACK] Processing deposit credited ACK`); + this.logger.log(`[ACK] depositId: ${payload.depositId}`); + this.logger.log(`[ACK] txHash: ${payload.txHash}`); + this.logger.log(`[ACK] userId: ${payload.userId}`); + this.logger.log(`[ACK] amount: ${payload.amount}`); + + try { + // 通知 OutboxPublisher 处理 ACK + await this.outboxPublisher.handleAck( + 'DepositTransaction', + payload.depositId, + 'blockchain.deposit.confirmed', + ); + + this.logger.log(`[ACK] Deposit ${payload.depositId} ACK processed successfully`); + } catch (error) { + this.logger.error(`[ACK] Error processing deposit ACK for ${payload.depositId}:`, error); + } + } +} diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts index 9ccdbc1b..94500a30 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts @@ -64,6 +64,38 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { } } + /** + * 发布原始事件数据(用于 Outbox 模式) + */ + async publishRaw(event: { + eventId: string; + eventType: string; + occurredAt: Date; + payload: Record; + }): Promise { + const topic = this.getTopicForEvent(event.eventType); + const message = { + key: event.eventId, + value: JSON.stringify({ + eventId: event.eventId, + eventType: event.eventType, + occurredAt: event.occurredAt.toISOString(), + payload: event.payload, + }), + headers: { + eventType: event.eventType, + source: 'blockchain-service', + }, + }; + + await this.producer.send({ + topic, + messages: [message], + }); + + this.logger.debug(`Published raw event: ${event.eventType} to topic: ${topic}`); + } + private getTopicForEvent(eventType: string): string { // 事件类型到 topic 的映射 const topicMap: Record = { diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/blockchain-service/src/infrastructure/kafka/index.ts index 460bc092..e78978c3 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/index.ts @@ -2,3 +2,4 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; export * from './mpc-event-consumer.service'; export * from './withdrawal-event-consumer.service'; +export * from './deposit-ack-consumer.service'; diff --git a/backend/services/blockchain-service/src/infrastructure/persistence/repositories/index.ts b/backend/services/blockchain-service/src/infrastructure/persistence/repositories/index.ts index f2122871..8fc5c7a3 100644 --- a/backend/services/blockchain-service/src/infrastructure/persistence/repositories/index.ts +++ b/backend/services/blockchain-service/src/infrastructure/persistence/repositories/index.ts @@ -2,3 +2,4 @@ export * from './deposit-transaction.repository.impl'; export * from './monitored-address.repository.impl'; export * from './block-checkpoint.repository.impl'; export * from './transaction-request.repository.impl'; +export * from './outbox-event.repository.impl'; diff --git a/backend/services/blockchain-service/src/infrastructure/persistence/repositories/outbox-event.repository.impl.ts b/backend/services/blockchain-service/src/infrastructure/persistence/repositories/outbox-event.repository.impl.ts new file mode 100644 index 00000000..9d9e377d --- /dev/null +++ b/backend/services/blockchain-service/src/infrastructure/persistence/repositories/outbox-event.repository.impl.ts @@ -0,0 +1,218 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { + IOutboxEventRepository, + OutboxEvent, + OutboxEventData, + OutboxEventStatus, +} from '@/domain/repositories/outbox-event.repository.interface'; + +@Injectable() +export class OutboxEventRepositoryImpl implements IOutboxEventRepository { + private readonly logger = new Logger(OutboxEventRepositoryImpl.name); + + constructor(private readonly prisma: PrismaService) {} + + async create(data: OutboxEventData): Promise { + const record = await this.prisma.outboxEvent.create({ + data: { + eventType: data.eventType, + aggregateId: data.aggregateId, + aggregateType: data.aggregateType, + payload: data.payload, + status: OutboxEventStatus.PENDING, + retryCount: 0, + maxRetries: 10, + }, + }); + return this.mapToOutboxEvent(record); + } + + async createMany(data: OutboxEventData[]): Promise { + await this.prisma.outboxEvent.createMany({ + data: data.map((d) => ({ + eventType: d.eventType, + aggregateId: d.aggregateId, + aggregateType: d.aggregateType, + payload: d.payload, + status: OutboxEventStatus.PENDING, + retryCount: 0, + maxRetries: 10, + })), + }); + } + + async findById(id: bigint): Promise { + const record = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + return record ? this.mapToOutboxEvent(record) : null; + } + + async findPendingEvents(limit: number = 100): Promise { + const now = new Date(); + const records = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxEventStatus.PENDING, + OR: [ + { nextRetryAt: null }, + { nextRetryAt: { lte: now } }, + ], + }, + orderBy: { createdAt: 'asc' }, + take: limit, + }); + return records.map((r) => this.mapToOutboxEvent(r)); + } + + async findUnackedEvents(timeoutSeconds: number, limit: number = 100): Promise { + const cutoff = new Date(Date.now() - timeoutSeconds * 1000); + const records = await this.prisma.outboxEvent.findMany({ + where: { + status: OutboxEventStatus.SENT, + sentAt: { lte: cutoff }, + }, + orderBy: { sentAt: 'asc' }, + take: limit, + }); + return records.map((r) => this.mapToOutboxEvent(r)); + } + + async markAsSent(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxEventStatus.SENT, + sentAt: new Date(), + }, + }); + } + + async markAsAcked(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxEventStatus.ACKED, + ackedAt: new Date(), + }, + }); + } + + async markAsAckedByAggregateId( + aggregateType: string, + aggregateId: string, + eventType: string, + ): Promise { + const result = await this.prisma.outboxEvent.updateMany({ + where: { + aggregateType, + aggregateId, + eventType, + status: OutboxEventStatus.SENT, + }, + data: { + status: OutboxEventStatus.ACKED, + ackedAt: new Date(), + }, + }); + this.logger.debug( + `Marked ${result.count} events as ACKED for ${aggregateType}:${aggregateId}:${eventType}`, + ); + } + + async recordFailure(id: bigint, error: string): Promise { + const event = await this.prisma.outboxEvent.findUnique({ + where: { id }, + }); + + if (!event) return; + + const newRetryCount = event.retryCount + 1; + // 指数退避: 2^retryCount 秒,最大 5 分钟 + const backoffSeconds = Math.min(Math.pow(2, newRetryCount), 300); + const nextRetryAt = new Date(Date.now() + backoffSeconds * 1000); + + if (newRetryCount >= event.maxRetries) { + await this.markAsFailed(id, error); + } else { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxEventStatus.PENDING, + retryCount: newRetryCount, + lastError: error, + nextRetryAt, + }, + }); + } + } + + async markAsFailed(id: bigint, error: string): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxEventStatus.FAILED, + lastError: error, + }, + }); + this.logger.warn(`Event ${id} marked as FAILED: ${error}`); + } + + async resetToPending(id: bigint): Promise { + await this.prisma.outboxEvent.update({ + where: { id }, + data: { + status: OutboxEventStatus.PENDING, + retryCount: 0, + lastError: null, + nextRetryAt: null, + sentAt: null, + ackedAt: null, + }, + }); + } + + async cleanupAckedEvents(olderThanDays: number): Promise { + const cutoff = new Date(Date.now() - olderThanDays * 24 * 60 * 60 * 1000); + const result = await this.prisma.outboxEvent.deleteMany({ + where: { + status: OutboxEventStatus.ACKED, + ackedAt: { lte: cutoff }, + }, + }); + this.logger.log(`Cleaned up ${result.count} old ACKED events`); + return result.count; + } + + private mapToOutboxEvent(record: { + id: bigint; + eventType: string; + aggregateId: string; + aggregateType: string; + payload: unknown; + status: string; + retryCount: number; + maxRetries: number; + lastError: string | null; + nextRetryAt: Date | null; + createdAt: Date; + sentAt: Date | null; + ackedAt: Date | null; + }): OutboxEvent { + return { + id: record.id, + eventType: record.eventType, + aggregateId: record.aggregateId, + aggregateType: record.aggregateType, + payload: record.payload as Record, + status: record.status as OutboxEventStatus, + retryCount: record.retryCount, + maxRetries: record.maxRetries, + lastError: record.lastError, + nextRetryAt: record.nextRetryAt, + createdAt: record.createdAt, + sentAt: record.sentAt, + ackedAt: record.ackedAt, + }; + } +} diff --git a/backend/services/wallet-service/src/application/event-handlers/deposit-confirmed.handler.ts b/backend/services/wallet-service/src/application/event-handlers/deposit-confirmed.handler.ts index 3289a4f5..59f6d335 100644 --- a/backend/services/wallet-service/src/application/event-handlers/deposit-confirmed.handler.ts +++ b/backend/services/wallet-service/src/application/event-handlers/deposit-confirmed.handler.ts @@ -12,6 +12,7 @@ import { DepositEventConsumerService, DepositConfirmedPayload, } from '@/infrastructure/kafka/deposit-event-consumer.service'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; import { ChainType } from '@/domain/value-objects'; @Injectable() @@ -21,6 +22,7 @@ export class DepositConfirmedHandler implements OnModuleInit { constructor( private readonly depositEventConsumer: DepositEventConsumerService, private readonly walletApplicationService: WalletApplicationService, + private readonly eventPublisher: EventPublisherService, ) {} onModuleInit() { @@ -52,12 +54,17 @@ export class DepositConfirmedHandler implements OnModuleInit { `userId=${payload.userId}, ` + `accountSequence=${payload.accountSequence}`, ); + + // 发送 ACK 确认事件给 blockchain-service + await this.sendDepositCreditedAck(payload); } catch (error) { // Check if it's a duplicate transaction error (already processed) if (error.message?.includes('Duplicate transaction')) { this.logger.warn( `Deposit already processed (duplicate): txHash=${payload.txHash}`, ); + // 重复交易也发送 ACK,确保 blockchain-service 知道已处理 + await this.sendDepositCreditedAck(payload); return; } @@ -69,6 +76,37 @@ export class DepositConfirmedHandler implements OnModuleInit { } } + /** + * 发送充值入账确认事件给 blockchain-service + * 这是 Outbox Pattern 的 ACK 机制,确保 blockchain-service 知道事件已被成功处理 + */ + private async sendDepositCreditedAck(payload: DepositConfirmedPayload): Promise { + try { + await this.eventPublisher.publish({ + eventType: 'wallet.deposit.credited', + payload: { + depositId: payload.depositId, + txHash: payload.txHash, + userId: payload.userId, + accountSequence: payload.accountSequence, + amount: payload.amountFormatted, + creditedAt: new Date().toISOString(), + }, + }); + + this.logger.log( + `[ACK] Sent deposit credited ACK: depositId=${payload.depositId}, txHash=${payload.txHash}`, + ); + } catch (error) { + // ACK 发送失败不应该影响主流程,只记录警告 + // blockchain-service 的 outbox 机制会处理重试 + this.logger.warn( + `[ACK] Failed to send deposit credited ACK: depositId=${payload.depositId}`, + error, + ); + } + } + private mapChainType(chainType: string): ChainType { const normalized = chainType.toUpperCase(); switch (normalized) { diff --git a/backend/services/wallet-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/wallet-service/src/infrastructure/kafka/event-publisher.service.ts index 0b5a612f..85d22ab5 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/event-publisher.service.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/event-publisher.service.ts @@ -106,6 +106,8 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { 'wallet.withdrawal.requested': 'wallet.withdrawals', 'wallet.withdrawal.completed': 'wallet.withdrawals', 'wallet.withdrawal.failed': 'wallet.withdrawals', + // ACK events - 确认消息 + 'wallet.deposit.credited': 'wallet.acks', }; return topicMap[eventType] || 'wallet.events'; }