diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index ea92ea14..a25d7f0d 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -6,9 +6,9 @@ import { ContributionCalculationService } from '../services/contribution-calcula import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; /** - * 认种 CDC 事件处理器 - * 处理从种植服务同步过来的认种数据 - * 认种是触发算力计算的核心事件 + * 认种订单 CDC 事件处理器 + * 处理从1.0 planting-service同步过来的planting_orders数据 + * 认种订单是触发算力计算的核心事件 */ @Injectable() export class AdoptionSyncedHandler { @@ -47,15 +47,21 @@ export class AdoptionSyncedHandler { private async handleCreate(data: any, sequenceNum: bigint): Promise { if (!data) return; + // planting_orders表字段: order_id, account_sequence, tree_count, created_at, status + const orderId = data.order_id || data.id; + const accountSequence = data.account_sequence || data.accountSequence; + const treeCount = data.tree_count || data.treeCount; + const createdAt = data.created_at || data.createdAt || data.paid_at || data.paidAt; + await this.unitOfWork.executeInTransaction(async () => { - // 保存同步的认种数据 + // 保存同步的认种订单数据 const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ - originalAdoptionId: BigInt(data.id), - accountSequence: data.account_sequence || data.accountSequence, - treeCount: data.tree_count || data.treeCount, - adoptionDate: new Date(data.adoption_date || data.adoptionDate || data.created_at || data.createdAt), + originalAdoptionId: BigInt(orderId), + accountSequence: accountSequence, + treeCount: treeCount, + adoptionDate: new Date(createdAt), status: data.status ?? null, - contributionPerTree: new Decimal(data.contribution_per_tree || data.contributionPerTree || '1'), + contributionPerTree: new Decimal('1'), // 每棵树1算力 sourceSequenceNum: sequenceNum, }); @@ -64,14 +70,15 @@ export class AdoptionSyncedHandler { }); this.logger.log( - `Adoption synced and contribution calculated: ${data.id}, account: ${data.account_sequence || data.accountSequence}`, + `Planting order synced and contribution calculated: ${orderId}, account: ${accountSequence}`, ); } private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise { if (!after) return; - const originalAdoptionId = BigInt(after.id); + const orderId = after.order_id || after.id; + const originalAdoptionId = BigInt(orderId); // 检查是否已经处理过 const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId); @@ -81,21 +88,25 @@ export class AdoptionSyncedHandler { const newTreeCount = after.tree_count || after.treeCount; if (existingAdoption.treeCount !== newTreeCount) { this.logger.warn( - `Adoption tree count changed after processing: ${originalAdoptionId}. This requires special handling.`, + `Planting order tree count changed after processing: ${originalAdoptionId}. This requires special handling.`, ); // TODO: 实现树数量变化的处理逻辑 } return; } + const accountSequence = after.account_sequence || after.accountSequence; + const treeCount = after.tree_count || after.treeCount; + const createdAt = after.created_at || after.createdAt || after.paid_at || after.paidAt; + await this.unitOfWork.executeInTransaction(async () => { const adoption = await this.syncedDataRepository.upsertSyncedAdoption({ originalAdoptionId: originalAdoptionId, - accountSequence: after.account_sequence || after.accountSequence, - treeCount: after.tree_count || after.treeCount, - adoptionDate: new Date(after.adoption_date || after.adoptionDate || after.created_at || after.createdAt), + accountSequence: accountSequence, + treeCount: treeCount, + adoptionDate: new Date(createdAt), status: after.status ?? null, - contributionPerTree: new Decimal(after.contribution_per_tree || after.contributionPerTree || '1'), + contributionPerTree: new Decimal('1'), sourceSequenceNum: sequenceNum, }); @@ -104,7 +115,7 @@ export class AdoptionSyncedHandler { } }); - this.logger.debug(`Adoption updated: ${originalAdoptionId}`); + this.logger.debug(`Planting order updated: ${originalAdoptionId}`); } private async handleDelete(data: any): Promise { diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 79ad6af4..416a59e5 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -79,14 +79,12 @@ export class CDCConsumerService implements OnModuleInit { await this.consumer.connect(); this.logger.log('CDC consumer connected'); - // 订阅 Debezium CDC topics + // 订阅 Debezium CDC topics (从1.0 planting-service同步) const topics = [ - // 用户表 - this.configService.get('CDC_TOPIC_USERS', 'dbserver1.public.users'), - // 认种表 - this.configService.get('CDC_TOPIC_ADOPTIONS', 'dbserver1.public.adoptions'), - // 引荐表 - this.configService.get('CDC_TOPIC_REFERRALS', 'dbserver1.public.referrals'), + // 认种订单表 (planting_orders) + this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), + // 资金分配表 (fund_allocations) + this.configService.get('CDC_TOPIC_PAYMENTS', 'cdc.planting.public.fund_allocations'), ]; await this.consumer.subscribe({ diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index bbb3548b..6a78ada3 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -37,15 +37,15 @@ services: REDIS_PORT: 6379 REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 10 - # Kafka - 消费 CDC 事件 + # Kafka - 消费 CDC 事件 (从1.0 planting-service同步认种数据) KAFKA_BROKERS: kafka:29092 - CDC_TOPIC_ADOPTIONS: ${CDC_TOPIC_ADOPTIONS:-dbserver1.public.adoptions} - CDC_TOPIC_PAYMENTS: ${CDC_TOPIC_PAYMENTS:-dbserver1.public.payment_records} + CDC_TOPIC_ADOPTIONS: ${CDC_TOPIC_ADOPTIONS:-cdc.planting.public.planting_orders} + CDC_TOPIC_PAYMENTS: ${CDC_TOPIC_PAYMENTS:-cdc.planting.public.fund_allocations} CDC_CONSUMER_GROUP: contribution-service-cdc-group ports: - "3020:3020" healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:3020/api/v1/health"] + test: ["CMD", "curl", "-f", "http://localhost:3020/api/v2/health"] interval: 30s timeout: 10s retries: 3