/** * Blockchain Event Consumer Service * * Consumes wallet address creation events from blockchain-service via Kafka. * Updates user wallet addresses when blockchain-service derives addresses from MPC public keys. */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; // Blockchain Event Topics (events from blockchain-service) export const BLOCKCHAIN_TOPICS = { WALLET_ADDRESS_CREATED: 'blockchain.wallets', } as const; export interface WalletAddressCreatedPayload { userId: string; publicKey: string; addresses: { chainType: string; address: string; }[]; } export type BlockchainEventHandler = (payload: T) => Promise; @Injectable() export class BlockchainEventConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(BlockchainEventConsumerService.name); private kafka: Kafka; private consumer: Consumer; private isConnected = false; private walletAddressCreatedHandler?: BlockchainEventHandler; constructor(private readonly configService: ConfigService) {} async onModuleInit() { const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'identity-service'; const groupId = 'identity-service-blockchain-events'; this.logger.log(`[INIT] Blockchain Event Consumer initializing...`); this.logger.log(`[INIT] ClientId: ${clientId}`); this.logger.log(`[INIT] GroupId: ${groupId}`); this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); this.logger.log(`[INIT] Topics to subscribe: ${Object.values(BLOCKCHAIN_TOPICS).join(', ')}`); this.kafka = new Kafka({ clientId, brokers, logLevel: logLevel.WARN, retry: { initialRetryTime: 100, retries: 8, }, }); this.consumer = this.kafka.consumer({ groupId, sessionTimeout: 30000, heartbeatInterval: 3000, }); try { this.logger.log(`[CONNECT] Connecting Blockchain Event consumer...`); await this.consumer.connect(); this.isConnected = true; this.logger.log(`[CONNECT] Blockchain Event Kafka consumer connected successfully`); // Subscribe to blockchain topics await this.consumer.subscribe({ topics: Object.values(BLOCKCHAIN_TOPICS), fromBeginning: false }); this.logger.log(`[SUBSCRIBE] Subscribed to blockchain topics: ${Object.values(BLOCKCHAIN_TOPICS).join(', ')}`); // Start consuming await this.startConsuming(); } catch (error) { this.logger.error(`[ERROR] Failed to connect Blockchain Event Kafka consumer`, error); } } async onModuleDestroy() { if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('Blockchain Event Kafka consumer disconnected'); } } /** * Register handler for wallet address created events */ onWalletAddressCreated(handler: BlockchainEventHandler): void { this.walletAddressCreatedHandler = handler; this.logger.log(`[REGISTER] WalletAddressCreated handler registered`); } private async startConsuming(): Promise { await this.consumer.run({ eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { const offset = message.offset; this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); try { const value = message.value?.toString(); if (!value) { this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); return; } this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); const parsed = JSON.parse(value); const payload = parsed.payload || parsed; const eventType = parsed.eventType || 'unknown'; this.logger.log(`[RECEIVE] Parsed event: eventType=${eventType}`); this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); // Handle WalletAddressCreated events if (eventType === 'blockchain.wallet.address.created' || topic === BLOCKCHAIN_TOPICS.WALLET_ADDRESS_CREATED) { this.logger.log(`[HANDLE] Processing WalletAddressCreated event`); this.logger.log(`[HANDLE] userId: ${payload.userId}`); this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); this.logger.log(`[HANDLE] addresses count: ${payload.addresses?.length}`); if (this.walletAddressCreatedHandler) { await this.walletAddressCreatedHandler(payload as WalletAddressCreatedPayload); this.logger.log(`[HANDLE] WalletAddressCreated handler completed successfully`); } else { this.logger.warn(`[HANDLE] No handler registered for WalletAddressCreated`); } } else { this.logger.warn(`[RECEIVE] Unknown event type: ${eventType}`); } } catch (error) { this.logger.error(`[ERROR] Error processing blockchain event from ${topic}`, error); } }, }); this.logger.log(`[START] Started consuming blockchain events`); } }