diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 26b55ffb..af4caa03 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -353,6 +353,12 @@ export class CdcSyncService implements OnModuleInit { this.withIdempotency(this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers)), ); + // CONTRIBUTION_CREDITED 事件 - 贡献值入账时更新用户钱包 + this.cdcConsumer.registerServiceHandler( + 'CONTRIBUTION_CREDITED', + this.withIdempotency(this.handleContributionCredited.bind(this)), + ); + this.logger.log('CDC sync handlers registered with idempotency protection'); } @@ -813,4 +819,60 @@ export class CdcSyncService implements OnModuleInit { this.logger.debug('Synced circulation pool'); } + // =========================================================================== + // 钱包事件处理 (mining-wallet-service) + // =========================================================================== + + /** + * 处理 CONTRIBUTION_CREDITED 事件 + * mining-wallet-service 在为用户入账贡献值时发布 + * payload: { accountSequence, walletType, amount, balanceAfter, transactionId, ... } + */ + private async handleContributionCredited(event: ServiceEvent, tx: TransactionClient): Promise { + const { payload } = event; + const walletType = payload.walletType || 'CONTRIBUTION'; + + // 先查找是否已存在 + const existing = await tx.syncedUserWallet.findUnique({ + where: { + accountSequence_walletType: { + accountSequence: payload.accountSequence, + walletType, + }, + }, + }); + + if (existing) { + // 更新余额(使用最新的 balanceAfter) + await tx.syncedUserWallet.update({ + where: { id: existing.id }, + data: { + balance: payload.balanceAfter, + totalInflow: { + increment: parseFloat(payload.amount) || 0, + }, + }, + }); + } else { + // 创建新钱包记录 + // originalId 使用 accountSequence + walletType 的组合生成一个稳定的 ID + const originalId = `wallet-${payload.accountSequence}-${walletType}`; + + await tx.syncedUserWallet.create({ + data: { + originalId, + accountSequence: payload.accountSequence, + walletType, + balance: payload.balanceAfter || 0, + frozenBalance: 0, + totalInflow: parseFloat(payload.amount) || 0, + totalOutflow: 0, + isActive: true, + }, + }); + } + + this.logger.debug(`Synced user wallet from CONTRIBUTION_CREDITED: ${payload.accountSequence}, balance: ${payload.balanceAfter}`); + } + }