feat(mining-wallet): add UserWalletCreated/Updated events for CDC sync

- Publish UserWalletCreated when a new wallet is created
- Publish UserWalletUpdated when wallet balance changes
- Events sent to cdc.mining-wallet.outbox topic for mining-admin-service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-14 06:13:34 -08:00
parent 839feab97d
commit f3d4799efc
1 changed files with 66 additions and 2 deletions

View File

@ -51,6 +51,7 @@ export class ContributionWalletService {
},
});
const isNewWallet = !wallet;
if (!wallet) {
wallet = await tx.userWallet.create({
data: {
@ -60,13 +61,34 @@ export class ContributionWalletService {
frozenBalance: new Decimal(0),
},
});
// 发布 UserWalletCreated 事件
await tx.outboxEvent.create({
data: {
aggregateType: 'UserWallet',
aggregateId: wallet.id,
eventType: 'UserWalletCreated',
topic: 'cdc.mining-wallet.outbox',
key: input.accountSequence,
payload: {
id: wallet.id,
accountSequence: wallet.accountSequence,
walletType: wallet.walletType,
balance: '0',
frozenBalance: '0',
totalInflow: 0,
totalOutflow: 0,
isActive: true,
},
},
});
}
const balanceBefore = new Decimal(wallet.balance.toString());
const balanceAfter = balanceBefore.plus(input.amount);
// 2. 更新钱包余额
await tx.userWallet.update({
const updatedWallet = await tx.userWallet.update({
where: { id: wallet.id },
data: {
balance: balanceAfter,
@ -74,6 +96,27 @@ export class ContributionWalletService {
},
});
// 发布 UserWalletUpdated 事件(用于 mining-admin-service 同步)
await tx.outboxEvent.create({
data: {
aggregateType: 'UserWallet',
aggregateId: wallet.id,
eventType: 'UserWalletUpdated',
topic: 'cdc.mining-wallet.outbox',
key: input.accountSequence,
payload: {
id: wallet.id,
accountSequence: wallet.accountSequence,
walletType: wallet.walletType,
balance: balanceAfter.toString(),
frozenBalance: updatedWallet.frozenBalance.toString(),
totalInflow: updatedWallet.totalInflow.toString(),
totalOutflow: updatedWallet.totalOutflow.toString(),
isActive: true,
},
},
});
// 3. 创建交易记录(分类账)
const transaction = await tx.userWalletTransaction.create({
data: {
@ -247,7 +290,7 @@ export class ContributionWalletService {
}
// 更新钱包余额
await tx.userWallet.update({
const updatedWallet = await tx.userWallet.update({
where: { id: wallet.id },
data: {
balance: balanceAfter,
@ -255,6 +298,27 @@ export class ContributionWalletService {
},
});
// 发布 UserWalletUpdated 事件(用于 mining-admin-service 同步)
await tx.outboxEvent.create({
data: {
aggregateType: 'UserWallet',
aggregateId: wallet.id,
eventType: 'UserWalletUpdated',
topic: 'cdc.mining-wallet.outbox',
key: accountSequence,
payload: {
id: wallet.id,
accountSequence: wallet.accountSequence,
walletType: wallet.walletType,
balance: balanceAfter.toString(),
frozenBalance: updatedWallet.frozenBalance.toString(),
totalInflow: updatedWallet.totalInflow.toString(),
totalOutflow: updatedWallet.totalOutflow.toString(),
isActive: true,
},
},
});
// 创建过期交易记录
await tx.userWalletTransaction.create({
data: {