From 582beb4f81fe7d4c03d3edf57a8c0cb19922d7f2 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 11 Jan 2026 20:17:46 -0800 Subject: [PATCH] =?UTF-8?q?feat(cdc):=20=E6=B7=BB=E5=8A=A0=20legacy=20?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E6=89=B9=E9=87=8F=E5=90=8C=E6=AD=A5=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit auth-service: - 添加 AdminController 和 AdminSyncService - POST /admin/legacy-users/publish-all: 为所有 legacy 用户发布事件 - GET /admin/users/sync: 获取所有用户数据供同步 mining-admin-service: - 添加 user.legacy.migrated 事件处理器 - 添加 sync-users 和 sync-contribution-accounts API Co-Authored-By: Claude Opus 4.5 --- .../auth-service/src/api/api.module.ts | 2 + .../src/api/controllers/admin.controller.ts | 21 +++ .../auth-service/src/api/controllers/index.ts | 1 + .../src/application/application.module.ts | 3 + .../services/admin-sync.service.ts | 131 ++++++++++++++++++ .../src/application/services/index.ts | 1 + .../controllers/initialization.controller.ts | 12 ++ .../infrastructure/kafka/cdc-sync.service.ts | 38 +++++ 8 files changed, 209 insertions(+) create mode 100644 backend/services/auth-service/src/api/controllers/admin.controller.ts create mode 100644 backend/services/auth-service/src/application/services/admin-sync.service.ts diff --git a/backend/services/auth-service/src/api/api.module.ts b/backend/services/auth-service/src/api/api.module.ts index 498a2d91..5313ad41 100644 --- a/backend/services/auth-service/src/api/api.module.ts +++ b/backend/services/auth-service/src/api/api.module.ts @@ -8,6 +8,7 @@ import { KycController, UserController, HealthController, + AdminController, } from './controllers'; import { ApplicationModule } from '@/application'; import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard'; @@ -33,6 +34,7 @@ import { JwtAuthGuard } from '@/shared/guards/jwt-auth.guard'; KycController, UserController, HealthController, + AdminController, ], providers: [JwtAuthGuard], }) diff --git a/backend/services/auth-service/src/api/controllers/admin.controller.ts b/backend/services/auth-service/src/api/controllers/admin.controller.ts new file mode 100644 index 00000000..ea3eeab2 --- /dev/null +++ b/backend/services/auth-service/src/api/controllers/admin.controller.ts @@ -0,0 +1,21 @@ +import { Controller, Get, Post, UseGuards } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { AdminSyncService } from '@/application/services/admin-sync.service'; + +@ApiTags('Admin') +@Controller('admin') +export class AdminController { + constructor(private readonly adminSyncService: AdminSyncService) {} + + @Get('users/sync') + @ApiOperation({ summary: '获取所有用户数据供 mining-admin-service 同步' }) + async getAllUsersForSync() { + return this.adminSyncService.getAllUsersForSync(); + } + + @Post('legacy-users/publish-all') + @ApiOperation({ summary: '为所有 synced_legacy_users 发布事件到 Kafka' }) + async publishAllLegacyUsers() { + return this.adminSyncService.publishAllLegacyUsers(); + } +} diff --git a/backend/services/auth-service/src/api/controllers/index.ts b/backend/services/auth-service/src/api/controllers/index.ts index 1b08a770..11d91974 100644 --- a/backend/services/auth-service/src/api/controllers/index.ts +++ b/backend/services/auth-service/src/api/controllers/index.ts @@ -4,3 +4,4 @@ export * from './password.controller'; export * from './kyc.controller'; export * from './user.controller'; export * from './health.controller'; +export * from './admin.controller'; diff --git a/backend/services/auth-service/src/application/application.module.ts b/backend/services/auth-service/src/application/application.module.ts index 9a7a1c48..809479e4 100644 --- a/backend/services/auth-service/src/application/application.module.ts +++ b/backend/services/auth-service/src/application/application.module.ts @@ -9,6 +9,7 @@ import { KycService, UserService, OutboxService, + AdminSyncService, } from './services'; import { OutboxScheduler } from './schedulers'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; @@ -35,6 +36,7 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; KycService, UserService, OutboxService, + AdminSyncService, OutboxScheduler, ], exports: [ @@ -43,6 +45,7 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; SmsService, KycService, UserService, + AdminSyncService, ], }) export class ApplicationModule {} diff --git a/backend/services/auth-service/src/application/services/admin-sync.service.ts b/backend/services/auth-service/src/application/services/admin-sync.service.ts new file mode 100644 index 00000000..7e5e083a --- /dev/null +++ b/backend/services/auth-service/src/application/services/admin-sync.service.ts @@ -0,0 +1,131 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; +import { OutboxService } from './outbox.service'; +import { LegacyUserMigratedEvent } from '@/domain'; + +@Injectable() +export class AdminSyncService { + private readonly logger = new Logger(AdminSyncService.name); + + constructor( + private readonly prisma: PrismaService, + private readonly outboxService: OutboxService, + ) {} + + /** + * 获取所有用户数据供 mining-admin-service 同步 + */ + async getAllUsersForSync(): Promise<{ + users: Array<{ + id: string; + accountSequence: string; + phone: string; + status: string; + kycStatus: string; + realName: string | null; + isLegacyUser: boolean; + createdAt: string; + }>; + }> { + // 获取已注册到 2.0 的用户 + const users = await this.prisma.user.findMany({ + select: { + id: true, + accountSequence: true, + phone: true, + status: true, + kycStatus: true, + realName: true, + createdAt: true, + }, + }); + + // 获取从 1.0 同步的但尚未迁移的用户 + const legacyUsers = await this.prisma.syncedLegacyUser.findMany({ + where: { + accountSequence: { + notIn: users.map((u) => u.accountSequence), + }, + }, + select: { + legacyId: true, + accountSequence: true, + phone: true, + status: true, + legacyCreatedAt: true, + }, + }); + + const allUsers = [ + ...users.map((u) => ({ + id: String(u.id), + accountSequence: u.accountSequence, + phone: u.phone, + status: u.status, + kycStatus: u.kycStatus, + realName: u.realName, + isLegacyUser: false, + createdAt: u.createdAt.toISOString(), + })), + ...legacyUsers.map((u) => ({ + id: String(u.legacyId), + accountSequence: u.accountSequence, + phone: u.phone || '', + status: u.status, + kycStatus: 'PENDING', + realName: null, + isLegacyUser: true, + createdAt: u.legacyCreatedAt?.toISOString() || new Date().toISOString(), + })), + ]; + + return { users: allUsers }; + } + + /** + * 为所有 synced_legacy_users 发布 user.legacy.migrated 事件 + * 用于初始同步到 mining-admin-service + */ + async publishAllLegacyUsers(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const legacyUsers = await this.prisma.syncedLegacyUser.findMany({ + select: { + accountSequence: true, + phone: true, + legacyCreatedAt: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + for (const user of legacyUsers) { + try { + const event = new LegacyUserMigratedEvent( + user.accountSequence, + user.phone || '', + user.legacyCreatedAt || new Date(), + ); + await this.outboxService.publish(event); + publishedCount++; + this.logger.debug(`Published event for legacy user: ${user.accountSequence}`); + } catch (error) { + failedCount++; + this.logger.error(`Failed to publish event for legacy user: ${user.accountSequence}`, error); + } + } + + this.logger.log(`Published ${publishedCount} legacy user events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${legacyUsers.length} total`, + }; + } +} diff --git a/backend/services/auth-service/src/application/services/index.ts b/backend/services/auth-service/src/application/services/index.ts index f64212c1..1631bc62 100644 --- a/backend/services/auth-service/src/application/services/index.ts +++ b/backend/services/auth-service/src/application/services/index.ts @@ -4,3 +4,4 @@ export * from './sms.service'; export * from './kyc.service'; export * from './user.service'; export * from './outbox.service'; +export * from './admin-sync.service'; diff --git a/backend/services/mining-admin-service/src/api/controllers/initialization.controller.ts b/backend/services/mining-admin-service/src/api/controllers/initialization.controller.ts index 904cb94b..db1e33ed 100644 --- a/backend/services/mining-admin-service/src/api/controllers/initialization.controller.ts +++ b/backend/services/mining-admin-service/src/api/controllers/initialization.controller.ts @@ -32,4 +32,16 @@ export class InitializationController { async activateMining(@Req() req: any) { return this.initService.activateMining(req.admin.id); } + + @Post('sync-users') + @ApiOperation({ summary: '同步所有用户数据(从auth-service初始同步)' }) + async syncUsers(@Req() req: any) { + return this.initService.syncAllUsers(req.admin.id); + } + + @Post('sync-contribution-accounts') + @ApiOperation({ summary: '同步所有算力账户(从contribution-service初始同步)' }) + async syncContributionAccounts(@Req() req: any) { + return this.initService.syncAllContributionAccounts(req.admin.id); + } } diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index f9035ff2..ec65c160 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -59,6 +59,11 @@ export class CdcSyncService implements OnModuleInit { 'user.kyc_verified', this.handleKycStatusChanged.bind(this), ); + // auth-service 发布的 user.legacy.migrated 事件 (1.0用户首次登录2.0时) + this.cdcConsumer.registerServiceHandler( + 'user.legacy.migrated', + this.handleLegacyUserMigrated.bind(this), + ); // =========================================================================== // 从 contribution-service 同步算力数据 @@ -322,6 +327,39 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 auth-service 发布的 user.legacy.migrated 事件 + * payload: { accountSequence, phone, migratedAt } + */ + private async handleLegacyUserMigrated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + originalUserId: payload.accountSequence, + accountSequence: payload.accountSequence, + phone: payload.phone, + status: 'ACTIVE', + kycStatus: 'PENDING', + realName: null, + isLegacyUser: true, + createdAt: new Date(payload.migratedAt), + }, + update: { + phone: payload.phone, + isLegacyUser: true, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.log(`Synced legacy migrated user: ${payload.accountSequence}`); + } catch (error) { + this.logger.error(`Failed to sync legacy migrated user: ${payload.accountSequence}`, error); + } + } + private async handleUserUpdated(event: ServiceEvent): Promise { const { payload } = event;