diff --git a/backend/services/planting-service/src/application/application.module.ts b/backend/services/planting-service/src/application/application.module.ts index 1c67ae8b..4f1d17b6 100644 --- a/backend/services/planting-service/src/application/application.module.ts +++ b/backend/services/planting-service/src/application/application.module.ts @@ -5,6 +5,7 @@ import { PoolInjectionService } from './services/pool-injection.service'; import { ContractSigningService } from './services/contract-signing.service'; import { ContractSigningTimeoutJob } from './jobs/contract-signing-timeout.job'; import { ContractSigningRecoveryJob } from './jobs/contract-signing-recovery.job'; +import { ContractSignedRecoveryJob } from './jobs/contract-signed-recovery.job'; import { DomainModule } from '../domain/domain.module'; @Module({ @@ -15,6 +16,7 @@ import { DomainModule } from '../domain/domain.module'; ContractSigningService, ContractSigningTimeoutJob, ContractSigningRecoveryJob, + ContractSignedRecoveryJob, ], exports: [PlantingApplicationService, PoolInjectionService, ContractSigningService], }) diff --git a/backend/services/planting-service/src/application/jobs/contract-signed-recovery.job.ts b/backend/services/planting-service/src/application/jobs/contract-signed-recovery.job.ts new file mode 100644 index 00000000..27200dad --- /dev/null +++ b/backend/services/planting-service/src/application/jobs/contract-signed-recovery.job.ts @@ -0,0 +1,131 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { EventPublisherService } from '../../infrastructure/kafka/event-publisher.service'; + +/** + * 合同签署后事件恢复定时任务 + * + * 每2~5分钟随机间隔扫描已签署但可能未完成后续流程的合同 + * 重新发布 contract.signed 事件,触发 referral-service 确认扣款和奖励分配 + * + * 场景: + * 1. contract.signed 事件发布成功但 referral-service 处理失败 + * 2. referral-service 调用 wallet-service 确认扣款失败 + * 3. referral-service 发布 planting.order.paid 事件失败 + * + * 幂等性保证: + * - wallet-service.confirmPlantingDeduction: 检查 PLANT_PAYMENT 流水 + * - wallet-service.allocateFunds: 检查 orderId + accountSequence + allocationType + */ +@Injectable() +export class ContractSignedRecoveryJob implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(ContractSignedRecoveryJob.name); + private isRunning = false; + private timeoutId: NodeJS.Timeout | null = null; + + constructor( + private readonly prisma: PrismaService, + private readonly eventPublisher: EventPublisherService, + ) {} + + onModuleInit() { + // 启动时延迟 2 分钟后开始第一次检查 + setTimeout(() => this.scheduleNext(), 2 * 60 * 1000); + this.logger.log('[CONTRACT-SIGNED-RECOVERY] Job initialized, first run in 2 minutes'); + } + + onModuleDestroy() { + if (this.timeoutId) { + clearTimeout(this.timeoutId); + this.timeoutId = null; + } + } + + /** + * 调度下一次执行(2~5分钟随机间隔) + */ + private scheduleNext(): void { + const minMs = 2 * 60 * 1000; // 2 分钟 + const maxMs = 5 * 60 * 1000; // 5 分钟 + const delayMs = Math.floor(Math.random() * (maxMs - minMs + 1)) + minMs; + + this.logger.debug(`[CONTRACT-SIGNED-RECOVERY] Next run in ${Math.round(delayMs / 1000)}s`); + + this.timeoutId = setTimeout(async () => { + await this.handleRecovery(); + this.scheduleNext(); + }, delayMs); + } + + async handleRecovery(): Promise { + if (this.isRunning) { + this.logger.debug('[CONTRACT-SIGNED-RECOVERY] Already running, skipping'); + return; + } + + this.isRunning = true; + this.logger.debug('[CONTRACT-SIGNED-RECOVERY] Starting recovery check...'); + + try { + // 查找已签署超过 2 分钟的合同任务 + // 2 分钟足够正常流程完成,超过说明可能有问题 + const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000); + + const signedTasks = await this.prisma.contractSigningTask.findMany({ + where: { + status: 'SIGNED', + signedAt: { + lt: twoMinutesAgo, + }, + }, + orderBy: { signedAt: 'asc' }, + take: 50, // 每次最多处理 50 个 + }); + + if (signedTasks.length === 0) { + this.logger.debug('[CONTRACT-SIGNED-RECOVERY] No tasks need recovery'); + return; + } + + this.logger.log(`[CONTRACT-SIGNED-RECOVERY] Found ${signedTasks.length} signed tasks to recover`); + + let successCount = 0; + let errorCount = 0; + + for (const task of signedTasks) { + try { + // 重新发布 contract.signed 事件 + await this.eventPublisher.publishContractSigned({ + orderNo: task.orderNo, + userId: task.userId.toString(), + accountSequence: task.accountSequence, + treeCount: task.treeCount, + totalAmount: Number(task.totalAmount), + provinceCode: task.provinceCode, + cityCode: task.cityCode, + signedAt: task.signedAt?.toISOString(), + }); + + successCount++; + this.logger.log( + `[CONTRACT-SIGNED-RECOVERY] Re-published contract.signed for order ${task.orderNo}`, + ); + } catch (error) { + errorCount++; + this.logger.error( + `[CONTRACT-SIGNED-RECOVERY] Failed to re-publish for order ${task.orderNo}:`, + error, + ); + } + } + + this.logger.log( + `[CONTRACT-SIGNED-RECOVERY] Completed: success=${successCount}, errors=${errorCount}`, + ); + } catch (error) { + this.logger.error('[CONTRACT-SIGNED-RECOVERY] Recovery check failed:', error); + } finally { + this.isRunning = false; + } + } +}