From 30b04c63769179b8416905c625231d115af086e7 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 12 Jan 2026 02:48:15 -0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E5=AE=8C=E5=96=84=20CDC=20?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5=20-=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=8E=A8=E8=8D=90=E5=85=B3=E7=B3=BB=E3=80=81=E8=AE=A4=E7=A7=8D?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=92=8C=E6=98=B5=E7=A7=B0=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - auth-service: - SyncedLegacyUser 表添加 nickname 字段 - LegacyUserMigratedEvent 添加 nickname 参数 - CDC consumer 同步 nickname 字段 - SyncedLegacyUserData 接口添加 nickname - contribution-service: - 新增 ReferralSyncedEvent 事件类 - 新增 AdoptionSyncedEvent 事件类 - admin.controller 添加 publish-all APIs: - POST /admin/referrals/publish-all - POST /admin/adoptions/publish-all - mining-admin-service: - SyncedUser 表添加 nickname 字段 - 新增 SyncedReferral 表 (推荐关系) - 新增 SyncedAdoption 表 (认种记录) - handleReferralSynced 处理器 - handleAdoptionSynced 处理器 - handleLegacyUserMigrated 处理 nickname - deploy-mining.sh: - full_reset 更新为 14 步 - Step 13: 发布推荐关系 - Step 14: 发布认种记录 解决 mining-admin-web 缺少昵称、推荐人、认种数据的问题 Co-Authored-By: Claude Opus 4.5 --- .claude/settings.local.json | 12 +- .../auth-service/prisma/schema.prisma | 1 + .../services/admin-sync.service.ts | 2 + .../src/application/services/auth.service.ts | 6 +- .../events/legacy-user-migrated.event.ts | 2 + ...synced-legacy-user.repository.interface.ts | 1 + .../messaging/cdc/legacy-user-cdc.consumer.ts | 4 + .../synced-legacy-user.repository.ts | 1 + .../src/api/controllers/admin.controller.ts | 142 +++++++++++++++++- .../domain/events/adoption-synced.event.ts | 29 ++++ .../src/domain/events/index.ts | 2 + .../domain/events/referral-synced.event.ts | 29 ++++ backend/services/deploy-mining.sh | 54 +++++-- .../mining-admin-service/prisma/schema.prisma | 44 ++++++ .../infrastructure/kafka/cdc-sync.service.ts | 80 +++++++++- 15 files changed, 392 insertions(+), 17 deletions(-) create mode 100644 backend/services/contribution-service/src/domain/events/adoption-synced.event.ts create mode 100644 backend/services/contribution-service/src/domain/events/referral-synced.event.ts diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 49f5251a..bb2ca883 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -739,7 +739,17 @@ "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker restart rwa-postgres && sleep 10 && docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -t -c \"\"\nSELECT ''synced_users'' as tbl, COUNT\\(*\\) FROM synced_users\nUNION ALL SELECT ''synced_contribution_accounts'', COUNT\\(*\\) FROM synced_contribution_accounts\nUNION ALL SELECT ''synced_mining_accounts'', COUNT\\(*\\) FROM synced_mining_accounts\nUNION ALL SELECT ''synced_trading_accounts'', COUNT\\(*\\) FROM synced_trading_accounts;\n\"\"\")", "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT datname, count\\(*\\) FROM pg_stat_activity GROUP BY datname ORDER BY count DESC;\"\"\")", "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SHOW max_connections;\"\" && docker exec rwa-postgres psql -U rwa_user -d postgres -c \"\"SELECT count\\(*\\) as current_connections FROM pg_stat_activity;\"\"\")", - "Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(postgres\\): 增加数据库最大连接数到 300\n\n- max_connections: 100 -> 300\n- max_replication_slots: 10 -> 20 \n- max_wal_senders: 10 -> 20\n\n支持更多服务和 Debezium connectors 同时连接\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")" + "Bash(git commit -m \"$\\(cat <<''EOF''\nfix\\(postgres\\): 增加数据库最大连接数到 300\n\n- max_connections: 100 -> 300\n- max_replication_slots: 10 -> 20 \n- max_wal_senders: 10 -> 20\n\n支持更多服务和 Debezium connectors 同时连接\n\nCo-Authored-By: Claude Opus 4.5 \nEOF\n\\)\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''SELECT * FROM synced_users LIMIT 2;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''SELECT * FROM synced_contribution_accounts LIMIT 2;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT account_sequence, has_adopted, direct_referral_adopted_count, unlocked_level_depth FROM contribution_accounts LIMIT 5;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT account_sequence, adopter_count FROM synced_users LIMIT 5;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''\\\\d synced_users''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT * FROM synced_adoptions LIMIT 3;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_contribution -c ''SELECT * FROM synced_referrals LIMIT 3;''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c ''\\\\d synced_users''\")", + "Bash(ssh -o ProxyCommand=\"ssh -W %h:%p ceshi@103.39.231.231\" ceshi@192.168.1.111 \"docker exec rwa-postgres psql -U rwa_user -d rwa_mining_admin -c \"\"SELECT table_name FROM information_schema.tables WHERE table_schema=''public'' ORDER BY table_name;\"\"\")", + "Bash(dir /b \"c:\\\\Users\\\\dong\\\\Desktop\\\\rwadurian\\\\backend\\\\services\\\\contribution-service\\\\src\\\\domain\\\\events\")" ], "deny": [], "ask": [] diff --git a/backend/services/auth-service/prisma/schema.prisma b/backend/services/auth-service/prisma/schema.prisma index 0a194f5f..aef229da 100644 --- a/backend/services/auth-service/prisma/schema.prisma +++ b/backend/services/auth-service/prisma/schema.prisma @@ -86,6 +86,7 @@ model SyncedLegacyUser { accountSequence String @unique @map("account_sequence") phone String? // 系统账户可能没有手机号 passwordHash String? @map("password_hash") // 系统账户可能没有密码 + nickname String? // 昵称 (from identity-service) status String legacyCreatedAt DateTime @map("legacy_created_at") diff --git a/backend/services/auth-service/src/application/services/admin-sync.service.ts b/backend/services/auth-service/src/application/services/admin-sync.service.ts index 7e5e083a..4db1b134 100644 --- a/backend/services/auth-service/src/application/services/admin-sync.service.ts +++ b/backend/services/auth-service/src/application/services/admin-sync.service.ts @@ -96,6 +96,7 @@ export class AdminSyncService { select: { accountSequence: true, phone: true, + nickname: true, legacyCreatedAt: true, }, }); @@ -108,6 +109,7 @@ export class AdminSyncService { const event = new LegacyUserMigratedEvent( user.accountSequence, user.phone || '', + user.nickname || '', user.legacyCreatedAt || new Date(), ); await this.outboxService.publish(event); diff --git a/backend/services/auth-service/src/application/services/auth.service.ts b/backend/services/auth-service/src/application/services/auth.service.ts index 4e3d0290..10d4e1f6 100644 --- a/backend/services/auth-service/src/application/services/auth.service.ts +++ b/backend/services/auth-service/src/application/services/auth.service.ts @@ -221,7 +221,7 @@ export class AuthService { * 迁移 V1 用户并登录 */ private async migrateAndLogin( - legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string }, + legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string; nickname: string | null }, password: string, deviceInfo?: string, ipAddress?: string, @@ -254,6 +254,7 @@ export class AuthService { new LegacyUserMigratedEvent( legacyUser.accountSequence.value, legacyUser.phone.value, + legacyUser.nickname || '', new Date(), ), ); @@ -265,7 +266,7 @@ export class AuthService { * 执行用户迁移(通用) */ private async performMigration( - legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string }, + legacyUser: { accountSequence: AccountSequence; phone: Phone; passwordHash: string; nickname: string | null }, deviceInfo?: string, ipAddress?: string, ): Promise { @@ -286,6 +287,7 @@ export class AuthService { new LegacyUserMigratedEvent( legacyUser.accountSequence.value, legacyUser.phone.value, + legacyUser.nickname || '', new Date(), ), ); diff --git a/backend/services/auth-service/src/domain/events/legacy-user-migrated.event.ts b/backend/services/auth-service/src/domain/events/legacy-user-migrated.event.ts index b13a93d4..223bcfba 100644 --- a/backend/services/auth-service/src/domain/events/legacy-user-migrated.event.ts +++ b/backend/services/auth-service/src/domain/events/legacy-user-migrated.event.ts @@ -7,6 +7,7 @@ export class LegacyUserMigratedEvent { constructor( public readonly accountSequence: string, public readonly phone: string, + public readonly nickname: string, public readonly migratedAt: Date, ) {} @@ -15,6 +16,7 @@ export class LegacyUserMigratedEvent { eventType: LegacyUserMigratedEvent.EVENT_TYPE, accountSequence: this.accountSequence, phone: this.phone, + nickname: this.nickname, migratedAt: this.migratedAt.toISOString(), }; } diff --git a/backend/services/auth-service/src/domain/repositories/synced-legacy-user.repository.interface.ts b/backend/services/auth-service/src/domain/repositories/synced-legacy-user.repository.interface.ts index f4fa246e..dcac4f8d 100644 --- a/backend/services/auth-service/src/domain/repositories/synced-legacy-user.repository.interface.ts +++ b/backend/services/auth-service/src/domain/repositories/synced-legacy-user.repository.interface.ts @@ -11,6 +11,7 @@ export interface SyncedLegacyUserData { accountSequence: AccountSequence; phone: Phone; passwordHash: string; + nickname: string | null; status: string; legacyCreatedAt: Date; migratedToV2: boolean; diff --git a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts index 4a5dc495..26065aaa 100644 --- a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts +++ b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts @@ -15,6 +15,7 @@ interface UnwrappedCdcUser { phone_number: string; password_hash: string; account_sequence: string; + nickname: string; // 昵称 status: string; registered_at: number; // timestamp in milliseconds @@ -139,6 +140,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { update: { phone: user.phone_number ?? undefined, passwordHash: user.password_hash ?? undefined, + nickname: user.nickname ?? undefined, accountSequence: user.account_sequence, status: user.status, sourceSequenceNum: sequenceNum, @@ -148,6 +150,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { legacyId: BigInt(user.user_id), phone: user.phone_number ?? null, passwordHash: user.password_hash ?? null, + nickname: user.nickname ?? null, accountSequence: user.account_sequence, status: user.status, legacyCreatedAt: new Date(user.registered_at), @@ -161,6 +164,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { const event = new LegacyUserMigratedEvent( user.account_sequence, user.phone_number || '', + user.nickname || '', new Date(user.registered_at), ); await this.outboxService.publish(event); diff --git a/backend/services/auth-service/src/infrastructure/persistence/repositories/synced-legacy-user.repository.ts b/backend/services/auth-service/src/infrastructure/persistence/repositories/synced-legacy-user.repository.ts index 0c77c555..2191b2dd 100644 --- a/backend/services/auth-service/src/infrastructure/persistence/repositories/synced-legacy-user.repository.ts +++ b/backend/services/auth-service/src/infrastructure/persistence/repositories/synced-legacy-user.repository.ts @@ -55,6 +55,7 @@ export class PrismaSyncedLegacyUserRepository implements SyncedLegacyUserReposit accountSequence: AccountSequence.create(user.accountSequence), phone: Phone.create(user.phone), passwordHash: user.passwordHash, + nickname: user.nickname, status: user.status, legacyCreatedAt: user.legacyCreatedAt, migratedToV2: user.migratedToV2, diff --git a/backend/services/contribution-service/src/api/controllers/admin.controller.ts b/backend/services/contribution-service/src/api/controllers/admin.controller.ts index 9a459431..2d4b7577 100644 --- a/backend/services/contribution-service/src/api/controllers/admin.controller.ts +++ b/backend/services/contribution-service/src/api/controllers/admin.controller.ts @@ -3,7 +3,11 @@ import { ApiTags, ApiOperation } from '@nestjs/swagger'; import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; import { OutboxRepository } from '../../infrastructure/persistence/repositories/outbox.repository'; import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; -import { ContributionAccountSyncedEvent } from '../../domain/events'; +import { + ContributionAccountSyncedEvent, + ReferralSyncedEvent, + AdoptionSyncedEvent, +} from '../../domain/events'; import { Public } from '../../shared/guards/jwt-auth.guard'; @ApiTags('Admin') @@ -130,4 +134,140 @@ export class AdminController { message: `Published ${publishedCount} events, ${failedCount} failed out of ${accounts.length} total`, }; } + + @Post('referrals/publish-all') + @Public() + @ApiOperation({ summary: '发布所有推荐关系事件到 outbox,用于同步到 mining-admin-service' }) + async publishAllReferrals(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const referrals = await this.prisma.syncedReferral.findMany({ + select: { + accountSequence: true, + referrerAccountSequence: true, + referrerUserId: true, + originalUserId: true, + ancestorPath: true, + depth: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + const batchSize = 100; + for (let i = 0; i < referrals.length; i += batchSize) { + const batch = referrals.slice(i, i + batchSize); + + try { + await this.unitOfWork.executeInTransaction(async () => { + const events = batch.map((ref) => { + const event = new ReferralSyncedEvent( + ref.accountSequence, + ref.referrerAccountSequence, + ref.referrerUserId, + ref.originalUserId, + ref.ancestorPath, + ref.depth, + ); + + return { + aggregateType: ReferralSyncedEvent.AGGREGATE_TYPE, + aggregateId: ref.accountSequence, + eventType: ReferralSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + publishedCount += batch.length; + this.logger.debug(`Published referral batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`); + } catch (error) { + failedCount += batch.length; + this.logger.error(`Failed to publish referral batch ${Math.floor(i / batchSize) + 1}`, error); + } + } + + this.logger.log(`Published ${publishedCount} referral events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${referrals.length} total`, + }; + } + + @Post('adoptions/publish-all') + @Public() + @ApiOperation({ summary: '发布所有认种记录事件到 outbox,用于同步到 mining-admin-service' }) + async publishAllAdoptions(): Promise<{ + success: boolean; + publishedCount: number; + failedCount: number; + message: string; + }> { + const adoptions = await this.prisma.syncedAdoption.findMany({ + select: { + originalAdoptionId: true, + accountSequence: true, + treeCount: true, + adoptionDate: true, + status: true, + contributionPerTree: true, + }, + }); + + let publishedCount = 0; + let failedCount = 0; + + const batchSize = 100; + for (let i = 0; i < adoptions.length; i += batchSize) { + const batch = adoptions.slice(i, i + batchSize); + + try { + await this.unitOfWork.executeInTransaction(async () => { + const events = batch.map((adoption) => { + const event = new AdoptionSyncedEvent( + adoption.originalAdoptionId, + adoption.accountSequence, + adoption.treeCount, + adoption.adoptionDate, + adoption.status, + adoption.contributionPerTree.toString(), + ); + + return { + aggregateType: AdoptionSyncedEvent.AGGREGATE_TYPE, + aggregateId: adoption.originalAdoptionId.toString(), + eventType: AdoptionSyncedEvent.EVENT_TYPE, + payload: event.toPayload(), + }; + }); + + await this.outboxRepository.saveMany(events); + }); + + publishedCount += batch.length; + this.logger.debug(`Published adoption batch ${Math.floor(i / batchSize) + 1}: ${batch.length} events`); + } catch (error) { + failedCount += batch.length; + this.logger.error(`Failed to publish adoption batch ${Math.floor(i / batchSize) + 1}`, error); + } + } + + this.logger.log(`Published ${publishedCount} adoption events, ${failedCount} failed`); + + return { + success: failedCount === 0, + publishedCount, + failedCount, + message: `Published ${publishedCount} events, ${failedCount} failed out of ${adoptions.length} total`, + }; + } } diff --git a/backend/services/contribution-service/src/domain/events/adoption-synced.event.ts b/backend/services/contribution-service/src/domain/events/adoption-synced.event.ts new file mode 100644 index 00000000..875996d3 --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/adoption-synced.event.ts @@ -0,0 +1,29 @@ +/** + * 认种记录同步事件 + * 用于同步到 mining-admin-service + */ +export class AdoptionSyncedEvent { + static readonly EVENT_TYPE = 'AdoptionSynced'; + static readonly AGGREGATE_TYPE = 'Adoption'; + + constructor( + public readonly originalAdoptionId: bigint, + public readonly accountSequence: string, + public readonly treeCount: number, + public readonly adoptionDate: Date, + public readonly status: string | null, + public readonly contributionPerTree: string, + ) {} + + toPayload(): Record { + return { + eventType: AdoptionSyncedEvent.EVENT_TYPE, + originalAdoptionId: this.originalAdoptionId.toString(), + accountSequence: this.accountSequence, + treeCount: this.treeCount, + adoptionDate: this.adoptionDate.toISOString(), + status: this.status, + contributionPerTree: this.contributionPerTree, + }; + } +} diff --git a/backend/services/contribution-service/src/domain/events/index.ts b/backend/services/contribution-service/src/domain/events/index.ts index 255b50d3..828d8ff6 100644 --- a/backend/services/contribution-service/src/domain/events/index.ts +++ b/backend/services/contribution-service/src/domain/events/index.ts @@ -1,3 +1,5 @@ export * from './contribution-calculated.event'; export * from './daily-snapshot-created.event'; export * from './contribution-account-synced.event'; +export * from './referral-synced.event'; +export * from './adoption-synced.event'; diff --git a/backend/services/contribution-service/src/domain/events/referral-synced.event.ts b/backend/services/contribution-service/src/domain/events/referral-synced.event.ts new file mode 100644 index 00000000..9eeadba0 --- /dev/null +++ b/backend/services/contribution-service/src/domain/events/referral-synced.event.ts @@ -0,0 +1,29 @@ +/** + * 推荐关系同步事件 + * 用于同步到 mining-admin-service + */ +export class ReferralSyncedEvent { + static readonly EVENT_TYPE = 'ReferralSynced'; + static readonly AGGREGATE_TYPE = 'Referral'; + + constructor( + public readonly accountSequence: string, + public readonly referrerAccountSequence: string | null, + public readonly referrerUserId: bigint | null, + public readonly originalUserId: bigint | null, + public readonly ancestorPath: string | null, + public readonly depth: number, + ) {} + + toPayload(): Record { + return { + eventType: ReferralSyncedEvent.EVENT_TYPE, + accountSequence: this.accountSequence, + referrerAccountSequence: this.referrerAccountSequence, + referrerUserId: this.referrerUserId?.toString() ?? null, + originalUserId: this.originalUserId?.toString() ?? null, + ancestorPath: this.ancestorPath, + depth: this.depth, + }; + } +} diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 9266e40e..3c860a85 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -881,16 +881,16 @@ full_reset() { fi echo "" - log_step "Step 1/12: Stopping 2.0 services..." + log_step "Step 1/14: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done - log_step "Step 2/12: Waiting for Kafka consumers to become inactive..." + log_step "Step 2/14: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 - log_step "Step 3/12: Resetting CDC consumer offsets..." + log_step "Step 3/14: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" @@ -927,17 +927,17 @@ full_reset() { fi done - log_step "Step 4/12: Dropping 2.0 databases..." + log_step "Step 4/14: Dropping 2.0 databases..." db_drop - log_step "Step 5/12: Creating 2.0 databases..." + log_step "Step 5/14: Creating 2.0 databases..." db_create - log_step "Step 6/12: Running migrations..." + log_step "Step 6/14: Running migrations..." db_migrate # Stop any containers that were started during migration - log_step "Step 7/12: Stopping containers and resetting CDC offsets again..." + log_step "Step 7/14: Stopping containers and resetting CDC offsets again..." log_info "Migration may have started CDC consumers, stopping them now..." for service in "${MINING_SERVICES[@]}"; do docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true @@ -981,22 +981,22 @@ full_reset() { fi done - log_step "Step 8/12: Registering Debezium outbox connectors..." + log_step "Step 8/14: Registering Debezium outbox connectors..." # Register outbox connectors for 2.0 service events # These connectors capture events from each service's outbox table and send to Kafka # mining-admin-service consumes these events to aggregate data from all 2.0 services register_outbox_connectors || log_warn "Some connectors may not be registered" - log_step "Step 9/12: Starting 2.0 services..." + log_step "Step 9/14: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done - log_step "Step 10/12: Waiting for services to be ready..." + log_step "Step 10/14: Waiting for services to be ready..." log_info "Waiting 20 seconds for all services to start and sync from 1.0 CDC..." sleep 20 - log_step "Step 11/12: Publishing legacy users to mining-admin-service..." + log_step "Step 11/14: Publishing legacy users to mining-admin-service..." # 调用 auth-service API 发布所有旧用户事件到 outbox # 这样 mining-admin-service 才能通过 Debezium 收到用户数据 local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" @@ -1012,7 +1012,7 @@ full_reset() { log_info "You may need to manually call: curl -X POST $publish_url" fi - log_step "Step 12/12: Publishing contribution data to mining-admin-service..." + log_step "Step 12/14: Publishing contribution data to mining-admin-service..." # 调用 contribution-service API 发布所有算力账户事件到 outbox local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" local contrib_result @@ -1027,6 +1027,36 @@ full_reset() { log_info "You may need to manually call: curl -X POST $contrib_publish_url" fi + log_step "Step 13/14: Publishing referral relationships to mining-admin-service..." + # 调用 contribution-service API 发布所有推荐关系事件到 outbox + local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all" + local referral_result + referral_result=$(curl -s -X POST "$referral_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') + + if echo "$referral_result" | grep -q '"success":true'; then + local referral_count + referral_count=$(echo "$referral_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') + log_success "Published $referral_count referral events to outbox" + else + log_warn "Failed to publish referral data: $referral_result" + log_info "You may need to manually call: curl -X POST $referral_publish_url" + fi + + log_step "Step 14/14: Publishing adoption records to mining-admin-service..." + # 调用 contribution-service API 发布所有认种记录事件到 outbox + local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all" + local adoption_result + adoption_result=$(curl -s -X POST "$adoption_publish_url" 2>/dev/null || echo '{"error": "curl failed"}') + + if echo "$adoption_result" | grep -q '"success":true'; then + local adoption_count + adoption_count=$(echo "$adoption_result" | grep -o '"publishedCount":[0-9]*' | grep -o '[0-9]*') + log_success "Published $adoption_count adoption events to outbox" + else + log_warn "Failed to publish adoption data: $adoption_result" + log_info "You may need to manually call: curl -X POST $adoption_publish_url" + fi + # 等待 mining-admin-service 消费 outbox 事件 log_info "Waiting 15 seconds for mining-admin-service to sync all data..." sleep 15 diff --git a/backend/services/mining-admin-service/prisma/schema.prisma b/backend/services/mining-admin-service/prisma/schema.prisma index b2fea37c..c5aa58fc 100644 --- a/backend/services/mining-admin-service/prisma/schema.prisma +++ b/backend/services/mining-admin-service/prisma/schema.prisma @@ -154,6 +154,7 @@ model SyncedUser { originalUserId String @unique // auth-service 中的原始 ID accountSequence String @unique // 账户序列号 phone String + nickname String? // 昵称 (from identity-service) status String // ACTIVE, DISABLED, DELETED kycStatus String // PENDING, SUBMITTED, VERIFIED, REJECTED realName String? @@ -166,6 +167,7 @@ model SyncedUser { contributionAccount SyncedContributionAccount? miningAccount SyncedMiningAccount? tradingAccount SyncedTradingAccount? + referral SyncedReferral? @@index([phone]) @@index([status]) @@ -197,6 +199,48 @@ model SyncedContributionAccount { @@map("synced_contribution_accounts") } +// ============================================================================= +// CDC 同步表 - 推荐关系 (from contribution-service) +// ============================================================================= + +model SyncedReferral { + id String @id @default(uuid()) + accountSequence String @unique + referrerAccountSequence String? // 推荐人账户序列号 + referrerUserId BigInt? // 1.0 的 referrer_id + originalUserId BigInt? // 1.0 的 user_id + ancestorPath String? @db.Text // 祖先路径(逗号分隔的 user_id) + depth Int @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + user SyncedUser @relation(fields: [accountSequence], references: [accountSequence]) + + @@index([referrerAccountSequence]) + @@index([depth]) + @@map("synced_referrals") +} + +// ============================================================================= +// CDC 同步表 - 认种记录 (from contribution-service) +// ============================================================================= + +model SyncedAdoption { + id String @id @default(uuid()) + originalAdoptionId BigInt @unique + accountSequence String + treeCount Int + adoptionDate DateTime @db.Date + status String? // 认种状态 + contributionPerTree Decimal @db.Decimal(20, 10) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([accountSequence]) + @@index([adoptionDate]) + @@map("synced_adoptions") +} + // ============================================================================= // CDC 同步表 - 挖矿账户 (from mining-service) // ============================================================================= diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 916f2778..3bb80913 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -91,6 +91,16 @@ export class CdcSyncService implements OnModuleInit { 'SystemContributionUpdated', this.handleSystemContributionUpdated.bind(this), ); + // ReferralSynced 事件 - 同步推荐关系 + this.cdcConsumer.registerServiceHandler( + 'ReferralSynced', + this.handleReferralSynced.bind(this), + ); + // AdoptionSynced 事件 - 同步认种记录 + this.cdcConsumer.registerServiceHandler( + 'AdoptionSynced', + this.handleAdoptionSynced.bind(this), + ); // =========================================================================== // 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表) @@ -339,7 +349,7 @@ export class CdcSyncService implements OnModuleInit { /** * 处理 auth-service 发布的 user.legacy.migrated 事件 - * payload: { accountSequence, phone, migratedAt } + * payload: { accountSequence, phone, nickname, migratedAt } */ private async handleLegacyUserMigrated(event: ServiceEvent): Promise { const { payload } = event; @@ -351,6 +361,7 @@ export class CdcSyncService implements OnModuleInit { originalUserId: payload.accountSequence, accountSequence: payload.accountSequence, phone: payload.phone, + nickname: payload.nickname || null, status: 'ACTIVE', kycStatus: 'PENDING', realName: null, @@ -359,6 +370,7 @@ export class CdcSyncService implements OnModuleInit { }, update: { phone: payload.phone, + nickname: payload.nickname || null, isLegacyUser: true, }, }); @@ -539,6 +551,72 @@ export class CdcSyncService implements OnModuleInit { } } + /** + * 处理 ReferralSynced 事件 - 同步推荐关系 + */ + private async handleReferralSynced(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedReferral.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + accountSequence: payload.accountSequence, + referrerAccountSequence: payload.referrerAccountSequence, + referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null, + originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null, + ancestorPath: payload.ancestorPath, + depth: payload.depth || 0, + }, + update: { + referrerAccountSequence: payload.referrerAccountSequence, + referrerUserId: payload.referrerUserId ? BigInt(payload.referrerUserId) : null, + originalUserId: payload.originalUserId ? BigInt(payload.originalUserId) : null, + ancestorPath: payload.ancestorPath, + depth: payload.depth || 0, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced referral: ${payload.accountSequence}`); + } catch (error) { + this.logger.error(`Failed to sync referral: ${payload.accountSequence}`, error); + } + } + + /** + * 处理 AdoptionSynced 事件 - 同步认种记录 + */ + private async handleAdoptionSynced(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedAdoption.upsert({ + where: { originalAdoptionId: BigInt(payload.originalAdoptionId) }, + create: { + originalAdoptionId: BigInt(payload.originalAdoptionId), + accountSequence: payload.accountSequence, + treeCount: payload.treeCount, + adoptionDate: new Date(payload.adoptionDate), + status: payload.status, + contributionPerTree: payload.contributionPerTree, + }, + update: { + accountSequence: payload.accountSequence, + treeCount: payload.treeCount, + adoptionDate: new Date(payload.adoptionDate), + status: payload.status, + contributionPerTree: payload.contributionPerTree, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced adoption: ${payload.originalAdoptionId}`); + } catch (error) { + this.logger.error(`Failed to sync adoption: ${payload.originalAdoptionId}`, error); + } + } + // =========================================================================== // 挖矿账户事件处理 // ===========================================================================