fix(cdc): 修复 auth-service 与 mining-admin-service 的 CDC 事件同步
- auth-service: 将 outbox topic 从 auth.events 改为 mining-admin.auth.users - mining-admin-service: 添加 user.registered 和 user.kyc_verified 事件处理器 - 确保 auth-service 发布的事件能被 mining-admin-service 正确接收和处理 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
e83b3d420c
commit
49b1571bba
|
|
@ -48,7 +48,7 @@ export class OutboxService {
|
|||
|
||||
private getTopicForEvent(event: DomainEvent): string {
|
||||
// 所有用户相关事件发到同一个 topic
|
||||
return 'auth.events';
|
||||
return 'mining-admin.auth.users';
|
||||
}
|
||||
|
||||
private getAggregateId(event: DomainEvent): string {
|
||||
|
|
|
|||
|
|
@ -41,6 +41,11 @@ export class CdcSyncService implements OnModuleInit {
|
|||
'UserCreated',
|
||||
this.handleUserCreated.bind(this),
|
||||
);
|
||||
// auth-service 发布的 user.registered 事件
|
||||
this.cdcConsumer.registerServiceHandler(
|
||||
'user.registered',
|
||||
this.handleUserRegistered.bind(this),
|
||||
);
|
||||
this.cdcConsumer.registerServiceHandler(
|
||||
'UserUpdated',
|
||||
this.handleUserUpdated.bind(this),
|
||||
|
|
@ -49,6 +54,11 @@ export class CdcSyncService implements OnModuleInit {
|
|||
'KycStatusChanged',
|
||||
this.handleKycStatusChanged.bind(this),
|
||||
);
|
||||
// auth-service 发布的 user.kyc_verified 事件
|
||||
this.cdcConsumer.registerServiceHandler(
|
||||
'user.kyc_verified',
|
||||
this.handleKycStatusChanged.bind(this),
|
||||
);
|
||||
|
||||
// ===========================================================================
|
||||
// 从 contribution-service 同步算力数据
|
||||
|
|
@ -279,6 +289,39 @@ export class CdcSyncService implements OnModuleInit {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 auth-service 发布的 user.registered 事件
|
||||
* payload: { accountSequence, phone, source, registeredAt }
|
||||
*/
|
||||
private async handleUserRegistered(event: ServiceEvent): Promise<void> {
|
||||
const { payload } = event;
|
||||
|
||||
try {
|
||||
await this.prisma.syncedUser.upsert({
|
||||
where: { accountSequence: payload.accountSequence },
|
||||
create: {
|
||||
originalUserId: payload.accountSequence, // 使用 accountSequence 作为 originalUserId
|
||||
accountSequence: payload.accountSequence,
|
||||
phone: payload.phone,
|
||||
status: 'ACTIVE',
|
||||
kycStatus: 'PENDING',
|
||||
realName: null,
|
||||
isLegacyUser: payload.source === 'V1',
|
||||
createdAt: new Date(payload.registeredAt),
|
||||
},
|
||||
update: {
|
||||
phone: payload.phone,
|
||||
isLegacyUser: payload.source === 'V1',
|
||||
},
|
||||
});
|
||||
|
||||
await this.recordProcessedEvent(event);
|
||||
this.logger.log(`Synced user from auth-service: ${payload.accountSequence}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to sync user from auth-service: ${payload.accountSequence}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleUserUpdated(event: ServiceEvent): Promise<void> {
|
||||
const { payload } = event;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue