feat(planting-service): 添加合同签署后事件恢复定时任务
每2~5分钟随机间隔扫描已签署超过2分钟的合同 重新发布 contract.signed 事件,确保扣款确认和奖励分配完成 幂等性已由 wallet-service 保证 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
f9e2d8483c
commit
63ac0debf3
|
|
@ -5,6 +5,7 @@ import { PoolInjectionService } from './services/pool-injection.service';
|
|||
import { ContractSigningService } from './services/contract-signing.service';
|
||||
import { ContractSigningTimeoutJob } from './jobs/contract-signing-timeout.job';
|
||||
import { ContractSigningRecoveryJob } from './jobs/contract-signing-recovery.job';
|
||||
import { ContractSignedRecoveryJob } from './jobs/contract-signed-recovery.job';
|
||||
import { DomainModule } from '../domain/domain.module';
|
||||
|
||||
@Module({
|
||||
|
|
@ -15,6 +16,7 @@ import { DomainModule } from '../domain/domain.module';
|
|||
ContractSigningService,
|
||||
ContractSigningTimeoutJob,
|
||||
ContractSigningRecoveryJob,
|
||||
ContractSignedRecoveryJob,
|
||||
],
|
||||
exports: [PlantingApplicationService, PoolInjectionService, ContractSigningService],
|
||||
})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,131 @@
|
|||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
|
||||
import { EventPublisherService } from '../../infrastructure/kafka/event-publisher.service';
|
||||
|
||||
/**
|
||||
* 合同签署后事件恢复定时任务
|
||||
*
|
||||
* 每2~5分钟随机间隔扫描已签署但可能未完成后续流程的合同
|
||||
* 重新发布 contract.signed 事件,触发 referral-service 确认扣款和奖励分配
|
||||
*
|
||||
* 场景:
|
||||
* 1. contract.signed 事件发布成功但 referral-service 处理失败
|
||||
* 2. referral-service 调用 wallet-service 确认扣款失败
|
||||
* 3. referral-service 发布 planting.order.paid 事件失败
|
||||
*
|
||||
* 幂等性保证:
|
||||
* - wallet-service.confirmPlantingDeduction: 检查 PLANT_PAYMENT 流水
|
||||
* - wallet-service.allocateFunds: 检查 orderId + accountSequence + allocationType
|
||||
*/
|
||||
@Injectable()
|
||||
export class ContractSignedRecoveryJob implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(ContractSignedRecoveryJob.name);
|
||||
private isRunning = false;
|
||||
private timeoutId: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly eventPublisher: EventPublisherService,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
// 启动时延迟 2 分钟后开始第一次检查
|
||||
setTimeout(() => this.scheduleNext(), 2 * 60 * 1000);
|
||||
this.logger.log('[CONTRACT-SIGNED-RECOVERY] Job initialized, first run in 2 minutes');
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
if (this.timeoutId) {
|
||||
clearTimeout(this.timeoutId);
|
||||
this.timeoutId = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 调度下一次执行(2~5分钟随机间隔)
|
||||
*/
|
||||
private scheduleNext(): void {
|
||||
const minMs = 2 * 60 * 1000; // 2 分钟
|
||||
const maxMs = 5 * 60 * 1000; // 5 分钟
|
||||
const delayMs = Math.floor(Math.random() * (maxMs - minMs + 1)) + minMs;
|
||||
|
||||
this.logger.debug(`[CONTRACT-SIGNED-RECOVERY] Next run in ${Math.round(delayMs / 1000)}s`);
|
||||
|
||||
this.timeoutId = setTimeout(async () => {
|
||||
await this.handleRecovery();
|
||||
this.scheduleNext();
|
||||
}, delayMs);
|
||||
}
|
||||
|
||||
async handleRecovery(): Promise<void> {
|
||||
if (this.isRunning) {
|
||||
this.logger.debug('[CONTRACT-SIGNED-RECOVERY] Already running, skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
this.isRunning = true;
|
||||
this.logger.debug('[CONTRACT-SIGNED-RECOVERY] Starting recovery check...');
|
||||
|
||||
try {
|
||||
// 查找已签署超过 2 分钟的合同任务
|
||||
// 2 分钟足够正常流程完成,超过说明可能有问题
|
||||
const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000);
|
||||
|
||||
const signedTasks = await this.prisma.contractSigningTask.findMany({
|
||||
where: {
|
||||
status: 'SIGNED',
|
||||
signedAt: {
|
||||
lt: twoMinutesAgo,
|
||||
},
|
||||
},
|
||||
orderBy: { signedAt: 'asc' },
|
||||
take: 50, // 每次最多处理 50 个
|
||||
});
|
||||
|
||||
if (signedTasks.length === 0) {
|
||||
this.logger.debug('[CONTRACT-SIGNED-RECOVERY] No tasks need recovery');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log(`[CONTRACT-SIGNED-RECOVERY] Found ${signedTasks.length} signed tasks to recover`);
|
||||
|
||||
let successCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
for (const task of signedTasks) {
|
||||
try {
|
||||
// 重新发布 contract.signed 事件
|
||||
await this.eventPublisher.publishContractSigned({
|
||||
orderNo: task.orderNo,
|
||||
userId: task.userId.toString(),
|
||||
accountSequence: task.accountSequence,
|
||||
treeCount: task.treeCount,
|
||||
totalAmount: Number(task.totalAmount),
|
||||
provinceCode: task.provinceCode,
|
||||
cityCode: task.cityCode,
|
||||
signedAt: task.signedAt?.toISOString(),
|
||||
});
|
||||
|
||||
successCount++;
|
||||
this.logger.log(
|
||||
`[CONTRACT-SIGNED-RECOVERY] Re-published contract.signed for order ${task.orderNo}`,
|
||||
);
|
||||
} catch (error) {
|
||||
errorCount++;
|
||||
this.logger.error(
|
||||
`[CONTRACT-SIGNED-RECOVERY] Failed to re-publish for order ${task.orderNo}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`[CONTRACT-SIGNED-RECOVERY] Completed: success=${successCount}, errors=${errorCount}`,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error('[CONTRACT-SIGNED-RECOVERY] Recovery check failed:', error);
|
||||
} finally {
|
||||
this.isRunning = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue