From 4b55c63e715681e0755219f6c643a2a089b23223 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 11 Jan 2026 09:27:01 -0800 Subject: [PATCH] =?UTF-8?q?fix(contribution-service):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?CDC=E5=90=8C=E6=AD=A5=E5=AD=97=E6=AE=B5=E6=98=A0=E5=B0=84?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E5=AE=8C=E6=95=B4=E5=90=8C=E6=AD=A5?= =?UTF-8?q?referral=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要更改: 1. synced_referrals表增加referrer_user_id和original_user_id字段 - 1.0的referral_relationships表只有referrer_id(user_id),没有referrer_account_sequence - 保存原始user_id以便后续解析推荐人的account_sequence 2. 修复referral-synced.handler字段映射 - 正确处理1.0的user_id、referrer_id、ancestor_path字段 - ancestor_path从BigInt[]数组转换为逗号分隔的字符串 3. 修复cdc-event-dispatcher表名注册 - 使用正确的表名: referral_relationships, planting_orders - 移除不需要的user_accounts注册 4. 更新docker-compose.2.0.yml - 添加CDC_TOPIC_REFERRALS配置 - 移除未使用的CDC_TOPIC_PAYMENTS Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 12 +++ .../contribution-service/prisma/schema.prisma | 6 ++ .../event-handlers/cdc-event-dispatcher.ts | 12 ++- .../event-handlers/referral-synced.handler.ts | 102 ++++++++++++++++-- .../synced-data.repository.interface.ts | 9 ++ .../kafka/cdc-consumer.service.ts | 8 +- .../repositories/synced-data.repository.ts | 20 ++++ backend/services/docker-compose.2.0.yml | 6 +- 8 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 backend/services/contribution-service/prisma/migrations/20260111100000_add_referral_user_ids/migration.sql diff --git a/backend/services/contribution-service/prisma/migrations/20260111100000_add_referral_user_ids/migration.sql b/backend/services/contribution-service/prisma/migrations/20260111100000_add_referral_user_ids/migration.sql new file mode 100644 index 00000000..7bfbb8cf --- /dev/null +++ b/backend/services/contribution-service/prisma/migrations/20260111100000_add_referral_user_ids/migration.sql @@ -0,0 +1,12 @@ +-- Migration: 添加推荐关系的 user_id 字段 +-- 用于完整同步 1.0 referral_relationships 表数据 + +-- 添加 referrer_user_id 字段 (1.0 的 referrer_id) +ALTER TABLE "synced_referrals" ADD COLUMN "referrer_user_id" BIGINT; + +-- 添加 original_user_id 字段 (1.0 的 user_id) +ALTER TABLE "synced_referrals" ADD COLUMN "original_user_id" BIGINT; + +-- 创建索引 +CREATE INDEX "synced_referrals_referrer_user_id_idx" ON "synced_referrals"("referrer_user_id"); +CREATE INDEX "synced_referrals_original_user_id_idx" ON "synced_referrals"("original_user_id"); diff --git a/backend/services/contribution-service/prisma/schema.prisma b/backend/services/contribution-service/prisma/schema.prisma index da3fef7e..737f0e2d 100644 --- a/backend/services/contribution-service/prisma/schema.prisma +++ b/backend/services/contribution-service/prisma/schema.prisma @@ -66,9 +66,13 @@ model SyncedAdoption { model SyncedReferral { id BigInt @id @default(autoincrement()) accountSequence String @unique @map("account_sequence") @db.VarChar(20) + // 推荐人信息:优先使用 account_sequence,但也保存 user_id 以便后续解析 referrerAccountSequence String? @map("referrer_account_sequence") @db.VarChar(20) + referrerUserId BigInt? @map("referrer_user_id") // 1.0 的 referrer_id + originalUserId BigInt? @map("original_user_id") // 1.0 的 user_id // 预计算的层级路径(便于快速查询上下级) + // 1.0 存储的是 BigInt[],这里转换为逗号分隔的字符串 ancestorPath String? @map("ancestor_path") @db.Text depth Int @default(0) @@ -79,6 +83,8 @@ model SyncedReferral { createdAt DateTime @default(now()) @map("created_at") @@index([referrerAccountSequence]) + @@index([referrerUserId]) + @@index([originalUserId]) @@map("synced_referrals") } diff --git a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts index 639d15a3..a745fe3a 100644 --- a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts +++ b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts @@ -21,9 +21,15 @@ export class CDCEventDispatcher implements OnModuleInit { async onModuleInit() { // 注册各表的事件处理器 - this.cdcConsumer.registerHandler('users', this.handleUserEvent.bind(this)); - this.cdcConsumer.registerHandler('referrals', this.handleReferralEvent.bind(this)); - this.cdcConsumer.registerHandler('adoptions', this.handleAdoptionEvent.bind(this)); + // 表名需要与 Debezium topic 中的表名一致 + // topic 格式: cdc..public. + // + // 注意:contribution-service 不需要直接同步用户数据 + // - 认种订单 (planting_orders) 包含 account_sequence + // - 推荐关系 (referral_relationships) 包含 account_sequence 和层级信息 + // - ContributionAccount 在认种时自动创建 + this.cdcConsumer.registerHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service + this.cdcConsumer.registerHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service // 启动 CDC 消费者 try { diff --git a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts index 1ed3113b..bf4344d7 100644 --- a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts @@ -5,7 +5,20 @@ import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-o /** * 引荐关系 CDC 事件处理器 - * 处理从引荐服务同步过来的引荐关系数据 + * 处理从1.0 referral-service同步过来的referral_relationships数据 + * + * 1.0 表结构 (referral_relationships): + * - user_id: BigInt (用户ID) + * - account_sequence: String (账户序列号) + * - referrer_id: BigInt (推荐人用户ID, 注意:不是 account_sequence) + * - ancestor_path: BigInt[] (祖先路径数组,存储 user_id) + * - depth: Int (层级深度) + * + * 2.0 存储策略: + * - 保存 original_user_id (1.0 的 user_id) + * - 保存 referrer_user_id (1.0 的 referrer_id) + * - 尝试查找 referrer 的 account_sequence 并保存 + * - ancestor_path 转换为逗号分隔的字符串 */ @Injectable() export class ReferralSyncedHandler { @@ -43,33 +56,77 @@ export class ReferralSyncedHandler { private async handleCreate(data: any, sequenceNum: bigint): Promise { if (!data) return; + // 1.0 字段映射 + const accountSequence = data.account_sequence || data.accountSequence; + const originalUserId = data.user_id || data.userId; + const referrerUserId = data.referrer_id || data.referrerId; + const ancestorPathArray = data.ancestor_path || data.ancestorPath; + const depth = data.depth || 0; + + // 将 BigInt[] 转换为逗号分隔的字符串 + const ancestorPath = this.convertAncestorPath(ancestorPathArray); + + // 尝试查找推荐人的 account_sequence + let referrerAccountSequence: string | null = null; + if (referrerUserId) { + const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); + if (referrer) { + referrerAccountSequence = referrer.accountSequence; + } else { + this.logger.debug( + `Referrer user_id ${referrerUserId} not found yet for ${accountSequence}, will resolve later`, + ); + } + } + await this.unitOfWork.executeInTransaction(async () => { await this.syncedDataRepository.upsertSyncedReferral({ - accountSequence: data.account_sequence || data.accountSequence, - referrerAccountSequence: data.referrer_account_sequence || data.referrerAccountSequence || null, - ancestorPath: data.ancestor_path || data.ancestorPath || null, - depth: data.depth || 0, + accountSequence, + referrerAccountSequence, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, + originalUserId: originalUserId ? BigInt(originalUserId) : null, + ancestorPath, + depth, sourceSequenceNum: sequenceNum, }); }); this.logger.log( - `Referral synced: ${data.account_sequence || data.accountSequence} -> ${data.referrer_account_sequence || data.referrerAccountSequence || 'none'}`, + `Referral synced: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}`, ); } private async handleUpdate(data: any, sequenceNum: bigint): Promise { if (!data) return; + const accountSequence = data.account_sequence || data.accountSequence; + const originalUserId = data.user_id || data.userId; + const referrerUserId = data.referrer_id || data.referrerId; + const ancestorPathArray = data.ancestor_path || data.ancestorPath; + const depth = data.depth || 0; + + const ancestorPath = this.convertAncestorPath(ancestorPathArray); + + // 尝试查找推荐人的 account_sequence + let referrerAccountSequence: string | null = null; + if (referrerUserId) { + const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); + if (referrer) { + referrerAccountSequence = referrer.accountSequence; + } + } + await this.syncedDataRepository.upsertSyncedReferral({ - accountSequence: data.account_sequence || data.accountSequence, - referrerAccountSequence: data.referrer_account_sequence || data.referrerAccountSequence || null, - ancestorPath: data.ancestor_path || data.ancestorPath || null, - depth: data.depth || 0, + accountSequence, + referrerAccountSequence, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, + originalUserId: originalUserId ? BigInt(originalUserId) : null, + ancestorPath, + depth, sourceSequenceNum: sequenceNum, }); - this.logger.debug(`Referral updated: ${data.account_sequence || data.accountSequence}`); + this.logger.debug(`Referral updated: ${accountSequence}`); } private async handleDelete(data: any): Promise { @@ -77,4 +134,27 @@ export class ReferralSyncedHandler { // 引荐关系删除需要特殊处理 this.logger.warn(`Referral delete event received: ${data.account_sequence || data.accountSequence}`); } + + /** + * 将 BigInt[] 数组转换为逗号分隔的字符串 + * @param ancestorPath BigInt 数组或 null + * @returns 逗号分隔的字符串或 null + */ + private convertAncestorPath(ancestorPath: any): string | null { + if (!ancestorPath) return null; + + // 处理可能的数组格式 + if (Array.isArray(ancestorPath)) { + return ancestorPath.map((id) => String(id)).join(','); + } + + // 如果已经是字符串 (可能是 PostgreSQL 数组的字符串表示) + if (typeof ancestorPath === 'string') { + // PostgreSQL 数组格式: {1,2,3} 或 [1,2,3] + const cleaned = ancestorPath.replace(/[{}\[\]]/g, ''); + return cleaned || null; + } + + return null; + } } diff --git a/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts b/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts index eaca6d22..f429d8db 100644 --- a/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts +++ b/backend/services/contribution-service/src/domain/repositories/synced-data.repository.interface.ts @@ -41,6 +41,8 @@ export interface SyncedReferral { id: bigint; accountSequence: string; referrerAccountSequence: string | null; + referrerUserId: bigint | null; // 1.0 的 referrer_id + originalUserId: bigint | null; // 1.0 的 user_id ancestorPath: string | null; depth: number; sourceSequenceNum: bigint; @@ -128,11 +130,18 @@ export interface ISyncedDataRepository { upsertSyncedReferral(data: { accountSequence: string; referrerAccountSequence?: string | null; + referrerUserId?: bigint | null; + originalUserId?: bigint | null; ancestorPath?: string | null; depth?: number; sourceSequenceNum: bigint; }): Promise; + /** + * 根据原始用户ID查找推荐关系 + */ + findSyncedReferralByOriginalUserId(originalUserId: bigint): Promise; + /** * 根据账户序列号查找推荐关系 */ diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 416a59e5..1d9b1de6 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -79,12 +79,12 @@ export class CDCConsumerService implements OnModuleInit { await this.consumer.connect(); this.logger.log('CDC consumer connected'); - // 订阅 Debezium CDC topics (从1.0 planting-service同步) + // 订阅 Debezium CDC topics (从1.0服务同步) const topics = [ - // 认种订单表 (planting_orders) + // 认种订单表 (planting-service: planting_orders) this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), - // 资金分配表 (fund_allocations) - this.configService.get('CDC_TOPIC_PAYMENTS', 'cdc.planting.public.fund_allocations'), + // 推荐关系表 (referral-service: referral_relationships) + this.configService.get('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'), ]; await this.consumer.subscribe({ diff --git a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts index e70e4ba8..84b38e89 100644 --- a/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts +++ b/backend/services/contribution-service/src/infrastructure/persistence/repositories/synced-data.repository.ts @@ -171,6 +171,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { async upsertSyncedReferral(data: { accountSequence: string; referrerAccountSequence?: string | null; + referrerUserId?: bigint | null; + originalUserId?: bigint | null; ancestorPath?: string | null; depth?: number; sourceSequenceNum: bigint; @@ -180,6 +182,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { create: { accountSequence: data.accountSequence, referrerAccountSequence: data.referrerAccountSequence ?? null, + referrerUserId: data.referrerUserId ?? null, + originalUserId: data.originalUserId ?? null, ancestorPath: data.ancestorPath ?? null, depth: data.depth ?? 0, sourceSequenceNum: data.sourceSequenceNum, @@ -187,6 +191,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { }, update: { referrerAccountSequence: data.referrerAccountSequence ?? undefined, + referrerUserId: data.referrerUserId ?? undefined, + originalUserId: data.originalUserId ?? undefined, ancestorPath: data.ancestorPath ?? undefined, depth: data.depth ?? undefined, sourceSequenceNum: data.sourceSequenceNum, @@ -197,6 +203,18 @@ export class SyncedDataRepository implements ISyncedDataRepository { return this.toSyncedReferral(record); } + async findSyncedReferralByOriginalUserId(originalUserId: bigint): Promise { + const record = await this.client.syncedReferral.findFirst({ + where: { originalUserId }, + }); + + if (!record) { + return null; + } + + return this.toSyncedReferral(record); + } + async findSyncedReferralByAccountSequence(accountSequence: string): Promise { const record = await this.client.syncedReferral.findUnique({ where: { accountSequence }, @@ -368,6 +386,8 @@ export class SyncedDataRepository implements ISyncedDataRepository { id: record.id, accountSequence: record.accountSequence, referrerAccountSequence: record.referrerAccountSequence, + referrerUserId: record.referrerUserId, + originalUserId: record.originalUserId, ancestorPath: record.ancestorPath, depth: record.depth, sourceSequenceNum: record.sourceSequenceNum, diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index 6a78ada3..00082e53 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -37,10 +37,12 @@ services: REDIS_PORT: 6379 REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 10 - # Kafka - 消费 CDC 事件 (从1.0 planting-service同步认种数据) + # Kafka - 消费 CDC 事件 (从1.0服务同步数据) KAFKA_BROKERS: kafka:29092 + # 认种订单 (planting-service) CDC_TOPIC_ADOPTIONS: ${CDC_TOPIC_ADOPTIONS:-cdc.planting.public.planting_orders} - CDC_TOPIC_PAYMENTS: ${CDC_TOPIC_PAYMENTS:-cdc.planting.public.fund_allocations} + # 推荐关系 (referral-service) + CDC_TOPIC_REFERRALS: ${CDC_TOPIC_REFERRALS:-cdc.referral.public.referral_relationships} CDC_CONSUMER_GROUP: contribution-service-cdc-group ports: - "3020:3020"