feat(snapshot): 数据快照备份服务全量实现(纯新增,零侵入)

一套代码两处部署的在线备份编排服务,为 1.0 认种分配系统和 2.0 算力挖矿系统
分别提供 PostgreSQL / Redis / Kafka / ZooKeeper / MinIO / Uploads 的在线备份能力。
管理员在 admin-web / mining-admin-web 中选择备份目标和存储方式,点击备份后系统
串行执行各组件备份,通过 Socket.IO WebSocket 实时推送进度到前端。

## 后端 snapshot-service(NestJS 10 + Prisma 5 + SQLite)

架构: DDD 四层(api / application / domain / infrastructure)

- api 层:
  · SnapshotController — REST API(创建/查询/删除/下载)含 Range/206 断点续传
  · SnapshotGateway — Socket.IO WebSocket 实时推送 5 类事件
  · HealthController — 健康检查
  · CreateSnapshotDto — class-validator 验证
  · toSnapshotResponse — BigInt→string 序列化

- application 层:
  · SnapshotOrchestratorService — 核心编排引擎
    - startSnapshot() 异步启动,不阻塞 HTTP
    - 按 PG→Redis→Kafka→ZK→MinIO→Uploads 顺序串行执行
    - 单目标失败不中断整体任务
    - MinIO 模式: 备份完上传到 MinIO 后删除本地临时文件
    - LOCAL 模式: 保留在服务器临时目录供下载
    - @Cron(EVERY_HOUR) 自动清理过期本地备份(默认 72h)
    - runningTaskId 防止并发执行

- domain 层:
  · BackupTarget 枚举(6 种目标)+ BACKUP_TARGET_ORDER 执行顺序
  · SnapshotStatus 枚举(PENDING/RUNNING/COMPLETED/FAILED)
  · StorageType 枚举(MINIO/LOCAL)
  · BackupHandler 接口 + BACKUP_HANDLER_TOKEN

- infrastructure 层:
  · 6 个备份 Handler(均实现 BackupHandler 接口):
    - PostgresBackupHandler: pg_basebackup 通过网络流式备份,解析 stderr 进度
    - RedisBackupHandler: BGSAVE + LASTSAVE 轮询 + 打包 dump.rdb/AOF
    - KafkaBackupHandler: archiver 打包数据卷,按字节计算进度
    - ZookeeperBackupHandler: archiver 打包 data/ + log/
    - MinioBackupHandler: SDK 列举并下载所有桶(排除备份桶)后打包
    - UploadsBackupHandler: archiver 打包上传文件目录
  · 2 个存储适配器:
    - MinioStorageAdapter: fPutObject 上传 / removeObjects 批量删除
    - LocalStorageAdapter: 本地临时目录管理 + 过期清理
  · PrismaService (SQLite) + SnapshotRepository (完整 CRUD)
  · BACKUP_HANDLER_TOKEN 工厂: 根据 AVAILABLE_TARGETS 环境变量过滤可用 handler

- Prisma Schema (SQLite):
  · SnapshotTask: 主表,targets 存 JSON 字符串,totalSize 用 BigInt
  · SnapshotDetail: 明细表,每个目标一行,@@index([taskId])
  · onDelete: Cascade 级联删除

- Dockerfile: 多阶段构建,生产镜像安装 postgresql-client + mc (MinIO CLI)
  SQLite 使用 prisma db push 而非 migrate deploy

- 部署端口: 1.0 系统 = 3099,2.0 系统 = 3199

## Docker Compose overlay(纯新增,不修改现有 docker-compose)

- docker-compose.snapshot.yml (1.0):
  · 挂载 redis_data/kafka_data/zookeeper_data/zookeeper_log/admin_uploads_data 只读卷
  · AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
  · 依赖 postgres + redis 健康检查

- docker-compose.2.0-snapshot.yml (2.0 standalone):
  · 挂载 redis_2_data/mining-admin-uploads/trading-uploads 只读卷
  · AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS
  · 依赖 postgres-2 + redis-2 健康检查

## 前端 admin-web(Next.js 15 + SCSS)

- 新增 /snapshots 页面: 创建备份表单 + 实时进度条 + 历史列表 + 下载/删除
- 新增 useSnapshotWebSocket hook: Socket.IO 连接 + 5 类事件监听
- 新增 snapshot.api.ts: 独立 fetch(不走通用 apiClient,snapshot 服务独立端口)
- 新增 snapshot.types.ts: 共享类型定义
- 新增 page.module.scss: 表单/进度条/表格样式
- 修改 Sidebar.tsx: 添加「数据快照」菜单项
- package.json: 添加 socket.io-client 依赖

## 前端 mining-admin-web(Next.js 14 + Tailwind CSS)

- 新增 /snapshots 页面: 同 admin-web 功能,Tailwind CSS 风格
- 新增 useSnapshotWebSocket hook
- 新增 snapshot.api.ts + snapshot.types.ts
- 修改 sidebar.tsx: 添加「数据快照」菜单项 + HardDrive 图标
- package.json: 添加 socket.io-client 依赖

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 21:53:09 -08:00
parent e690a55c8e
commit cf07eb03be
50 changed files with 3419 additions and 0 deletions

View File

@ -0,0 +1,60 @@
# 数据快照备份服务 (2.0 算力挖矿系统 - standalone 模式)
# 使用方式: docker compose -f docker-compose.2.0.yml -f docker-compose.2.0-snapshot.yml --profile standalone up snapshot-service-2
#
# 纯新增 overlay不修改任何现有服务配置
services:
snapshot-service-2:
build:
context: ./snapshot-service
dockerfile: Dockerfile
container_name: rwa-snapshot-service-2
ports:
- "3199:3199"
environment:
- NODE_ENV=production
- APP_PORT=3199
- DATABASE_URL=file:./data/snapshot.db
# PostgreSQL-2 (备份目标)
- PG_HOST=postgres-2
- PG_PORT=5432
- PG_USER=${POSTGRES_USER:-rwa_user}
- PG_PASSWORD=${POSTGRES_PASSWORD}
# Redis-2 (备份目标)
- REDIS_HOST=redis-2
- REDIS_PORT=6379
- REDIS_PASSWORD=${REDIS_PASSWORD:-}
# MinIO (存储后端)
- MINIO_ENDPOINT=${MINIO_ENDPOINT:-192.168.1.100}
- MINIO_PORT=${MINIO_PORT:-9000}
- MINIO_USE_SSL=${MINIO_USE_SSL:-false}
- MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BACKUP_BUCKET=snapshots-2
# 备份配置
- SNAPSHOT_TEMP_DIR=/app/data/snapshots
- SNAPSHOT_RETENTION_HOURS=72
- AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS
volumes:
- snapshot_2_data:/app/data
- redis_2_data:/backup-source/redis:ro
- mining-admin-uploads:/backup-source/uploads/mining-admin:ro
- trading-uploads:/backup-source/uploads/trading:ro
profiles:
- standalone
depends_on:
postgres-2:
condition: service_healthy
redis-2:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3199/api/v1/health"]
interval: 30s
timeout: 3s
retries: 3
start_period: 30s
networks:
- rwa-2-network
volumes:
snapshot_2_data:

View File

@ -0,0 +1,60 @@
# 数据快照备份服务 (1.0 认种分配系统)
# 使用方式: docker compose -f docker-compose.yml -f docker-compose.snapshot.yml up snapshot-service
#
# 纯新增 overlay不修改任何现有服务配置
services:
snapshot-service:
build:
context: ./snapshot-service
dockerfile: Dockerfile
container_name: rwa-snapshot-service
ports:
- "3099:3099"
environment:
- NODE_ENV=production
- APP_PORT=3099
- DATABASE_URL=file:./data/snapshot.db
# PostgreSQL (备份目标)
- PG_HOST=postgres
- PG_PORT=5432
- PG_USER=${POSTGRES_USER:-rwa_user}
- PG_PASSWORD=${POSTGRES_PASSWORD}
# Redis (备份目标)
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_PASSWORD=${REDIS_PASSWORD:-}
# MinIO (备份目标 + 存储后端)
- MINIO_ENDPOINT=${MINIO_ENDPOINT:-192.168.1.100}
- MINIO_PORT=${MINIO_PORT:-9000}
- MINIO_USE_SSL=${MINIO_USE_SSL:-false}
- MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY}
- MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
- MINIO_BACKUP_BUCKET=snapshots
# 备份配置
- SNAPSHOT_TEMP_DIR=/app/data/snapshots
- SNAPSHOT_RETENTION_HOURS=72
- AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS
volumes:
- snapshot_data:/app/data
- redis_data:/backup-source/redis:ro
- kafka_data:/backup-source/kafka:ro
- zookeeper_data:/backup-source/zookeeper/data:ro
- zookeeper_log:/backup-source/zookeeper/log:ro
- admin_uploads_data:/backup-source/uploads:ro
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3099/api/v1/health"]
interval: 30s
timeout: 3s
retries: 3
start_period: 30s
networks:
- rwa-network
volumes:
snapshot_data:

View File

@ -0,0 +1,30 @@
NODE_ENV=development
APP_PORT=3099
DATABASE_URL="file:./data/snapshot.db"
# PostgreSQL (被备份目标)
PG_HOST=localhost
PG_PORT=5432
PG_USER=rwa_user
PG_PASSWORD=your_password
# Redis (被备份目标)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
# MinIO
MINIO_ENDPOINT=192.168.1.100
MINIO_PORT=9000
MINIO_USE_SSL=false
MINIO_ACCESS_KEY=admin
MINIO_SECRET_KEY=your_minio_password
MINIO_BACKUP_BUCKET=snapshots
# 备份临时目录
SNAPSHOT_TEMP_DIR=./data/snapshots
# 临时文件保留时长(小时)
SNAPSHOT_RETENTION_HOURS=72
# 可用备份目标(逗号分隔,部署时按系统配置)
AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS

View File

@ -0,0 +1,6 @@
dist/
node_modules/
data/
*.db
*.db-journal
.env

View File

@ -0,0 +1,55 @@
# Build stage
FROM node:20-alpine AS builder
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY prisma ./prisma/
RUN DATABASE_URL="file:./data/snapshot.db" npx prisma generate
COPY . .
RUN npm run build
# Production stage
FROM node:20-slim AS production
WORKDIR /app
# 安装系统工具: openssl, curl (健康检查), postgresql-client (pg_basebackup)
RUN apt-get update && apt-get install -y --no-install-recommends \
openssl \
curl \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 安装 MinIO 客户端 (mc)
RUN curl -fsSL https://dl.min.io/client/mc/release/linux-amd64/mc -o /usr/local/bin/mc \
&& chmod +x /usr/local/bin/mc
COPY package*.json ./
RUN npm ci --only=production
COPY prisma ./prisma/
RUN DATABASE_URL="file:./data/snapshot.db" npx prisma generate
COPY --from=builder /app/dist ./dist
# 创建数据目录
RUN mkdir -p /app/data/snapshots
# 启动脚本: SQLite 用 db push 而非 migrate deploy
RUN echo '#!/bin/sh\n\
set -e\n\
echo "Initializing SQLite database..."\n\
npx prisma db push --skip-generate\n\
echo "Starting snapshot service..."\n\
exec node dist/main.js\n' > /app/start.sh && chmod +x /app/start.sh
EXPOSE 3099
HEALTHCHECK --interval=30s --timeout=3s --start-period=30s --retries=3 \
CMD curl -f http://localhost:${APP_PORT:-3099}/api/v1/health || exit 1
CMD ["/app/start.sh"]

View File

@ -0,0 +1,8 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true
}
}

View File

@ -0,0 +1,43 @@
{
"name": "snapshot-service",
"version": "1.0.0",
"description": "RWADurian 数据快照备份编排服务",
"scripts": {
"build": "nest build",
"start": "nest start",
"start:dev": "nest start --watch",
"start:prod": "node dist/main",
"prisma:generate": "prisma generate",
"prisma:push": "prisma db push"
},
"dependencies": {
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/swagger": "^7.1.17",
"@nestjs/schedule": "^4.0.0",
"@nestjs/websockets": "^10.0.0",
"@nestjs/platform-socket.io": "^10.0.0",
"@prisma/client": "^5.7.0",
"class-validator": "^0.14.0",
"class-transformer": "^0.5.1",
"ioredis": "^5.3.2",
"minio": "^8.0.1",
"archiver": "^7.0.1",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"uuid": "^9.0.0",
"socket.io": "^4.7.4"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@types/archiver": "^6.0.2",
"@types/express": "^4.17.17",
"@types/node": "^20.3.1",
"@types/uuid": "^9.0.2",
"prisma": "^5.7.0",
"typescript": "^5.1.3"
}
}

View File

@ -0,0 +1,46 @@
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "sqlite"
url = env("DATABASE_URL")
}
model SnapshotTask {
id String @id @default(uuid())
status String @default("PENDING")
storageType String
storagePath String
targets String // JSON: ["POSTGRES","REDIS"]
totalSize BigInt @default(0)
error String?
startedAt DateTime?
completedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
details SnapshotDetail[]
@@map("snapshot_tasks")
}
model SnapshotDetail {
id String @id @default(uuid())
taskId String
target String
status String @default("PENDING")
progress Int @default(0)
fileSize BigInt @default(0)
fileName String?
error String?
startedAt DateTime?
completedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
task SnapshotTask @relation(fields: [taskId], references: [id], onDelete: Cascade)
@@index([taskId])
@@map("snapshot_details")
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { SnapshotController } from './controllers/snapshot.controller';
import { HealthController } from './controllers/health.controller';
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
import { ApplicationModule } from '@/application/application.module';
@Module({
imports: [InfrastructureModule, ApplicationModule],
controllers: [SnapshotController, HealthController],
})
export class ApiModule {}

View File

@ -0,0 +1,16 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
@ApiTags('健康检查')
@Controller('health')
export class HealthController {
@Get()
@ApiOperation({ summary: '服务健康检查' })
check() {
return {
status: 'ok',
service: 'snapshot-service',
timestamp: new Date().toISOString(),
};
}
}

View File

@ -0,0 +1,151 @@
import {
Controller,
Get,
Post,
Delete,
Param,
Body,
Query,
Res,
Req,
NotFoundException,
BadRequestException,
ConflictException,
Logger,
} from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse, ApiQuery } from '@nestjs/swagger';
import { Request, Response } from 'express';
import * as fs from 'fs';
import * as path from 'path';
import { CreateSnapshotDto } from '../dto/create-snapshot.dto';
import { toSnapshotResponse } from '../dto/snapshot.response';
import { SnapshotOrchestratorService } from '@/application/services/snapshot-orchestrator.service';
import { SnapshotRepository } from '@/infrastructure/persistence/repositories/snapshot.repository';
import { StorageType } from '@/domain/enums';
@ApiTags('快照备份')
@Controller('snapshots')
export class SnapshotController {
private readonly logger = new Logger(SnapshotController.name);
constructor(
private readonly orchestrator: SnapshotOrchestratorService,
private readonly repo: SnapshotRepository,
) {}
@Get('targets')
@ApiOperation({ summary: '获取可用备份目标列表' })
getAvailableTargets() {
return {
targets: this.orchestrator.getAvailableTargets(),
isRunning: this.orchestrator.isRunning(),
};
}
@Post()
@ApiOperation({ summary: '创建备份任务' })
@ApiResponse({ status: 201, description: '任务已创建,异步执行' })
@ApiResponse({ status: 409, description: '已有任务在执行' })
async createSnapshot(@Body() dto: CreateSnapshotDto) {
if (this.orchestrator.isRunning()) {
throw new ConflictException('已有备份任务正在执行,请等待完成');
}
const taskId = await this.orchestrator.startSnapshot(dto);
return { taskId, message: '备份任务已启动' };
}
@Get()
@ApiOperation({ summary: '备份历史列表' })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'limit', required: false, type: Number })
async listSnapshots(
@Query('page') page: number = 1,
@Query('limit') limit: number = 20,
) {
const result = await this.repo.findAll(page, limit);
return {
tasks: result.tasks.map(toSnapshotResponse),
total: result.total,
page: result.page,
limit: result.limit,
};
}
@Get(':id')
@ApiOperation({ summary: '备份任务详情' })
async getSnapshot(@Param('id') id: string) {
const task = await this.repo.findById(id);
if (!task) throw new NotFoundException('备份任务不存在');
return toSnapshotResponse(task);
}
@Delete(':id')
@ApiOperation({ summary: '删除备份' })
async deleteSnapshot(@Param('id') id: string) {
await this.orchestrator.deleteSnapshot(id);
return { message: '备份已删除' };
}
@Get(':id/download/:target')
@ApiOperation({ summary: '下载备份文件 (仅 LOCAL 模式)' })
@ApiResponse({ status: 200, description: '文件内容' })
@ApiResponse({ status: 206, description: '部分内容 (断点续传)' })
async downloadFile(
@Param('id') id: string,
@Param('target') target: string,
@Req() req: Request,
@Res() res: Response,
) {
const task = await this.repo.findById(id);
if (!task) throw new NotFoundException('备份任务不存在');
if (task.storageType !== StorageType.LOCAL) {
throw new BadRequestException('仅本地存储模式支持下载');
}
const detail = task.details.find((d) => d.target === target);
if (!detail || !detail.fileName) {
throw new NotFoundException('备份文件不存在');
}
const tempDir = process.env.SNAPSHOT_TEMP_DIR || './data/snapshots';
const filePath = path.join(tempDir, id, detail.fileName);
if (!fs.existsSync(filePath)) {
throw new NotFoundException('备份文件已过期或被删除');
}
const stat = fs.statSync(filePath);
const fileSize = stat.size;
const range = req.headers.range;
res.setHeader('Accept-Ranges', 'bytes');
res.setHeader('Content-Type', 'application/gzip');
res.setHeader(
'Content-Disposition',
`attachment; filename="${encodeURIComponent(detail.fileName)}"`,
);
if (range) {
const parts = range.replace(/bytes=/, '').split('-');
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
if (start >= fileSize || end >= fileSize || start > end) {
res.status(416);
res.setHeader('Content-Range', `bytes */${fileSize}`);
res.end();
return;
}
const chunkSize = end - start + 1;
res.status(206);
res.setHeader('Content-Range', `bytes ${start}-${end}/${fileSize}`);
res.setHeader('Content-Length', chunkSize);
fs.createReadStream(filePath, { start, end }).pipe(res);
} else {
res.setHeader('Content-Length', fileSize);
fs.createReadStream(filePath).pipe(res);
}
}
}

View File

@ -0,0 +1,25 @@
import { IsEnum, IsArray, IsString, ArrayMinSize } from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';
import { BackupTarget } from '@/domain/enums';
import { StorageType } from '@/domain/enums';
export class CreateSnapshotDto {
@ApiProperty({ enum: StorageType, description: '存储方式' })
@IsEnum(StorageType)
storageType: StorageType;
@ApiProperty({ description: '存储路径 (MinIO 桶路径或描述标签)', example: 'snapshots/2026-02-23' })
@IsString()
storagePath: string;
@ApiProperty({
enum: BackupTarget,
isArray: true,
description: '备份目标列表',
example: ['POSTGRES', 'REDIS'],
})
@IsArray()
@ArrayMinSize(1)
@IsEnum(BackupTarget, { each: true })
targets: BackupTarget[];
}

View File

@ -0,0 +1,62 @@
import { ApiProperty } from '@nestjs/swagger';
export class SnapshotDetailResponse {
@ApiProperty() id: string;
@ApiProperty() target: string;
@ApiProperty() status: string;
@ApiProperty() progress: number;
@ApiProperty() fileSize: string;
@ApiProperty() fileName: string | null;
@ApiProperty() error: string | null;
@ApiProperty() startedAt: Date | null;
@ApiProperty() completedAt: Date | null;
}
export class SnapshotTaskResponse {
@ApiProperty() id: string;
@ApiProperty() status: string;
@ApiProperty() storageType: string;
@ApiProperty() storagePath: string;
@ApiProperty() targets: string[];
@ApiProperty() totalSize: string;
@ApiProperty() error: string | null;
@ApiProperty() startedAt: Date | null;
@ApiProperty() completedAt: Date | null;
@ApiProperty() createdAt: Date;
@ApiProperty({ type: [SnapshotDetailResponse] })
details: SnapshotDetailResponse[];
}
export class SnapshotListResponse {
@ApiProperty({ type: [SnapshotTaskResponse] }) tasks: SnapshotTaskResponse[];
@ApiProperty() total: number;
@ApiProperty() page: number;
@ApiProperty() limit: number;
}
/** 将 Prisma 对象转换为响应 DTO处理 BigInt 序列化) */
export function toSnapshotResponse(task: any): SnapshotTaskResponse {
return {
id: task.id,
status: task.status,
storageType: task.storageType,
storagePath: task.storagePath,
targets: JSON.parse(task.targets),
totalSize: task.totalSize.toString(),
error: task.error,
startedAt: task.startedAt,
completedAt: task.completedAt,
createdAt: task.createdAt,
details: (task.details || []).map((d: any) => ({
id: d.id,
target: d.target,
status: d.status,
progress: d.progress,
fileSize: d.fileSize.toString(),
fileName: d.fileName,
error: d.error,
startedAt: d.startedAt,
completedAt: d.completedAt,
})),
};
}

View File

@ -0,0 +1,81 @@
import {
WebSocketGateway,
WebSocketServer,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger, Injectable } from '@nestjs/common';
import { BackupTarget } from '@/domain/enums';
@Injectable()
@WebSocketGateway({
namespace: '/snapshots',
cors: {
origin: '*',
credentials: true,
},
transports: ['websocket', 'polling'],
})
export class SnapshotGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server: Server;
private readonly logger = new Logger(SnapshotGateway.name);
private connectedClients = 0;
afterInit() {
this.logger.log('Snapshot WebSocket Gateway initialized');
}
handleConnection(client: Socket) {
this.connectedClients++;
this.logger.log(`Client connected: ${client.id}, total: ${this.connectedClients}`);
}
handleDisconnect(client: Socket) {
this.connectedClients--;
this.logger.log(`Client disconnected: ${client.id}, total: ${this.connectedClients}`);
}
emitTaskStarted(taskId: string, targets: BackupTarget[]): void {
if (this.server && this.connectedClients > 0) {
this.server.emit('snapshot:started', { taskId, targets });
}
}
emitProgress(taskId: string, target: string, percent: number, message: string): void {
if (this.server && this.connectedClients > 0) {
this.server.emit('snapshot:progress', { taskId, target, percent, message });
}
}
emitTargetComplete(taskId: string, target: string, fileSize: number): void {
if (this.server && this.connectedClients > 0) {
this.server.emit('snapshot:target-complete', {
taskId,
target,
fileSize: fileSize.toString(),
});
}
}
emitComplete(taskId: string, totalSize: number, duration: number): void {
if (this.server && this.connectedClients > 0) {
this.server.emit('snapshot:complete', {
taskId,
totalSize: totalSize.toString(),
duration,
});
}
}
emitError(taskId: string, target: string | undefined, error: string): void {
if (this.server && this.connectedClients > 0) {
this.server.emit('snapshot:error', { taskId, target, error });
}
}
}

View File

@ -0,0 +1,22 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { ApiModule } from './api/api.module';
import { InfrastructureModule } from './infrastructure/infrastructure.module';
import { DomainModule } from './domain/domain.module';
import { appConfig } from './config/app.config';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
load: [appConfig],
envFilePath: ['.env', '.env.development'],
}),
ScheduleModule.forRoot(),
DomainModule,
InfrastructureModule,
ApiModule,
],
})
export class AppModule {}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { SnapshotOrchestratorService } from './services/snapshot-orchestrator.service';
import { InfrastructureModule } from '@/infrastructure/infrastructure.module';
@Module({
imports: [InfrastructureModule],
providers: [SnapshotOrchestratorService],
exports: [SnapshotOrchestratorService],
})
export class ApplicationModule {}

View File

@ -0,0 +1,189 @@
import { Injectable, Inject, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Cron, CronExpression } from '@nestjs/schedule';
import * as fs from 'fs';
import { SnapshotRepository } from '@/infrastructure/persistence/repositories/snapshot.repository';
import { MinioStorageAdapter } from '@/infrastructure/storage/minio-storage.adapter';
import { LocalStorageAdapter } from '@/infrastructure/storage/local-storage.adapter';
import { SnapshotGateway } from '@/api/gateways/snapshot.gateway';
import {
BackupHandler,
BACKUP_HANDLER_TOKEN,
} from '@/domain/interfaces/backup-handler.interface';
import {
BackupTarget,
BACKUP_TARGET_ORDER,
SnapshotStatus,
StorageType,
} from '@/domain/enums';
@Injectable()
export class SnapshotOrchestratorService {
private readonly logger = new Logger(SnapshotOrchestratorService.name);
private readonly retentionHours: number;
private runningTaskId: string | null = null;
constructor(
private readonly repo: SnapshotRepository,
private readonly minioStorage: MinioStorageAdapter,
private readonly localStorage: LocalStorageAdapter,
private readonly gateway: SnapshotGateway,
private readonly configService: ConfigService,
@Inject(BACKUP_HANDLER_TOKEN)
private readonly handlers: Map<BackupTarget, BackupHandler>,
) {
this.retentionHours = this.configService.get<number>('SNAPSHOT_RETENTION_HOURS', 72);
}
getAvailableTargets(): BackupTarget[] {
return BACKUP_TARGET_ORDER.filter((t) => this.handlers.has(t));
}
isRunning(): boolean {
return this.runningTaskId !== null;
}
async startSnapshot(data: {
storageType: StorageType;
storagePath: string;
targets: BackupTarget[];
}): Promise<string> {
if (this.runningTaskId) {
throw new Error('已有备份任务正在执行,请等待完成');
}
// 校验 targets 是否都可用
const unavailable = data.targets.filter((t) => !this.handlers.has(t));
if (unavailable.length > 0) {
throw new Error(`以下备份目标不可用: ${unavailable.join(', ')}`);
}
// 按预定义顺序排序
const sortedTargets = BACKUP_TARGET_ORDER.filter((t) => data.targets.includes(t));
const task = await this.repo.createTask({
storageType: data.storageType,
storagePath: data.storagePath,
targets: sortedTargets,
});
this.logger.log(`备份任务已创建: ${task.id}, targets: ${sortedTargets.join(',')}`);
// 异步执行,不阻塞 HTTP 请求
this.executeSnapshot(task.id).catch((err) => {
this.logger.error(`备份任务异常: ${task.id}, ${err.message}`);
});
return task.id;
}
private async executeSnapshot(taskId: string): Promise<void> {
this.runningTaskId = taskId;
const startTime = Date.now();
try {
await this.repo.updateTaskStatus(taskId, SnapshotStatus.RUNNING);
const task = await this.repo.findById(taskId);
if (!task) throw new Error(`任务不存在: ${taskId}`);
const targets: BackupTarget[] = JSON.parse(task.targets);
const outputDir = this.localStorage.getTaskDir(taskId);
this.gateway.emitTaskStarted(taskId, targets);
// 串行执行各目标
for (const target of targets) {
await this.repo.updateDetailStatus(taskId, target, SnapshotStatus.RUNNING);
this.gateway.emitProgress(taskId, target, 0, '开始备份...');
try {
const handler = this.handlers.get(target);
if (!handler) {
throw new Error(`备份处理器不存在: ${target}`);
}
const result = await handler.execute(outputDir, (percent, msg) => {
this.gateway.emitProgress(taskId, target, percent, msg);
// 进度更新不频繁写库每10%写一次
if (percent % 10 === 0) {
this.repo.updateDetailProgress(taskId, target, percent).catch(() => {});
}
});
// 如果存储到 MinIO上传后删除本地临时文件
if (task.storageType === StorageType.MINIO) {
this.gateway.emitProgress(taskId, target, 99, '上传到 MinIO...');
await this.minioStorage.upload(
result.filePath,
`${task.storagePath}/${result.fileName}`,
);
fs.unlinkSync(result.filePath);
}
await this.repo.completeDetail(taskId, target, result);
this.gateway.emitTargetComplete(taskId, target, result.fileSize);
this.logger.log(`备份目标完成: ${target}, 大小: ${result.fileSize} bytes`);
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
await this.repo.failDetail(taskId, target, errMsg);
this.gateway.emitError(taskId, target, errMsg);
this.logger.error(`备份目标失败: ${target}, 错误: ${errMsg}`);
// 单个目标失败不中断整体任务
}
}
await this.repo.completeTask(taskId);
const duration = Date.now() - startTime;
const completedTask = await this.repo.findById(taskId);
this.gateway.emitComplete(
taskId,
Number(completedTask?.totalSize ?? 0),
duration,
);
this.logger.log(`备份任务完成: ${taskId}, 耗时: ${duration}ms`);
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
await this.repo.updateTaskStatus(taskId, SnapshotStatus.FAILED, errMsg);
this.gateway.emitError(taskId, undefined, errMsg);
this.logger.error(`备份任务失败: ${taskId}, ${errMsg}`);
} finally {
this.runningTaskId = null;
// MinIO 模式下清理空的临时目录
const task = await this.repo.findById(taskId);
if (task?.storageType === StorageType.MINIO) {
this.localStorage.deleteTask(taskId);
}
}
}
async deleteSnapshot(taskId: string): Promise<void> {
const task = await this.repo.findById(taskId);
if (!task) throw new Error(`任务不存在: ${taskId}`);
if (task.storageType === StorageType.LOCAL) {
this.localStorage.deleteTask(taskId);
} else if (task.storageType === StorageType.MINIO) {
await this.minioStorage.deleteByPrefix(`${task.storagePath}/`);
}
await this.repo.deleteTask(taskId);
this.logger.log(`备份已删除: ${taskId}`);
}
@Cron(CronExpression.EVERY_HOUR)
async cleanupExpiredSnapshots(): Promise<void> {
const deletedIds = this.localStorage.cleanupExpired(this.retentionHours);
if (deletedIds.length > 0) {
this.logger.log(`已清理 ${deletedIds.length} 个过期本地备份`);
for (const id of deletedIds) {
try {
await this.repo.deleteTask(id);
} catch {
// 任务记录可能已不存在
}
}
}
}
}

View File

@ -0,0 +1,4 @@
export const appConfig = () => ({
port: parseInt(process.env.APP_PORT || '3099', 10),
nodeEnv: process.env.NODE_ENV || 'development',
});

View File

@ -0,0 +1,4 @@
import { Module } from '@nestjs/common';
@Module({})
export class DomainModule {}

View File

@ -0,0 +1,28 @@
export enum BackupTarget {
POSTGRES = 'POSTGRES',
REDIS = 'REDIS',
KAFKA = 'KAFKA',
ZOOKEEPER = 'ZOOKEEPER',
MINIO = 'MINIO',
UPLOADS = 'UPLOADS',
}
/** 备份执行顺序 */
export const BACKUP_TARGET_ORDER: BackupTarget[] = [
BackupTarget.POSTGRES,
BackupTarget.REDIS,
BackupTarget.KAFKA,
BackupTarget.ZOOKEEPER,
BackupTarget.MINIO,
BackupTarget.UPLOADS,
];
/** 备份目标中文名 */
export const BACKUP_TARGET_LABELS: Record<BackupTarget, string> = {
[BackupTarget.POSTGRES]: 'PostgreSQL',
[BackupTarget.REDIS]: 'Redis',
[BackupTarget.KAFKA]: 'Kafka',
[BackupTarget.ZOOKEEPER]: 'ZooKeeper',
[BackupTarget.MINIO]: 'MinIO',
[BackupTarget.UPLOADS]: 'Uploads',
};

View File

@ -0,0 +1,3 @@
export * from './backup-target.enum';
export * from './snapshot-status.enum';
export * from './storage-type.enum';

View File

@ -0,0 +1,6 @@
export enum SnapshotStatus {
PENDING = 'PENDING',
RUNNING = 'RUNNING',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}

View File

@ -0,0 +1,4 @@
export enum StorageType {
MINIO = 'MINIO',
LOCAL = 'LOCAL',
}

View File

@ -0,0 +1,16 @@
import { BackupTarget } from '../enums';
export interface BackupResult {
fileName: string;
filePath: string;
fileSize: number;
}
export type ProgressCallback = (percent: number, message: string) => void;
export interface BackupHandler {
readonly target: BackupTarget;
execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult>;
}
export const BACKUP_HANDLER_TOKEN = 'BACKUP_HANDLER';

View File

@ -0,0 +1,84 @@
import { Injectable, Logger } from '@nestjs/common';
import * as fs from 'fs';
import * as path from 'path';
import * as archiver from 'archiver';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class KafkaBackupHandler implements BackupHandler {
readonly target = BackupTarget.KAFKA;
private readonly logger = new Logger(KafkaBackupHandler.name);
private readonly sourceDir = '/backup-source/kafka';
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `kafka-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
if (!fs.existsSync(this.sourceDir)) {
throw new Error(`Kafka 数据目录不存在: ${this.sourceDir}`);
}
onProgress(0, 'Kafka 数据打包开始...');
// 先计算总大小
const totalSize = this.getDirSize(this.sourceDir);
this.logger.log(`Kafka 数据目录大小: ${totalSize} bytes`);
return new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true });
let processedBytes = 0;
archive.on('progress', (progress) => {
processedBytes = progress.fs.processedBytes;
if (totalSize > 0) {
const percent = Math.min(99, Math.floor((processedBytes / totalSize) * 100));
onProgress(percent, `Kafka 打包中 ${percent}%`);
}
});
output.on('close', () => {
const fileSize = archive.pointer();
this.logger.log(`Kafka 备份完成: ${fileName}, 大小: ${fileSize} bytes`);
onProgress(100, 'Kafka 备份完成');
resolve({ fileName, filePath, fileSize });
});
archive.on('error', (err) => {
this.logger.error(`Kafka 备份打包失败: ${err.message}`);
reject(err);
});
archive.pipe(output);
archive.directory(this.sourceDir, 'kafka');
archive.finalize();
});
}
private getDirSize(dirPath: string): number {
let size = 0;
try {
const entries = fs.readdirSync(dirPath, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dirPath, entry.name);
if (entry.isFile()) {
size += fs.statSync(fullPath).size;
} else if (entry.isDirectory()) {
size += this.getDirSize(fullPath);
}
}
} catch {
// 忽略权限错误
}
return size;
}
}

View File

@ -0,0 +1,135 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as Minio from 'minio';
import * as fs from 'fs';
import * as path from 'path';
import * as archiver from 'archiver';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class MinioBackupHandler implements BackupHandler {
readonly target = BackupTarget.MINIO;
private readonly logger = new Logger(MinioBackupHandler.name);
private readonly minioClient: Minio.Client;
constructor(private readonly configService: ConfigService) {
const endpoint = this.configService.get<string>('MINIO_ENDPOINT', 'localhost');
const port = parseInt(this.configService.get<string>('MINIO_PORT', '9000'), 10);
const useSSL = this.configService.get<string>('MINIO_USE_SSL', 'false') === 'true';
const accessKey = this.configService.get<string>('MINIO_ACCESS_KEY', 'admin');
const secretKey = this.configService.get<string>('MINIO_SECRET_KEY', '');
this.minioClient = new Minio.Client({
endPoint: endpoint,
port,
useSSL,
accessKey,
secretKey,
});
}
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `minio-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
const tempDownloadDir = path.join(outputDir, `minio-temp-${timestamp}`);
fs.mkdirSync(outputDir, { recursive: true });
fs.mkdirSync(tempDownloadDir, { recursive: true });
onProgress(0, 'MinIO 桶列表获取中...');
try {
// 1. 列出所有桶
const buckets = await this.minioClient.listBuckets();
const backupBucket = this.configService.get<string>('MINIO_BACKUP_BUCKET', 'snapshots');
// 排除备份桶自身
const bucketsToBackup = buckets.filter((b) => b.name !== backupBucket);
if (bucketsToBackup.length === 0) {
throw new Error('没有可备份的 MinIO 桶');
}
this.logger.log(`发现 ${bucketsToBackup.length} 个桶待备份`);
// 2. 下载所有桶的所有对象到临时目录
let totalObjects = 0;
let downloadedObjects = 0;
// 先统计对象总数
for (const bucket of bucketsToBackup) {
const objects = await this.listAllObjects(bucket.name);
totalObjects += objects.length;
}
onProgress(5, `${totalObjects} 个对象待下载...`);
for (const bucket of bucketsToBackup) {
const bucketDir = path.join(tempDownloadDir, bucket.name);
fs.mkdirSync(bucketDir, { recursive: true });
const objects = await this.listAllObjects(bucket.name);
for (const obj of objects) {
const objectDir = path.join(bucketDir, path.dirname(obj.name));
fs.mkdirSync(objectDir, { recursive: true });
const objectPath = path.join(bucketDir, obj.name);
const dataStream = await this.minioClient.getObject(bucket.name, obj.name);
const writeStream = fs.createWriteStream(objectPath);
await new Promise<void>((resolve, reject) => {
dataStream.pipe(writeStream);
writeStream.on('finish', resolve);
writeStream.on('error', reject);
dataStream.on('error', reject);
});
downloadedObjects++;
const percent = 5 + Math.floor((downloadedObjects / totalObjects) * 70);
onProgress(percent, `下载对象 ${downloadedObjects}/${totalObjects}: ${bucket.name}/${obj.name}`);
}
}
onProgress(75, 'MinIO 对象下载完成,开始打包...');
// 3. 打包为 tar.gz
const result = await new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true });
output.on('close', () => {
const fileSize = archive.pointer();
onProgress(100, 'MinIO 备份完成');
resolve({ fileName, filePath, fileSize });
});
archive.on('error', reject);
archive.pipe(output);
archive.directory(tempDownloadDir, 'minio');
archive.finalize();
});
this.logger.log(`MinIO 备份完成: ${fileName}, 大小: ${result.fileSize} bytes`);
return result;
} finally {
// 清理临时下载目录
fs.rmSync(tempDownloadDir, { recursive: true, force: true });
}
}
private async listAllObjects(bucket: string): Promise<Minio.BucketItem[]> {
return new Promise((resolve, reject) => {
const objects: Minio.BucketItem[] = [];
const stream = this.minioClient.listObjects(bucket, '', true);
stream.on('data', (obj) => objects.push(obj));
stream.on('end', () => resolve(objects));
stream.on('error', reject);
});
}
}

View File

@ -0,0 +1,92 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { spawn } from 'child_process';
import * as fs from 'fs';
import * as path from 'path';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class PostgresBackupHandler implements BackupHandler {
readonly target = BackupTarget.POSTGRES;
private readonly logger = new Logger(PostgresBackupHandler.name);
private readonly host: string;
private readonly port: string;
private readonly user: string;
private readonly password: string;
constructor(private readonly configService: ConfigService) {
this.host = this.configService.get<string>('PG_HOST', 'localhost');
this.port = this.configService.get<string>('PG_PORT', '5432');
this.user = this.configService.get<string>('PG_USER', 'rwa_user');
this.password = this.configService.get<string>('PG_PASSWORD', '');
}
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `postgres-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'pg_basebackup 开始...');
return new Promise<BackupResult>((resolve, reject) => {
const outputStream = fs.createWriteStream(filePath);
const proc = spawn('pg_basebackup', [
'-h', this.host,
'-p', this.port,
'-U', this.user,
'-D', '-',
'-Ft',
'-z',
'-P',
'-v',
], {
env: { ...process.env, PGPASSWORD: this.password },
});
proc.stdout.pipe(outputStream);
let stderrBuffer = '';
proc.stderr.on('data', (data: Buffer) => {
const text = data.toString();
stderrBuffer += text;
// pg_basebackup 进度格式: "12345/67890 kB (18%), 0/1 tablespace"
const match = text.match(/\((\d+)%\)/);
if (match) {
const percent = parseInt(match[1], 10);
onProgress(percent, `PostgreSQL 备份中 ${percent}%`);
}
});
proc.on('close', (code) => {
if (code === 0) {
const stat = fs.statSync(filePath);
this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`);
onProgress(100, 'PostgreSQL 备份完成');
resolve({ fileName, filePath, fileSize: stat.size });
} else {
const error = `pg_basebackup 退出码: ${code}, stderr: ${stderrBuffer.slice(-500)}`;
this.logger.error(error);
// 清理不完整的文件
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(error));
}
});
proc.on('error', (err) => {
this.logger.error(`pg_basebackup 启动失败: ${err.message}`);
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
reject(new Error(`pg_basebackup 启动失败: ${err.message}`));
});
});
}
}

View File

@ -0,0 +1,116 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
import * as fs from 'fs';
import * as path from 'path';
import * as archiver from 'archiver';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class RedisBackupHandler implements BackupHandler {
readonly target = BackupTarget.REDIS;
private readonly logger = new Logger(RedisBackupHandler.name);
private readonly redisHost: string;
private readonly redisPort: number;
private readonly redisPassword: string;
private readonly sourceDir = '/backup-source/redis';
constructor(private readonly configService: ConfigService) {
this.redisHost = this.configService.get<string>('REDIS_HOST', 'localhost');
this.redisPort = this.configService.get<number>('REDIS_PORT', 6379);
this.redisPassword = this.configService.get<string>('REDIS_PASSWORD', '');
}
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `redis-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'Redis BGSAVE 触发中...');
// 1. 连接 Redis 触发 BGSAVE
const redis = new Redis({
host: this.redisHost,
port: this.redisPort,
password: this.redisPassword || undefined,
maxRetriesPerRequest: 3,
});
try {
const lastSaveBefore = await redis.lastsave();
await redis.bgsave();
this.logger.log('Redis BGSAVE 已触发');
// 2. 轮询等待 BGSAVE 完成
onProgress(10, 'Redis BGSAVE 执行中...');
let attempts = 0;
const maxAttempts = 120; // 最多等待 60 秒
while (attempts < maxAttempts) {
await new Promise((r) => setTimeout(r, 500));
const lastSaveAfter = await redis.lastsave();
if (lastSaveAfter > lastSaveBefore) {
break;
}
attempts++;
onProgress(10 + Math.min(40, Math.floor((attempts / maxAttempts) * 40)), 'Redis BGSAVE 执行中...');
}
if (attempts >= maxAttempts) {
throw new Error('Redis BGSAVE 超时 (60秒)');
}
onProgress(50, 'Redis BGSAVE 完成,开始打包文件...');
} finally {
redis.disconnect();
}
// 3. 从只读卷复制 dump.rdb+ aof 如果存在)
return new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true });
output.on('close', () => {
const fileSize = archive.pointer();
this.logger.log(`Redis 备份完成: ${fileName}, 大小: ${fileSize} bytes`);
onProgress(100, 'Redis 备份完成');
resolve({ fileName, filePath, fileSize });
});
archive.on('error', (err) => {
this.logger.error(`Redis 备份打包失败: ${err.message}`);
reject(err);
});
archive.pipe(output);
const dumpPath = path.join(this.sourceDir, 'dump.rdb');
if (fs.existsSync(dumpPath)) {
archive.file(dumpPath, { name: 'dump.rdb' });
onProgress(70, '打包 dump.rdb...');
}
const aofPath = path.join(this.sourceDir, 'appendonly.aof');
if (fs.existsSync(aofPath)) {
archive.file(aofPath, { name: 'appendonly.aof' });
onProgress(85, '打包 appendonly.aof...');
}
// AOF 目录Redis 7 multi-part AOF
const aofDir = path.join(this.sourceDir, 'appendonlydir');
if (fs.existsSync(aofDir)) {
archive.directory(aofDir, 'appendonlydir');
onProgress(85, '打包 appendonlydir/...');
}
archive.finalize();
});
}
}

View File

@ -0,0 +1,82 @@
import { Injectable, Logger } from '@nestjs/common';
import * as fs from 'fs';
import * as path from 'path';
import * as archiver from 'archiver';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class UploadsBackupHandler implements BackupHandler {
readonly target = BackupTarget.UPLOADS;
private readonly logger = new Logger(UploadsBackupHandler.name);
private readonly sourceDir = '/backup-source/uploads';
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `uploads-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
if (!fs.existsSync(this.sourceDir)) {
throw new Error(`Uploads 目录不存在: ${this.sourceDir}`);
}
onProgress(0, 'Uploads 文件打包开始...');
// 计算总大小
const totalSize = this.getDirSize(this.sourceDir);
this.logger.log(`Uploads 目录大小: ${totalSize} bytes`);
return new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true });
archive.on('progress', (progress) => {
if (totalSize > 0) {
const percent = Math.min(99, Math.floor((progress.fs.processedBytes / totalSize) * 100));
onProgress(percent, `Uploads 打包中 ${percent}%`);
}
});
output.on('close', () => {
const fileSize = archive.pointer();
this.logger.log(`Uploads 备份完成: ${fileName}, 大小: ${fileSize} bytes`);
onProgress(100, 'Uploads 备份完成');
resolve({ fileName, filePath, fileSize });
});
archive.on('error', (err) => {
this.logger.error(`Uploads 备份打包失败: ${err.message}`);
reject(err);
});
archive.pipe(output);
archive.directory(this.sourceDir, 'uploads');
archive.finalize();
});
}
private getDirSize(dirPath: string): number {
let size = 0;
try {
const entries = fs.readdirSync(dirPath, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dirPath, entry.name);
if (entry.isFile()) {
size += fs.statSync(fullPath).size;
} else if (entry.isDirectory()) {
size += this.getDirSize(fullPath);
}
}
} catch {
// 忽略权限错误
}
return size;
}
}

View File

@ -0,0 +1,60 @@
import { Injectable, Logger } from '@nestjs/common';
import * as fs from 'fs';
import * as path from 'path';
import * as archiver from 'archiver';
import { BackupTarget } from '@/domain/enums';
import {
BackupHandler,
BackupResult,
ProgressCallback,
} from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class ZookeeperBackupHandler implements BackupHandler {
readonly target = BackupTarget.ZOOKEEPER;
private readonly logger = new Logger(ZookeeperBackupHandler.name);
private readonly dataDir = '/backup-source/zookeeper/data';
private readonly logDir = '/backup-source/zookeeper/log';
async execute(outputDir: string, onProgress: ProgressCallback): Promise<BackupResult> {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const fileName = `zookeeper-${timestamp}.tar.gz`;
const filePath = path.join(outputDir, fileName);
fs.mkdirSync(outputDir, { recursive: true });
onProgress(0, 'ZooKeeper 数据打包开始...');
return new Promise<BackupResult>((resolve, reject) => {
const output = fs.createWriteStream(filePath);
const archive = archiver('tar', { gzip: true });
output.on('close', () => {
const fileSize = archive.pointer();
this.logger.log(`ZooKeeper 备份完成: ${fileName}, 大小: ${fileSize} bytes`);
onProgress(100, 'ZooKeeper 备份完成');
resolve({ fileName, filePath, fileSize });
});
archive.on('error', (err) => {
this.logger.error(`ZooKeeper 备份打包失败: ${err.message}`);
reject(err);
});
archive.pipe(output);
if (fs.existsSync(this.dataDir)) {
archive.directory(this.dataDir, 'data');
onProgress(30, '打包 ZooKeeper data/...');
}
if (fs.existsSync(this.logDir)) {
archive.directory(this.logDir, 'log');
onProgress(60, '打包 ZooKeeper log/...');
}
archive.finalize();
});
}
}

View File

@ -0,0 +1,80 @@
import { Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { PrismaService } from './persistence/prisma/prisma.service';
import { SnapshotRepository } from './persistence/repositories/snapshot.repository';
import { PostgresBackupHandler } from './backup/postgres-backup.handler';
import { RedisBackupHandler } from './backup/redis-backup.handler';
import { KafkaBackupHandler } from './backup/kafka-backup.handler';
import { ZookeeperBackupHandler } from './backup/zookeeper-backup.handler';
import { MinioBackupHandler } from './backup/minio-backup.handler';
import { UploadsBackupHandler } from './backup/uploads-backup.handler';
import { MinioStorageAdapter } from './storage/minio-storage.adapter';
import { LocalStorageAdapter } from './storage/local-storage.adapter';
import { SnapshotGateway } from '@/api/gateways/snapshot.gateway';
import { BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface';
import { BackupTarget } from '@/domain/enums';
@Module({
providers: [
PrismaService,
SnapshotRepository,
MinioStorageAdapter,
LocalStorageAdapter,
SnapshotGateway,
// 各备份 handler
PostgresBackupHandler,
RedisBackupHandler,
KafkaBackupHandler,
ZookeeperBackupHandler,
MinioBackupHandler,
UploadsBackupHandler,
// 备份 handler Map 注入
{
provide: BACKUP_HANDLER_TOKEN,
useFactory: (
configService: ConfigService,
pg: PostgresBackupHandler,
redis: RedisBackupHandler,
kafka: KafkaBackupHandler,
zk: ZookeeperBackupHandler,
minio: MinioBackupHandler,
uploads: UploadsBackupHandler,
) => {
const availableTargets = configService
.get<string>('AVAILABLE_TARGETS', '')
.split(',')
.map((t) => t.trim())
.filter(Boolean);
const allHandlers = [pg, redis, kafka, zk, minio, uploads];
const handlerMap = new Map<BackupTarget, typeof pg>();
for (const handler of allHandlers) {
if (availableTargets.includes(handler.target)) {
handlerMap.set(handler.target, handler);
}
}
return handlerMap;
},
inject: [
ConfigService,
PostgresBackupHandler,
RedisBackupHandler,
KafkaBackupHandler,
ZookeeperBackupHandler,
MinioBackupHandler,
UploadsBackupHandler,
],
},
],
exports: [
PrismaService,
SnapshotRepository,
MinioStorageAdapter,
LocalStorageAdapter,
SnapshotGateway,
BACKUP_HANDLER_TOKEN,
],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,16 @@
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
async onModuleInit() {
await this.$connect();
this.logger.log('SQLite database connected');
}
async onModuleDestroy() {
await this.$disconnect();
}
}

View File

@ -0,0 +1,159 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { BackupTarget, SnapshotStatus } from '@/domain/enums';
import { BackupResult } from '@/domain/interfaces/backup-handler.interface';
@Injectable()
export class SnapshotRepository {
private readonly logger = new Logger(SnapshotRepository.name);
constructor(private readonly prisma: PrismaService) {}
async createTask(data: {
storageType: string;
storagePath: string;
targets: BackupTarget[];
}) {
return this.prisma.snapshotTask.create({
data: {
storageType: data.storageType,
storagePath: data.storagePath,
targets: JSON.stringify(data.targets),
status: SnapshotStatus.PENDING,
details: {
create: data.targets.map((target) => ({
target,
status: SnapshotStatus.PENDING,
})),
},
},
include: { details: true },
});
}
async findById(id: string) {
return this.prisma.snapshotTask.findUnique({
where: { id },
include: { details: true },
});
}
async findAll(page: number = 1, limit: number = 20) {
const skip = (page - 1) * limit;
const [tasks, total] = await Promise.all([
this.prisma.snapshotTask.findMany({
skip,
take: limit,
orderBy: { createdAt: 'desc' },
include: { details: true },
}),
this.prisma.snapshotTask.count(),
]);
return { tasks, total, page, limit };
}
async updateTaskStatus(id: string, status: SnapshotStatus, error?: string) {
const data: Record<string, unknown> = { status };
if (status === SnapshotStatus.RUNNING) {
data.startedAt = new Date();
}
if (status === SnapshotStatus.COMPLETED || status === SnapshotStatus.FAILED) {
data.completedAt = new Date();
}
if (error) {
data.error = error;
}
return this.prisma.snapshotTask.update({ where: { id }, data });
}
async updateDetailStatus(taskId: string, target: string, status: SnapshotStatus) {
const detail = await this.prisma.snapshotDetail.findFirst({
where: { taskId, target },
});
if (!detail) return;
const data: Record<string, unknown> = { status };
if (status === SnapshotStatus.RUNNING) {
data.startedAt = new Date();
}
return this.prisma.snapshotDetail.update({ where: { id: detail.id }, data });
}
async updateDetailProgress(taskId: string, target: string, progress: number) {
const detail = await this.prisma.snapshotDetail.findFirst({
where: { taskId, target },
});
if (!detail) return;
return this.prisma.snapshotDetail.update({
where: { id: detail.id },
data: { progress },
});
}
async completeDetail(taskId: string, target: string, result: BackupResult) {
const detail = await this.prisma.snapshotDetail.findFirst({
where: { taskId, target },
});
if (!detail) return;
return this.prisma.snapshotDetail.update({
where: { id: detail.id },
data: {
status: SnapshotStatus.COMPLETED,
progress: 100,
fileSize: BigInt(result.fileSize),
fileName: result.fileName,
completedAt: new Date(),
},
});
}
async failDetail(taskId: string, target: string, error: string) {
const detail = await this.prisma.snapshotDetail.findFirst({
where: { taskId, target },
});
if (!detail) return;
return this.prisma.snapshotDetail.update({
where: { id: detail.id },
data: {
status: SnapshotStatus.FAILED,
error,
completedAt: new Date(),
},
});
}
async completeTask(id: string) {
const details = await this.prisma.snapshotDetail.findMany({
where: { taskId: id },
});
const totalSize = details.reduce((sum, d) => sum + d.fileSize, BigInt(0));
const hasFailure = details.some((d) => d.status === SnapshotStatus.FAILED);
const allFailed = details.every((d) => d.status === SnapshotStatus.FAILED);
return this.prisma.snapshotTask.update({
where: { id },
data: {
status: allFailed ? SnapshotStatus.FAILED : SnapshotStatus.COMPLETED,
totalSize,
completedAt: new Date(),
error: hasFailure ? '部分备份目标失败,请查看详情' : null,
},
});
}
async deleteTask(id: string) {
return this.prisma.snapshotTask.delete({ where: { id } });
}
async findExpiredLocalTasks(retentionHours: number) {
const threshold = new Date(Date.now() - retentionHours * 60 * 60 * 1000);
return this.prisma.snapshotTask.findMany({
where: {
storageType: 'LOCAL',
createdAt: { lt: threshold },
},
include: { details: true },
});
}
}

View File

@ -0,0 +1,61 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as fs from 'fs';
import * as path from 'path';
@Injectable()
export class LocalStorageAdapter {
private readonly logger = new Logger(LocalStorageAdapter.name);
private readonly tempDir: string;
constructor(private readonly configService: ConfigService) {
this.tempDir = this.configService.get<string>('SNAPSHOT_TEMP_DIR', './data/snapshots');
fs.mkdirSync(this.tempDir, { recursive: true });
this.logger.log(`Local storage adapter configured: ${this.tempDir}`);
}
getTaskDir(taskId: string): string {
const taskDir = path.join(this.tempDir, taskId);
fs.mkdirSync(taskDir, { recursive: true });
return taskDir;
}
getFilePath(taskId: string, fileName: string): string {
return path.join(this.tempDir, taskId, fileName);
}
fileExists(taskId: string, fileName: string): boolean {
return fs.existsSync(this.getFilePath(taskId, fileName));
}
deleteTask(taskId: string): void {
const taskDir = path.join(this.tempDir, taskId);
if (fs.existsSync(taskDir)) {
fs.rmSync(taskDir, { recursive: true, force: true });
this.logger.log(`Deleted local snapshot: ${taskDir}`);
}
}
cleanupExpired(retentionHours: number): string[] {
const threshold = Date.now() - retentionHours * 60 * 60 * 1000;
const deletedIds: string[] = [];
if (!fs.existsSync(this.tempDir)) return deletedIds;
const entries = fs.readdirSync(this.tempDir, { withFileTypes: true });
for (const entry of entries) {
if (!entry.isDirectory()) continue;
const dirPath = path.join(this.tempDir, entry.name);
const stat = fs.statSync(dirPath);
if (stat.mtimeMs < threshold) {
fs.rmSync(dirPath, { recursive: true, force: true });
deletedIds.push(entry.name);
this.logger.log(`Cleaned up expired snapshot: ${entry.name}`);
}
}
return deletedIds;
}
}

View File

@ -0,0 +1,88 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as Minio from 'minio';
import * as fs from 'fs';
@Injectable()
export class MinioStorageAdapter implements OnModuleInit {
private readonly logger = new Logger(MinioStorageAdapter.name);
private readonly minioClient: Minio.Client;
private readonly bucketName: string;
constructor(private readonly configService: ConfigService) {
const endpoint = this.configService.get<string>('MINIO_ENDPOINT', 'localhost');
const port = parseInt(this.configService.get<string>('MINIO_PORT', '9000'), 10);
const useSSL = this.configService.get<string>('MINIO_USE_SSL', 'false') === 'true';
const accessKey = this.configService.get<string>('MINIO_ACCESS_KEY', 'admin');
const secretKey = this.configService.get<string>('MINIO_SECRET_KEY', '');
this.bucketName = this.configService.get<string>('MINIO_BACKUP_BUCKET', 'snapshots');
this.minioClient = new Minio.Client({
endPoint: endpoint,
port,
useSSL,
accessKey,
secretKey,
});
this.logger.log(`MinIO storage adapter configured: ${endpoint}:${port}, bucket: ${this.bucketName}`);
}
async onModuleInit() {
await this.ensureBucketExists();
}
private async ensureBucketExists(): Promise<void> {
try {
const exists = await this.minioClient.bucketExists(this.bucketName);
if (!exists) {
await this.minioClient.makeBucket(this.bucketName, 'cn-east-1');
this.logger.log(`Created backup bucket: ${this.bucketName}`);
} else {
this.logger.log(`Backup bucket exists: ${this.bucketName}`);
}
} catch (error) {
this.logger.error(`Failed to ensure backup bucket exists: ${error.message}`);
}
}
async upload(localFilePath: string, objectPath: string): Promise<string> {
const stat = fs.statSync(localFilePath);
await this.minioClient.fPutObject(
this.bucketName,
objectPath,
localFilePath,
{ 'Content-Type': 'application/gzip' },
);
this.logger.log(`Uploaded to MinIO: ${this.bucketName}/${objectPath}, size: ${stat.size} bytes`);
return `${this.bucketName}/${objectPath}`;
}
async delete(objectPath: string): Promise<void> {
try {
await this.minioClient.removeObject(this.bucketName, objectPath);
this.logger.log(`Deleted from MinIO: ${this.bucketName}/${objectPath}`);
} catch (error) {
this.logger.error(`Failed to delete from MinIO: ${error.message}`);
}
}
async deleteByPrefix(prefix: string): Promise<void> {
const objects: string[] = [];
const stream = this.minioClient.listObjects(this.bucketName, prefix, true);
await new Promise<void>((resolve, reject) => {
stream.on('data', (obj) => objects.push(obj.name));
stream.on('end', resolve);
stream.on('error', reject);
});
if (objects.length > 0) {
await this.minioClient.removeObjects(this.bucketName, objects);
this.logger.log(`Deleted ${objects.length} objects from MinIO with prefix: ${prefix}`);
}
}
}

View File

@ -0,0 +1,49 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('Bootstrap');
const app = await NestFactory.create(AppModule);
// 全局前缀
app.setGlobalPrefix('api/v1');
// 全局验证管道
app.useGlobalPipes(
new ValidationPipe({
whitelist: true,
forbidNonWhitelisted: true,
transform: true,
transformOptions: { enableImplicitConversion: true },
}),
);
// CORS 配置
app.enableCors({
origin: '*',
methods: 'GET,HEAD,PUT,PATCH,POST,DELETE',
credentials: true,
});
// Swagger API 文档
const config = new DocumentBuilder()
.setTitle('Snapshot Service API')
.setDescription('RWA 榴莲皇后平台数据快照备份服务 API')
.setVersion('1.0.0')
.addTag('快照备份', '数据备份相关接口')
.addTag('健康检查', '服务健康检查接口')
.build();
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api/docs', app, document);
const port = process.env.APP_PORT || 3099;
await app.listen(port);
logger.log(`Snapshot Service is running on port ${port}`);
logger.log(`Swagger docs: http://localhost:${port}/api/docs`);
}
bootstrap();

View File

@ -0,0 +1,24 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"removeComments": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"allowSyntheticDefaultImports": true,
"target": "ES2021",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true,
"strictNullChecks": true,
"noImplicitAny": true,
"strictBindCallApply": true,
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"paths": {
"@/*": ["src/*"]
}
}
}

View File

@ -28,6 +28,7 @@
"react-redux": "^9.2.0", "react-redux": "^9.2.0",
"recharts": "^2.15.0", "recharts": "^2.15.0",
"redux-persist": "^6.0.0", "redux-persist": "^6.0.0",
"socket.io-client": "^4.7.4",
"xlsx": "^0.18.5", "xlsx": "^0.18.5",
"zustand": "^5.0.3" "zustand": "^5.0.3"
}, },

View File

@ -0,0 +1,290 @@
.container {
max-width: 1200px;
margin: 0 auto;
}
.title {
font-size: 1.5rem;
font-weight: 600;
margin-bottom: 1.5rem;
color: #1a1a2e;
}
.card {
background: #fff;
border-radius: 12px;
padding: 1.5rem;
margin-bottom: 1.5rem;
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.08);
}
.cardTitle {
font-size: 1.1rem;
font-weight: 600;
margin-bottom: 1rem;
display: flex;
align-items: center;
gap: 0.75rem;
}
.wsStatus {
font-size: 0.75rem;
font-weight: 400;
color: #52c41a;
background: #f6ffed;
padding: 2px 8px;
border-radius: 4px;
}
.formGroup {
margin-bottom: 1rem;
}
.label {
display: flex;
align-items: center;
gap: 0.5rem;
font-weight: 500;
margin-bottom: 0.5rem;
font-size: 0.9rem;
color: #333;
}
.radioGroup {
display: flex;
gap: 1.5rem;
}
.radio {
display: flex;
align-items: center;
gap: 0.35rem;
cursor: pointer;
font-size: 0.9rem;
input {
cursor: pointer;
}
}
.checkboxGroup {
display: flex;
flex-wrap: wrap;
gap: 1rem;
}
.checkbox {
display: flex;
align-items: center;
gap: 0.35rem;
cursor: pointer;
font-size: 0.9rem;
background: #f5f5f5;
padding: 6px 12px;
border-radius: 6px;
transition: background 0.2s;
&:hover {
background: #e8e8e8;
}
input {
cursor: pointer;
}
}
.selectAllBtn {
background: none;
border: none;
color: #1890ff;
cursor: pointer;
font-size: 0.8rem;
padding: 0;
&:hover {
text-decoration: underline;
}
&:disabled {
color: #ccc;
cursor: not-allowed;
}
}
.input {
width: 100%;
max-width: 400px;
padding: 8px 12px;
border: 1px solid #d9d9d9;
border-radius: 6px;
font-size: 0.9rem;
outline: none;
transition: border-color 0.2s;
&:focus {
border-color: #1890ff;
}
&:disabled {
background: #f5f5f5;
cursor: not-allowed;
}
}
.startBtn {
background: #1890ff;
color: #fff;
border: none;
padding: 10px 32px;
border-radius: 6px;
font-size: 0.95rem;
font-weight: 500;
cursor: pointer;
transition: background 0.2s;
&:hover:not(:disabled) {
background: #40a9ff;
}
&:disabled {
background: #d9d9d9;
cursor: not-allowed;
}
}
.progressList {
display: flex;
flex-direction: column;
gap: 1rem;
}
.progressItem {
background: #fafafa;
padding: 0.75rem 1rem;
border-radius: 8px;
}
.progressHeader {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 6px;
font-size: 0.9rem;
}
.progressTarget {
font-weight: 500;
}
.progressBar {
height: 8px;
background: #f0f0f0;
border-radius: 4px;
overflow: hidden;
}
.progressFill {
height: 100%;
border-radius: 4px;
transition: width 0.3s ease;
}
.progressMsg {
font-size: 0.8rem;
color: #888;
margin-top: 4px;
}
.completedInfo {
margin-top: 1rem;
padding: 0.75rem 1rem;
background: #f6ffed;
border: 1px solid #b7eb8f;
border-radius: 6px;
color: #52c41a;
font-weight: 500;
}
.tableWrapper {
overflow-x: auto;
}
.table {
width: 100%;
border-collapse: collapse;
font-size: 0.9rem;
th,
td {
padding: 10px 12px;
text-align: left;
border-bottom: 1px solid #f0f0f0;
}
th {
font-weight: 600;
color: #666;
background: #fafafa;
}
tr:hover td {
background: #fafafa;
}
}
.empty {
text-align: center !important;
color: #999;
padding: 2rem 0 !important;
}
.downloadLink {
color: #1890ff;
text-decoration: none;
margin-right: 8px;
font-size: 0.85rem;
&:hover {
text-decoration: underline;
}
}
.deleteBtn {
background: none;
border: none;
color: #ff4d4f;
cursor: pointer;
font-size: 0.85rem;
padding: 0;
&:hover {
text-decoration: underline;
}
}
.pagination {
display: flex;
justify-content: center;
align-items: center;
gap: 1rem;
margin-top: 1rem;
font-size: 0.9rem;
button {
background: #fff;
border: 1px solid #d9d9d9;
padding: 4px 12px;
border-radius: 4px;
cursor: pointer;
&:hover:not(:disabled) {
border-color: #1890ff;
color: #1890ff;
}
&:disabled {
color: #ccc;
cursor: not-allowed;
}
}
}

View File

@ -0,0 +1,356 @@
'use client';
import { useState, useEffect, useCallback } from 'react';
import { snapshotApi } from '@/infrastructure/api/snapshot.api';
import { useSnapshotWebSocket } from '@/hooks/useSnapshotWebSocket';
import type {
BackupTarget,
StorageType,
SnapshotTask,
CreateSnapshotDto,
SnapshotStatus,
} from '@/types/snapshot.types';
import { BACKUP_TARGET_LABELS } from '@/types/snapshot.types';
import styles from './page.module.scss';
function formatBytes(bytes: string | number): string {
const b = typeof bytes === 'string' ? parseInt(bytes, 10) : bytes;
if (b === 0) return '0 B';
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
const i = Math.floor(Math.log(b) / Math.log(1024));
return `${(b / Math.pow(1024, i)).toFixed(2)} ${units[i]}`;
}
function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000);
const m = Math.floor(seconds / 60);
const s = seconds % 60;
return m > 0 ? `${m}${s}` : `${s}`;
}
function formatTime(iso: string | null): string {
if (!iso) return '-';
return new Date(iso).toLocaleString('zh-CN');
}
function statusLabel(status: SnapshotStatus): string {
const map: Record<SnapshotStatus, string> = {
PENDING: '等待中',
RUNNING: '执行中',
COMPLETED: '已完成',
FAILED: '失败',
};
return map[status] || status;
}
function statusColor(status: SnapshotStatus): string {
const map: Record<SnapshotStatus, string> = {
PENDING: '#999',
RUNNING: '#1890ff',
COMPLETED: '#52c41a',
FAILED: '#ff4d4f',
};
return map[status] || '#999';
}
export default function SnapshotsPage() {
// 可用目标
const [availableTargets, setAvailableTargets] = useState<BackupTarget[]>([]);
const [isServiceRunning, setIsServiceRunning] = useState(false);
// 表单状态
const [storageType, setStorageType] = useState<StorageType>('MINIO');
const [storagePath, setStoragePath] = useState('');
const [selectedTargets, setSelectedTargets] = useState<BackupTarget[]>([]);
const [isSubmitting, setIsSubmitting] = useState(false);
// 当前执行中的任务
const [activeTaskId, setActiveTaskId] = useState<string | null>(null);
const { progresses, isConnected, taskCompleted, totalSize, duration } =
useSnapshotWebSocket(activeTaskId);
// 历史列表
const [tasks, setTasks] = useState<SnapshotTask[]>([]);
const [total, setTotal] = useState(0);
const [page, setPage] = useState(1);
// 加载可用目标
useEffect(() => {
snapshotApi
.getTargets()
.then((res) => {
setAvailableTargets(res.targets as BackupTarget[]);
setSelectedTargets(res.targets as BackupTarget[]);
setIsServiceRunning(res.isRunning);
})
.catch(console.error);
}, []);
// 加载历史列表
const loadHistory = useCallback(() => {
snapshotApi
.list(page, 10)
.then((res) => {
setTasks(res.tasks);
setTotal(res.total);
})
.catch(console.error);
}, [page]);
useEffect(() => {
loadHistory();
}, [loadHistory]);
// 任务完成后刷新列表
useEffect(() => {
if (taskCompleted) {
loadHistory();
setIsServiceRunning(false);
}
}, [taskCompleted, loadHistory]);
// 默认 MinIO 路径
useEffect(() => {
if (storageType === 'MINIO' && !storagePath) {
const date = new Date().toISOString().split('T')[0];
setStoragePath(`snapshots/${date}`);
}
}, [storageType, storagePath]);
// 开始备份
const handleStart = async () => {
if (selectedTargets.length === 0) return;
setIsSubmitting(true);
try {
const dto: CreateSnapshotDto = {
storageType,
storagePath: storageType === 'LOCAL' ? 'local' : storagePath,
targets: selectedTargets,
};
const res = await snapshotApi.create(dto);
setActiveTaskId(res.taskId);
setIsServiceRunning(true);
} catch (err: any) {
alert(err.message || '创建备份任务失败');
} finally {
setIsSubmitting(false);
}
};
// 切换目标选择
const toggleTarget = (target: BackupTarget) => {
setSelectedTargets((prev) =>
prev.includes(target) ? prev.filter((t) => t !== target) : [...prev, target],
);
};
// 全选/取消全选
const toggleAll = () => {
if (selectedTargets.length === availableTargets.length) {
setSelectedTargets([]);
} else {
setSelectedTargets([...availableTargets]);
}
};
// 删除备份
const handleDelete = async (id: string) => {
if (!confirm('确定删除该备份?文件将被永久删除。')) return;
try {
await snapshotApi.delete(id);
loadHistory();
} catch (err: any) {
alert(err.message || '删除失败');
}
};
return (
<div className={styles.container}>
<h1 className={styles.title}></h1>
{/* 创建备份表单 */}
<div className={styles.card}>
<h2 className={styles.cardTitle}></h2>
<div className={styles.formGroup}>
<label className={styles.label}></label>
<div className={styles.radioGroup}>
<label className={styles.radio}>
<input
type="radio"
checked={storageType === 'MINIO'}
onChange={() => setStorageType('MINIO')}
disabled={isServiceRunning}
/>
MinIO
</label>
<label className={styles.radio}>
<input
type="radio"
checked={storageType === 'LOCAL'}
onChange={() => setStorageType('LOCAL')}
disabled={isServiceRunning}
/>
()
</label>
</div>
</div>
{storageType === 'MINIO' && (
<div className={styles.formGroup}>
<label className={styles.label}>MinIO </label>
<input
type="text"
className={styles.input}
value={storagePath}
onChange={(e) => setStoragePath(e.target.value)}
placeholder="snapshots/2026-02-23"
disabled={isServiceRunning}
/>
</div>
)}
<div className={styles.formGroup}>
<label className={styles.label}>
<button type="button" className={styles.selectAllBtn} onClick={toggleAll} disabled={isServiceRunning}>
{selectedTargets.length === availableTargets.length ? '取消全选' : '全选'}
</button>
</label>
<div className={styles.checkboxGroup}>
{availableTargets.map((target) => (
<label key={target} className={styles.checkbox}>
<input
type="checkbox"
checked={selectedTargets.includes(target)}
onChange={() => toggleTarget(target)}
disabled={isServiceRunning}
/>
{BACKUP_TARGET_LABELS[target] || target}
</label>
))}
</div>
</div>
<button
className={styles.startBtn}
onClick={handleStart}
disabled={isSubmitting || isServiceRunning || selectedTargets.length === 0}
>
{isServiceRunning ? '备份执行中...' : '开始备份'}
</button>
</div>
{/* 实时进度 */}
{activeTaskId && progresses.size > 0 && (
<div className={styles.card}>
<h2 className={styles.cardTitle}>
{isConnected && <span className={styles.wsStatus}>WebSocket </span>}
</h2>
<div className={styles.progressList}>
{Array.from(progresses.values()).map((p) => (
<div key={p.target} className={styles.progressItem}>
<div className={styles.progressHeader}>
<span className={styles.progressTarget}>
{BACKUP_TARGET_LABELS[p.target] || p.target}
</span>
<span style={{ color: statusColor(p.status.toUpperCase() as SnapshotStatus) }}>
{p.status === 'completed' ? '100%' : p.status === 'failed' ? '失败' : `${p.percent}%`}
</span>
</div>
<div className={styles.progressBar}>
<div
className={styles.progressFill}
style={{
width: `${p.percent}%`,
backgroundColor:
p.status === 'failed' ? '#ff4d4f' : p.status === 'completed' ? '#52c41a' : '#1890ff',
}}
/>
</div>
<div className={styles.progressMsg}>{p.message}</div>
</div>
))}
</div>
{taskCompleted && (
<div className={styles.completedInfo}>
: {formatBytes(totalSize || '0')}: {formatDuration(duration || 0)}
</div>
)}
</div>
)}
{/* 历史备份列表 */}
<div className={styles.card}>
<h2 className={styles.cardTitle}></h2>
<div className={styles.tableWrapper}>
<table className={styles.table}>
<thead>
<tr>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
</tr>
</thead>
<tbody>
{tasks.length === 0 && (
<tr>
<td colSpan={6} className={styles.empty}></td>
</tr>
)}
{tasks.map((task) => (
<tr key={task.id}>
<td>{formatTime(task.createdAt)}</td>
<td>{task.targets.map((t) => BACKUP_TARGET_LABELS[t] || t).join(', ')}</td>
<td>{formatBytes(task.totalSize)}</td>
<td>{task.storageType === 'MINIO' ? 'MinIO' : '本地'}</td>
<td style={{ color: statusColor(task.status) }}>{statusLabel(task.status)}</td>
<td>
{task.storageType === 'LOCAL' && task.status === 'COMPLETED' && (
<>
{task.details
.filter((d) => d.fileName)
.map((d) => (
<a
key={d.target}
href={snapshotApi.getDownloadUrl(task.id, d.target)}
className={styles.downloadLink}
download
>
{BACKUP_TARGET_LABELS[d.target]}
</a>
))}
</>
)}
<button
className={styles.deleteBtn}
onClick={() => handleDelete(task.id)}
>
</button>
</td>
</tr>
))}
</tbody>
</table>
</div>
{total > 10 && (
<div className={styles.pagination}>
<button disabled={page <= 1} onClick={() => setPage(page - 1)}></button>
<span> {page} / {Math.ceil(total / 10)} </span>
<button disabled={page >= Math.ceil(total / 10)} onClick={() => setPage(page + 1)}></button>
</div>
)}
</div>
</div>
);
}

View File

@ -39,6 +39,8 @@ const topMenuItems: MenuItem[] = [
{ key: 'pre-planting', icon: '/images/Container3.svg', label: '预种管理', path: '/pre-planting' }, { key: 'pre-planting', icon: '/images/Container3.svg', label: '预种管理', path: '/pre-planting' },
// [2026-02-19] 纯新增:树转让管理 // [2026-02-19] 纯新增:树转让管理
{ key: 'transfers', icon: '/images/Container5.svg', label: '转让管理', path: '/transfers' }, { key: 'transfers', icon: '/images/Container5.svg', label: '转让管理', path: '/transfers' },
// [2026-02-23] 纯新增:数据快照备份
{ key: 'snapshots', icon: '/images/Container6.svg', label: '数据快照', path: '/snapshots' },
{ key: 'maintenance', icon: '/images/Container6.svg', label: '系统维护', path: '/maintenance' }, { key: 'maintenance', icon: '/images/Container6.svg', label: '系统维护', path: '/maintenance' },
{ key: 'settings', icon: '/images/Container6.svg', label: '系统设置', path: '/settings' }, { key: 'settings', icon: '/images/Container6.svg', label: '系统设置', path: '/settings' },
]; ];

View File

@ -0,0 +1,117 @@
'use client';
import { useEffect, useState, useRef, useCallback } from 'react';
import { io, Socket } from 'socket.io-client';
import type { BackupTarget, SnapshotProgress } from '@/types/snapshot.types';
const SNAPSHOT_WS_URL = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3099';
interface TargetProgress {
target: BackupTarget;
percent: number;
message: string;
status: 'pending' | 'running' | 'completed' | 'failed';
}
interface UseSnapshotWebSocketReturn {
progresses: Map<string, TargetProgress>;
isConnected: boolean;
taskCompleted: boolean;
totalSize: string | null;
duration: number | null;
error: string | null;
}
export function useSnapshotWebSocket(taskId: string | null): UseSnapshotWebSocketReturn {
const [progresses, setProgresses] = useState<Map<string, TargetProgress>>(new Map());
const [isConnected, setIsConnected] = useState(false);
const [taskCompleted, setTaskCompleted] = useState(false);
const [totalSize, setTotalSize] = useState<string | null>(null);
const [duration, setDuration] = useState<number | null>(null);
const [error, setError] = useState<string | null>(null);
const socketRef = useRef<Socket | null>(null);
useEffect(() => {
if (!taskId) return;
// 重置状态
setProgresses(new Map());
setTaskCompleted(false);
setTotalSize(null);
setDuration(null);
setError(null);
const socket = io(`${SNAPSHOT_WS_URL}/snapshots`, {
transports: ['websocket', 'polling'],
});
socketRef.current = socket;
socket.on('connect', () => setIsConnected(true));
socket.on('disconnect', () => setIsConnected(false));
socket.on('snapshot:started', (data: { taskId: string; targets: BackupTarget[] }) => {
if (data.taskId !== taskId) return;
const initial = new Map<string, TargetProgress>();
data.targets.forEach((t) => {
initial.set(t, { target: t, percent: 0, message: '等待中', status: 'pending' });
});
setProgresses(initial);
});
socket.on('snapshot:progress', (data: SnapshotProgress) => {
if (data.taskId !== taskId) return;
setProgresses((prev) => {
const next = new Map(prev);
next.set(data.target, {
target: data.target,
percent: data.percent,
message: data.message,
status: 'running',
});
return next;
});
});
socket.on('snapshot:target-complete', (data: { taskId: string; target: string; fileSize: string }) => {
if (data.taskId !== taskId) return;
setProgresses((prev) => {
const next = new Map(prev);
const existing = next.get(data.target);
if (existing) {
next.set(data.target, { ...existing, percent: 100, message: '完成', status: 'completed' });
}
return next;
});
});
socket.on('snapshot:complete', (data: { taskId: string; totalSize: string; duration: number }) => {
if (data.taskId !== taskId) return;
setTaskCompleted(true);
setTotalSize(data.totalSize);
setDuration(data.duration);
});
socket.on('snapshot:error', (data: { taskId: string; target?: string; error: string }) => {
if (data.taskId !== taskId) return;
if (data.target) {
setProgresses((prev) => {
const next = new Map(prev);
const existing = next.get(data.target!);
if (existing) {
next.set(data.target!, { ...existing, message: data.error, status: 'failed' });
}
return next;
});
} else {
setError(data.error);
}
});
return () => {
socket.disconnect();
socketRef.current = null;
};
}, [taskId]);
return { progresses, isConnected, taskCompleted, totalSize, duration, error };
}

View File

@ -0,0 +1,44 @@
import type { CreateSnapshotDto, SnapshotTask } from '@/types/snapshot.types';
const SNAPSHOT_BASE = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3099';
/** snapshot-service 独立请求(不走通用 apiClient因为 snapshot 服务有独立端口) */
async function snapshotFetch<T>(url: string, options?: RequestInit): Promise<T> {
const token = typeof window !== 'undefined' ? localStorage.getItem('token') : null;
const res = await fetch(`${SNAPSHOT_BASE}/api/v1${url}`, {
headers: {
'Content-Type': 'application/json',
...(token ? { Authorization: `Bearer ${token}` } : {}),
},
...options,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ message: res.statusText }));
throw new Error(err.message || `请求失败: ${res.status}`);
}
return res.json();
}
export const snapshotApi = {
getTargets: () =>
snapshotFetch<{ targets: string[]; isRunning: boolean }>('/snapshots/targets'),
create: (data: CreateSnapshotDto) =>
snapshotFetch<{ taskId: string; message: string }>('/snapshots', {
method: 'POST',
body: JSON.stringify(data),
}),
list: (page = 1, limit = 20) =>
snapshotFetch<{ tasks: SnapshotTask[]; total: number; page: number; limit: number }>(
`/snapshots?page=${page}&limit=${limit}`,
),
getById: (id: string) => snapshotFetch<SnapshotTask>(`/snapshots/${id}`),
delete: (id: string) =>
snapshotFetch<{ message: string }>(`/snapshots/${id}`, { method: 'DELETE' }),
getDownloadUrl: (id: string, target: string) =>
`${SNAPSHOT_BASE}/api/v1/snapshots/${id}/download/${target}`,
};

View File

@ -0,0 +1,51 @@
export type BackupTarget = 'POSTGRES' | 'REDIS' | 'KAFKA' | 'ZOOKEEPER' | 'MINIO' | 'UPLOADS';
export type StorageType = 'MINIO' | 'LOCAL';
export type SnapshotStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED';
export const BACKUP_TARGET_LABELS: Record<BackupTarget, string> = {
POSTGRES: 'PostgreSQL',
REDIS: 'Redis',
KAFKA: 'Kafka',
ZOOKEEPER: 'ZooKeeper',
MINIO: 'MinIO',
UPLOADS: 'Uploads',
};
export interface SnapshotDetail {
id: string;
target: BackupTarget;
status: SnapshotStatus;
progress: number;
fileSize: string;
fileName: string | null;
error: string | null;
startedAt: string | null;
completedAt: string | null;
}
export interface SnapshotTask {
id: string;
status: SnapshotStatus;
storageType: StorageType;
storagePath: string;
targets: BackupTarget[];
totalSize: string;
error: string | null;
startedAt: string | null;
completedAt: string | null;
createdAt: string;
details: SnapshotDetail[];
}
export interface CreateSnapshotDto {
storageType: StorageType;
storagePath: string;
targets: BackupTarget[];
}
export interface SnapshotProgress {
taskId: string;
target: BackupTarget;
percent: number;
message: string;
}

View File

@ -39,6 +39,7 @@
"react-dom": "^18.2.0", "react-dom": "^18.2.0",
"react-hook-form": "^7.50.1", "react-hook-form": "^7.50.1",
"react-redux": "^9.1.0", "react-redux": "^9.1.0",
"socket.io-client": "^4.7.4",
"tailwind-merge": "^2.2.1", "tailwind-merge": "^2.2.1",
"tailwindcss-animate": "^1.0.7", "tailwindcss-animate": "^1.0.7",
"zod": "^3.22.4", "zod": "^3.22.4",

View File

@ -0,0 +1,359 @@
'use client';
import { useState, useEffect, useCallback } from 'react';
import { snapshotApi } from '@/lib/api/snapshot.api';
import { useSnapshotWebSocket } from '@/lib/hooks/useSnapshotWebSocket';
import type {
BackupTarget,
StorageType,
SnapshotTask,
CreateSnapshotDto,
SnapshotStatus,
} from '@/types/snapshot.types';
import { BACKUP_TARGET_LABELS } from '@/types/snapshot.types';
function formatBytes(bytes: string | number): string {
const b = typeof bytes === 'string' ? parseInt(bytes, 10) : bytes;
if (b === 0) return '0 B';
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
const i = Math.floor(Math.log(b) / Math.log(1024));
return `${(b / Math.pow(1024, i)).toFixed(2)} ${units[i]}`;
}
function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000);
const m = Math.floor(seconds / 60);
const s = seconds % 60;
return m > 0 ? `${m}${s}` : `${s}`;
}
function formatTime(iso: string | null): string {
if (!iso) return '-';
return new Date(iso).toLocaleString('zh-CN');
}
const STATUS_MAP: Record<SnapshotStatus, { label: string; color: string }> = {
PENDING: { label: '等待中', color: 'text-muted-foreground' },
RUNNING: { label: '执行中', color: 'text-blue-500' },
COMPLETED: { label: '已完成', color: 'text-green-500' },
FAILED: { label: '失败', color: 'text-red-500' },
};
export default function SnapshotsPage() {
const [availableTargets, setAvailableTargets] = useState<BackupTarget[]>([]);
const [isServiceRunning, setIsServiceRunning] = useState(false);
const [storageType, setStorageType] = useState<StorageType>('MINIO');
const [storagePath, setStoragePath] = useState('');
const [selectedTargets, setSelectedTargets] = useState<BackupTarget[]>([]);
const [isSubmitting, setIsSubmitting] = useState(false);
const [activeTaskId, setActiveTaskId] = useState<string | null>(null);
const { progresses, isConnected, taskCompleted, totalSize, duration } =
useSnapshotWebSocket(activeTaskId);
const [tasks, setTasks] = useState<SnapshotTask[]>([]);
const [total, setTotal] = useState(0);
const [page, setPage] = useState(1);
useEffect(() => {
snapshotApi
.getTargets()
.then((res) => {
setAvailableTargets(res.targets as BackupTarget[]);
setSelectedTargets(res.targets as BackupTarget[]);
setIsServiceRunning(res.isRunning);
})
.catch(console.error);
}, []);
const loadHistory = useCallback(() => {
snapshotApi
.list(page, 10)
.then((res) => {
setTasks(res.tasks);
setTotal(res.total);
})
.catch(console.error);
}, [page]);
useEffect(() => { loadHistory(); }, [loadHistory]);
useEffect(() => {
if (taskCompleted) {
loadHistory();
setIsServiceRunning(false);
}
}, [taskCompleted, loadHistory]);
useEffect(() => {
if (storageType === 'MINIO' && !storagePath) {
const date = new Date().toISOString().split('T')[0];
setStoragePath(`snapshots/${date}`);
}
}, [storageType, storagePath]);
const handleStart = async () => {
if (selectedTargets.length === 0) return;
setIsSubmitting(true);
try {
const dto: CreateSnapshotDto = {
storageType,
storagePath: storageType === 'LOCAL' ? 'local' : storagePath,
targets: selectedTargets,
};
const res = await snapshotApi.create(dto);
setActiveTaskId(res.taskId);
setIsServiceRunning(true);
} catch (err: any) {
alert(err.message || '创建备份任务失败');
} finally {
setIsSubmitting(false);
}
};
const toggleTarget = (target: BackupTarget) => {
setSelectedTargets((prev) =>
prev.includes(target) ? prev.filter((t) => t !== target) : [...prev, target],
);
};
const toggleAll = () => {
setSelectedTargets(
selectedTargets.length === availableTargets.length ? [] : [...availableTargets],
);
};
const handleDelete = async (id: string) => {
if (!confirm('确定删除该备份?')) return;
try {
await snapshotApi.delete(id);
loadHistory();
} catch (err: any) {
alert(err.message || '删除失败');
}
};
return (
<div className="space-y-6">
<h1 className="text-2xl font-semibold text-foreground"></h1>
{/* 创建备份 */}
<div className="rounded-xl border bg-card p-6 shadow-sm">
<h2 className="mb-4 text-lg font-semibold"></h2>
<div className="mb-4">
<label className="mb-2 block text-sm font-medium"></label>
<div className="flex gap-6">
<label className="flex cursor-pointer items-center gap-2 text-sm">
<input
type="radio"
className="accent-primary"
checked={storageType === 'MINIO'}
onChange={() => setStorageType('MINIO')}
disabled={isServiceRunning}
/>
MinIO
</label>
<label className="flex cursor-pointer items-center gap-2 text-sm">
<input
type="radio"
className="accent-primary"
checked={storageType === 'LOCAL'}
onChange={() => setStorageType('LOCAL')}
disabled={isServiceRunning}
/>
()
</label>
</div>
</div>
{storageType === 'MINIO' && (
<div className="mb-4">
<label className="mb-2 block text-sm font-medium">MinIO </label>
<input
type="text"
className="w-full max-w-md rounded-lg border bg-background px-3 py-2 text-sm outline-none focus:border-primary"
value={storagePath}
onChange={(e) => setStoragePath(e.target.value)}
placeholder="snapshots/2026-02-23"
disabled={isServiceRunning}
/>
</div>
)}
<div className="mb-4">
<label className="mb-2 flex items-center gap-2 text-sm font-medium">
<button
type="button"
className="text-xs text-primary hover:underline disabled:text-muted-foreground"
onClick={toggleAll}
disabled={isServiceRunning}
>
{selectedTargets.length === availableTargets.length ? '取消全选' : '全选'}
</button>
</label>
<div className="flex flex-wrap gap-3">
{availableTargets.map((target) => (
<label
key={target}
className="flex cursor-pointer items-center gap-2 rounded-lg bg-muted px-3 py-2 text-sm transition-colors hover:bg-muted/80"
>
<input
type="checkbox"
className="accent-primary"
checked={selectedTargets.includes(target)}
onChange={() => toggleTarget(target)}
disabled={isServiceRunning}
/>
{BACKUP_TARGET_LABELS[target] || target}
</label>
))}
</div>
</div>
<button
className="rounded-lg bg-primary px-8 py-2.5 text-sm font-medium text-primary-foreground transition-colors hover:bg-primary/90 disabled:cursor-not-allowed disabled:opacity-50"
onClick={handleStart}
disabled={isSubmitting || isServiceRunning || selectedTargets.length === 0}
>
{isServiceRunning ? '备份执行中...' : '开始备份'}
</button>
</div>
{/* 实时进度 */}
{activeTaskId && progresses.size > 0 && (
<div className="rounded-xl border bg-card p-6 shadow-sm">
<h2 className="mb-4 flex items-center gap-3 text-lg font-semibold">
{isConnected && (
<span className="rounded bg-green-50 px-2 py-0.5 text-xs font-normal text-green-600">
WebSocket
</span>
)}
</h2>
<div className="space-y-4">
{Array.from(progresses.values()).map((p) => (
<div key={p.target} className="rounded-lg bg-muted/50 p-4">
<div className="mb-2 flex items-center justify-between text-sm">
<span className="font-medium">{BACKUP_TARGET_LABELS[p.target] || p.target}</span>
<span
className={
p.status === 'completed'
? 'text-green-500'
: p.status === 'failed'
? 'text-red-500'
: 'text-blue-500'
}
>
{p.status === 'completed' ? '100%' : p.status === 'failed' ? '失败' : `${p.percent}%`}
</span>
</div>
<div className="h-2 overflow-hidden rounded-full bg-muted">
<div
className="h-full rounded-full transition-all duration-300"
style={{
width: `${p.percent}%`,
backgroundColor:
p.status === 'failed' ? '#ef4444' : p.status === 'completed' ? '#22c55e' : '#3b82f6',
}}
/>
</div>
<p className="mt-1 text-xs text-muted-foreground">{p.message}</p>
</div>
))}
</div>
{taskCompleted && (
<div className="mt-4 rounded-lg border border-green-200 bg-green-50 p-4 text-sm font-medium text-green-600">
: {formatBytes(totalSize || '0')}: {formatDuration(duration || 0)}
</div>
)}
</div>
)}
{/* 历史列表 */}
<div className="rounded-xl border bg-card p-6 shadow-sm">
<h2 className="mb-4 text-lg font-semibold"></h2>
<div className="overflow-x-auto">
<table className="w-full text-sm">
<thead>
<tr className="border-b text-left text-muted-foreground">
<th className="px-3 py-3 font-medium"></th>
<th className="px-3 py-3 font-medium"></th>
<th className="px-3 py-3 font-medium"></th>
<th className="px-3 py-3 font-medium"></th>
<th className="px-3 py-3 font-medium"></th>
<th className="px-3 py-3 font-medium"></th>
</tr>
</thead>
<tbody>
{tasks.length === 0 && (
<tr>
<td colSpan={6} className="py-8 text-center text-muted-foreground"></td>
</tr>
)}
{tasks.map((task) => {
const st = STATUS_MAP[task.status];
return (
<tr key={task.id} className="border-b last:border-0 hover:bg-muted/30">
<td className="px-3 py-3">{formatTime(task.createdAt)}</td>
<td className="px-3 py-3">
{task.targets.map((t) => BACKUP_TARGET_LABELS[t] || t).join(', ')}
</td>
<td className="px-3 py-3">{formatBytes(task.totalSize)}</td>
<td className="px-3 py-3">{task.storageType === 'MINIO' ? 'MinIO' : '本地'}</td>
<td className={`px-3 py-3 ${st.color}`}>{st.label}</td>
<td className="px-3 py-3">
{task.storageType === 'LOCAL' && task.status === 'COMPLETED' &&
task.details
.filter((d) => d.fileName)
.map((d) => (
<a
key={d.target}
href={snapshotApi.getDownloadUrl(task.id, d.target)}
className="mr-2 text-primary hover:underline"
download
>
{BACKUP_TARGET_LABELS[d.target]}
</a>
))}
<button
className="text-red-500 hover:underline"
onClick={() => handleDelete(task.id)}
>
</button>
</td>
</tr>
);
})}
</tbody>
</table>
</div>
{total > 10 && (
<div className="mt-4 flex items-center justify-center gap-4 text-sm">
<button
className="rounded border px-3 py-1 hover:border-primary hover:text-primary disabled:cursor-not-allowed disabled:text-muted-foreground"
disabled={page <= 1}
onClick={() => setPage(page - 1)}
>
</button>
<span className="text-muted-foreground">
{page} / {Math.ceil(total / 10)}
</span>
<button
className="rounded border px-3 py-1 hover:border-primary hover:text-primary disabled:cursor-not-allowed disabled:text-muted-foreground"
disabled={page >= Math.ceil(total / 10)}
onClick={() => setPage(page + 1)}
>
</button>
</div>
)}
</div>
</div>
);
}

View File

@ -19,6 +19,7 @@ import {
HandCoins, HandCoins,
FileSpreadsheet, FileSpreadsheet,
SendHorizontal, SendHorizontal,
HardDrive,
} from 'lucide-react'; } from 'lucide-react';
import { Button } from '@/components/ui/button'; import { Button } from '@/components/ui/button';
@ -35,6 +36,7 @@ const menuItems = [
{ name: '系统账户', href: '/system-accounts', icon: Building2 }, { name: '系统账户', href: '/system-accounts', icon: Building2 },
{ name: '报表统计', href: '/reports', icon: FileBarChart }, { name: '报表统计', href: '/reports', icon: FileBarChart },
{ name: '审计日志', href: '/audit-logs', icon: ClipboardList }, { name: '审计日志', href: '/audit-logs', icon: ClipboardList },
{ name: '数据快照', href: '/snapshots', icon: HardDrive },
]; ];
export function Sidebar() { export function Sidebar() {

View File

@ -0,0 +1,43 @@
import type { CreateSnapshotDto, SnapshotTask } from '@/types/snapshot.types';
const SNAPSHOT_BASE = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3199';
async function snapshotFetch<T>(url: string, options?: RequestInit): Promise<T> {
const token = typeof window !== 'undefined' ? localStorage.getItem('admin_token') : null;
const res = await fetch(`${SNAPSHOT_BASE}/api/v1${url}`, {
headers: {
'Content-Type': 'application/json',
...(token ? { Authorization: `Bearer ${token}` } : {}),
},
...options,
});
if (!res.ok) {
const err = await res.json().catch(() => ({ message: res.statusText }));
throw new Error(err.message || `请求失败: ${res.status}`);
}
return res.json();
}
export const snapshotApi = {
getTargets: () =>
snapshotFetch<{ targets: string[]; isRunning: boolean }>('/snapshots/targets'),
create: (data: CreateSnapshotDto) =>
snapshotFetch<{ taskId: string; message: string }>('/snapshots', {
method: 'POST',
body: JSON.stringify(data),
}),
list: (page = 1, limit = 20) =>
snapshotFetch<{ tasks: SnapshotTask[]; total: number; page: number; limit: number }>(
`/snapshots?page=${page}&limit=${limit}`,
),
getById: (id: string) => snapshotFetch<SnapshotTask>(`/snapshots/${id}`),
delete: (id: string) =>
snapshotFetch<{ message: string }>(`/snapshots/${id}`, { method: 'DELETE' }),
getDownloadUrl: (id: string, target: string) =>
`${SNAPSHOT_BASE}/api/v1/snapshots/${id}/download/${target}`,
};

View File

@ -0,0 +1,116 @@
'use client';
import { useEffect, useState, useRef } from 'react';
import { io, Socket } from 'socket.io-client';
import type { BackupTarget, SnapshotProgress } from '@/types/snapshot.types';
const SNAPSHOT_WS_URL = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3199';
interface TargetProgress {
target: BackupTarget;
percent: number;
message: string;
status: 'pending' | 'running' | 'completed' | 'failed';
}
interface UseSnapshotWebSocketReturn {
progresses: Map<string, TargetProgress>;
isConnected: boolean;
taskCompleted: boolean;
totalSize: string | null;
duration: number | null;
error: string | null;
}
export function useSnapshotWebSocket(taskId: string | null): UseSnapshotWebSocketReturn {
const [progresses, setProgresses] = useState<Map<string, TargetProgress>>(new Map());
const [isConnected, setIsConnected] = useState(false);
const [taskCompleted, setTaskCompleted] = useState(false);
const [totalSize, setTotalSize] = useState<string | null>(null);
const [duration, setDuration] = useState<number | null>(null);
const [error, setError] = useState<string | null>(null);
const socketRef = useRef<Socket | null>(null);
useEffect(() => {
if (!taskId) return;
setProgresses(new Map());
setTaskCompleted(false);
setTotalSize(null);
setDuration(null);
setError(null);
const socket = io(`${SNAPSHOT_WS_URL}/snapshots`, {
transports: ['websocket', 'polling'],
});
socketRef.current = socket;
socket.on('connect', () => setIsConnected(true));
socket.on('disconnect', () => setIsConnected(false));
socket.on('snapshot:started', (data: { taskId: string; targets: BackupTarget[] }) => {
if (data.taskId !== taskId) return;
const initial = new Map<string, TargetProgress>();
data.targets.forEach((t) => {
initial.set(t, { target: t, percent: 0, message: '等待中', status: 'pending' });
});
setProgresses(initial);
});
socket.on('snapshot:progress', (data: SnapshotProgress) => {
if (data.taskId !== taskId) return;
setProgresses((prev) => {
const next = new Map(prev);
next.set(data.target, {
target: data.target,
percent: data.percent,
message: data.message,
status: 'running',
});
return next;
});
});
socket.on('snapshot:target-complete', (data: { taskId: string; target: string; fileSize: string }) => {
if (data.taskId !== taskId) return;
setProgresses((prev) => {
const next = new Map(prev);
const existing = next.get(data.target);
if (existing) {
next.set(data.target, { ...existing, percent: 100, message: '完成', status: 'completed' });
}
return next;
});
});
socket.on('snapshot:complete', (data: { taskId: string; totalSize: string; duration: number }) => {
if (data.taskId !== taskId) return;
setTaskCompleted(true);
setTotalSize(data.totalSize);
setDuration(data.duration);
});
socket.on('snapshot:error', (data: { taskId: string; target?: string; error: string }) => {
if (data.taskId !== taskId) return;
if (data.target) {
setProgresses((prev) => {
const next = new Map(prev);
const existing = next.get(data.target!);
if (existing) {
next.set(data.target!, { ...existing, message: data.error, status: 'failed' });
}
return next;
});
} else {
setError(data.error);
}
});
return () => {
socket.disconnect();
socketRef.current = null;
};
}, [taskId]);
return { progresses, isConnected, taskCompleted, totalSize, duration, error };
}

View File

@ -0,0 +1,51 @@
export type BackupTarget = 'POSTGRES' | 'REDIS' | 'KAFKA' | 'ZOOKEEPER' | 'MINIO' | 'UPLOADS';
export type StorageType = 'MINIO' | 'LOCAL';
export type SnapshotStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED';
export const BACKUP_TARGET_LABELS: Record<BackupTarget, string> = {
POSTGRES: 'PostgreSQL',
REDIS: 'Redis',
KAFKA: 'Kafka',
ZOOKEEPER: 'ZooKeeper',
MINIO: 'MinIO',
UPLOADS: 'Uploads',
};
export interface SnapshotDetail {
id: string;
target: BackupTarget;
status: SnapshotStatus;
progress: number;
fileSize: string;
fileName: string | null;
error: string | null;
startedAt: string | null;
completedAt: string | null;
}
export interface SnapshotTask {
id: string;
status: SnapshotStatus;
storageType: StorageType;
storagePath: string;
targets: BackupTarget[];
totalSize: string;
error: string | null;
startedAt: string | null;
completedAt: string | null;
createdAt: string;
details: SnapshotDetail[];
}
export interface CreateSnapshotDto {
storageType: StorageType;
storagePath: string;
targets: BackupTarget[];
}
export interface SnapshotProgress {
taskId: string;
target: BackupTarget;
percent: number;
message: string;
}