From 22523aba14bc39ba8a48b2629f4aa467e69cc1a5 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 21:07:57 -0800 Subject: [PATCH] revert: restore blocking await for sequential CDC consumption The previous change was wrong - running sequential consumption in background defeats its purpose. The whole point is to ensure data dependency order (users -> referrals -> adoptions) before any other operations can proceed. Co-Authored-By: Claude Opus 4.5 --- .../src/infrastructure/kafka/cdc-consumer.service.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index a8846630..39291b6e 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -297,12 +297,10 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { this.sequentialMode = true; this.isRunning = true; - // 开始顺序消费(后台运行,不阻塞服务启动) - this.startSequentialConsumption().catch((error) => { - this.logger.error('Sequential consumption failed', error); - }); + // 开始顺序消费(阻塞直到完成,确保数据依赖顺序正确) + await this.startSequentialConsumption(); - this.logger.log('CDC consumer started with sequential phase consumption (running in background)'); + this.logger.log('CDC consumer started with sequential phase consumption'); } catch (error) { this.logger.error('Failed to start CDC consumer', error); // 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发)