feat(blockchain): implement Outbox Pattern for reliable event delivery
Implement Outbox Pattern with consumer ACK to ensure 100% reliable event delivery between blockchain-service and wallet-service: blockchain-service: - Add OutboxEvent model to Prisma schema with status tracking - Create outbox repository interface and implementation - Modify deposit-detection.service to write events to outbox - Add outbox-publisher.service with cron jobs for publishing/retry - Add deposit-ack-consumer.service to receive ACK from wallet-service - Add publishRaw method to event-publisher.service wallet-service: - Modify deposit-confirmed.handler to send ACK after successful processing - Add wallet.deposit.credited topic mapping for ACK events Event flow: 1. Deposit detected → written to outbox (status: PENDING) 2. Outbox publisher sends to Kafka → status: SENT 3. wallet-service processes and sends ACK → status: ACKED 4. Events without ACK are retried with exponential backoff 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
f04f8ed6ce
commit
075c9aaa48
|
|
@ -0,0 +1,30 @@
|
|||
-- CreateTable
|
||||
CREATE TABLE "outbox_events" (
|
||||
"event_id" BIGSERIAL NOT NULL,
|
||||
"event_type" VARCHAR(100) NOT NULL,
|
||||
"aggregate_id" VARCHAR(100) NOT NULL,
|
||||
"aggregate_type" VARCHAR(50) NOT NULL,
|
||||
"payload" JSONB NOT NULL,
|
||||
"status" VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||||
"retry_count" INTEGER NOT NULL DEFAULT 0,
|
||||
"max_retries" INTEGER NOT NULL DEFAULT 10,
|
||||
"last_error" TEXT,
|
||||
"next_retry_at" TIMESTAMP(3),
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"sent_at" TIMESTAMP(3),
|
||||
"acked_at" TIMESTAMP(3),
|
||||
|
||||
CONSTRAINT "outbox_events_pkey" PRIMARY KEY ("event_id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_outbox_pending" ON "outbox_events"("status", "next_retry_at");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_outbox_aggregate" ON "outbox_events"("aggregate_type", "aggregate_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_outbox_event_type" ON "outbox_events"("event_type");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "idx_outbox_created" ON "outbox_events"("created_at");
|
||||
|
|
@ -200,6 +200,40 @@ model RecoveryMnemonic {
|
|||
@@map("recovery_mnemonics")
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// Outbox 事件表 (发件箱模式)
|
||||
// 保证事件发布的可靠性
|
||||
// ============================================
|
||||
model OutboxEvent {
|
||||
id BigInt @id @default(autoincrement()) @map("event_id")
|
||||
|
||||
// 事件信息
|
||||
eventType String @map("event_type") @db.VarChar(100)
|
||||
aggregateId String @map("aggregate_id") @db.VarChar(100)
|
||||
aggregateType String @map("aggregate_type") @db.VarChar(50)
|
||||
payload Json @map("payload")
|
||||
|
||||
// 发送状态: PENDING -> SENT -> ACKED / FAILED
|
||||
status String @default("PENDING") @db.VarChar(20)
|
||||
|
||||
// 重试信息
|
||||
retryCount Int @default(0) @map("retry_count")
|
||||
maxRetries Int @default(10) @map("max_retries")
|
||||
lastError String? @map("last_error") @db.Text
|
||||
nextRetryAt DateTime? @map("next_retry_at")
|
||||
|
||||
// 时间戳
|
||||
createdAt DateTime @default(now()) @map("created_at")
|
||||
sentAt DateTime? @map("sent_at")
|
||||
ackedAt DateTime? @map("acked_at")
|
||||
|
||||
@@index([status, nextRetryAt], name: "idx_outbox_pending")
|
||||
@@index([aggregateType, aggregateId], name: "idx_outbox_aggregate")
|
||||
@@index([eventType], name: "idx_outbox_event_type")
|
||||
@@index([createdAt], name: "idx_outbox_created")
|
||||
@@map("outbox_events")
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 区块链事件日志 (Append-Only 审计)
|
||||
// ============================================
|
||||
|
|
|
|||
|
|
@ -6,8 +6,10 @@ import {
|
|||
DepositDetectionService,
|
||||
BalanceQueryService,
|
||||
MnemonicVerificationService,
|
||||
OutboxPublisherService,
|
||||
} from './services';
|
||||
import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-handlers';
|
||||
import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service';
|
||||
|
||||
@Module({
|
||||
imports: [InfrastructureModule, DomainModule],
|
||||
|
|
@ -17,6 +19,10 @@ import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-h
|
|||
DepositDetectionService,
|
||||
BalanceQueryService,
|
||||
MnemonicVerificationService,
|
||||
OutboxPublisherService,
|
||||
|
||||
// 事件消费者(依赖 OutboxPublisherService,需要在这里注册)
|
||||
DepositAckConsumerService,
|
||||
|
||||
// 事件处理器
|
||||
MpcKeygenCompletedHandler,
|
||||
|
|
@ -27,6 +33,8 @@ import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-h
|
|||
DepositDetectionService,
|
||||
BalanceQueryService,
|
||||
MnemonicVerificationService,
|
||||
OutboxPublisherService,
|
||||
DepositAckConsumerService,
|
||||
MpcKeygenCompletedHandler,
|
||||
WithdrawalRequestedHandler,
|
||||
],
|
||||
|
|
|
|||
|
|
@ -21,8 +21,13 @@ import {
|
|||
BLOCK_CHECKPOINT_REPOSITORY,
|
||||
IBlockCheckpointRepository,
|
||||
} from '@/domain/repositories/block-checkpoint.repository.interface';
|
||||
import {
|
||||
OUTBOX_EVENT_REPOSITORY,
|
||||
IOutboxEventRepository,
|
||||
} from '@/domain/repositories/outbox-event.repository.interface';
|
||||
import { DepositTransaction } from '@/domain/aggregates/deposit-transaction';
|
||||
import { ChainType, TxHash, EvmAddress, TokenAmount, BlockNumber } from '@/domain/value-objects';
|
||||
import { DepositConfirmedEvent } from '@/domain/events';
|
||||
|
||||
/**
|
||||
* 充值检测服务
|
||||
|
|
@ -45,6 +50,8 @@ export class DepositDetectionService implements OnModuleInit {
|
|||
private readonly monitoredAddressRepo: IMonitoredAddressRepository,
|
||||
@Inject(BLOCK_CHECKPOINT_REPOSITORY)
|
||||
private readonly checkpointRepo: IBlockCheckpointRepository,
|
||||
@Inject(OUTBOX_EVENT_REPOSITORY)
|
||||
private readonly outboxRepo: IOutboxEventRepository,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
|
|
@ -195,9 +202,23 @@ export class DepositDetectionService implements OnModuleInit {
|
|||
|
||||
await this.depositRepo.save(deposit);
|
||||
|
||||
// 发布确认事件
|
||||
// 处理领域事件
|
||||
for (const event of deposit.domainEvents) {
|
||||
await this.eventPublisher.publish(event);
|
||||
if (event instanceof DepositConfirmedEvent) {
|
||||
// 重要事件写入 outbox,保证可靠投递
|
||||
await this.outboxRepo.create({
|
||||
eventType: event.eventType,
|
||||
aggregateId: deposit.id?.toString() || deposit.txHash.toString(),
|
||||
aggregateType: 'DepositTransaction',
|
||||
payload: event.toPayload(),
|
||||
});
|
||||
this.logger.log(
|
||||
`Deposit confirmed event saved to outbox: ${deposit.txHash.toShort()} (${deposit.confirmations} confirmations)`,
|
||||
);
|
||||
} else {
|
||||
// 非关键事件直接发送(如 DepositDetectedEvent)
|
||||
await this.eventPublisher.publish(event);
|
||||
}
|
||||
}
|
||||
deposit.clearDomainEvents();
|
||||
|
||||
|
|
|
|||
|
|
@ -2,3 +2,4 @@ export * from './address-derivation.service';
|
|||
export * from './deposit-detection.service';
|
||||
export * from './balance-query.service';
|
||||
export * from './mnemonic-verification.service';
|
||||
export * from './outbox-publisher.service';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,148 @@
|
|||
import { Injectable, Logger, Inject, OnModuleInit } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
|
||||
import {
|
||||
OUTBOX_EVENT_REPOSITORY,
|
||||
IOutboxEventRepository,
|
||||
OutboxEventStatus,
|
||||
} from '@/domain/repositories/outbox-event.repository.interface';
|
||||
import {
|
||||
DEPOSIT_TRANSACTION_REPOSITORY,
|
||||
IDepositTransactionRepository,
|
||||
} from '@/domain/repositories/deposit-transaction.repository.interface';
|
||||
|
||||
/**
|
||||
* Outbox 发布服务
|
||||
*
|
||||
* 定时扫描 outbox_events 表,将待发送的事件发布到 Kafka。
|
||||
* 实现发件箱模式(Outbox Pattern),保证事件的可靠投递。
|
||||
*/
|
||||
@Injectable()
|
||||
export class OutboxPublisherService implements OnModuleInit {
|
||||
private readonly logger = new Logger(OutboxPublisherService.name);
|
||||
|
||||
// 发送超时时间(秒)- 超过此时间未收到 ACK 则重发
|
||||
private readonly SENT_TIMEOUT_SECONDS = 300; // 5 分钟
|
||||
|
||||
// 清理已确认事件的天数
|
||||
private readonly CLEANUP_DAYS = 7;
|
||||
|
||||
constructor(
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
@Inject(OUTBOX_EVENT_REPOSITORY)
|
||||
private readonly outboxRepo: IOutboxEventRepository,
|
||||
@Inject(DEPOSIT_TRANSACTION_REPOSITORY)
|
||||
private readonly depositRepo: IDepositTransactionRepository,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
this.logger.log('OutboxPublisherService initialized');
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时发布待发送事件(每5秒)
|
||||
*/
|
||||
@Cron(CronExpression.EVERY_5_SECONDS)
|
||||
async publishPendingEvents(): Promise<void> {
|
||||
try {
|
||||
const pendingEvents = await this.outboxRepo.findPendingEvents(50);
|
||||
|
||||
if (pendingEvents.length === 0) return;
|
||||
|
||||
this.logger.debug(`Found ${pendingEvents.length} pending events to publish`);
|
||||
|
||||
for (const event of pendingEvents) {
|
||||
try {
|
||||
// 发送到 Kafka
|
||||
await this.eventPublisher.publishRaw({
|
||||
eventId: `outbox-${event.id}`,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.createdAt,
|
||||
payload: event.payload,
|
||||
});
|
||||
|
||||
// 标记为已发送
|
||||
await this.outboxRepo.markAsSent(event.id);
|
||||
|
||||
this.logger.log(
|
||||
`Published event ${event.id}: ${event.eventType} for ${event.aggregateType}:${event.aggregateId}`,
|
||||
);
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
this.logger.error(`Failed to publish event ${event.id}: ${errorMessage}`);
|
||||
|
||||
// 记录失败,安排重试
|
||||
await this.outboxRepo.recordFailure(event.id, errorMessage);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error in publishPendingEvents:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时检查超时未确认的事件(每分钟)
|
||||
* 将 SENT 状态但超时的事件重置为 PENDING
|
||||
*/
|
||||
@Cron(CronExpression.EVERY_MINUTE)
|
||||
async checkUnackedEvents(): Promise<void> {
|
||||
try {
|
||||
const unackedEvents = await this.outboxRepo.findUnackedEvents(
|
||||
this.SENT_TIMEOUT_SECONDS,
|
||||
50,
|
||||
);
|
||||
|
||||
if (unackedEvents.length === 0) return;
|
||||
|
||||
this.logger.warn(`Found ${unackedEvents.length} unacked events, will retry`);
|
||||
|
||||
for (const event of unackedEvents) {
|
||||
// 记录超时失败,触发重试逻辑
|
||||
await this.outboxRepo.recordFailure(event.id, 'ACK timeout');
|
||||
this.logger.warn(
|
||||
`Event ${event.id} ACK timeout, scheduled for retry (attempt ${event.retryCount + 1})`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error in checkUnackedEvents:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时清理已确认的旧事件(每天凌晨3点)
|
||||
*/
|
||||
@Cron('0 3 * * *')
|
||||
async cleanupOldEvents(): Promise<void> {
|
||||
try {
|
||||
const count = await this.outboxRepo.cleanupAckedEvents(this.CLEANUP_DAYS);
|
||||
if (count > 0) {
|
||||
this.logger.log(`Cleaned up ${count} old ACKED events`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Error in cleanupOldEvents:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 ACK 确认
|
||||
* 当收到 wallet-service 的确认事件时调用
|
||||
*/
|
||||
async handleAck(aggregateType: string, aggregateId: string, eventType: string): Promise<void> {
|
||||
try {
|
||||
await this.outboxRepo.markAsAckedByAggregateId(aggregateType, aggregateId, eventType);
|
||||
|
||||
// 同时更新 deposit_transactions 表的 notified_at
|
||||
if (aggregateType === 'DepositTransaction') {
|
||||
const depositId = BigInt(aggregateId);
|
||||
const deposit = await this.depositRepo.findById(depositId);
|
||||
if (deposit) {
|
||||
deposit.markAsNotified();
|
||||
await this.depositRepo.save(deposit);
|
||||
this.logger.log(`Deposit ${aggregateId} marked as notified`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Error handling ACK for ${aggregateType}:${aggregateId}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,3 +2,4 @@ export * from './deposit-transaction.repository.interface';
|
|||
export * from './monitored-address.repository.interface';
|
||||
export * from './block-checkpoint.repository.interface';
|
||||
export * from './transaction-request.repository.interface';
|
||||
export * from './outbox-event.repository.interface';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Outbox Event Repository Interface
|
||||
*
|
||||
* 发件箱模式 - 保证事件发布的可靠性
|
||||
*/
|
||||
|
||||
export const OUTBOX_EVENT_REPOSITORY = Symbol('OUTBOX_EVENT_REPOSITORY');
|
||||
|
||||
export enum OutboxEventStatus {
|
||||
PENDING = 'PENDING', // 待发送
|
||||
SENT = 'SENT', // 已发送,等待确认
|
||||
ACKED = 'ACKED', // 已确认
|
||||
FAILED = 'FAILED', // 发送失败(超过最大重试次数)
|
||||
}
|
||||
|
||||
export interface OutboxEventData {
|
||||
eventType: string;
|
||||
aggregateId: string;
|
||||
aggregateType: string;
|
||||
payload: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface OutboxEvent {
|
||||
id: bigint;
|
||||
eventType: string;
|
||||
aggregateId: string;
|
||||
aggregateType: string;
|
||||
payload: Record<string, unknown>;
|
||||
status: OutboxEventStatus;
|
||||
retryCount: number;
|
||||
maxRetries: number;
|
||||
lastError: string | null;
|
||||
nextRetryAt: Date | null;
|
||||
createdAt: Date;
|
||||
sentAt: Date | null;
|
||||
ackedAt: Date | null;
|
||||
}
|
||||
|
||||
export interface IOutboxEventRepository {
|
||||
/**
|
||||
* 创建新的 outbox 事件
|
||||
*/
|
||||
create(data: OutboxEventData): Promise<OutboxEvent>;
|
||||
|
||||
/**
|
||||
* 批量创建 outbox 事件
|
||||
*/
|
||||
createMany(data: OutboxEventData[]): Promise<void>;
|
||||
|
||||
/**
|
||||
* 根据 ID 查找
|
||||
*/
|
||||
findById(id: bigint): Promise<OutboxEvent | null>;
|
||||
|
||||
/**
|
||||
* 查找待发送的事件(PENDING 状态且到达重试时间)
|
||||
*/
|
||||
findPendingEvents(limit?: number): Promise<OutboxEvent[]>;
|
||||
|
||||
/**
|
||||
* 查找已发送但未确认的事件(SENT 状态且超时)
|
||||
*/
|
||||
findUnackedEvents(timeoutSeconds: number, limit?: number): Promise<OutboxEvent[]>;
|
||||
|
||||
/**
|
||||
* 标记为已发送
|
||||
*/
|
||||
markAsSent(id: bigint): Promise<void>;
|
||||
|
||||
/**
|
||||
* 标记为已确认
|
||||
*/
|
||||
markAsAcked(id: bigint): Promise<void>;
|
||||
|
||||
/**
|
||||
* 根据聚合ID标记为已确认(用于接收 ACK 时)
|
||||
*/
|
||||
markAsAckedByAggregateId(aggregateType: string, aggregateId: string, eventType: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 记录发送失败,增加重试计数
|
||||
*/
|
||||
recordFailure(id: bigint, error: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 标记为最终失败(超过最大重试次数)
|
||||
*/
|
||||
markAsFailed(id: bigint, error: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* 重置为待发送状态(用于手动重试)
|
||||
*/
|
||||
resetToPending(id: bigint): Promise<void>;
|
||||
|
||||
/**
|
||||
* 清理已确认的旧事件
|
||||
*/
|
||||
cleanupAckedEvents(olderThanDays: number): Promise<number>;
|
||||
}
|
||||
|
|
@ -9,12 +9,14 @@ import {
|
|||
MONITORED_ADDRESS_REPOSITORY,
|
||||
BLOCK_CHECKPOINT_REPOSITORY,
|
||||
TRANSACTION_REQUEST_REPOSITORY,
|
||||
OUTBOX_EVENT_REPOSITORY,
|
||||
} from '@/domain/repositories';
|
||||
import {
|
||||
DepositTransactionRepositoryImpl,
|
||||
MonitoredAddressRepositoryImpl,
|
||||
BlockCheckpointRepositoryImpl,
|
||||
TransactionRequestRepositoryImpl,
|
||||
OutboxEventRepositoryImpl,
|
||||
} from './persistence/repositories';
|
||||
|
||||
@Global()
|
||||
|
|
@ -55,6 +57,10 @@ import {
|
|||
provide: TRANSACTION_REQUEST_REPOSITORY,
|
||||
useClass: TransactionRequestRepositoryImpl,
|
||||
},
|
||||
{
|
||||
provide: OUTBOX_EVENT_REPOSITORY,
|
||||
useClass: OutboxEventRepositoryImpl,
|
||||
},
|
||||
],
|
||||
exports: [
|
||||
PrismaService,
|
||||
|
|
@ -72,6 +78,7 @@ import {
|
|||
MONITORED_ADDRESS_REPOSITORY,
|
||||
BLOCK_CHECKPOINT_REPOSITORY,
|
||||
TRANSACTION_REQUEST_REPOSITORY,
|
||||
OUTBOX_EVENT_REPOSITORY,
|
||||
],
|
||||
})
|
||||
export class InfrastructureModule {}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Deposit ACK Consumer Service
|
||||
*
|
||||
* 监听 wallet-service 发送的充值确认事件。
|
||||
* 当 wallet-service 成功处理充值后,会发送 ACK 事件。
|
||||
*/
|
||||
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
|
||||
import { OutboxPublisherService } from '@/application/services/outbox-publisher.service';
|
||||
|
||||
export const ACK_TOPICS = {
|
||||
WALLET_ACKS: 'wallet.acks',
|
||||
} as const;
|
||||
|
||||
export interface DepositCreditedPayload {
|
||||
depositId: string;
|
||||
txHash: string;
|
||||
userId: string;
|
||||
accountSequence: string;
|
||||
amount: string;
|
||||
creditedAt: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(DepositAckConsumerService.name);
|
||||
private kafka: Kafka;
|
||||
private consumer: Consumer;
|
||||
private isConnected = false;
|
||||
|
||||
constructor(
|
||||
private readonly configService: ConfigService,
|
||||
private readonly outboxPublisher: OutboxPublisherService,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') ||
|
||||
this.configService.get<string[]>('kafka.brokers') ||
|
||||
['localhost:9092'];
|
||||
const clientId = this.configService.get<string>('kafka.clientId') || 'blockchain-service';
|
||||
const groupId = 'blockchain-service-deposit-acks';
|
||||
|
||||
this.logger.log(`[INIT] Deposit ACK Consumer initializing...`);
|
||||
this.logger.log(`[INIT] ClientId: ${clientId}`);
|
||||
this.logger.log(`[INIT] GroupId: ${groupId}`);
|
||||
this.logger.log(`[INIT] Brokers: ${brokers}`);
|
||||
this.logger.log(`[INIT] Topics: ${Object.values(ACK_TOPICS).join(', ')}`);
|
||||
|
||||
this.kafka = new Kafka({
|
||||
clientId,
|
||||
brokers: Array.isArray(brokers) ? brokers : brokers.split(','),
|
||||
logLevel: logLevel.WARN,
|
||||
retry: {
|
||||
initialRetryTime: 100,
|
||||
retries: 8,
|
||||
},
|
||||
});
|
||||
|
||||
this.consumer = this.kafka.consumer({
|
||||
groupId,
|
||||
sessionTimeout: 30000,
|
||||
heartbeatInterval: 3000,
|
||||
});
|
||||
|
||||
try {
|
||||
this.logger.log(`[CONNECT] Connecting Deposit ACK consumer...`);
|
||||
await this.consumer.connect();
|
||||
this.isConnected = true;
|
||||
this.logger.log(`[CONNECT] Deposit ACK consumer connected successfully`);
|
||||
|
||||
await this.consumer.subscribe({
|
||||
topics: Object.values(ACK_TOPICS),
|
||||
fromBeginning: false,
|
||||
});
|
||||
this.logger.log(`[SUBSCRIBE] Subscribed to ACK topics`);
|
||||
|
||||
await this.startConsuming();
|
||||
} catch (error) {
|
||||
this.logger.error(`[ERROR] Failed to connect Deposit ACK consumer`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
if (this.isConnected) {
|
||||
await this.consumer.disconnect();
|
||||
this.logger.log('Deposit ACK consumer disconnected');
|
||||
}
|
||||
}
|
||||
|
||||
private async startConsuming(): Promise<void> {
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
|
||||
const offset = message.offset;
|
||||
this.logger.log(`[RECEIVE] ACK message received: topic=${topic}, partition=${partition}, offset=${offset}`);
|
||||
|
||||
try {
|
||||
const value = message.value?.toString();
|
||||
if (!value) {
|
||||
this.logger.warn(`[RECEIVE] Empty ACK message received on ${topic}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug(`[RECEIVE] Raw ACK message: ${value.substring(0, 500)}`);
|
||||
|
||||
const parsed = JSON.parse(value);
|
||||
const eventType = parsed.eventType;
|
||||
const payload = parsed.payload || parsed;
|
||||
|
||||
this.logger.log(`[RECEIVE] ACK event type: ${eventType}`);
|
||||
|
||||
if (eventType === 'wallet.deposit.credited') {
|
||||
await this.handleDepositCredited(payload as DepositCreditedPayload);
|
||||
} else {
|
||||
this.logger.debug(`[RECEIVE] Unknown ACK event type: ${eventType}`);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`[ERROR] Error processing ACK message from ${topic}`, error);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`[START] Started consuming ACK events`);
|
||||
}
|
||||
|
||||
private async handleDepositCredited(payload: DepositCreditedPayload): Promise<void> {
|
||||
this.logger.log(`[ACK] Processing deposit credited ACK`);
|
||||
this.logger.log(`[ACK] depositId: ${payload.depositId}`);
|
||||
this.logger.log(`[ACK] txHash: ${payload.txHash}`);
|
||||
this.logger.log(`[ACK] userId: ${payload.userId}`);
|
||||
this.logger.log(`[ACK] amount: ${payload.amount}`);
|
||||
|
||||
try {
|
||||
// 通知 OutboxPublisher 处理 ACK
|
||||
await this.outboxPublisher.handleAck(
|
||||
'DepositTransaction',
|
||||
payload.depositId,
|
||||
'blockchain.deposit.confirmed',
|
||||
);
|
||||
|
||||
this.logger.log(`[ACK] Deposit ${payload.depositId} ACK processed successfully`);
|
||||
} catch (error) {
|
||||
this.logger.error(`[ACK] Error processing deposit ACK for ${payload.depositId}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -64,6 +64,38 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布原始事件数据(用于 Outbox 模式)
|
||||
*/
|
||||
async publishRaw(event: {
|
||||
eventId: string;
|
||||
eventType: string;
|
||||
occurredAt: Date;
|
||||
payload: Record<string, unknown>;
|
||||
}): Promise<void> {
|
||||
const topic = this.getTopicForEvent(event.eventType);
|
||||
const message = {
|
||||
key: event.eventId,
|
||||
value: JSON.stringify({
|
||||
eventId: event.eventId,
|
||||
eventType: event.eventType,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
payload: event.payload,
|
||||
}),
|
||||
headers: {
|
||||
eventType: event.eventType,
|
||||
source: 'blockchain-service',
|
||||
},
|
||||
};
|
||||
|
||||
await this.producer.send({
|
||||
topic,
|
||||
messages: [message],
|
||||
});
|
||||
|
||||
this.logger.debug(`Published raw event: ${event.eventType} to topic: ${topic}`);
|
||||
}
|
||||
|
||||
private getTopicForEvent(eventType: string): string {
|
||||
// 事件类型到 topic 的映射
|
||||
const topicMap: Record<string, string> = {
|
||||
|
|
|
|||
|
|
@ -2,3 +2,4 @@ export * from './event-publisher.service';
|
|||
export * from './event-consumer.controller';
|
||||
export * from './mpc-event-consumer.service';
|
||||
export * from './withdrawal-event-consumer.service';
|
||||
export * from './deposit-ack-consumer.service';
|
||||
|
|
|
|||
|
|
@ -2,3 +2,4 @@ export * from './deposit-transaction.repository.impl';
|
|||
export * from './monitored-address.repository.impl';
|
||||
export * from './block-checkpoint.repository.impl';
|
||||
export * from './transaction-request.repository.impl';
|
||||
export * from './outbox-event.repository.impl';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,218 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { PrismaService } from '../prisma/prisma.service';
|
||||
import {
|
||||
IOutboxEventRepository,
|
||||
OutboxEvent,
|
||||
OutboxEventData,
|
||||
OutboxEventStatus,
|
||||
} from '@/domain/repositories/outbox-event.repository.interface';
|
||||
|
||||
@Injectable()
|
||||
export class OutboxEventRepositoryImpl implements IOutboxEventRepository {
|
||||
private readonly logger = new Logger(OutboxEventRepositoryImpl.name);
|
||||
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async create(data: OutboxEventData): Promise<OutboxEvent> {
|
||||
const record = await this.prisma.outboxEvent.create({
|
||||
data: {
|
||||
eventType: data.eventType,
|
||||
aggregateId: data.aggregateId,
|
||||
aggregateType: data.aggregateType,
|
||||
payload: data.payload,
|
||||
status: OutboxEventStatus.PENDING,
|
||||
retryCount: 0,
|
||||
maxRetries: 10,
|
||||
},
|
||||
});
|
||||
return this.mapToOutboxEvent(record);
|
||||
}
|
||||
|
||||
async createMany(data: OutboxEventData[]): Promise<void> {
|
||||
await this.prisma.outboxEvent.createMany({
|
||||
data: data.map((d) => ({
|
||||
eventType: d.eventType,
|
||||
aggregateId: d.aggregateId,
|
||||
aggregateType: d.aggregateType,
|
||||
payload: d.payload,
|
||||
status: OutboxEventStatus.PENDING,
|
||||
retryCount: 0,
|
||||
maxRetries: 10,
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
async findById(id: bigint): Promise<OutboxEvent | null> {
|
||||
const record = await this.prisma.outboxEvent.findUnique({
|
||||
where: { id },
|
||||
});
|
||||
return record ? this.mapToOutboxEvent(record) : null;
|
||||
}
|
||||
|
||||
async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
|
||||
const now = new Date();
|
||||
const records = await this.prisma.outboxEvent.findMany({
|
||||
where: {
|
||||
status: OutboxEventStatus.PENDING,
|
||||
OR: [
|
||||
{ nextRetryAt: null },
|
||||
{ nextRetryAt: { lte: now } },
|
||||
],
|
||||
},
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: limit,
|
||||
});
|
||||
return records.map((r) => this.mapToOutboxEvent(r));
|
||||
}
|
||||
|
||||
async findUnackedEvents(timeoutSeconds: number, limit: number = 100): Promise<OutboxEvent[]> {
|
||||
const cutoff = new Date(Date.now() - timeoutSeconds * 1000);
|
||||
const records = await this.prisma.outboxEvent.findMany({
|
||||
where: {
|
||||
status: OutboxEventStatus.SENT,
|
||||
sentAt: { lte: cutoff },
|
||||
},
|
||||
orderBy: { sentAt: 'asc' },
|
||||
take: limit,
|
||||
});
|
||||
return records.map((r) => this.mapToOutboxEvent(r));
|
||||
}
|
||||
|
||||
async markAsSent(id: bigint): Promise<void> {
|
||||
await this.prisma.outboxEvent.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: OutboxEventStatus.SENT,
|
||||
sentAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async markAsAcked(id: bigint): Promise<void> {
|
||||
await this.prisma.outboxEvent.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: OutboxEventStatus.ACKED,
|
||||
ackedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async markAsAckedByAggregateId(
|
||||
aggregateType: string,
|
||||
aggregateId: string,
|
||||
eventType: string,
|
||||
): Promise<void> {
|
||||
const result = await this.prisma.outboxEvent.updateMany({
|
||||
where: {
|
||||
aggregateType,
|
||||
aggregateId,
|
||||
eventType,
|
||||
status: OutboxEventStatus.SENT,
|
||||
},
|
||||
data: {
|
||||
status: OutboxEventStatus.ACKED,
|
||||
ackedAt: new Date(),
|
||||
},
|
||||
});
|
||||
this.logger.debug(
|
||||
`Marked ${result.count} events as ACKED for ${aggregateType}:${aggregateId}:${eventType}`,
|
||||
);
|
||||
}
|
||||
|
||||
async recordFailure(id: bigint, error: string): Promise<void> {
|
||||
const event = await this.prisma.outboxEvent.findUnique({
|
||||
where: { id },
|
||||
});
|
||||
|
||||
if (!event) return;
|
||||
|
||||
const newRetryCount = event.retryCount + 1;
|
||||
// 指数退避: 2^retryCount 秒,最大 5 分钟
|
||||
const backoffSeconds = Math.min(Math.pow(2, newRetryCount), 300);
|
||||
const nextRetryAt = new Date(Date.now() + backoffSeconds * 1000);
|
||||
|
||||
if (newRetryCount >= event.maxRetries) {
|
||||
await this.markAsFailed(id, error);
|
||||
} else {
|
||||
await this.prisma.outboxEvent.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: OutboxEventStatus.PENDING,
|
||||
retryCount: newRetryCount,
|
||||
lastError: error,
|
||||
nextRetryAt,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async markAsFailed(id: bigint, error: string): Promise<void> {
|
||||
await this.prisma.outboxEvent.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: OutboxEventStatus.FAILED,
|
||||
lastError: error,
|
||||
},
|
||||
});
|
||||
this.logger.warn(`Event ${id} marked as FAILED: ${error}`);
|
||||
}
|
||||
|
||||
async resetToPending(id: bigint): Promise<void> {
|
||||
await this.prisma.outboxEvent.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: OutboxEventStatus.PENDING,
|
||||
retryCount: 0,
|
||||
lastError: null,
|
||||
nextRetryAt: null,
|
||||
sentAt: null,
|
||||
ackedAt: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async cleanupAckedEvents(olderThanDays: number): Promise<number> {
|
||||
const cutoff = new Date(Date.now() - olderThanDays * 24 * 60 * 60 * 1000);
|
||||
const result = await this.prisma.outboxEvent.deleteMany({
|
||||
where: {
|
||||
status: OutboxEventStatus.ACKED,
|
||||
ackedAt: { lte: cutoff },
|
||||
},
|
||||
});
|
||||
this.logger.log(`Cleaned up ${result.count} old ACKED events`);
|
||||
return result.count;
|
||||
}
|
||||
|
||||
private mapToOutboxEvent(record: {
|
||||
id: bigint;
|
||||
eventType: string;
|
||||
aggregateId: string;
|
||||
aggregateType: string;
|
||||
payload: unknown;
|
||||
status: string;
|
||||
retryCount: number;
|
||||
maxRetries: number;
|
||||
lastError: string | null;
|
||||
nextRetryAt: Date | null;
|
||||
createdAt: Date;
|
||||
sentAt: Date | null;
|
||||
ackedAt: Date | null;
|
||||
}): OutboxEvent {
|
||||
return {
|
||||
id: record.id,
|
||||
eventType: record.eventType,
|
||||
aggregateId: record.aggregateId,
|
||||
aggregateType: record.aggregateType,
|
||||
payload: record.payload as Record<string, unknown>,
|
||||
status: record.status as OutboxEventStatus,
|
||||
retryCount: record.retryCount,
|
||||
maxRetries: record.maxRetries,
|
||||
lastError: record.lastError,
|
||||
nextRetryAt: record.nextRetryAt,
|
||||
createdAt: record.createdAt,
|
||||
sentAt: record.sentAt,
|
||||
ackedAt: record.ackedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ import {
|
|||
DepositEventConsumerService,
|
||||
DepositConfirmedPayload,
|
||||
} from '@/infrastructure/kafka/deposit-event-consumer.service';
|
||||
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
|
||||
import { ChainType } from '@/domain/value-objects';
|
||||
|
||||
@Injectable()
|
||||
|
|
@ -21,6 +22,7 @@ export class DepositConfirmedHandler implements OnModuleInit {
|
|||
constructor(
|
||||
private readonly depositEventConsumer: DepositEventConsumerService,
|
||||
private readonly walletApplicationService: WalletApplicationService,
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
|
|
@ -52,12 +54,17 @@ export class DepositConfirmedHandler implements OnModuleInit {
|
|||
`userId=${payload.userId}, ` +
|
||||
`accountSequence=${payload.accountSequence}`,
|
||||
);
|
||||
|
||||
// 发送 ACK 确认事件给 blockchain-service
|
||||
await this.sendDepositCreditedAck(payload);
|
||||
} catch (error) {
|
||||
// Check if it's a duplicate transaction error (already processed)
|
||||
if (error.message?.includes('Duplicate transaction')) {
|
||||
this.logger.warn(
|
||||
`Deposit already processed (duplicate): txHash=${payload.txHash}`,
|
||||
);
|
||||
// 重复交易也发送 ACK,确保 blockchain-service 知道已处理
|
||||
await this.sendDepositCreditedAck(payload);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -69,6 +76,37 @@ export class DepositConfirmedHandler implements OnModuleInit {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送充值入账确认事件给 blockchain-service
|
||||
* 这是 Outbox Pattern 的 ACK 机制,确保 blockchain-service 知道事件已被成功处理
|
||||
*/
|
||||
private async sendDepositCreditedAck(payload: DepositConfirmedPayload): Promise<void> {
|
||||
try {
|
||||
await this.eventPublisher.publish({
|
||||
eventType: 'wallet.deposit.credited',
|
||||
payload: {
|
||||
depositId: payload.depositId,
|
||||
txHash: payload.txHash,
|
||||
userId: payload.userId,
|
||||
accountSequence: payload.accountSequence,
|
||||
amount: payload.amountFormatted,
|
||||
creditedAt: new Date().toISOString(),
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(
|
||||
`[ACK] Sent deposit credited ACK: depositId=${payload.depositId}, txHash=${payload.txHash}`,
|
||||
);
|
||||
} catch (error) {
|
||||
// ACK 发送失败不应该影响主流程,只记录警告
|
||||
// blockchain-service 的 outbox 机制会处理重试
|
||||
this.logger.warn(
|
||||
`[ACK] Failed to send deposit credited ACK: depositId=${payload.depositId}`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private mapChainType(chainType: string): ChainType {
|
||||
const normalized = chainType.toUpperCase();
|
||||
switch (normalized) {
|
||||
|
|
|
|||
|
|
@ -106,6 +106,8 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy {
|
|||
'wallet.withdrawal.requested': 'wallet.withdrawals',
|
||||
'wallet.withdrawal.completed': 'wallet.withdrawals',
|
||||
'wallet.withdrawal.failed': 'wallet.withdrawals',
|
||||
// ACK events - 确认消息
|
||||
'wallet.deposit.credited': 'wallet.acks',
|
||||
};
|
||||
return topicMap[eventType] || 'wallet.events';
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue