fix(snapshot): PostgreSQL 备份从 pg_basebackup 改为 pg_dumpall

pg_basebackup -D - -Ft -z 在 PG15 中不支持同时 WAL streaming
改用 pg_dumpall | gzip 逻辑备份,更轻量且不需要 replication 权限

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 23:18:52 -08:00
parent a4689d5e8b
commit 470dc1ccd0
1 changed files with 41 additions and 23 deletions

View File

@ -29,63 +29,81 @@ export class PostgresBackupHandler implements BackupHandler {
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> { async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `postgres-${timestamp}.tar.gz`; const fileName = `postgres-${timestamp}.sql.gz`;
const filePath = path.join(outputDir, fileName); const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true }); fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'pg_basebackup 开始...'); onProgress(0, 'pg_dump --all-databases 开始...');
return new Promise<BackupResult>((resolve, reject) => { return new Promise<BackupResult>((resolve, reject) => {
const outputStream = fs.createWriteStream(filePath); // pg_dumpall 输出所有数据库,通过 gzip 压缩
const dumpProc = spawn('pg_dumpall', [
const proc = spawn('pg_basebackup', [
'-h', this.host, '-h', this.host,
'-p', this.port, '-p', this.port,
'-U', this.user, '-U', this.user,
'-D', '-', '--clean',
'-Ft', '--if-exists',
'-z',
'-P',
'-v', '-v',
], { ], {
env: { ...process.env, PGPASSWORD: this.password }, env: { ...process.env, PGPASSWORD: this.password },
}); });
proc.stdout.pipe(outputStream); const gzipProc = spawn('gzip', ['-6']);
const outputStream = fs.createWriteStream(filePath);
// pipe: pg_dumpall stdout → gzip stdin → file
dumpProc.stdout.pipe(gzipProc.stdin);
gzipProc.stdout.pipe(outputStream);
let stderrBuffer = ''; let stderrBuffer = '';
proc.stderr.on('data', (data: Buffer) => { let tableCount = 0;
dumpProc.stderr.on('data', (data: Buffer) => {
const text = data.toString(); const text = data.toString();
stderrBuffer += text; stderrBuffer += text;
// pg_basebackup 进度格式: "12345/67890 kB (18%), 0/1 tablespace" // pg_dumpall 的 -v 输出 "dumping contents of table ..." 行
const match = text.match(/\((\d+)%\)/); const tableMatches = text.match(/dumping contents of table/gi);
if (match) { if (tableMatches) {
const percent = parseInt(match[1], 10); tableCount += tableMatches.length;
onProgress(percent, `PostgreSQL 备份中 ${percent}%`); // 估算进度(假设 ~100 张表)
const percent = Math.min(90, Math.floor((tableCount / 100) * 90));
onProgress(percent, `PostgreSQL 备份中... 已处理 ${tableCount} 张表`);
} }
}); });
proc.on('close', (code) => { let dumpExitCode: number | null = null;
if (code === 0) { let gzipExitCode: number | null = null;
const checkDone = () => {
if (dumpExitCode === null || gzipExitCode === null) return;
if (dumpExitCode === 0 && gzipExitCode === 0) {
const stat = fs.statSync(filePath); const stat = fs.statSync(filePath);
this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`); this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`);
onProgress(100, 'PostgreSQL 备份完成'); onProgress(100, 'PostgreSQL 备份完成');
resolve({ fileName, filePath, fileSize: stat.size }); resolve({ fileName, filePath, fileSize: stat.size });
} else { } else {
const error = `pg_basebackup 退出码: ${code}, stderr: ${stderrBuffer.slice(-500)}`; const error = `pg_dumpall 退出码: ${dumpExitCode}, gzip 退出码: ${gzipExitCode}, stderr: ${stderrBuffer.slice(-500)}`;
this.logger.error(error); this.logger.error(error);
// 清理不完整的文件
if (fs.existsSync(filePath)) fs.unlinkSync(filePath); if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(error)); reject(new Error(error));
} }
};
dumpProc.on('close', (code) => { dumpExitCode = code; checkDone(); });
gzipProc.on('close', (code) => { gzipExitCode = code; checkDone(); });
dumpProc.on('error', (err) => {
this.logger.error(`pg_dumpall 启动失败: ${err.message}`);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(`pg_dumpall 启动失败: ${err.message}`));
}); });
proc.on('error', (err) => { gzipProc.on('error', (err) => {
this.logger.error(`pg_basebackup 启动失败: ${err.message}`); this.logger.error(`gzip 启动失败: ${err.message}`);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath); if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(`pg_basebackup 启动失败: ${err.message}`)); reject(new Error(`gzip 启动失败: ${err.message}`));
}); });
}); });
} }