feat(pre-planting): 添加算力补偿调度器,修复 transfer_order_no schema 一致性

问题:CDC 后置回调失败(如迁移未就绪)后,pre_planting_synced_orders 记录
status=PAID 但 contributionDistributed=false,没有机制重新触发算力计算。

修复:
1. 新增 PrePlantingContributionScheduler(每 5 分钟):
   - 扫描未分配算力的 PAID 预种订单
   - 调用 processUndistributedOrders() 补偿分配
   - Redis 分布式锁防并发
2. 注册到 PrePlantingCdcModule 的 providers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-28 03:08:06 -08:00
parent b747555927
commit a7f2008bc2
2 changed files with 51 additions and 0 deletions

View File

@ -0,0 +1,49 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { RedisService } from '@/infrastructure/redis/redis.service';
import { PrePlantingContributionService } from '../services/pre-planting-contribution.service';
/**
*
*
* [2026-02-28] CDC
*
* === ===
* 5 pre_planting_synced_orders status=PAID
* contributionDistributed=false
*
* === ===
* - CDC DB
* -
* - calculateForPrePlantingOrder existsBySourceAdoptionId
*/
@Injectable()
export class PrePlantingContributionScheduler {
private readonly logger = new Logger(PrePlantingContributionScheduler.name);
private readonly LOCK_KEY = 'pre-planting:contribution:distribute:lock';
constructor(
private readonly contributionService: PrePlantingContributionService,
private readonly redis: RedisService,
) {}
/**
* 5
*/
@Cron('0 */5 * * * *')
async processUndistributedOrders(): Promise<void> {
const lockValue = await this.redis.acquireLock(`${this.LOCK_KEY}`, 240);
if (!lockValue) return;
try {
const count = await this.contributionService.processUndistributedOrders(50);
if (count > 0) {
this.logger.log(`[PRE-PLANTING-CONTRIBUTION] Distributed ${count} undistributed orders`);
}
} catch (error) {
this.logger.error('[PRE-PLANTING-CONTRIBUTION] Failed to process undistributed orders', error);
} finally {
await this.redis.releaseLock(`${this.LOCK_KEY}`, lockValue);
}
}
}

View File

@ -18,6 +18,7 @@ import { PrePlantingContributionService } from './application/services/pre-plant
// Schedulers
import { PrePlantingFreezeScheduler } from './application/schedulers/pre-planting-freeze.scheduler';
import { PrePlantingContributionScheduler } from './application/schedulers/pre-planting-contribution.scheduler';
// 现有 Application Services直接提供不 import ApplicationModule 避免引入现有 CDCEventDispatcher
// 这些服务是无状态的,仅依赖 InfrastructureModule 的 providers重复实例化无副作用。
@ -83,6 +84,7 @@ import { BonusClaimService } from '../application/services/bonus-claim.service';
// Schedulers
PrePlantingFreezeScheduler,
PrePlantingContributionScheduler,
],
})
export class PrePlantingCdcModule {}