From 49b1571bba0176b1f08564d319cd8670f689d35a Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 11 Jan 2026 19:51:01 -0800 Subject: [PATCH] =?UTF-8?q?fix(cdc):=20=E4=BF=AE=E5=A4=8D=20auth-service?= =?UTF-8?q?=20=E4=B8=8E=20mining-admin-service=20=E7=9A=84=20CDC=20?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - auth-service: 将 outbox topic 从 auth.events 改为 mining-admin.auth.users - mining-admin-service: 添加 user.registered 和 user.kyc_verified 事件处理器 - 确保 auth-service 发布的事件能被 mining-admin-service 正确接收和处理 Co-Authored-By: Claude Opus 4.5 --- .../application/services/outbox.service.ts | 2 +- .../infrastructure/kafka/cdc-sync.service.ts | 43 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/backend/services/auth-service/src/application/services/outbox.service.ts b/backend/services/auth-service/src/application/services/outbox.service.ts index 15922fe2..852d5d43 100644 --- a/backend/services/auth-service/src/application/services/outbox.service.ts +++ b/backend/services/auth-service/src/application/services/outbox.service.ts @@ -48,7 +48,7 @@ export class OutboxService { private getTopicForEvent(event: DomainEvent): string { // 所有用户相关事件发到同一个 topic - return 'auth.events'; + return 'mining-admin.auth.users'; } private getAggregateId(event: DomainEvent): string { diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index fa24fef1..f9035ff2 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -41,6 +41,11 @@ export class CdcSyncService implements OnModuleInit { 'UserCreated', this.handleUserCreated.bind(this), ); + // auth-service 发布的 user.registered 事件 + this.cdcConsumer.registerServiceHandler( + 'user.registered', + this.handleUserRegistered.bind(this), + ); this.cdcConsumer.registerServiceHandler( 'UserUpdated', this.handleUserUpdated.bind(this), @@ -49,6 +54,11 @@ export class CdcSyncService implements OnModuleInit { 'KycStatusChanged', this.handleKycStatusChanged.bind(this), ); + // auth-service 发布的 user.kyc_verified 事件 + this.cdcConsumer.registerServiceHandler( + 'user.kyc_verified', + this.handleKycStatusChanged.bind(this), + ); // =========================================================================== // 从 contribution-service 同步算力数据 @@ -279,6 +289,39 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 auth-service 发布的 user.registered 事件 + * payload: { accountSequence, phone, source, registeredAt } + */ + private async handleUserRegistered(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + originalUserId: payload.accountSequence, // 使用 accountSequence 作为 originalUserId + accountSequence: payload.accountSequence, + phone: payload.phone, + status: 'ACTIVE', + kycStatus: 'PENDING', + realName: null, + isLegacyUser: payload.source === 'V1', + createdAt: new Date(payload.registeredAt), + }, + update: { + phone: payload.phone, + isLegacyUser: payload.source === 'V1', + }, + }); + + await this.recordProcessedEvent(event); + this.logger.log(`Synced user from auth-service: ${payload.accountSequence}`); + } catch (error) { + this.logger.error(`Failed to sync user from auth-service: ${payload.accountSequence}`, error); + } + } + private async handleUserUpdated(event: ServiceEvent): Promise { const { payload } = event;