/** * MPC Keygen Event Handler * * Handles keygen events from mpc-service: * - KeygenStarted: Updates status in Redis to "generating" * - KeygenCompleted: Updates status to indicate waiting for blockchain-service * - SessionFailed: Logs error and updates status to "failed" * * NOTE: Address derivation is now handled by blockchain-service. * This handler only manages status updates. The actual wallet addresses * are saved by BlockchainWalletHandler when it receives WalletAddressCreated * events from blockchain-service. */ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { RedisService } from '@/infrastructure/redis/redis.service'; import { MpcEventConsumerService, KeygenStartedPayload, KeygenCompletedPayload, SessionFailedPayload, } from '@/infrastructure/kafka/mpc-event-consumer.service'; // Redis key prefix for keygen status const KEYGEN_STATUS_PREFIX = 'keygen:status:'; const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours export type KeygenStatus = 'pending' | 'generating' | 'deriving' | 'completed' | 'failed'; export interface KeygenStatusData { status: KeygenStatus; userId: string; mpcSessionId?: string; publicKey?: string; errorMessage?: string; updatedAt: string; } @Injectable() export class MpcKeygenCompletedHandler implements OnModuleInit { private readonly logger = new Logger(MpcKeygenCompletedHandler.name); constructor( private readonly redisService: RedisService, private readonly mpcEventConsumer: MpcEventConsumerService, ) {} async onModuleInit() { // Register event handlers this.mpcEventConsumer.onKeygenStarted(this.handleKeygenStarted.bind(this)); this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this)); this.logger.log('[INIT] Registered MPC event handlers (status updates only)'); } /** * Handle keygen started event * * Update Redis status to "generating" */ private async handleKeygenStarted(payload: KeygenStartedPayload): Promise { const { userId, mpcSessionId } = payload; this.logger.log(`[STATUS] Keygen started: userId=${userId}, mpcSessionId=${mpcSessionId}`); try { const statusData: KeygenStatusData = { status: 'generating', userId, mpcSessionId, updatedAt: new Date().toISOString(), }; await this.redisService.set( `${KEYGEN_STATUS_PREFIX}${userId}`, JSON.stringify(statusData), KEYGEN_STATUS_TTL, ); this.logger.log(`[STATUS] Keygen status updated to 'generating' for user: ${userId}`); } catch (error) { this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); } } /** * Handle keygen completed event * * From mpc-service, keygen is complete with public key. * Update status to "deriving" - blockchain-service will now derive addresses * and send WalletAddressCreated event which BlockchainWalletHandler will process. * * Uses atomic Redis update to ensure status only advances forward: * pending -> generating -> deriving -> completed */ private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { const { publicKey, extraPayload } = payload; if (!extraPayload?.userId) { this.logger.warn('[WARN] KeygenCompleted event missing userId, skipping'); return; } const { userId, username } = extraPayload; this.logger.log(`[STATUS] Keygen completed: userId=${userId}, username=${username}`); this.logger.log(`[STATUS] Public key: ${publicKey?.substring(0, 30)}...`); try { this.logger.log(`[STATUS] Waiting for blockchain-service to derive addresses...`); // Update status to "deriving" - waiting for blockchain-service // Uses atomic operation to ensure we don't overwrite higher-priority status const statusData: KeygenStatusData = { status: 'deriving', userId, publicKey, updatedAt: new Date().toISOString(), }; const updated = await this.redisService.updateKeygenStatusAtomic( `${KEYGEN_STATUS_PREFIX}${userId}`, JSON.stringify(statusData), 'deriving', KEYGEN_STATUS_TTL, ); if (updated) { this.logger.log(`[STATUS] Keygen status updated to 'deriving' for user: ${userId}`); this.logger.log(`[STATUS] blockchain-service will derive addresses and send WalletAddressCreated event`); } else { this.logger.log(`[STATUS] Status not updated for user: ${userId} (current status has higher priority)`); } } catch (error) { this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); } } /** * Handle session failed event * * When keygen fails: * 1. Log error * 2. Update Redis status to "failed" */ private async handleSessionFailed(payload: SessionFailedPayload): Promise { const { sessionType, errorMessage, extraPayload } = payload; // Only handle keygen failures if (sessionType !== 'keygen' && sessionType !== 'KEYGEN') { return; } const userId = extraPayload?.userId || 'unknown'; this.logger.error(`[ERROR] Keygen failed for user ${userId}: ${errorMessage}`); try { // Update Redis status to failed const statusData: KeygenStatusData = { status: 'failed', userId, errorMessage, updatedAt: new Date().toISOString(), }; await this.redisService.set( `${KEYGEN_STATUS_PREFIX}${userId}`, JSON.stringify(statusData), KEYGEN_STATUS_TTL, ); this.logger.log(`[STATUS] Keygen status updated to 'failed' for user: ${userId}`); } catch (error) { this.logger.error(`[ERROR] Failed to update keygen failed status: ${error}`, error); } } }