From edc0ea46c9f1a87a3905907a73d0c6870d0218ed Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 28 Jan 2026 22:05:55 -0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20mining-blockchain-s?= =?UTF-8?q?ervice=20Kafka=20consumer=20group=20=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mining-blockchain-service 使用独立的 consumer group ID,避免与 blockchain-service 冲突 - withdrawal-event-consumer: blockchain-service-withdrawal-events -> mining-blockchain-service-withdrawal-events - mpc-event-consumer: blockchain-service-mpc-events -> mining-blockchain-service-mpc-events - deposit-ack-consumer: blockchain-service-deposit-acks -> mining-blockchain-service-deposit-acks - 更新 docker-compose.yml 和 kafka.config.ts 的默认配置 Co-Authored-By: Claude Opus 4.5 --- .../services/user-application.service.ts | 17 +++++++++++++++-- .../user-account/user-account.aggregate.ts | 16 ++++++++++++++++ .../docker-compose.yml | 4 ++-- .../src/config/kafka.config.ts | 4 ++-- .../kafka/deposit-ack-consumer.service.ts | 4 ++-- .../kafka/mpc-event-consumer.service.ts | 4 ++-- .../kafka/withdrawal-event-consumer.service.ts | 4 ++-- 7 files changed, 41 insertions(+), 12 deletions(-) diff --git a/backend/services/identity-service/src/application/services/user-application.service.ts b/backend/services/identity-service/src/application/services/user-application.service.ts index 71fbc773..129b2f65 100644 --- a/backend/services/identity-service/src/application/services/user-application.service.ts +++ b/backend/services/identity-service/src/application/services/user-application.service.ts @@ -171,6 +171,9 @@ export class UserApplicationService { // 7. 保存账户 await this.userRepository.save(account); + // 7.1 更新事件中的 userId(save 后才有真实 userId) + account.updateEventsUserId(); + // 8. 生成 Token const tokens = await this.tokenService.generateTokenPair({ userId: account.userId.toString(), @@ -352,7 +355,10 @@ export class UserApplicationService { }); this.logger.log(`[REGISTER] Tokens generated`); - // 13. 发布领域事件 + // 13. 更新事件中的 userId(save 后才有真实 userId) + account.updateEventsUserId(); + + // 14. 发布领域事件 this.logger.log( `[REGISTER] Publishing ${account.domainEvents.length} domain events...`, ); @@ -523,7 +529,10 @@ export class UserApplicationService { }); this.logger.log(`[REGISTER_NO_SMS] Tokens generated`); - // 11. 发布领域事件 + // 11. 更新事件中的 userId(save 后才有真实 userId) + account.updateEventsUserId(); + + // 12. 发布领域事件 this.logger.log( `[REGISTER_NO_SMS] Publishing ${account.domainEvents.length} domain events...`, ); @@ -864,6 +873,10 @@ export class UserApplicationService { await this.userRepository.save(account); await this.redisService.delete(`sms:register:${phoneNumber.value}`); + + // 更新事件中的 userId(save 后才有真实 userId) + account.updateEventsUserId(); + await this.eventPublisher.publishAll(account.domainEvents); account.clearDomainEvents(); diff --git a/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts b/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts index 8f2c12f3..f23018b3 100644 --- a/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts +++ b/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts @@ -498,6 +498,22 @@ export class UserAccount { this._domainEvents = []; } + /** + * 更新领域事件中的 userId + * + * [2026-01-29] 修复:事件在 aggregate 创建时添加,此时 userId 是临时值 0 + * 在 repository.save() 后调用此方法,将事件中的 userId 更新为数据库生成的真实值 + */ + updateEventsUserId(): void { + const realUserId = this._userId.toString(); + // 只更新 userId 为 '0' 的事件(即新创建时的临时值) + for (const event of this._domainEvents) { + if ('payload' in event && (event as any).payload?.userId === '0') { + (event as any).payload.userId = realUserId; + } + } + } + /** * 创建钱包生成事件(用于重试) * diff --git a/backend/services/mining-blockchain-service/docker-compose.yml b/backend/services/mining-blockchain-service/docker-compose.yml index 27d83605..a47d4dd1 100644 --- a/backend/services/mining-blockchain-service/docker-compose.yml +++ b/backend/services/mining-blockchain-service/docker-compose.yml @@ -29,8 +29,8 @@ services: REDIS_DB: 8 # Kafka (shared) KAFKA_BROKERS: rwa-kafka:29092 - KAFKA_CLIENT_ID: blockchain-service - KAFKA_GROUP_ID: blockchain-service-group + KAFKA_CLIENT_ID: mining-blockchain-service + KAFKA_GROUP_ID: mining-blockchain-service-group # Blockchain RPC KAVA_RPC_URL: https://evm.kava.io BSC_RPC_URL: https://bsc-dataseed.binance.org diff --git a/backend/services/mining-blockchain-service/src/config/kafka.config.ts b/backend/services/mining-blockchain-service/src/config/kafka.config.ts index 6411acbc..f8511e4a 100644 --- a/backend/services/mining-blockchain-service/src/config/kafka.config.ts +++ b/backend/services/mining-blockchain-service/src/config/kafka.config.ts @@ -2,6 +2,6 @@ import { registerAs } from '@nestjs/config'; export default registerAs('kafka', () => ({ brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','), - clientId: process.env.KAFKA_CLIENT_ID || 'blockchain-service', - groupId: process.env.KAFKA_GROUP_ID || 'blockchain-service-group', + clientId: process.env.KAFKA_CLIENT_ID || 'mining-blockchain-service', + groupId: process.env.KAFKA_GROUP_ID || 'mining-blockchain-service-group', })); diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts index 16ff9aab..5943988f 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts @@ -39,8 +39,8 @@ export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy const brokersEnv = this.configService.get('KAFKA_BROKERS'); const brokersConfig = this.configService.get('kafka.brokers'); const brokers: string[] = brokersEnv?.split(',') || brokersConfig || ['localhost:9092']; - const clientId = this.configService.get('kafka.clientId') || 'blockchain-service'; - const groupId = 'blockchain-service-deposit-acks'; + const clientId = this.configService.get('kafka.clientId') || 'mining-blockchain-service'; + const groupId = 'mining-blockchain-service-deposit-acks'; this.logger.log(`[INIT] Deposit ACK Consumer initializing...`); this.logger.log(`[INIT] ClientId: ${clientId}`); diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index cff98b82..c6d6ea6a 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -82,8 +82,8 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { async onModuleInit() { const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; - const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'blockchain-service'; - const groupId = 'blockchain-service-mpc-events'; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; + const groupId = 'mining-blockchain-service-mpc-events'; this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); this.logger.log(`[INIT] ClientId: ${clientId}`); diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts index 01bc249b..c42e0503 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts @@ -54,8 +54,8 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes async onModuleInit() { const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; - const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'blockchain-service'; - const groupId = 'blockchain-service-withdrawal-events'; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; + const groupId = 'mining-blockchain-service-withdrawal-events'; this.logger.log(`[INIT] Withdrawal Event Consumer initializing...`); this.logger.log(`[INIT] ClientId: ${clientId}`);