diff --git a/backend/services/auth-service/src/application/services/outbox.service.ts b/backend/services/auth-service/src/application/services/outbox.service.ts index 15922fe2..852d5d43 100644 --- a/backend/services/auth-service/src/application/services/outbox.service.ts +++ b/backend/services/auth-service/src/application/services/outbox.service.ts @@ -48,7 +48,7 @@ export class OutboxService { private getTopicForEvent(event: DomainEvent): string { // 所有用户相关事件发到同一个 topic - return 'auth.events'; + return 'mining-admin.auth.users'; } private getAggregateId(event: DomainEvent): string { diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index fa24fef1..f9035ff2 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -41,6 +41,11 @@ export class CdcSyncService implements OnModuleInit { 'UserCreated', this.handleUserCreated.bind(this), ); + // auth-service 发布的 user.registered 事件 + this.cdcConsumer.registerServiceHandler( + 'user.registered', + this.handleUserRegistered.bind(this), + ); this.cdcConsumer.registerServiceHandler( 'UserUpdated', this.handleUserUpdated.bind(this), @@ -49,6 +54,11 @@ export class CdcSyncService implements OnModuleInit { 'KycStatusChanged', this.handleKycStatusChanged.bind(this), ); + // auth-service 发布的 user.kyc_verified 事件 + this.cdcConsumer.registerServiceHandler( + 'user.kyc_verified', + this.handleKycStatusChanged.bind(this), + ); // =========================================================================== // 从 contribution-service 同步算力数据 @@ -279,6 +289,39 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 auth-service 发布的 user.registered 事件 + * payload: { accountSequence, phone, source, registeredAt } + */ + private async handleUserRegistered(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + originalUserId: payload.accountSequence, // 使用 accountSequence 作为 originalUserId + accountSequence: payload.accountSequence, + phone: payload.phone, + status: 'ACTIVE', + kycStatus: 'PENDING', + realName: null, + isLegacyUser: payload.source === 'V1', + createdAt: new Date(payload.registeredAt), + }, + update: { + phone: payload.phone, + isLegacyUser: payload.source === 'V1', + }, + }); + + await this.recordProcessedEvent(event); + this.logger.log(`Synced user from auth-service: ${payload.accountSequence}`); + } catch (error) { + this.logger.error(`Failed to sync user from auth-service: ${payload.accountSequence}`, error); + } + } + private async handleUserUpdated(event: ServiceEvent): Promise { const { payload } = event;