rwadurian/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.servic...

152 lines
5.1 KiB
TypeScript

/**
* Deposit ACK Consumer Service
*
* 监听 wallet-service 发送的充值确认事件。
* 当 wallet-service 成功处理充值后,会发送 ACK 事件。
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
import { OutboxPublisherService } from '@/application/services/outbox-publisher.service';
export const ACK_TOPICS = {
WALLET_ACKS: 'mining_wallet.acks',
} as const;
export interface DepositCreditedPayload {
depositId: string;
txHash: string;
userId: string;
accountSequence: string;
amount: string;
creditedAt: string;
}
@Injectable()
export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DepositAckConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
constructor(
private readonly configService: ConfigService,
private readonly outboxPublisher: OutboxPublisherService,
) {}
async onModuleInit() {
const brokersEnv = this.configService.get<string>('KAFKA_BROKERS');
const brokersConfig = this.configService.get<string[]>('kafka.brokers');
const brokers: string[] = brokersEnv?.split(',') || brokersConfig || ['localhost:9092'];
const clientId = this.configService.get<string>('kafka.clientId') || 'mining-blockchain-service';
const groupId = 'mining-blockchain-service-deposit-acks';
this.logger.log(`[INIT] Deposit ACK Consumer initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`);
this.logger.log(`[INIT] GroupId: ${groupId}`);
this.logger.log(`[INIT] Brokers: ${brokers}`);
this.logger.log(`[INIT] Topics: ${Object.values(ACK_TOPICS).join(', ')}`);
// 企业级重试配置:指数退避,最多重试约 2.5 小时
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 1000, // 1 秒
maxRetryTime: 300000, // 最大 5 分钟
retries: 15, // 最多 15 次
multiplier: 2, // 指数退避因子
restartOnFailure: async () => true,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
this.logger.log(`[CONNECT] Connecting Deposit ACK consumer...`);
await this.consumer.connect();
this.isConnected = true;
this.logger.log(`[CONNECT] Deposit ACK consumer connected successfully`);
await this.consumer.subscribe({
topics: Object.values(ACK_TOPICS),
fromBeginning: false,
});
this.logger.log(`[SUBSCRIBE] Subscribed to ACK topics`);
await this.startConsuming();
} catch (error) {
this.logger.error(`[ERROR] Failed to connect Deposit ACK consumer`, error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('Deposit ACK consumer disconnected');
}
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const offset = message.offset;
this.logger.log(`[RECEIVE] ACK message received: topic=${topic}, partition=${partition}, offset=${offset}`);
try {
const value = message.value?.toString();
if (!value) {
this.logger.warn(`[RECEIVE] Empty ACK message received on ${topic}`);
return;
}
this.logger.debug(`[RECEIVE] Raw ACK message: ${value.substring(0, 500)}`);
const parsed = JSON.parse(value);
const eventType = parsed.eventType;
const payload = parsed.payload || parsed;
this.logger.log(`[RECEIVE] ACK event type: ${eventType}`);
if (eventType === 'mining_wallet.deposit.credited') {
await this.handleDepositCredited(payload as DepositCreditedPayload);
} else {
this.logger.debug(`[RECEIVE] Unknown ACK event type: ${eventType}`);
}
} catch (error) {
this.logger.error(`[ERROR] Error processing ACK message from ${topic}`, error);
}
},
});
this.logger.log(`[START] Started consuming ACK events`);
}
private async handleDepositCredited(payload: DepositCreditedPayload): Promise<void> {
this.logger.log(`[ACK] Processing deposit credited ACK`);
this.logger.log(`[ACK] depositId: ${payload.depositId}`);
this.logger.log(`[ACK] txHash: ${payload.txHash}`);
this.logger.log(`[ACK] userId: ${payload.userId}`);
this.logger.log(`[ACK] amount: ${payload.amount}`);
try {
// 通知 OutboxPublisher 处理 ACK
await this.outboxPublisher.handleAck(
'DepositTransaction',
payload.depositId,
'mining_blockchain.deposit.confirmed',
);
this.logger.log(`[ACK] Deposit ${payload.depositId} ACK processed successfully`);
} catch (error) {
this.logger.error(`[ACK] Error processing deposit ACK for ${payload.depositId}:`, error);
}
}
}