diff --git a/backend/services/docker-compose.2.0-snapshot.yml b/backend/services/docker-compose.2.0-snapshot.yml index 7889e9db..a1188db4 100644 --- a/backend/services/docker-compose.2.0-snapshot.yml +++ b/backend/services/docker-compose.2.0-snapshot.yml @@ -37,11 +37,12 @@ services: - MINIO_SECRET_KEY=${MINIO_SECRET_KEY} - MINIO_BACKUP_BUCKET=snapshots-2 # 备份配置 - - SNAPSHOT_TEMP_DIR=/app/data/snapshots + - SNAPSHOT_TEMP_DIR=/tmp/snapshots - SNAPSHOT_RETENTION_HOURS=72 - AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS volumes: - snapshot_2_data:/app/data + - /tmp/rwa-snapshots-2:/tmp/snapshots - redis_2_data:/backup-source/redis:ro - mining-admin-uploads:/backup-source/uploads/mining-admin:ro - trading-uploads:/backup-source/uploads/trading:ro diff --git a/backend/services/docker-compose.snapshot.yml b/backend/services/docker-compose.snapshot.yml index 10ddba07..195cd727 100644 --- a/backend/services/docker-compose.snapshot.yml +++ b/backend/services/docker-compose.snapshot.yml @@ -32,11 +32,12 @@ services: - MINIO_SECRET_KEY=${MINIO_SECRET_KEY} - MINIO_BACKUP_BUCKET=snapshots # 备份配置 - - SNAPSHOT_TEMP_DIR=/app/data/snapshots + - SNAPSHOT_TEMP_DIR=/tmp/snapshots - SNAPSHOT_RETENTION_HOURS=72 - AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS volumes: - snapshot_data:/app/data + - /tmp/rwa-snapshots:/tmp/snapshots - redis_data:/backup-source/redis:ro - kafka_data:/backup-source/kafka:ro - zookeeper_data:/backup-source/zookeeper/data:ro diff --git a/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts b/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts index 9303e03c..cd9b2841 100644 --- a/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts +++ b/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Inject, Logger } from '@nestjs/common'; +import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Cron, CronExpression } from '@nestjs/schedule'; import * as fs from 'fs'; @@ -18,7 +18,7 @@ import { } from '@/domain/enums'; @Injectable() -export class SnapshotOrchestratorService { +export class SnapshotOrchestratorService implements OnModuleInit { private readonly logger = new Logger(SnapshotOrchestratorService.name); private readonly retentionHours: number; private runningTaskId: string | null = null; @@ -35,6 +35,15 @@ export class SnapshotOrchestratorService { this.retentionHours = this.configService.get('SNAPSHOT_RETENTION_HOURS', 72); } + async onModuleInit(): Promise { + const stale = await this.repo.findByStatus(SnapshotStatus.RUNNING); + for (const task of stale) { + await this.repo.updateTaskStatus(task.id, SnapshotStatus.FAILED, '服务重启,任务中断'); + this.localStorage.deleteTask(task.id); + this.logger.warn(`遗留任务已标记失败: ${task.id}`); + } + } + getAvailableTargets(): BackupTarget[] { return BACKUP_TARGET_ORDER.filter((t) => this.handlers.has(t)); } diff --git a/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts index 524048bd..e6801078 100644 --- a/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts +++ b/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts @@ -27,6 +27,38 @@ export class PostgresBackupHandler implements BackupHandler { this.password = this.configService.get('PG_PASSWORD', ''); } + /** 查询 PostgreSQL 所有非模板数据库的总大小(字节) */ + private getTotalDatabaseSize(): Promise { + return new Promise((resolve) => { + const psql = spawn('psql', [ + '-h', this.host, + '-p', this.port, + '-U', this.user, + '-t', '-A', + '-c', 'SELECT COALESCE(sum(pg_database_size(datname)), 0) FROM pg_database WHERE datistemplate = false', + ], { + env: { ...process.env, PGPASSWORD: this.password }, + }); + + let output = ''; + psql.stdout.on('data', (data: Buffer) => { output += data.toString(); }); + psql.on('close', (code) => { + const size = parseInt(output.trim(), 10); + if (code === 0 && !isNaN(size) && size > 0) { + this.logger.log(`PostgreSQL 总数据库大小: ${(size / 1024 / 1024).toFixed(1)} MB`); + resolve(size); + } else { + this.logger.warn(`无法获取数据库大小 (exit=${code}), 使用默认估算 1GB`); + resolve(1024 * 1024 * 1024); + } + }); + psql.on('error', () => { + this.logger.warn('psql 不可用, 使用默认估算 1GB'); + resolve(1024 * 1024 * 1024); + }); + }); + } + async execute(outputDir: string, onProgress: ProgressCallback): Promise { const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const fileName = `postgres-${timestamp}.sql.gz`; @@ -34,17 +66,18 @@ export class PostgresBackupHandler implements BackupHandler { fs.mkdirSync(outputDir, { recursive: true }); - onProgress(0, 'pg_dump --all-databases 开始...'); + onProgress(0, 'PostgreSQL 查询数据库大小...'); + const totalSize = await this.getTotalDatabaseSize(); + + onProgress(0, 'pg_dumpall 开始...'); return new Promise((resolve, reject) => { - // pg_dumpall 输出所有数据库,通过 gzip 压缩 const dumpProc = spawn('pg_dumpall', [ '-h', this.host, '-p', this.port, '-U', this.user, '--clean', '--if-exists', - '-v', ], { env: { ...process.env, PGPASSWORD: this.password }, }); @@ -52,24 +85,31 @@ export class PostgresBackupHandler implements BackupHandler { const gzipProc = spawn('gzip', ['-6']); const outputStream = fs.createWriteStream(filePath); + // 统计 pg_dumpall 输出的原始字节数,用于计算进度 + // pg_database_size 含索引,SQL 文本不含索引但有语句膨胀,两者近似 + // 与 pv 工具做法一致:用 pg_database_size 做分母,接受 10-20% 误差 + let bytesRead = 0; + let lastReportedMB = 0; + + dumpProc.stdout.on('data', (chunk: Buffer) => { + bytesRead += chunk.length; + const readMB = Math.floor(bytesRead / (1024 * 1024)); + // 每增加 1MB 才上报,避免过于频繁 + if (readMB > lastReportedMB) { + lastReportedMB = readMB; + const totalMB = Math.floor(totalSize / (1024 * 1024)); + const percent = Math.min(99, Math.floor((bytesRead / totalSize) * 100)); + onProgress(percent, `PostgreSQL 备份中... ${readMB}MB / ~${totalMB}MB`); + } + }); + // pipe: pg_dumpall stdout → gzip stdin → file dumpProc.stdout.pipe(gzipProc.stdin); gzipProc.stdout.pipe(outputStream); let stderrBuffer = ''; - let tableCount = 0; dumpProc.stderr.on('data', (data: Buffer) => { - const text = data.toString(); - stderrBuffer += text; - - // pg_dumpall 的 -v 输出 "dumping contents of table ..." 行 - const tableMatches = text.match(/dumping contents of table/gi); - if (tableMatches) { - tableCount += tableMatches.length; - // 估算进度(假设 ~100 张表) - const percent = Math.min(90, Math.floor((tableCount / 100) * 90)); - onProgress(percent, `PostgreSQL 备份中... 已处理 ${tableCount} 张表`); - } + stderrBuffer += data.toString(); }); let dumpExitCode: number | null = null; @@ -80,7 +120,7 @@ export class PostgresBackupHandler implements BackupHandler { if (dumpExitCode === 0 && gzipExitCode === 0) { const stat = fs.statSync(filePath); - this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`); + this.logger.log(`PostgreSQL 备份完成: ${fileName}, 压缩后: ${stat.size} bytes, 原始输出: ${bytesRead} bytes`); onProgress(100, 'PostgreSQL 备份完成'); resolve({ fileName, filePath, fileSize: stat.size }); } else { diff --git a/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts index 2c049edf..496d79f0 100644 --- a/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts +++ b/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts @@ -72,7 +72,34 @@ export class RedisBackupHandler implements BackupHandler { redis.disconnect(); } - // 3. 从只读卷复制 dump.rdb(+ aof 如果存在) + // 3. 先复制文件到临时目录(避免 Redis 写入导致 tar-stream Size mismatch 崩溃) + const tempCopyDir = path.join(outputDir, `redis-copy-${Date.now()}`); + fs.mkdirSync(tempCopyDir, { recursive: true }); + + onProgress(50, '复制 Redis 数据文件到临时目录...'); + + const dumpSrc = path.join(this.sourceDir, 'dump.rdb'); + const dumpDst = path.join(tempCopyDir, 'dump.rdb'); + if (fs.existsSync(dumpSrc)) { + fs.copyFileSync(dumpSrc, dumpDst); + } + + const aofSrc = path.join(this.sourceDir, 'appendonly.aof'); + const aofDst = path.join(tempCopyDir, 'appendonly.aof'); + if (fs.existsSync(aofSrc)) { + fs.copyFileSync(aofSrc, aofDst); + } + + // AOF 目录(Redis 7 multi-part AOF) + const aofDirSrc = path.join(this.sourceDir, 'appendonlydir'); + const aofDirDst = path.join(tempCopyDir, 'appendonlydir'); + if (fs.existsSync(aofDirSrc)) { + this.copyDirSync(aofDirSrc, aofDirDst); + } + + onProgress(60, '开始打包...'); + + // 4. 打包临时目录的快照文件 return new Promise((resolve, reject) => { const output = fs.createWriteStream(filePath); const archive = archiver('tar', { gzip: true }); @@ -81,36 +108,48 @@ export class RedisBackupHandler implements BackupHandler { const fileSize = archive.pointer(); this.logger.log(`Redis 备份完成: ${fileName}, 大小: ${fileSize} bytes`); onProgress(100, 'Redis 备份完成'); + // 清理临时复制目录 + fs.rmSync(tempCopyDir, { recursive: true, force: true }); resolve({ fileName, filePath, fileSize }); }); archive.on('error', (err) => { this.logger.error(`Redis 备份打包失败: ${err.message}`); + fs.rmSync(tempCopyDir, { recursive: true, force: true }); reject(err); }); archive.pipe(output); - const dumpPath = path.join(this.sourceDir, 'dump.rdb'); - if (fs.existsSync(dumpPath)) { - archive.file(dumpPath, { name: 'dump.rdb' }); + if (fs.existsSync(dumpDst)) { + archive.file(dumpDst, { name: 'dump.rdb' }); onProgress(70, '打包 dump.rdb...'); } - const aofPath = path.join(this.sourceDir, 'appendonly.aof'); - if (fs.existsSync(aofPath)) { - archive.file(aofPath, { name: 'appendonly.aof' }); + if (fs.existsSync(aofDst)) { + archive.file(aofDst, { name: 'appendonly.aof' }); onProgress(85, '打包 appendonly.aof...'); } - // AOF 目录(Redis 7 multi-part AOF) - const aofDir = path.join(this.sourceDir, 'appendonlydir'); - if (fs.existsSync(aofDir)) { - archive.directory(aofDir, 'appendonlydir'); + if (fs.existsSync(aofDirDst)) { + archive.directory(aofDirDst, 'appendonlydir'); onProgress(85, '打包 appendonlydir/...'); } archive.finalize(); }); } + + private copyDirSync(src: string, dst: string): void { + fs.mkdirSync(dst, { recursive: true }); + for (const entry of fs.readdirSync(src, { withFileTypes: true })) { + const srcPath = path.join(src, entry.name); + const dstPath = path.join(dst, entry.name); + if (entry.isDirectory()) { + this.copyDirSync(srcPath, dstPath); + } else { + fs.copyFileSync(srcPath, dstPath); + } + } + } } diff --git a/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts b/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts index b1331104..97b75dfe 100644 --- a/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts +++ b/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts @@ -146,6 +146,13 @@ export class SnapshotRepository { return this.prisma.snapshotTask.delete({ where: { id } }); } + async findByStatus(status: SnapshotStatus) { + return this.prisma.snapshotTask.findMany({ + where: { status }, + include: { details: true }, + }); + } + async findExpiredLocalTasks(retentionHours: number) { const threshold = new Date(Date.now() - retentionHours * 60 * 60 * 1000); return this.prisma.snapshotTask.findMany({