diff --git a/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-contribution.scheduler.ts b/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-contribution.scheduler.ts new file mode 100644 index 00000000..3a3ba20b --- /dev/null +++ b/backend/services/contribution-service/src/pre-planting/application/schedulers/pre-planting-contribution.scheduler.ts @@ -0,0 +1,49 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { RedisService } from '@/infrastructure/redis/redis.service'; +import { PrePlantingContributionService } from '../services/pre-planting-contribution.service'; + +/** + * 预种算力分配补偿调度器 + * + * [2026-02-28] 新增:处理 CDC 后置回调失败后未分配的预种订单 + * + * === 职责 === + * 每 5 分钟扫描一次 pre_planting_synced_orders 表中 status=PAID 且 + * contributionDistributed=false 的订单,重新触发算力计算。 + * + * === 设计原则 === + * - CDC 后置回调失败(如 DB 迁移未就绪、网络抖动)时,订单已写入追踪表但未分配算力 + * - 本调度器作为补偿机制,确保最终一致性 + * - calculateForPrePlantingOrder 内部已做幂等检查(existsBySourceAdoptionId),重复调用安全 + */ +@Injectable() +export class PrePlantingContributionScheduler { + private readonly logger = new Logger(PrePlantingContributionScheduler.name); + private readonly LOCK_KEY = 'pre-planting:contribution:distribute:lock'; + + constructor( + private readonly contributionService: PrePlantingContributionService, + private readonly redis: RedisService, + ) {} + + /** + * 每 5 分钟处理一批未分配的预种订单 + */ + @Cron('0 */5 * * * *') + async processUndistributedOrders(): Promise { + const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}`, 240); + if (!lockValue) return; + + try { + const count = await this.contributionService.processUndistributedOrders(50); + if (count > 0) { + this.logger.log(`[PRE-PLANTING-CONTRIBUTION] Distributed ${count} undistributed orders`); + } + } catch (error) { + this.logger.error('[PRE-PLANTING-CONTRIBUTION] Failed to process undistributed orders', error); + } finally { + await this.redis.releaseLock(`${this.LOCK_KEY}`, lockValue); + } + } +} diff --git a/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts index eed86485..8506dd2a 100644 --- a/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts +++ b/backend/services/contribution-service/src/pre-planting/pre-planting-cdc.module.ts @@ -18,6 +18,7 @@ import { PrePlantingContributionService } from './application/services/pre-plant // Schedulers import { PrePlantingFreezeScheduler } from './application/schedulers/pre-planting-freeze.scheduler'; +import { PrePlantingContributionScheduler } from './application/schedulers/pre-planting-contribution.scheduler'; // 现有 Application Services(直接提供,不 import ApplicationModule 避免引入现有 CDCEventDispatcher) // 这些服务是无状态的,仅依赖 InfrastructureModule 的 providers,重复实例化无副作用。 @@ -83,6 +84,7 @@ import { BonusClaimService } from '../application/services/bonus-claim.service'; // Schedulers PrePlantingFreezeScheduler, + PrePlantingContributionScheduler, ], }) export class PrePlantingCdcModule {}