diff --git a/backend/services/docker-compose.snapshot.yml b/backend/services/docker-compose.snapshot.yml index eaef2537..3c053116 100644 --- a/backend/services/docker-compose.snapshot.yml +++ b/backend/services/docker-compose.snapshot.yml @@ -15,11 +15,16 @@ services: - NODE_ENV=production - APP_PORT=3099 - DATABASE_URL=file:/app/data/snapshot.db - # PostgreSQL (备份目标) + # PostgreSQL (备份目标 - 主业务库) - PG_HOST=postgres - PG_PORT=5432 - PG_USER=${POSTGRES_USER:-rwa_user} - PG_PASSWORD=${POSTGRES_PASSWORD} + # MPC PostgreSQL (备份目标 - MPC 独立库) + - MPC_PG_HOST=mpc-postgres + - MPC_PG_PORT=5432 + - MPC_PG_USER=${MPC_POSTGRES_USER:-mpc_user} + - MPC_PG_PASSWORD=${MPC_POSTGRES_PASSWORD} # Redis (备份目标) - REDIS_HOST=redis - REDIS_PORT=6379 @@ -34,7 +39,7 @@ services: # 备份配置 - SNAPSHOT_TEMP_DIR=/tmp/snapshots - SNAPSHOT_RETENTION_HOURS=72 - - AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS + - AVAILABLE_TARGETS=POSTGRES,MPC_POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS volumes: - snapshot_data:/app/data - /tmp/rwa-snapshots:/tmp/snapshots @@ -56,6 +61,12 @@ services: start_period: 30s networks: - rwa-network + - mpc-network volumes: snapshot_data: + +networks: + mpc-network: + external: true + name: mpc-system_mpc-network diff --git a/backend/services/snapshot-service/.env.development b/backend/services/snapshot-service/.env.development index fc061d79..3a909a18 100644 --- a/backend/services/snapshot-service/.env.development +++ b/backend/services/snapshot-service/.env.development @@ -3,12 +3,18 @@ APP_PORT=3099 DATABASE_URL="file:./data/snapshot.db" # Docker 部署时 docker-compose 覆盖为绝对路径: file:/app/data/snapshot.db -# PostgreSQL (被备份目标) +# PostgreSQL (被备份目标 - 主业务库) PG_HOST=localhost PG_PORT=5432 PG_USER=rwa_user PG_PASSWORD=your_password +# MPC PostgreSQL (被备份目标 - MPC 独立库) +MPC_PG_HOST=localhost +MPC_PG_PORT=5433 +MPC_PG_USER=mpc_user +MPC_PG_PASSWORD=your_mpc_password + # Redis (被备份目标) REDIS_HOST=localhost REDIS_PORT=6379 @@ -28,4 +34,4 @@ SNAPSHOT_TEMP_DIR=./data/snapshots SNAPSHOT_RETENTION_HOURS=72 # 可用备份目标(逗号分隔,部署时按系统配置) -AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS +AVAILABLE_TARGETS=POSTGRES,MPC_POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS diff --git a/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts b/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts index 5e8f50e9..71586eb2 100644 --- a/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts +++ b/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts @@ -1,5 +1,6 @@ export enum BackupTarget { POSTGRES = 'POSTGRES', + MPC_POSTGRES = 'MPC_POSTGRES', REDIS = 'REDIS', KAFKA = 'KAFKA', ZOOKEEPER = 'ZOOKEEPER', @@ -10,6 +11,7 @@ export enum BackupTarget { /** 备份执行顺序 */ export const BACKUP_TARGET_ORDER: BackupTarget[] = [ BackupTarget.POSTGRES, + BackupTarget.MPC_POSTGRES, BackupTarget.REDIS, BackupTarget.KAFKA, BackupTarget.ZOOKEEPER, @@ -20,6 +22,7 @@ export const BACKUP_TARGET_ORDER: BackupTarget[] = [ /** 备份目标中文名 */ export const BACKUP_TARGET_LABELS: Record = { [BackupTarget.POSTGRES]: 'PostgreSQL', + [BackupTarget.MPC_POSTGRES]: 'MPC PostgreSQL', [BackupTarget.REDIS]: 'Redis', [BackupTarget.KAFKA]: 'Kafka', [BackupTarget.ZOOKEEPER]: 'ZooKeeper', diff --git a/backend/services/snapshot-service/src/infrastructure/backup/mpc-postgres-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/mpc-postgres-backup.handler.ts new file mode 100644 index 00000000..f559c855 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/mpc-postgres-backup.handler.ts @@ -0,0 +1,146 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { spawn } from 'child_process'; +import * as fs from 'fs'; +import * as path from 'path'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class MpcPostgresBackupHandler implements BackupHandler { + readonly target = BackupTarget.MPC_POSTGRES; + private readonly logger = new Logger(MpcPostgresBackupHandler.name); + + private readonly host: string; + private readonly port: string; + private readonly user: string; + private readonly password: string; + + constructor(private readonly configService: ConfigService) { + this.host = this.configService.get('MPC_PG_HOST', 'localhost'); + this.port = this.configService.get('MPC_PG_PORT', '5432'); + this.user = this.configService.get('MPC_PG_USER', 'mpc_user'); + this.password = this.configService.get('MPC_PG_PASSWORD', ''); + } + + /** 查询 MPC PostgreSQL 所有非模板数据库的总大小(字节) */ + private getTotalDatabaseSize(): Promise { + return new Promise((resolve) => { + const psql = spawn('psql', [ + '-h', this.host, + '-p', this.port, + '-U', this.user, + '-t', '-A', + '-c', 'SELECT COALESCE(sum(pg_database_size(datname)), 0) FROM pg_database WHERE datistemplate = false', + ], { + env: { ...process.env, PGPASSWORD: this.password }, + }); + + let output = ''; + psql.stdout.on('data', (data: Buffer) => { output += data.toString(); }); + psql.on('close', (code) => { + const size = parseInt(output.trim(), 10); + if (code === 0 && !isNaN(size) && size > 0) { + this.logger.log(`MPC PostgreSQL 总数据库大小: ${(size / 1024 / 1024).toFixed(1)} MB`); + resolve(size); + } else { + this.logger.warn(`无法获取 MPC 数据库大小 (exit=${code}), 使用默认估算 500MB`); + resolve(500 * 1024 * 1024); + } + }); + psql.on('error', () => { + this.logger.warn('psql 不可用, 使用默认估算 500MB'); + resolve(500 * 1024 * 1024); + }); + }); + } + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `mpc-postgres-${timestamp}.sql.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + onProgress(0, 'MPC PostgreSQL 查询数据库大小...'); + const totalSize = await this.getTotalDatabaseSize(); + + onProgress(0, 'MPC pg_dumpall 开始...'); + + return new Promise((resolve, reject) => { + const dumpProc = spawn('pg_dumpall', [ + '-h', this.host, + '-p', this.port, + '-U', this.user, + '--clean', + '--if-exists', + ], { + env: { ...process.env, PGPASSWORD: this.password }, + }); + + const gzipProc = spawn('gzip', ['-6']); + const outputStream = fs.createWriteStream(filePath); + + let bytesRead = 0; + let lastReportedMB = 0; + + dumpProc.stdout.on('data', (chunk: Buffer) => { + bytesRead += chunk.length; + const readMB = Math.floor(bytesRead / (1024 * 1024)); + if (readMB > lastReportedMB) { + lastReportedMB = readMB; + const totalMB = Math.floor(totalSize / (1024 * 1024)); + const percent = Math.min(99, parseFloat(((bytesRead / totalSize) * 100).toFixed(2))); + onProgress(percent, `MPC PostgreSQL 备份中... ${readMB}MB / ~${totalMB}MB`); + } + }); + + // pipe: pg_dumpall stdout → gzip stdin → file + dumpProc.stdout.pipe(gzipProc.stdin); + gzipProc.stdout.pipe(outputStream); + + let stderrBuffer = ''; + dumpProc.stderr.on('data', (data: Buffer) => { + stderrBuffer += data.toString(); + }); + + let dumpExitCode: number | null = null; + let gzipExitCode: number | null = null; + + const checkDone = () => { + if (dumpExitCode === null || gzipExitCode === null) return; + + if (dumpExitCode === 0 && gzipExitCode === 0) { + const stat = fs.statSync(filePath); + this.logger.log(`MPC PostgreSQL 备份完成: ${fileName}, 压缩后: ${stat.size} bytes, 原始输出: ${bytesRead} bytes`); + onProgress(100, 'MPC PostgreSQL 备份完成'); + resolve({ fileName, filePath, fileSize: stat.size }); + } else { + const error = `MPC pg_dumpall 退出码: ${dumpExitCode}, gzip 退出码: ${gzipExitCode}, stderr: ${stderrBuffer.slice(-500)}`; + this.logger.error(error); + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + reject(new Error(error)); + } + }; + + dumpProc.on('close', (code) => { dumpExitCode = code; checkDone(); }); + gzipProc.on('close', (code) => { gzipExitCode = code; checkDone(); }); + + dumpProc.on('error', (err) => { + this.logger.error(`MPC pg_dumpall 启动失败: ${err.message}`); + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + reject(new Error(`MPC pg_dumpall 启动失败: ${err.message}`)); + }); + + gzipProc.on('error', (err) => { + this.logger.error(`gzip 启动失败: ${err.message}`); + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + reject(new Error(`gzip 启动失败: ${err.message}`)); + }); + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts b/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts index e7b9c1ae..78fee174 100644 --- a/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts @@ -3,6 +3,7 @@ import { ConfigService } from '@nestjs/config'; import { PrismaService } from './persistence/prisma/prisma.service'; import { SnapshotRepository } from './persistence/repositories/snapshot.repository'; import { PostgresBackupHandler } from './backup/postgres-backup.handler'; +import { MpcPostgresBackupHandler } from './backup/mpc-postgres-backup.handler'; import { RedisBackupHandler } from './backup/redis-backup.handler'; import { KafkaBackupHandler } from './backup/kafka-backup.handler'; import { ZookeeperBackupHandler } from './backup/zookeeper-backup.handler'; @@ -11,7 +12,7 @@ import { UploadsBackupHandler } from './backup/uploads-backup.handler'; import { MinioStorageAdapter } from './storage/minio-storage.adapter'; import { LocalStorageAdapter } from './storage/local-storage.adapter'; import { SnapshotGateway } from '@/api/gateways/snapshot.gateway'; -import { BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface'; +import { BackupHandler, BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface'; import { BackupTarget } from '@/domain/enums'; @Module({ @@ -23,6 +24,7 @@ import { BackupTarget } from '@/domain/enums'; SnapshotGateway, // 各备份 handler PostgresBackupHandler, + MpcPostgresBackupHandler, RedisBackupHandler, KafkaBackupHandler, ZookeeperBackupHandler, @@ -34,6 +36,7 @@ import { BackupTarget } from '@/domain/enums'; useFactory: ( configService: ConfigService, pg: PostgresBackupHandler, + mpcPg: MpcPostgresBackupHandler, redis: RedisBackupHandler, kafka: KafkaBackupHandler, zk: ZookeeperBackupHandler, @@ -46,8 +49,8 @@ import { BackupTarget } from '@/domain/enums'; .map((t) => t.trim()) .filter(Boolean); - const allHandlers = [pg, redis, kafka, zk, minio, uploads]; - const handlerMap = new Map(); + const allHandlers = [pg, mpcPg, redis, kafka, zk, minio, uploads]; + const handlerMap = new Map(); for (const handler of allHandlers) { if (availableTargets.includes(handler.target as string)) { @@ -60,6 +63,7 @@ import { BackupTarget } from '@/domain/enums'; inject: [ ConfigService, PostgresBackupHandler, + MpcPostgresBackupHandler, RedisBackupHandler, KafkaBackupHandler, ZookeeperBackupHandler,