fix: 修复 mining-blockchain-service Kafka consumer group 冲突

- mining-blockchain-service 使用独立的 consumer group ID,避免与 blockchain-service 冲突
- withdrawal-event-consumer: blockchain-service-withdrawal-events -> mining-blockchain-service-withdrawal-events
- mpc-event-consumer: blockchain-service-mpc-events -> mining-blockchain-service-mpc-events
- deposit-ack-consumer: blockchain-service-deposit-acks -> mining-blockchain-service-deposit-acks
- 更新 docker-compose.yml 和 kafka.config.ts 的默认配置

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-28 22:05:55 -08:00
parent 76d566d145
commit edc0ea46c9
7 changed files with 41 additions and 12 deletions

View File

@ -171,6 +171,9 @@ export class UserApplicationService {
// 7. 保存账户 // 7. 保存账户
await this.userRepository.save(account); await this.userRepository.save(account);
// 7.1 更新事件中的 userIdsave 后才有真实 userId
account.updateEventsUserId();
// 8. 生成 Token // 8. 生成 Token
const tokens = await this.tokenService.generateTokenPair({ const tokens = await this.tokenService.generateTokenPair({
userId: account.userId.toString(), userId: account.userId.toString(),
@ -352,7 +355,10 @@ export class UserApplicationService {
}); });
this.logger.log(`[REGISTER] Tokens generated`); this.logger.log(`[REGISTER] Tokens generated`);
// 13. 发布领域事件 // 13. 更新事件中的 userIdsave 后才有真实 userId
account.updateEventsUserId();
// 14. 发布领域事件
this.logger.log( this.logger.log(
`[REGISTER] Publishing ${account.domainEvents.length} domain events...`, `[REGISTER] Publishing ${account.domainEvents.length} domain events...`,
); );
@ -523,7 +529,10 @@ export class UserApplicationService {
}); });
this.logger.log(`[REGISTER_NO_SMS] Tokens generated`); this.logger.log(`[REGISTER_NO_SMS] Tokens generated`);
// 11. 发布领域事件 // 11. 更新事件中的 userIdsave 后才有真实 userId
account.updateEventsUserId();
// 12. 发布领域事件
this.logger.log( this.logger.log(
`[REGISTER_NO_SMS] Publishing ${account.domainEvents.length} domain events...`, `[REGISTER_NO_SMS] Publishing ${account.domainEvents.length} domain events...`,
); );
@ -864,6 +873,10 @@ export class UserApplicationService {
await this.userRepository.save(account); await this.userRepository.save(account);
await this.redisService.delete(`sms:register:${phoneNumber.value}`); await this.redisService.delete(`sms:register:${phoneNumber.value}`);
// 更新事件中的 userIdsave 后才有真实 userId
account.updateEventsUserId();
await this.eventPublisher.publishAll(account.domainEvents); await this.eventPublisher.publishAll(account.domainEvents);
account.clearDomainEvents(); account.clearDomainEvents();

View File

@ -498,6 +498,22 @@ export class UserAccount {
this._domainEvents = []; this._domainEvents = [];
} }
/**
* userId
*
* [2026-01-29] aggregate userId 0
* repository.save() userId
*/
updateEventsUserId(): void {
const realUserId = this._userId.toString();
// 只更新 userId 为 '0' 的事件(即新创建时的临时值)
for (const event of this._domainEvents) {
if ('payload' in event && (event as any).payload?.userId === '0') {
(event as any).payload.userId = realUserId;
}
}
}
/** /**
* *
* *

View File

@ -29,8 +29,8 @@ services:
REDIS_DB: 8 REDIS_DB: 8
# Kafka (shared) # Kafka (shared)
KAFKA_BROKERS: rwa-kafka:29092 KAFKA_BROKERS: rwa-kafka:29092
KAFKA_CLIENT_ID: blockchain-service KAFKA_CLIENT_ID: mining-blockchain-service
KAFKA_GROUP_ID: blockchain-service-group KAFKA_GROUP_ID: mining-blockchain-service-group
# Blockchain RPC # Blockchain RPC
KAVA_RPC_URL: https://evm.kava.io KAVA_RPC_URL: https://evm.kava.io
BSC_RPC_URL: https://bsc-dataseed.binance.org BSC_RPC_URL: https://bsc-dataseed.binance.org

View File

@ -2,6 +2,6 @@ import { registerAs } from '@nestjs/config';
export default registerAs('kafka', () => ({ export default registerAs('kafka', () => ({
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','), brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
clientId: process.env.KAFKA_CLIENT_ID || 'blockchain-service', clientId: process.env.KAFKA_CLIENT_ID || 'mining-blockchain-service',
groupId: process.env.KAFKA_GROUP_ID || 'blockchain-service-group', groupId: process.env.KAFKA_GROUP_ID || 'mining-blockchain-service-group',
})); }));

View File

@ -39,8 +39,8 @@ export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy
const brokersEnv = this.configService.get<string>('KAFKA_BROKERS'); const brokersEnv = this.configService.get<string>('KAFKA_BROKERS');
const brokersConfig = this.configService.get<string[]>('kafka.brokers'); const brokersConfig = this.configService.get<string[]>('kafka.brokers');
const brokers: string[] = brokersEnv?.split(',') || brokersConfig || ['localhost:9092']; const brokers: string[] = brokersEnv?.split(',') || brokersConfig || ['localhost:9092'];
const clientId = this.configService.get<string>('kafka.clientId') || 'blockchain-service'; const clientId = this.configService.get<string>('kafka.clientId') || 'mining-blockchain-service';
const groupId = 'blockchain-service-deposit-acks'; const groupId = 'mining-blockchain-service-deposit-acks';
this.logger.log(`[INIT] Deposit ACK Consumer initializing...`); this.logger.log(`[INIT] Deposit ACK Consumer initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`); this.logger.log(`[INIT] ClientId: ${clientId}`);

View File

@ -82,8 +82,8 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy {
async onModuleInit() { async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'blockchain-service'; const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mining-blockchain-service';
const groupId = 'blockchain-service-mpc-events'; const groupId = 'mining-blockchain-service-mpc-events';
this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`); this.logger.log(`[INIT] ClientId: ${clientId}`);

View File

@ -54,8 +54,8 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
async onModuleInit() { async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'blockchain-service'; const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'mining-blockchain-service';
const groupId = 'blockchain-service-withdrawal-events'; const groupId = 'mining-blockchain-service-withdrawal-events';
this.logger.log(`[INIT] Withdrawal Event Consumer initializing...`); this.logger.log(`[INIT] Withdrawal Event Consumer initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`); this.logger.log(`[INIT] ClientId: ${clientId}`);