/** * 合同批量下载任务处理器 * [2026-02-05] 新增:定时处理批量下载任务 * 回滚方式:删除此文件并从 app.module.ts 中移除引用 */ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { ConfigService } from '@nestjs/config'; import * as fs from 'fs/promises'; import * as path from 'path'; import * as archiver from 'archiver'; import { createWriteStream, existsSync, mkdirSync } from 'fs'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { ContractService, ContractDto } from '../../application/services/contract.service'; /** * 筛选条件类型 */ interface BatchDownloadFilters { signedAfter?: string; signedBefore?: string; provinceCode?: string; cityCode?: string; } /** * 合同批量下载任务处理 Job * 每分钟检查是否有待处理的批量下载任务 */ @Injectable() export class ContractBatchDownloadJob implements OnModuleInit { private readonly logger = new Logger(ContractBatchDownloadJob.name); private isRunning = false; private readonly downloadDir: string; private readonly baseUrl: string; constructor( private readonly prisma: PrismaService, private readonly contractService: ContractService, private readonly configService: ConfigService, ) { this.downloadDir = this.configService.get('UPLOAD_DIR') || './uploads'; this.baseUrl = this.configService.get('BASE_URL') || 'http://localhost:3005'; } onModuleInit() { this.logger.log('ContractBatchDownloadJob initialized'); // 确保下载目录存在 const contractsDir = path.join(this.downloadDir, 'contracts'); if (!existsSync(contractsDir)) { mkdirSync(contractsDir, { recursive: true }); this.logger.log(`Created contracts download directory: ${contractsDir}`); } } /** * 每分钟检查并处理待处理的批量下载任务 */ @Cron('0 * * * * *') // 每分钟的第0秒 async processPendingTasks(): Promise { if (this.isRunning) { this.logger.debug('Batch download job is already running, skipping...'); return; } this.isRunning = true; try { // 查找待处理的任务 const pendingTask = await this.prisma.contractBatchDownloadTask.findFirst({ where: { status: 'PENDING' }, orderBy: { createdAt: 'asc' }, }); if (!pendingTask) { return; } this.logger.log(`开始处理批量下载任务: ${pendingTask.taskNo}`); // 更新状态为处理中 await this.prisma.contractBatchDownloadTask.update({ where: { id: pendingTask.id }, data: { status: 'PROCESSING', startedAt: new Date(), }, }); try { await this.processTask(pendingTask.id, pendingTask.taskNo, pendingTask.filters as BatchDownloadFilters); } catch (error) { this.logger.error(`任务处理失败: ${pendingTask.taskNo}`, error); await this.prisma.contractBatchDownloadTask.update({ where: { id: pendingTask.id }, data: { status: 'FAILED', errors: { message: error.message, stack: error.stack }, completedAt: new Date(), }, }); } } catch (error) { this.logger.error('批量下载任务检查失败', error); } finally { this.isRunning = false; } } /** * 处理单个批量下载任务 */ private async processTask( taskId: bigint, taskNo: string, filters: BatchDownloadFilters | null, ): Promise { const errors: Array<{ orderNo: string; error: string }> = []; let downloadedCount = 0; let failedCount = 0; // 1. 获取符合条件的合同列表(只获取已签署的) this.logger.log(`获取合同列表, 筛选条件: ${JSON.stringify(filters)}`); const contractsResult = await this.contractService.getContracts({ signedAfter: filters?.signedAfter, signedBefore: filters?.signedBefore, provinceCode: filters?.provinceCode, cityCode: filters?.cityCode, status: 'SIGNED', pageSize: 10000, // 最大获取1万份 orderBy: 'signedAt', orderDir: 'asc', }); const contracts = contractsResult.items; const totalContracts = contracts.length; this.logger.log(`共找到 ${totalContracts} 份已签署合同`); if (totalContracts === 0) { // 没有合同需要下载 await this.prisma.contractBatchDownloadTask.update({ where: { id: taskId }, data: { status: 'COMPLETED', totalContracts: 0, downloadedCount: 0, failedCount: 0, progress: 100, completedAt: new Date(), }, }); return; } // 更新总数 await this.prisma.contractBatchDownloadTask.update({ where: { id: taskId }, data: { totalContracts }, }); // 2. 创建临时目录 const tempDir = path.join(this.downloadDir, 'temp', taskNo); if (!existsSync(tempDir)) { mkdirSync(tempDir, { recursive: true }); } // 3. 逐个下载合同 PDF for (let i = 0; i < contracts.length; i++) { const contract = contracts[i]; try { // 下载 PDF const pdfBuffer = await this.contractService.downloadContractPdf(contract.orderNo); // 生成文件路径(按省市分组) const safeProvince = this.sanitizeFileName(contract.provinceName || '未知省份'); const safeCity = this.sanitizeFileName(contract.cityName || '未知城市'); const subDir = path.join(tempDir, safeProvince, safeCity); if (!existsSync(subDir)) { mkdirSync(subDir, { recursive: true }); } // 生成文件名 const safeRealName = this.sanitizeFileName(contract.userRealName || '未知'); const fileName = `${contract.contractNo}_${safeRealName}_${contract.treeCount}棵.pdf`; const filePath = path.join(subDir, fileName); // 保存文件 await fs.writeFile(filePath, pdfBuffer); downloadedCount++; this.logger.debug(`下载成功: ${contract.orderNo} -> ${fileName}`); } catch (error) { failedCount++; errors.push({ orderNo: contract.orderNo, error: error.message }); this.logger.warn(`下载失败: ${contract.orderNo} - ${error.message}`); } // 更新进度 const progress = Math.floor(((i + 1) / totalContracts) * 100); if (progress % 10 === 0 || i === totalContracts - 1) { await this.prisma.contractBatchDownloadTask.update({ where: { id: taskId }, data: { downloadedCount, failedCount, progress, lastProcessedOrderNo: contract.orderNo, errors: errors.length > 0 ? errors : undefined, }, }); this.logger.log(`进度: ${progress}% (${downloadedCount}/${totalContracts})`); } } // 4. 打包成 ZIP this.logger.log('开始打包 ZIP...'); const zipFileName = this.generateZipFileName(filters, downloadedCount); const zipDir = path.join(this.downloadDir, 'contracts'); const zipPath = path.join(zipDir, zipFileName); await this.createZipArchive(tempDir, zipPath); // 获取 ZIP 文件大小 const zipStats = await fs.stat(zipPath); const resultFileUrl = `${this.baseUrl}/uploads/contracts/${zipFileName}`; this.logger.log(`ZIP 打包完成: ${zipFileName}, 大小: ${zipStats.size} bytes`); // 5. 清理临时文件 await this.cleanupTempDir(tempDir); // 6. 更新任务状态为完成 await this.prisma.contractBatchDownloadTask.update({ where: { id: taskId }, data: { status: 'COMPLETED', downloadedCount, failedCount, progress: 100, resultFileUrl, resultFileSize: BigInt(zipStats.size), errors: errors.length > 0 ? errors : undefined, completedAt: new Date(), expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000), // 7天后过期 }, }); this.logger.log(`任务完成: ${taskNo}, 成功: ${downloadedCount}, 失败: ${failedCount}`); } /** * 生成 ZIP 文件名 */ private generateZipFileName(filters: BatchDownloadFilters | null, count: number): string { const dateStr = new Date().toISOString().slice(0, 10).replace(/-/g, ''); let rangeStr = ''; if (filters?.signedAfter || filters?.signedBefore) { const start = filters.signedAfter ? new Date(filters.signedAfter).toISOString().slice(0, 10).replace(/-/g, '') : 'all'; const end = filters.signedBefore ? new Date(filters.signedBefore).toISOString().slice(0, 10).replace(/-/g, '') : 'now'; rangeStr = `_${start}-${end}`; } return `contracts_${dateStr}${rangeStr}_${count}份.zip`; } /** * 创建 ZIP 压缩包 */ private async createZipArchive(sourceDir: string, zipPath: string): Promise { return new Promise((resolve, reject) => { const output = createWriteStream(zipPath); const archive = archiver('zip', { zlib: { level: 6 }, // 压缩级别 }); output.on('close', () => { this.logger.log(`ZIP 文件大小: ${archive.pointer()} bytes`); resolve(); }); archive.on('error', (err: Error) => { reject(err); }); archive.pipe(output); // 添加目录下所有文件 archive.directory(sourceDir, false); archive.finalize(); }); } /** * 清理临时目录 */ private async cleanupTempDir(tempDir: string): Promise { try { await fs.rm(tempDir, { recursive: true, force: true }); this.logger.debug(`清理临时目录: ${tempDir}`); } catch (error) { this.logger.warn(`清理临时目录失败: ${tempDir}`, error); } } /** * 清理文件名中的非法字符 */ private sanitizeFileName(name: string): string { return name.replace(/[\/\\:*?"<>|]/g, '_').trim() || '未知'; } /** * 手动触发任务处理(供 API 调用) */ async triggerProcessing(): Promise<{ processed: boolean; taskNo?: string }> { if (this.isRunning) { return { processed: false }; } await this.processPendingTasks(); return { processed: true }; } /** * 获取处理状态 */ getProcessingStatus(): { isRunning: boolean } { return { isRunning: this.isRunning }; } }