feat(snapshot): 新增 MPC PostgreSQL 备份目标

MPC 系统使用独立的 PostgreSQL 实例 (mpc-postgres),之前不在备份范围内。
新增 MPC_POSTGRES handler,通过 MPC_PG_* 环境变量连接,snapshot-service
加入 mpc-network 实现跨 compose 网络访问。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-24 03:12:16 -08:00
parent 26e55a649f
commit b6fb421316
5 changed files with 177 additions and 7 deletions

View File

@ -15,11 +15,16 @@ services:
- NODE_ENV=production
- APP_PORT=3099
- DATABASE_URL=file:/app/data/snapshot.db
# PostgreSQL (备份目标)
# PostgreSQL (备份目标 - 主业务库)
- PG_HOST=postgres
- PG_PORT=5432
- PG_USER=${POSTGRES_USER:-rwa_user}
- PG_PASSWORD=${POSTGRES_PASSWORD}
# MPC PostgreSQL (备份目标 - MPC 独立库)
- MPC_PG_HOST=mpc-postgres
- MPC_PG_PORT=5432
- MPC_PG_USER=${MPC_POSTGRES_USER:-mpc_user}
- MPC_PG_PASSWORD=${MPC_POSTGRES_PASSWORD}
# Redis (备份目标)
- REDIS_HOST=redis
- REDIS_PORT=6379
@ -34,7 +39,7 @@ services:
# 备份配置
- SNAPSHOT_TEMP_DIR=/tmp/snapshots
- SNAPSHOT_RETENTION_HOURS=72
- AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
- AVAILABLE_TARGETS=POSTGRES,MPC_POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
volumes:
- snapshot_data:/app/data
- /tmp/rwa-snapshots:/tmp/snapshots
@ -56,6 +61,12 @@ services:
start_period: 30s
networks:
- rwa-network
- mpc-network
volumes:
snapshot_data:
networks:
mpc-network:
external: true
name: mpc-system_mpc-network

View File

@ -3,12 +3,18 @@ APP_PORT=3099
DATABASE_URL="file:./data/snapshot.db"
# Docker 部署时 docker-compose 覆盖为绝对路径: file:/app/data/snapshot.db
# PostgreSQL (被备份目标)
# PostgreSQL (被备份目标 - 主业务库)
PG_HOST=localhost
PG_PORT=5432
PG_USER=rwa_user
PG_PASSWORD=your_password
# MPC PostgreSQL (被备份目标 - MPC 独立库)
MPC_PG_HOST=localhost
MPC_PG_PORT=5433
MPC_PG_USER=mpc_user
MPC_PG_PASSWORD=your_mpc_password
# Redis (被备份目标)
REDIS_HOST=localhost
REDIS_PORT=6379
@ -28,4 +34,4 @@ SNAPSHOT_TEMP_DIR=./data/snapshots
SNAPSHOT_RETENTION_HOURS=72
# 可用备份目标(逗号分隔,部署时按系统配置)
AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
AVAILABLE_TARGETS=POSTGRES,MPC_POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS

View File

@ -1,5 +1,6 @@
export enum BackupTarget {
POSTGRES = 'POSTGRES',
MPC_POSTGRES = 'MPC_POSTGRES',
REDIS = 'REDIS',
KAFKA = 'KAFKA',
ZOOKEEPER = 'ZOOKEEPER',
@ -10,6 +11,7 @@ export enum BackupTarget {
/** 备份执行顺序 */
export const BACKUP_TARGET_ORDER: BackupTarget[] = [
BackupTarget.POSTGRES,
BackupTarget.MPC_POSTGRES,
BackupTarget.REDIS,
BackupTarget.KAFKA,
BackupTarget.ZOOKEEPER,
@ -20,6 +22,7 @@ export const BACKUP_TARGET_ORDER: BackupTarget[] = [
/** 备份目标中文名 */
export const BACKUP_TARGET_LABELS: Record<BackupTarget, string> = {
[BackupTarget.POSTGRES]: 'PostgreSQL',
[BackupTarget.MPC_POSTGRES]: 'MPC PostgreSQL',
[BackupTarget.REDIS]: 'Redis',
[BackupTarget.KAFKA]: 'Kafka',
[BackupTarget.ZOOKEEPER]: 'ZooKeeper',

View File

@ -0,0 +1,146 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { spawn } from 'child_process';
import * as fs from 'fs';
import * as path from 'path';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class MpcPostgresBackupHandler implements BackupHandler {
readonly target = BackupTarget.MPC_POSTGRES;
private readonly logger = new Logger(MpcPostgresBackupHandler.name);
private readonly host: string;
private readonly port: string;
private readonly user: string;
private readonly password: string;
constructor(private readonly configService: ConfigService) {
this.host = this.configService.get<string>('MPC_PG_HOST', 'localhost');
this.port = this.configService.get<string>('MPC_PG_PORT', '5432');
this.user = this.configService.get<string>('MPC_PG_USER', 'mpc_user');
this.password = this.configService.get<string>('MPC_PG_PASSWORD', '');
}
/** 查询 MPC PostgreSQL 所有非模板数据库的总大小(字节) */
private getTotalDatabaseSize(): Promise<number> {
return new Promise((resolve) => {
const psql = spawn('psql', [
'-h', this.host,
'-p', this.port,
'-U', this.user,
'-t', '-A',
'-c', 'SELECT COALESCE(sum(pg_database_size(datname)), 0) FROM pg_database WHERE datistemplate = false',
], {
env: { ...process.env, PGPASSWORD: this.password },
});
let output = '';
psql.stdout.on('data', (data: Buffer) => { output += data.toString(); });
psql.on('close', (code) => {
const size = parseInt(output.trim(), 10);
if (code === 0 && !isNaN(size) && size > 0) {
this.logger.log(`MPC PostgreSQL 总数据库大小: ${(size / 1024 / 1024).toFixed(1)} MB`);
resolve(size);
} else {
this.logger.warn(`无法获取 MPC 数据库大小 (exit=${code}), 使用默认估算 500MB`);
resolve(500 * 1024 * 1024);
}
});
psql.on('error', () => {
this.logger.warn('psql 不可用, 使用默认估算 500MB');
resolve(500 * 1024 * 1024);
});
});
}
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `mpc-postgres-${timestamp}.sql.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'MPC PostgreSQL 查询数据库大小...');
const totalSize = await this.getTotalDatabaseSize();
onProgress(0, 'MPC pg_dumpall 开始...');
return new Promise<BackupResult>((resolve, reject) => {
const dumpProc = spawn('pg_dumpall', [
'-h', this.host,
'-p', this.port,
'-U', this.user,
'--clean',
'--if-exists',
], {
env: { ...process.env, PGPASSWORD: this.password },
});
const gzipProc = spawn('gzip', ['-6']);
const outputStream = fs.createWriteStream(filePath);
let bytesRead = 0;
let lastReportedMB = 0;
dumpProc.stdout.on('data', (chunk: Buffer) => {
bytesRead += chunk.length;
const readMB = Math.floor(bytesRead / (1024 * 1024));
if (readMB > lastReportedMB) {
lastReportedMB = readMB;
const totalMB = Math.floor(totalSize / (1024 * 1024));
const percent = Math.min(99, parseFloat(((bytesRead / totalSize) * 100).toFixed(2)));
onProgress(percent, `MPC PostgreSQL 备份中... ${readMB}MB / ~${totalMB}MB`);
}
});
// pipe: pg_dumpall stdout → gzip stdin → file
dumpProc.stdout.pipe(gzipProc.stdin);
gzipProc.stdout.pipe(outputStream);
let stderrBuffer = '';
dumpProc.stderr.on('data', (data: Buffer) => {
stderrBuffer += data.toString();
});
let dumpExitCode: number | null = null;
let gzipExitCode: number | null = null;
const checkDone = () => {
if (dumpExitCode === null || gzipExitCode === null) return;
if (dumpExitCode === 0 && gzipExitCode === 0) {
const stat = fs.statSync(filePath);
this.logger.log(`MPC PostgreSQL 备份完成: ${fileName}, 压缩后: ${stat.size} bytes, 原始输出: ${bytesRead} bytes`);
onProgress(100, 'MPC PostgreSQL 备份完成');
resolve({ fileName, filePath, fileSize: stat.size });
} else {
const error = `MPC pg_dumpall 退出码: ${dumpExitCode}, gzip 退出码: ${gzipExitCode}, stderr: ${stderrBuffer.slice(-500)}`;
this.logger.error(error);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(error));
}
};
dumpProc.on('close', (code) => { dumpExitCode = code; checkDone(); });
gzipProc.on('close', (code) => { gzipExitCode = code; checkDone(); });
dumpProc.on('error', (err) => {
this.logger.error(`MPC pg_dumpall 启动失败: ${err.message}`);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(`MPC pg_dumpall 启动失败: ${err.message}`));
});
gzipProc.on('error', (err) => {
this.logger.error(`gzip 启动失败: ${err.message}`);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(`gzip 启动失败: ${err.message}`));
});
});
}
}

View File

@ -3,6 +3,7 @@ import { ConfigService } from '@nestjs/config';
import { PrismaService } from './persistence/prisma/prisma.service';
import { SnapshotRepository } from './persistence/repositories/snapshot.repository';
import { PostgresBackupHandler } from './backup/postgres-backup.handler';
import { MpcPostgresBackupHandler } from './backup/mpc-postgres-backup.handler';
import { RedisBackupHandler } from './backup/redis-backup.handler';
import { KafkaBackupHandler } from './backup/kafka-backup.handler';
import { ZookeeperBackupHandler } from './backup/zookeeper-backup.handler';
@ -11,7 +12,7 @@ import { UploadsBackupHandler } from './backup/uploads-backup.handler';
import { MinioStorageAdapter } from './storage/minio-storage.adapter';
import { LocalStorageAdapter } from './storage/local-storage.adapter';
import { SnapshotGateway } from '@/api/gateways/snapshot.gateway';
import { BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface';
import { BackupHandler, BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface';
import { BackupTarget } from '@/domain/enums';
@Module({
@ -23,6 +24,7 @@ import { BackupTarget } from '@/domain/enums';
SnapshotGateway,
// 各备份 handler
PostgresBackupHandler,
MpcPostgresBackupHandler,
RedisBackupHandler,
KafkaBackupHandler,
ZookeeperBackupHandler,
@ -34,6 +36,7 @@ import { BackupTarget } from '@/domain/enums';
useFactory: (
configService: ConfigService,
pg: PostgresBackupHandler,
mpcPg: MpcPostgresBackupHandler,
redis: RedisBackupHandler,
kafka: KafkaBackupHandler,
zk: ZookeeperBackupHandler,
@ -46,8 +49,8 @@ import { BackupTarget } from '@/domain/enums';
.map((t) => t.trim())
.filter(Boolean);
const allHandlers = [pg, redis, kafka, zk, minio, uploads];
const handlerMap = new Map<BackupTarget, PostgresBackupHandler | RedisBackupHandler | KafkaBackupHandler | ZookeeperBackupHandler | MinioBackupHandler | UploadsBackupHandler>();
const allHandlers = [pg, mpcPg, redis, kafka, zk, minio, uploads];
const handlerMap = new Map<BackupTarget, BackupHandler>();
for (const handler of allHandlers) {
if (availableTargets.includes(handler.target as string)) {
@ -60,6 +63,7 @@ import { BackupTarget } from '@/domain/enums';
inject: [
ConfigService,
PostgresBackupHandler,
MpcPostgresBackupHandler,
RedisBackupHandler,
KafkaBackupHandler,
ZookeeperBackupHandler,