rwadurian/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts

203 lines
6.4 KiB
TypeScript

/**
* Blockchain Wallet Event Handler
*
* Handles wallet address events from blockchain-service:
* - WalletAddressCreated: Saves derived wallet addresses to user account
*
* This handler receives properly derived addresses from blockchain-service:
* - KAVA: EVM format (0x...) - KAVA is EVM-compatible
* - DST: Cosmos bech32 format (dst1...)
* - BSC: EVM format (0x...)
*/
import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common';
import {
UserAccountRepository,
USER_ACCOUNT_REPOSITORY,
} from '@/domain/repositories/user-account.repository.interface';
import { WalletAddress } from '@/domain/entities/wallet-address.entity';
import { ChainType, UserId } from '@/domain/value-objects';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import {
BlockchainEventConsumerService,
WalletAddressCreatedPayload,
} from '@/infrastructure/kafka/blockchain-event-consumer.service';
// Redis key prefix for keygen status
const KEYGEN_STATUS_PREFIX = 'keygen:status:';
const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours
// Status data for wallet completion (extended from MpcKeygenCompletedHandler)
interface WalletCompletedStatusData {
status: 'completed';
userId: string;
publicKey?: string;
walletAddresses?: { chainType: string; address: string }[];
mnemonic?: string; // 恢复助记词 (明文,仅首次)
updatedAt: string;
}
@Injectable()
export class BlockchainWalletHandler implements OnModuleInit {
private readonly logger = new Logger(BlockchainWalletHandler.name);
constructor(
@Inject(USER_ACCOUNT_REPOSITORY)
private readonly userRepository: UserAccountRepository,
private readonly redisService: RedisService,
private readonly prisma: PrismaService,
private readonly blockchainEventConsumer: BlockchainEventConsumerService,
) {}
async onModuleInit() {
// Register event handler
this.blockchainEventConsumer.onWalletAddressCreated(
this.handleWalletAddressCreated.bind(this),
);
this.logger.log(
'[INIT] Registered BlockchainWalletHandler for WalletAddressCreated events',
);
}
/**
* Handle WalletAddressCreated event from blockchain-service
*
* This event contains properly derived addresses:
* - KAVA: 0x... (EVM) - KAVA is EVM-compatible
* - DST: dst1... (Cosmos bech32)
* - BSC: 0x... (EVM)
*/
private async handleWalletAddressCreated(
payload: WalletAddressCreatedPayload,
): Promise<void> {
const {
userId,
publicKey,
addresses,
mnemonic,
encryptedMnemonic,
mnemonicHash,
} = payload;
this.logger.log(
`[HANDLE] Processing WalletAddressCreated: userId=${userId}`,
);
this.logger.log(`[HANDLE] Public key: ${publicKey?.substring(0, 30)}...`);
this.logger.log(`[HANDLE] Addresses: ${JSON.stringify(addresses)}`);
this.logger.log(`[HANDLE] Has mnemonic: ${!!mnemonic}`);
if (!userId) {
this.logger.error(
'[ERROR] WalletAddressCreated event missing userId, skipping',
);
return;
}
if (!addresses || addresses.length === 0) {
this.logger.error(
'[ERROR] WalletAddressCreated event missing addresses, skipping',
);
return;
}
try {
// 1. Find user account
const account = await this.userRepository.findById(UserId.create(userId));
if (!account) {
this.logger.error(`[ERROR] User not found: ${userId}`);
return;
}
// 2. Create wallet addresses for each chain (with publicKey)
const wallets: WalletAddress[] = addresses.map((addr) => {
const chainType = this.parseChainType(addr.chainType);
this.logger.log(
`[WALLET] Creating wallet: ${addr.chainType} -> ${addr.address} (publicKey: ${publicKey?.slice(0, 16)}...)`,
);
return WalletAddress.create({
userId: account.userId,
chainType,
address: addr.address,
publicKey, // 传入公钥,用于关联助记词
});
});
// 3. Save wallet addresses to user account
await this.userRepository.saveWallets(account.userId, wallets);
this.logger.log(
`[WALLET] Saved ${wallets.length} wallet addresses for user: ${userId}`,
);
// 4. Recovery mnemonic is now stored in blockchain-service (DDD: domain separation)
// Note: blockchain-service stores mnemonic with accountSequence association
if (mnemonic) {
this.logger.log(
`[MNEMONIC] Recovery mnemonic received for user: ${userId} (stored in blockchain-service)`,
);
}
// 5. Update Redis status to completed (include mnemonic for first-time retrieval)
// Uses atomic operation to ensure proper state transition
const statusData: WalletCompletedStatusData = {
status: 'completed',
userId,
publicKey,
walletAddresses: addresses,
mnemonic, // 首次返回明文助记词
updatedAt: new Date().toISOString(),
};
const updated = await this.redisService.updateKeygenStatusAtomic(
`${KEYGEN_STATUS_PREFIX}${userId}`,
JSON.stringify(statusData),
'completed',
KEYGEN_STATUS_TTL,
);
if (updated) {
this.logger.log(
`[STATUS] Keygen status updated to 'completed' for user: ${userId}`,
);
} else {
this.logger.log(
`[STATUS] Status already 'completed' for user: ${userId} (idempotent - event redelivered)`,
);
}
// Log all addresses
addresses.forEach((addr) => {
this.logger.log(`[COMPLETE] ${addr.chainType}: ${addr.address}`);
});
} catch (error) {
this.logger.error(
`[ERROR] Failed to process WalletAddressCreated: ${error}`,
error,
);
// Re-throw to trigger Kafka retry mechanism
// This ensures messages are not marked as consumed until successfully processed
throw error;
}
}
/**
* Parse chain type string to ChainType value object
*/
private parseChainType(chainType: string): ChainType {
const normalizedType = chainType.toUpperCase();
switch (normalizedType) {
case 'KAVA':
return ChainType.KAVA;
case 'DST':
return ChainType.DST;
case 'BSC':
return ChainType.BSC;
default:
this.logger.warn(
`[WARN] Unknown chain type: ${chainType}, defaulting to BSC`,
);
return ChainType.BSC;
}
}
}