diff --git a/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts b/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts index 1b12764c..50635210 100644 --- a/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts +++ b/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts @@ -139,6 +139,9 @@ export class BlockchainWalletHandler implements OnModuleInit { }); } catch (error) { this.logger.error(`[ERROR] Failed to process WalletAddressCreated: ${error}`, error); + // Re-throw to trigger Kafka retry mechanism + // This ensures messages are not marked as consumed until successfully processed + throw error; } } diff --git a/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts index 8e5ed9e1..f52cfa52 100644 --- a/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts +++ b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts @@ -79,6 +79,8 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { this.logger.log(`[STATUS] Keygen status updated to 'generating' for user: ${userId}`); } catch (error) { this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); + // Re-throw to trigger Kafka retry mechanism + throw error; } } @@ -131,6 +133,8 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { } } catch (error) { this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); + // Re-throw to trigger Kafka retry mechanism + throw error; } } @@ -170,6 +174,8 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { this.logger.log(`[STATUS] Keygen status updated to 'failed' for user: ${userId}`); } catch (error) { this.logger.error(`[ERROR] Failed to update keygen failed status: ${error}`, error); + // Re-throw to trigger Kafka retry mechanism + throw error; } } } diff --git a/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts index 8c2b7297..63786202 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts @@ -139,6 +139,9 @@ export class BlockchainEventConsumerService implements OnModuleInit, OnModuleDes } } catch (error) { this.logger.error(`[ERROR] Error processing blockchain event from ${topic}`, error); + // Re-throw to trigger Kafka retry mechanism + // This ensures messages are not marked as consumed until successfully processed + throw error; } }, }); diff --git a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index 91055f97..f3b32448 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -233,6 +233,9 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { } } catch (error) { this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); + // Re-throw to trigger Kafka retry mechanism + // This ensures messages are not marked as consumed until successfully processed + throw error; } }, }); diff --git a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts index 30f50d79..e71d1300 100644 --- a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts +++ b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts @@ -95,23 +95,40 @@ export class UserAccountRepositoryImpl implements UserAccountRepository { } async saveWallets(userId: UserId, wallets: WalletAddress[]): Promise { - await this.prisma.walletAddress.createMany({ - data: wallets.map((w) => { + // 使用事务保护,确保所有钱包地址要么全部保存成功,要么全部回滚 + await this.prisma.$transaction(async (tx) => { + for (const w of wallets) { const sig = fromMpcSignatureString(w.mpcSignature); - return { - userId: BigInt(userId.value), - chainType: w.chainType, - address: w.address, - publicKey: w.publicKey, - addressDigest: w.addressDigest, - mpcSignatureR: sig.r, - mpcSignatureS: sig.s, - mpcSignatureV: sig.v, - status: w.status, - boundAt: w.boundAt, - }; - }), - skipDuplicates: true, + + // 幂等性检查:如果该链类型的地址已存在,跳过 + const existing = await tx.walletAddress.findFirst({ + where: { + userId: BigInt(userId.value), + chainType: w.chainType, + }, + }); + + if (existing) { + // 已存在,跳过(幂等性) + continue; + } + + // 创建新的钱包地址 + await tx.walletAddress.create({ + data: { + userId: BigInt(userId.value), + chainType: w.chainType, + address: w.address, + publicKey: w.publicKey, + addressDigest: w.addressDigest, + mpcSignatureR: sig.r, + mpcSignatureS: sig.s, + mpcSignatureV: sig.v, + status: w.status, + boundAt: w.boundAt, + }, + }); + } }); }