From f3d4799efc9a72b1ec9491a7b8c07d867286810b Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 14 Jan 2026 06:13:34 -0800 Subject: [PATCH] feat(mining-wallet): add UserWalletCreated/Updated events for CDC sync - Publish UserWalletCreated when a new wallet is created - Publish UserWalletUpdated when wallet balance changes - Events sent to cdc.mining-wallet.outbox topic for mining-admin-service Co-Authored-By: Claude Opus 4.5 --- .../services/contribution-wallet.service.ts | 68 ++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts b/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts index 9cd8fa00..b99b7db0 100644 --- a/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts +++ b/backend/services/mining-wallet-service/src/application/services/contribution-wallet.service.ts @@ -51,6 +51,7 @@ export class ContributionWalletService { }, }); + const isNewWallet = !wallet; if (!wallet) { wallet = await tx.userWallet.create({ data: { @@ -60,13 +61,34 @@ export class ContributionWalletService { frozenBalance: new Decimal(0), }, }); + + // 发布 UserWalletCreated 事件 + await tx.outboxEvent.create({ + data: { + aggregateType: 'UserWallet', + aggregateId: wallet.id, + eventType: 'UserWalletCreated', + topic: 'cdc.mining-wallet.outbox', + key: input.accountSequence, + payload: { + id: wallet.id, + accountSequence: wallet.accountSequence, + walletType: wallet.walletType, + balance: '0', + frozenBalance: '0', + totalInflow: 0, + totalOutflow: 0, + isActive: true, + }, + }, + }); } const balanceBefore = new Decimal(wallet.balance.toString()); const balanceAfter = balanceBefore.plus(input.amount); // 2. 更新钱包余额 - await tx.userWallet.update({ + const updatedWallet = await tx.userWallet.update({ where: { id: wallet.id }, data: { balance: balanceAfter, @@ -74,6 +96,27 @@ export class ContributionWalletService { }, }); + // 发布 UserWalletUpdated 事件(用于 mining-admin-service 同步) + await tx.outboxEvent.create({ + data: { + aggregateType: 'UserWallet', + aggregateId: wallet.id, + eventType: 'UserWalletUpdated', + topic: 'cdc.mining-wallet.outbox', + key: input.accountSequence, + payload: { + id: wallet.id, + accountSequence: wallet.accountSequence, + walletType: wallet.walletType, + balance: balanceAfter.toString(), + frozenBalance: updatedWallet.frozenBalance.toString(), + totalInflow: updatedWallet.totalInflow.toString(), + totalOutflow: updatedWallet.totalOutflow.toString(), + isActive: true, + }, + }, + }); + // 3. 创建交易记录(分类账) const transaction = await tx.userWalletTransaction.create({ data: { @@ -247,7 +290,7 @@ export class ContributionWalletService { } // 更新钱包余额 - await tx.userWallet.update({ + const updatedWallet = await tx.userWallet.update({ where: { id: wallet.id }, data: { balance: balanceAfter, @@ -255,6 +298,27 @@ export class ContributionWalletService { }, }); + // 发布 UserWalletUpdated 事件(用于 mining-admin-service 同步) + await tx.outboxEvent.create({ + data: { + aggregateType: 'UserWallet', + aggregateId: wallet.id, + eventType: 'UserWalletUpdated', + topic: 'cdc.mining-wallet.outbox', + key: accountSequence, + payload: { + id: wallet.id, + accountSequence: wallet.accountSequence, + walletType: wallet.walletType, + balance: balanceAfter.toString(), + frozenBalance: updatedWallet.frozenBalance.toString(), + totalInflow: updatedWallet.totalInflow.toString(), + totalOutflow: updatedWallet.totalOutflow.toString(), + isActive: true, + }, + }, + }); + // 创建过期交易记录 await tx.userWalletTransaction.create({ data: {