fix(snapshot): PG 进度改字节百分比 + Redis 备份防崩溃 + 启动清理遗留任务

- postgres-backup: 用 pg_database_size 做分母,统计 stdout 字节数算进度(与 pv 方案一致)
- redis-backup: BGSAVE 后先 copyFileSync 到临时目录再打包,防止 tar-stream Size mismatch 崩溃
- orchestrator: onModuleInit 清理遗留 RUNNING 任务,标记 FAILED 并删除临时文件
- docker-compose: 临时文件改挂宿主机 /tmp 目录,方便手动清理

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-24 01:30:52 -08:00
parent 38efa891b8
commit f14f685ea9
6 changed files with 128 additions and 31 deletions

View File

@ -37,11 +37,12 @@ services:
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY} - MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BACKUP_BUCKET=snapshots-2 - MINIO_BACKUP_BUCKET=snapshots-2
# 备份配置 # 备份配置
- SNAPSHOT_TEMP_DIR=/app/data/snapshots - SNAPSHOT_TEMP_DIR=/tmp/snapshots
- SNAPSHOT_RETENTION_HOURS=72 - SNAPSHOT_RETENTION_HOURS=72
- AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS - AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS
volumes: volumes:
- snapshot_2_data:/app/data - snapshot_2_data:/app/data
- /tmp/rwa-snapshots-2:/tmp/snapshots
- redis_2_data:/backup-source/redis:ro - redis_2_data:/backup-source/redis:ro
- mining-admin-uploads:/backup-source/uploads/mining-admin:ro - mining-admin-uploads:/backup-source/uploads/mining-admin:ro
- trading-uploads:/backup-source/uploads/trading:ro - trading-uploads:/backup-source/uploads/trading:ro

View File

@ -32,11 +32,12 @@ services:
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY} - MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BACKUP_BUCKET=snapshots - MINIO_BACKUP_BUCKET=snapshots
# 备份配置 # 备份配置
- SNAPSHOT_TEMP_DIR=/app/data/snapshots - SNAPSHOT_TEMP_DIR=/tmp/snapshots
- SNAPSHOT_RETENTION_HOURS=72 - SNAPSHOT_RETENTION_HOURS=72
- AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS - AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
volumes: volumes:
- snapshot_data:/app/data - snapshot_data:/app/data
- /tmp/rwa-snapshots:/tmp/snapshots
- redis_data:/backup-source/redis:ro - redis_data:/backup-source/redis:ro
- kafka_data:/backup-source/kafka:ro - kafka_data:/backup-source/kafka:ro
- zookeeper_data:/backup-source/zookeeper/data:ro - zookeeper_data:/backup-source/zookeeper/data:ro

View File

@ -1,4 +1,4 @@
import { Injectable, Inject, Logger } from '@nestjs/common'; import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { Cron, CronExpression } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import * as fs from 'fs'; import * as fs from 'fs';
@ -18,7 +18,7 @@ import {
} from '@/domain/enums'; } from '@/domain/enums';
@Injectable() @Injectable()
export class SnapshotOrchestratorService { export class SnapshotOrchestratorService implements OnModuleInit {
private readonly logger = new Logger(SnapshotOrchestratorService.name); private readonly logger = new Logger(SnapshotOrchestratorService.name);
private readonly retentionHours: number; private readonly retentionHours: number;
private runningTaskId: string | null = null; private runningTaskId: string | null = null;
@ -35,6 +35,15 @@ export class SnapshotOrchestratorService {
this.retentionHours = this.configService.get<number>('SNAPSHOT_RETENTION_HOURS', 72); this.retentionHours = this.configService.get<number>('SNAPSHOT_RETENTION_HOURS', 72);
} }
async onModuleInit(): Promise<void> {
const stale = await this.repo.findByStatus(SnapshotStatus.RUNNING);
for (const task of stale) {
await this.repo.updateTaskStatus(task.id, SnapshotStatus.FAILED, '服务重启,任务中断');
this.localStorage.deleteTask(task.id);
this.logger.warn(`遗留任务已标记失败: ${task.id}`);
}
}
getAvailableTargets(): BackupTarget[] { getAvailableTargets(): BackupTarget[] {
return BACKUP_TARGET_ORDER.filter((t) => this.handlers.has(t)); return BACKUP_TARGET_ORDER.filter((t) => this.handlers.has(t));
} }

View File

@ -27,6 +27,38 @@ export class PostgresBackupHandler implements BackupHandler {
this.password = this.configService.get<string>('PG_PASSWORD', ''); this.password = this.configService.get<string>('PG_PASSWORD', '');
} }
/** 查询 PostgreSQL 所有非模板数据库的总大小(字节) */
private getTotalDatabaseSize(): Promise<number> {
return new Promise((resolve) => {
const psql = spawn('psql', [
'-h', this.host,
'-p', this.port,
'-U', this.user,
'-t', '-A',
'-c', 'SELECT COALESCE(sum(pg_database_size(datname)), 0) FROM pg_database WHERE datistemplate = false',
], {
env: { ...process.env, PGPASSWORD: this.password },
});
let output = '';
psql.stdout.on('data', (data: Buffer) => { output += data.toString(); });
psql.on('close', (code) => {
const size = parseInt(output.trim(), 10);
if (code === 0 && !isNaN(size) && size > 0) {
this.logger.log(`PostgreSQL 总数据库大小: ${(size / 1024 / 1024).toFixed(1)} MB`);
resolve(size);
} else {
this.logger.warn(`无法获取数据库大小 (exit=${code}), 使用默认估算 1GB`);
resolve(1024 * 1024 * 1024);
}
});
psql.on('error', () => {
this.logger.warn('psql 不可用, 使用默认估算 1GB');
resolve(1024 * 1024 * 1024);
});
});
}
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> { async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `postgres-${timestamp}.sql.gz`; const fileName = `postgres-${timestamp}.sql.gz`;
@ -34,17 +66,18 @@ export class PostgresBackupHandler implements BackupHandler {
fs.mkdirSync(outputDir, { recursive: true }); fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'pg_dump --all-databases 开始...'); onProgress(0, 'PostgreSQL 查询数据库大小...');
const totalSize = await this.getTotalDatabaseSize();
onProgress(0, 'pg_dumpall 开始...');
return new Promise<BackupResult>((resolve, reject) => { return new Promise<BackupResult>((resolve, reject) => {
// pg_dumpall 输出所有数据库,通过 gzip 压缩
const dumpProc = spawn('pg_dumpall', [ const dumpProc = spawn('pg_dumpall', [
'-h', this.host, '-h', this.host,
'-p', this.port, '-p', this.port,
'-U', this.user, '-U', this.user,
'--clean', '--clean',
'--if-exists', '--if-exists',
'-v',
], { ], {
env: { ...process.env, PGPASSWORD: this.password }, env: { ...process.env, PGPASSWORD: this.password },
}); });
@ -52,24 +85,31 @@ export class PostgresBackupHandler implements BackupHandler {
const gzipProc = spawn('gzip', ['-6']); const gzipProc = spawn('gzip', ['-6']);
const outputStream = fs.createWriteStream(filePath); const outputStream = fs.createWriteStream(filePath);
// 统计 pg_dumpall 输出的原始字节数,用于计算进度
// pg_database_size 含索引SQL 文本不含索引但有语句膨胀,两者近似
// 与 pv 工具做法一致:用 pg_database_size 做分母,接受 10-20% 误差
let bytesRead = 0;
let lastReportedMB = 0;
dumpProc.stdout.on('data', (chunk: Buffer) => {
bytesRead += chunk.length;
const readMB = Math.floor(bytesRead / (1024 * 1024));
// 每增加 1MB 才上报,避免过于频繁
if (readMB > lastReportedMB) {
lastReportedMB = readMB;
const totalMB = Math.floor(totalSize / (1024 * 1024));
const percent = Math.min(99, Math.floor((bytesRead / totalSize) * 100));
onProgress(percent, `PostgreSQL 备份中... ${readMB}MB / ~${totalMB}MB`);
}
});
// pipe: pg_dumpall stdout → gzip stdin → file // pipe: pg_dumpall stdout → gzip stdin → file
dumpProc.stdout.pipe(gzipProc.stdin); dumpProc.stdout.pipe(gzipProc.stdin);
gzipProc.stdout.pipe(outputStream); gzipProc.stdout.pipe(outputStream);
let stderrBuffer = ''; let stderrBuffer = '';
let tableCount = 0;
dumpProc.stderr.on('data', (data: Buffer) => { dumpProc.stderr.on('data', (data: Buffer) => {
const text = data.toString(); stderrBuffer += data.toString();
stderrBuffer += text;
// pg_dumpall 的 -v 输出 "dumping contents of table ..." 行
const tableMatches = text.match(/dumping contents of table/gi);
if (tableMatches) {
tableCount += tableMatches.length;
// 估算进度(假设 ~100 张表)
const percent = Math.min(90, Math.floor((tableCount / 100) * 90));
onProgress(percent, `PostgreSQL 备份中... 已处理 ${tableCount} 张表`);
}
}); });
let dumpExitCode: number | null = null; let dumpExitCode: number | null = null;
@ -80,7 +120,7 @@ export class PostgresBackupHandler implements BackupHandler {
if (dumpExitCode === 0 && gzipExitCode === 0) { if (dumpExitCode === 0 && gzipExitCode === 0) {
const stat = fs.statSync(filePath); const stat = fs.statSync(filePath);
this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`); this.logger.log(`PostgreSQL 备份完成: ${fileName}, 压缩后: ${stat.size} bytes, 原始输出: ${bytesRead} bytes`);
onProgress(100, 'PostgreSQL 备份完成'); onProgress(100, 'PostgreSQL 备份完成');
resolve({ fileName, filePath, fileSize: stat.size }); resolve({ fileName, filePath, fileSize: stat.size });
} else { } else {

View File

@ -72,7 +72,34 @@ export class RedisBackupHandler implements BackupHandler {
redis.disconnect(); redis.disconnect();
} }
// 3. 从只读卷复制 dump.rdb+ aof 如果存在) // 3. 先复制文件到临时目录(避免 Redis 写入导致 tar-stream Size mismatch 崩溃)
const tempCopyDir = path.join(outputDir, `redis-copy-${Date.now()}`);
fs.mkdirSync(tempCopyDir, { recursive: true });
onProgress(50, '复制 Redis 数据文件到临时目录...');
const dumpSrc = path.join(this.sourceDir, 'dump.rdb');
const dumpDst = path.join(tempCopyDir, 'dump.rdb');
if (fs.existsSync(dumpSrc)) {
fs.copyFileSync(dumpSrc, dumpDst);
}
const aofSrc = path.join(this.sourceDir, 'appendonly.aof');
const aofDst = path.join(tempCopyDir, 'appendonly.aof');
if (fs.existsSync(aofSrc)) {
fs.copyFileSync(aofSrc, aofDst);
}
// AOF 目录Redis 7 multi-part AOF
const aofDirSrc = path.join(this.sourceDir, 'appendonlydir');
const aofDirDst = path.join(tempCopyDir, 'appendonlydir');
if (fs.existsSync(aofDirSrc)) {
this.copyDirSync(aofDirSrc, aofDirDst);
}
onProgress(60, '开始打包...');
// 4. 打包临时目录的快照文件
return new Promise<BackupResult>((resolve, reject) => { return new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath); const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true }); const archive = archiver('tar', { gzip: true });
@ -81,36 +108,48 @@ export class RedisBackupHandler implements BackupHandler {
const fileSize = archive.pointer(); const fileSize = archive.pointer();
this.logger.log(`Redis 备份完成: ${fileName}, 大小: ${fileSize} bytes`); this.logger.log(`Redis 备份完成: ${fileName}, 大小: ${fileSize} bytes`);
onProgress(100, 'Redis 备份完成'); onProgress(100, 'Redis 备份完成');
// 清理临时复制目录
fs.rmSync(tempCopyDir, { recursive: true, force: true });
resolve({ fileName, filePath, fileSize }); resolve({ fileName, filePath, fileSize });
}); });
archive.on('error', (err) => { archive.on('error', (err) => {
this.logger.error(`Redis 备份打包失败: ${err.message}`); this.logger.error(`Redis 备份打包失败: ${err.message}`);
fs.rmSync(tempCopyDir, { recursive: true, force: true });
reject(err); reject(err);
}); });
archive.pipe(output); archive.pipe(output);
const dumpPath = path.join(this.sourceDir, 'dump.rdb'); if (fs.existsSync(dumpDst)) {
if (fs.existsSync(dumpPath)) { archive.file(dumpDst, { name: 'dump.rdb' });
archive.file(dumpPath, { name: 'dump.rdb' });
onProgress(70, '打包 dump.rdb...'); onProgress(70, '打包 dump.rdb...');
} }
const aofPath = path.join(this.sourceDir, 'appendonly.aof'); if (fs.existsSync(aofDst)) {
if (fs.existsSync(aofPath)) { archive.file(aofDst, { name: 'appendonly.aof' });
archive.file(aofPath, { name: 'appendonly.aof' });
onProgress(85, '打包 appendonly.aof...'); onProgress(85, '打包 appendonly.aof...');
} }
// AOF 目录Redis 7 multi-part AOF if (fs.existsSync(aofDirDst)) {
const aofDir = path.join(this.sourceDir, 'appendonlydir'); archive.directory(aofDirDst, 'appendonlydir');
if (fs.existsSync(aofDir)) {
archive.directory(aofDir, 'appendonlydir');
onProgress(85, '打包 appendonlydir/...'); onProgress(85, '打包 appendonlydir/...');
} }
archive.finalize(); archive.finalize();
}); });
} }
private copyDirSync(src: string, dst: string): void {
fs.mkdirSync(dst, { recursive: true });
for (const entry of fs.readdirSync(src, { withFileTypes: true })) {
const srcPath = path.join(src, entry.name);
const dstPath = path.join(dst, entry.name);
if (entry.isDirectory()) {
this.copyDirSync(srcPath, dstPath);
} else {
fs.copyFileSync(srcPath, dstPath);
}
}
}
} }

View File

@ -146,6 +146,13 @@ export class SnapshotRepository {
return this.prisma.snapshotTask.delete({ where: { id } }); return this.prisma.snapshotTask.delete({ where: { id } });
} }
async findByStatus(status: SnapshotStatus) {
return this.prisma.snapshotTask.findMany({
where: { status },
include: { details: true },
});
}
async findExpiredLocalTasks(retentionHours: number) { async findExpiredLocalTasks(retentionHours: number) {
const threshold = new Date(Date.now() - retentionHours * 60 * 60 * 1000); const threshold = new Date(Date.now() - retentionHours * 60 * 60 * 1000);
return this.prisma.snapshotTask.findMany({ return this.prisma.snapshotTask.findMany({