diff --git a/backend/services/docker-compose.2.0-snapshot.yml b/backend/services/docker-compose.2.0-snapshot.yml new file mode 100644 index 00000000..28808574 --- /dev/null +++ b/backend/services/docker-compose.2.0-snapshot.yml @@ -0,0 +1,60 @@ +# 数据快照备份服务 (2.0 算力挖矿系统 - standalone 模式) +# 使用方式: docker compose -f docker-compose.2.0.yml -f docker-compose.2.0-snapshot.yml --profile standalone up snapshot-service-2 +# +# 纯新增 overlay,不修改任何现有服务配置 + +services: + snapshot-service-2: + build: + context: ./snapshot-service + dockerfile: Dockerfile + container_name: rwa-snapshot-service-2 + ports: + - "3199:3199" + environment: + - NODE_ENV=production + - APP_PORT=3199 + - DATABASE_URL=file:./data/snapshot.db + # PostgreSQL-2 (备份目标) + - PG_HOST=postgres-2 + - PG_PORT=5432 + - PG_USER=${POSTGRES_USER:-rwa_user} + - PG_PASSWORD=${POSTGRES_PASSWORD} + # Redis-2 (备份目标) + - REDIS_HOST=redis-2 + - REDIS_PORT=6379 + - REDIS_PASSWORD=${REDIS_PASSWORD:-} + # MinIO (存储后端) + - MINIO_ENDPOINT=${MINIO_ENDPOINT:-192.168.1.100} + - MINIO_PORT=${MINIO_PORT:-9000} + - MINIO_USE_SSL=${MINIO_USE_SSL:-false} + - MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY} + - MINIO_SECRET_KEY=${MINIO_SECRET_KEY} + - MINIO_BACKUP_BUCKET=snapshots-2 + # 备份配置 + - SNAPSHOT_TEMP_DIR=/app/data/snapshots + - SNAPSHOT_RETENTION_HOURS=72 + - AVAILABLE_TARGETS=POSTGRES,REDIS,UPLOADS + volumes: + - snapshot_2_data:/app/data + - redis_2_data:/backup-source/redis:ro + - mining-admin-uploads:/backup-source/uploads/mining-admin:ro + - trading-uploads:/backup-source/uploads/trading:ro + profiles: + - standalone + depends_on: + postgres-2: + condition: service_healthy + redis-2: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3199/api/v1/health"] + interval: 30s + timeout: 3s + retries: 3 + start_period: 30s + networks: + - rwa-2-network + +volumes: + snapshot_2_data: diff --git a/backend/services/docker-compose.snapshot.yml b/backend/services/docker-compose.snapshot.yml new file mode 100644 index 00000000..10ddba07 --- /dev/null +++ b/backend/services/docker-compose.snapshot.yml @@ -0,0 +1,60 @@ +# 数据快照备份服务 (1.0 认种分配系统) +# 使用方式: docker compose -f docker-compose.yml -f docker-compose.snapshot.yml up snapshot-service +# +# 纯新增 overlay,不修改任何现有服务配置 + +services: + snapshot-service: + build: + context: ./snapshot-service + dockerfile: Dockerfile + container_name: rwa-snapshot-service + ports: + - "3099:3099" + environment: + - NODE_ENV=production + - APP_PORT=3099 + - DATABASE_URL=file:./data/snapshot.db + # PostgreSQL (备份目标) + - PG_HOST=postgres + - PG_PORT=5432 + - PG_USER=${POSTGRES_USER:-rwa_user} + - PG_PASSWORD=${POSTGRES_PASSWORD} + # Redis (备份目标) + - REDIS_HOST=redis + - REDIS_PORT=6379 + - REDIS_PASSWORD=${REDIS_PASSWORD:-} + # MinIO (备份目标 + 存储后端) + - MINIO_ENDPOINT=${MINIO_ENDPOINT:-192.168.1.100} + - MINIO_PORT=${MINIO_PORT:-9000} + - MINIO_USE_SSL=${MINIO_USE_SSL:-false} + - MINIO_ACCESS_KEY=${MINIO_ACCESS_KEY} + - MINIO_SECRET_KEY=${MINIO_SECRET_KEY} + - MINIO_BACKUP_BUCKET=snapshots + # 备份配置 + - SNAPSHOT_TEMP_DIR=/app/data/snapshots + - SNAPSHOT_RETENTION_HOURS=72 + - AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS + volumes: + - snapshot_data:/app/data + - redis_data:/backup-source/redis:ro + - kafka_data:/backup-source/kafka:ro + - zookeeper_data:/backup-source/zookeeper/data:ro + - zookeeper_log:/backup-source/zookeeper/log:ro + - admin_uploads_data:/backup-source/uploads:ro + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3099/api/v1/health"] + interval: 30s + timeout: 3s + retries: 3 + start_period: 30s + networks: + - rwa-network + +volumes: + snapshot_data: diff --git a/backend/services/snapshot-service/.env.development b/backend/services/snapshot-service/.env.development new file mode 100644 index 00000000..8dbf26ad --- /dev/null +++ b/backend/services/snapshot-service/.env.development @@ -0,0 +1,30 @@ +NODE_ENV=development +APP_PORT=3099 +DATABASE_URL="file:./data/snapshot.db" + +# PostgreSQL (被备份目标) +PG_HOST=localhost +PG_PORT=5432 +PG_USER=rwa_user +PG_PASSWORD=your_password + +# Redis (被备份目标) +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= + +# MinIO +MINIO_ENDPOINT=192.168.1.100 +MINIO_PORT=9000 +MINIO_USE_SSL=false +MINIO_ACCESS_KEY=admin +MINIO_SECRET_KEY=your_minio_password +MINIO_BACKUP_BUCKET=snapshots + +# 备份临时目录 +SNAPSHOT_TEMP_DIR=./data/snapshots +# 临时文件保留时长(小时) +SNAPSHOT_RETENTION_HOURS=72 + +# 可用备份目标(逗号分隔,部署时按系统配置) +AVAILABLE_TARGETS=POSTGRES,REDIS,KAFKA,ZOOKEEPER,MINIO,UPLOADS diff --git a/backend/services/snapshot-service/.gitignore b/backend/services/snapshot-service/.gitignore new file mode 100644 index 00000000..cca6970b --- /dev/null +++ b/backend/services/snapshot-service/.gitignore @@ -0,0 +1,6 @@ +dist/ +node_modules/ +data/ +*.db +*.db-journal +.env diff --git a/backend/services/snapshot-service/Dockerfile b/backend/services/snapshot-service/Dockerfile new file mode 100644 index 00000000..f2a4c2ca --- /dev/null +++ b/backend/services/snapshot-service/Dockerfile @@ -0,0 +1,55 @@ +# Build stage +FROM node:20-alpine AS builder + +WORKDIR /app + +COPY package*.json ./ +RUN npm ci + +COPY prisma ./prisma/ +RUN DATABASE_URL="file:./data/snapshot.db" npx prisma generate + +COPY . . +RUN npm run build + +# Production stage +FROM node:20-slim AS production + +WORKDIR /app + +# 安装系统工具: openssl, curl (健康检查), postgresql-client (pg_basebackup) +RUN apt-get update && apt-get install -y --no-install-recommends \ + openssl \ + curl \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +# 安装 MinIO 客户端 (mc) +RUN curl -fsSL https://dl.min.io/client/mc/release/linux-amd64/mc -o /usr/local/bin/mc \ + && chmod +x /usr/local/bin/mc + +COPY package*.json ./ +RUN npm ci --only=production + +COPY prisma ./prisma/ +RUN DATABASE_URL="file:./data/snapshot.db" npx prisma generate + +COPY --from=builder /app/dist ./dist + +# 创建数据目录 +RUN mkdir -p /app/data/snapshots + +# 启动脚本: SQLite 用 db push 而非 migrate deploy +RUN echo '#!/bin/sh\n\ +set -e\n\ +echo "Initializing SQLite database..."\n\ +npx prisma db push --skip-generate\n\ +echo "Starting snapshot service..."\n\ +exec node dist/main.js\n' > /app/start.sh && chmod +x /app/start.sh + +EXPOSE 3099 + +HEALTHCHECK --interval=30s --timeout=3s --start-period=30s --retries=3 \ + CMD curl -f http://localhost:${APP_PORT:-3099}/api/v1/health || exit 1 + +CMD ["/app/start.sh"] diff --git a/backend/services/snapshot-service/nest-cli.json b/backend/services/snapshot-service/nest-cli.json new file mode 100644 index 00000000..f9aa683b --- /dev/null +++ b/backend/services/snapshot-service/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/backend/services/snapshot-service/package.json b/backend/services/snapshot-service/package.json new file mode 100644 index 00000000..0c889ba4 --- /dev/null +++ b/backend/services/snapshot-service/package.json @@ -0,0 +1,43 @@ +{ + "name": "snapshot-service", + "version": "1.0.0", + "description": "RWADurian 数据快照备份编排服务", + "scripts": { + "build": "nest build", + "start": "nest start", + "start:dev": "nest start --watch", + "start:prod": "node dist/main", + "prisma:generate": "prisma generate", + "prisma:push": "prisma db push" + }, + "dependencies": { + "@nestjs/common": "^10.0.0", + "@nestjs/core": "^10.0.0", + "@nestjs/config": "^3.1.1", + "@nestjs/platform-express": "^10.0.0", + "@nestjs/swagger": "^7.1.17", + "@nestjs/schedule": "^4.0.0", + "@nestjs/websockets": "^10.0.0", + "@nestjs/platform-socket.io": "^10.0.0", + "@prisma/client": "^5.7.0", + "class-validator": "^0.14.0", + "class-transformer": "^0.5.1", + "ioredis": "^5.3.2", + "minio": "^8.0.1", + "archiver": "^7.0.1", + "reflect-metadata": "^0.1.13", + "rxjs": "^7.8.1", + "uuid": "^9.0.0", + "socket.io": "^4.7.4" + }, + "devDependencies": { + "@nestjs/cli": "^10.0.0", + "@nestjs/schematics": "^10.0.0", + "@types/archiver": "^6.0.2", + "@types/express": "^4.17.17", + "@types/node": "^20.3.1", + "@types/uuid": "^9.0.2", + "prisma": "^5.7.0", + "typescript": "^5.1.3" + } +} diff --git a/backend/services/snapshot-service/prisma/schema.prisma b/backend/services/snapshot-service/prisma/schema.prisma new file mode 100644 index 00000000..f1f550b1 --- /dev/null +++ b/backend/services/snapshot-service/prisma/schema.prisma @@ -0,0 +1,46 @@ +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "sqlite" + url = env("DATABASE_URL") +} + +model SnapshotTask { + id String @id @default(uuid()) + status String @default("PENDING") + storageType String + storagePath String + targets String // JSON: ["POSTGRES","REDIS"] + totalSize BigInt @default(0) + error String? + startedAt DateTime? + completedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + details SnapshotDetail[] + + @@map("snapshot_tasks") +} + +model SnapshotDetail { + id String @id @default(uuid()) + taskId String + target String + status String @default("PENDING") + progress Int @default(0) + fileSize BigInt @default(0) + fileName String? + error String? + startedAt DateTime? + completedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + task SnapshotTask @relation(fields: [taskId], references: [id], onDelete: Cascade) + + @@index([taskId]) + @@map("snapshot_details") +} diff --git a/backend/services/snapshot-service/src/api/api.module.ts b/backend/services/snapshot-service/src/api/api.module.ts new file mode 100644 index 00000000..3d7b2395 --- /dev/null +++ b/backend/services/snapshot-service/src/api/api.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { SnapshotController } from './controllers/snapshot.controller'; +import { HealthController } from './controllers/health.controller'; +import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; +import { ApplicationModule } from '@/application/application.module'; + +@Module({ + imports: [InfrastructureModule, ApplicationModule], + controllers: [SnapshotController, HealthController], +}) +export class ApiModule {} diff --git a/backend/services/snapshot-service/src/api/controllers/health.controller.ts b/backend/services/snapshot-service/src/api/controllers/health.controller.ts new file mode 100644 index 00000000..f2859c56 --- /dev/null +++ b/backend/services/snapshot-service/src/api/controllers/health.controller.ts @@ -0,0 +1,16 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags, ApiOperation } from '@nestjs/swagger'; + +@ApiTags('健康检查') +@Controller('health') +export class HealthController { + @Get() + @ApiOperation({ summary: '服务健康检查' }) + check() { + return { + status: 'ok', + service: 'snapshot-service', + timestamp: new Date().toISOString(), + }; + } +} diff --git a/backend/services/snapshot-service/src/api/controllers/snapshot.controller.ts b/backend/services/snapshot-service/src/api/controllers/snapshot.controller.ts new file mode 100644 index 00000000..8b8f75d7 --- /dev/null +++ b/backend/services/snapshot-service/src/api/controllers/snapshot.controller.ts @@ -0,0 +1,151 @@ +import { + Controller, + Get, + Post, + Delete, + Param, + Body, + Query, + Res, + Req, + NotFoundException, + BadRequestException, + ConflictException, + Logger, +} from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse, ApiQuery } from '@nestjs/swagger'; +import { Request, Response } from 'express'; +import * as fs from 'fs'; +import * as path from 'path'; +import { CreateSnapshotDto } from '../dto/create-snapshot.dto'; +import { toSnapshotResponse } from '../dto/snapshot.response'; +import { SnapshotOrchestratorService } from '@/application/services/snapshot-orchestrator.service'; +import { SnapshotRepository } from '@/infrastructure/persistence/repositories/snapshot.repository'; +import { StorageType } from '@/domain/enums'; + +@ApiTags('快照备份') +@Controller('snapshots') +export class SnapshotController { + private readonly logger = new Logger(SnapshotController.name); + + constructor( + private readonly orchestrator: SnapshotOrchestratorService, + private readonly repo: SnapshotRepository, + ) {} + + @Get('targets') + @ApiOperation({ summary: '获取可用备份目标列表' }) + getAvailableTargets() { + return { + targets: this.orchestrator.getAvailableTargets(), + isRunning: this.orchestrator.isRunning(), + }; + } + + @Post() + @ApiOperation({ summary: '创建备份任务' }) + @ApiResponse({ status: 201, description: '任务已创建,异步执行' }) + @ApiResponse({ status: 409, description: '已有任务在执行' }) + async createSnapshot(@Body() dto: CreateSnapshotDto) { + if (this.orchestrator.isRunning()) { + throw new ConflictException('已有备份任务正在执行,请等待完成'); + } + + const taskId = await this.orchestrator.startSnapshot(dto); + return { taskId, message: '备份任务已启动' }; + } + + @Get() + @ApiOperation({ summary: '备份历史列表' }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'limit', required: false, type: Number }) + async listSnapshots( + @Query('page') page: number = 1, + @Query('limit') limit: number = 20, + ) { + const result = await this.repo.findAll(page, limit); + return { + tasks: result.tasks.map(toSnapshotResponse), + total: result.total, + page: result.page, + limit: result.limit, + }; + } + + @Get(':id') + @ApiOperation({ summary: '备份任务详情' }) + async getSnapshot(@Param('id') id: string) { + const task = await this.repo.findById(id); + if (!task) throw new NotFoundException('备份任务不存在'); + return toSnapshotResponse(task); + } + + @Delete(':id') + @ApiOperation({ summary: '删除备份' }) + async deleteSnapshot(@Param('id') id: string) { + await this.orchestrator.deleteSnapshot(id); + return { message: '备份已删除' }; + } + + @Get(':id/download/:target') + @ApiOperation({ summary: '下载备份文件 (仅 LOCAL 模式)' }) + @ApiResponse({ status: 200, description: '文件内容' }) + @ApiResponse({ status: 206, description: '部分内容 (断点续传)' }) + async downloadFile( + @Param('id') id: string, + @Param('target') target: string, + @Req() req: Request, + @Res() res: Response, + ) { + const task = await this.repo.findById(id); + if (!task) throw new NotFoundException('备份任务不存在'); + if (task.storageType !== StorageType.LOCAL) { + throw new BadRequestException('仅本地存储模式支持下载'); + } + + const detail = task.details.find((d) => d.target === target); + if (!detail || !detail.fileName) { + throw new NotFoundException('备份文件不存在'); + } + + const tempDir = process.env.SNAPSHOT_TEMP_DIR || './data/snapshots'; + const filePath = path.join(tempDir, id, detail.fileName); + + if (!fs.existsSync(filePath)) { + throw new NotFoundException('备份文件已过期或被删除'); + } + + const stat = fs.statSync(filePath); + const fileSize = stat.size; + const range = req.headers.range; + + res.setHeader('Accept-Ranges', 'bytes'); + res.setHeader('Content-Type', 'application/gzip'); + res.setHeader( + 'Content-Disposition', + `attachment; filename="${encodeURIComponent(detail.fileName)}"`, + ); + + if (range) { + const parts = range.replace(/bytes=/, '').split('-'); + const start = parseInt(parts[0], 10); + const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; + + if (start >= fileSize || end >= fileSize || start > end) { + res.status(416); + res.setHeader('Content-Range', `bytes */${fileSize}`); + res.end(); + return; + } + + const chunkSize = end - start + 1; + res.status(206); + res.setHeader('Content-Range', `bytes ${start}-${end}/${fileSize}`); + res.setHeader('Content-Length', chunkSize); + fs.createReadStream(filePath, { start, end }).pipe(res); + } else { + res.setHeader('Content-Length', fileSize); + fs.createReadStream(filePath).pipe(res); + } + } +} diff --git a/backend/services/snapshot-service/src/api/dto/create-snapshot.dto.ts b/backend/services/snapshot-service/src/api/dto/create-snapshot.dto.ts new file mode 100644 index 00000000..e4f4beee --- /dev/null +++ b/backend/services/snapshot-service/src/api/dto/create-snapshot.dto.ts @@ -0,0 +1,25 @@ +import { IsEnum, IsArray, IsString, ArrayMinSize } from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; +import { BackupTarget } from '@/domain/enums'; +import { StorageType } from '@/domain/enums'; + +export class CreateSnapshotDto { + @ApiProperty({ enum: StorageType, description: '存储方式' }) + @IsEnum(StorageType) + storageType: StorageType; + + @ApiProperty({ description: '存储路径 (MinIO 桶路径或描述标签)', example: 'snapshots/2026-02-23' }) + @IsString() + storagePath: string; + + @ApiProperty({ + enum: BackupTarget, + isArray: true, + description: '备份目标列表', + example: ['POSTGRES', 'REDIS'], + }) + @IsArray() + @ArrayMinSize(1) + @IsEnum(BackupTarget, { each: true }) + targets: BackupTarget[]; +} diff --git a/backend/services/snapshot-service/src/api/dto/snapshot.response.ts b/backend/services/snapshot-service/src/api/dto/snapshot.response.ts new file mode 100644 index 00000000..21bd2ac6 --- /dev/null +++ b/backend/services/snapshot-service/src/api/dto/snapshot.response.ts @@ -0,0 +1,62 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class SnapshotDetailResponse { + @ApiProperty() id: string; + @ApiProperty() target: string; + @ApiProperty() status: string; + @ApiProperty() progress: number; + @ApiProperty() fileSize: string; + @ApiProperty() fileName: string | null; + @ApiProperty() error: string | null; + @ApiProperty() startedAt: Date | null; + @ApiProperty() completedAt: Date | null; +} + +export class SnapshotTaskResponse { + @ApiProperty() id: string; + @ApiProperty() status: string; + @ApiProperty() storageType: string; + @ApiProperty() storagePath: string; + @ApiProperty() targets: string[]; + @ApiProperty() totalSize: string; + @ApiProperty() error: string | null; + @ApiProperty() startedAt: Date | null; + @ApiProperty() completedAt: Date | null; + @ApiProperty() createdAt: Date; + @ApiProperty({ type: [SnapshotDetailResponse] }) + details: SnapshotDetailResponse[]; +} + +export class SnapshotListResponse { + @ApiProperty({ type: [SnapshotTaskResponse] }) tasks: SnapshotTaskResponse[]; + @ApiProperty() total: number; + @ApiProperty() page: number; + @ApiProperty() limit: number; +} + +/** 将 Prisma 对象转换为响应 DTO(处理 BigInt 序列化) */ +export function toSnapshotResponse(task: any): SnapshotTaskResponse { + return { + id: task.id, + status: task.status, + storageType: task.storageType, + storagePath: task.storagePath, + targets: JSON.parse(task.targets), + totalSize: task.totalSize.toString(), + error: task.error, + startedAt: task.startedAt, + completedAt: task.completedAt, + createdAt: task.createdAt, + details: (task.details || []).map((d: any) => ({ + id: d.id, + target: d.target, + status: d.status, + progress: d.progress, + fileSize: d.fileSize.toString(), + fileName: d.fileName, + error: d.error, + startedAt: d.startedAt, + completedAt: d.completedAt, + })), + }; +} diff --git a/backend/services/snapshot-service/src/api/gateways/snapshot.gateway.ts b/backend/services/snapshot-service/src/api/gateways/snapshot.gateway.ts new file mode 100644 index 00000000..c70655db --- /dev/null +++ b/backend/services/snapshot-service/src/api/gateways/snapshot.gateway.ts @@ -0,0 +1,81 @@ +import { + WebSocketGateway, + WebSocketServer, + OnGatewayInit, + OnGatewayConnection, + OnGatewayDisconnect, +} from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { Logger, Injectable } from '@nestjs/common'; +import { BackupTarget } from '@/domain/enums'; + +@Injectable() +@WebSocketGateway({ + namespace: '/snapshots', + cors: { + origin: '*', + credentials: true, + }, + transports: ['websocket', 'polling'], +}) +export class SnapshotGateway + implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect +{ + @WebSocketServer() + server: Server; + + private readonly logger = new Logger(SnapshotGateway.name); + private connectedClients = 0; + + afterInit() { + this.logger.log('Snapshot WebSocket Gateway initialized'); + } + + handleConnection(client: Socket) { + this.connectedClients++; + this.logger.log(`Client connected: ${client.id}, total: ${this.connectedClients}`); + } + + handleDisconnect(client: Socket) { + this.connectedClients--; + this.logger.log(`Client disconnected: ${client.id}, total: ${this.connectedClients}`); + } + + emitTaskStarted(taskId: string, targets: BackupTarget[]): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('snapshot:started', { taskId, targets }); + } + } + + emitProgress(taskId: string, target: string, percent: number, message: string): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('snapshot:progress', { taskId, target, percent, message }); + } + } + + emitTargetComplete(taskId: string, target: string, fileSize: number): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('snapshot:target-complete', { + taskId, + target, + fileSize: fileSize.toString(), + }); + } + } + + emitComplete(taskId: string, totalSize: number, duration: number): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('snapshot:complete', { + taskId, + totalSize: totalSize.toString(), + duration, + }); + } + } + + emitError(taskId: string, target: string | undefined, error: string): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('snapshot:error', { taskId, target, error }); + } + } +} diff --git a/backend/services/snapshot-service/src/app.module.ts b/backend/services/snapshot-service/src/app.module.ts new file mode 100644 index 00000000..b7dbf220 --- /dev/null +++ b/backend/services/snapshot-service/src/app.module.ts @@ -0,0 +1,22 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ApiModule } from './api/api.module'; +import { InfrastructureModule } from './infrastructure/infrastructure.module'; +import { DomainModule } from './domain/domain.module'; +import { appConfig } from './config/app.config'; + +@Module({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + load: [appConfig], + envFilePath: ['.env', '.env.development'], + }), + ScheduleModule.forRoot(), + DomainModule, + InfrastructureModule, + ApiModule, + ], +}) +export class AppModule {} diff --git a/backend/services/snapshot-service/src/application/application.module.ts b/backend/services/snapshot-service/src/application/application.module.ts new file mode 100644 index 00000000..d73431fe --- /dev/null +++ b/backend/services/snapshot-service/src/application/application.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { SnapshotOrchestratorService } from './services/snapshot-orchestrator.service'; +import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; + +@Module({ + imports: [InfrastructureModule], + providers: [SnapshotOrchestratorService], + exports: [SnapshotOrchestratorService], +}) +export class ApplicationModule {} diff --git a/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts b/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts new file mode 100644 index 00000000..9303e03c --- /dev/null +++ b/backend/services/snapshot-service/src/application/services/snapshot-orchestrator.service.ts @@ -0,0 +1,189 @@ +import { Injectable, Inject, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import * as fs from 'fs'; +import { SnapshotRepository } from '@/infrastructure/persistence/repositories/snapshot.repository'; +import { MinioStorageAdapter } from '@/infrastructure/storage/minio-storage.adapter'; +import { LocalStorageAdapter } from '@/infrastructure/storage/local-storage.adapter'; +import { SnapshotGateway } from '@/api/gateways/snapshot.gateway'; +import { + BackupHandler, + BACKUP_HANDLER_TOKEN, +} from '@/domain/interfaces/backup-handler.interface'; +import { + BackupTarget, + BACKUP_TARGET_ORDER, + SnapshotStatus, + StorageType, +} from '@/domain/enums'; + +@Injectable() +export class SnapshotOrchestratorService { + private readonly logger = new Logger(SnapshotOrchestratorService.name); + private readonly retentionHours: number; + private runningTaskId: string | null = null; + + constructor( + private readonly repo: SnapshotRepository, + private readonly minioStorage: MinioStorageAdapter, + private readonly localStorage: LocalStorageAdapter, + private readonly gateway: SnapshotGateway, + private readonly configService: ConfigService, + @Inject(BACKUP_HANDLER_TOKEN) + private readonly handlers: Map, + ) { + this.retentionHours = this.configService.get('SNAPSHOT_RETENTION_HOURS', 72); + } + + getAvailableTargets(): BackupTarget[] { + return BACKUP_TARGET_ORDER.filter((t) => this.handlers.has(t)); + } + + isRunning(): boolean { + return this.runningTaskId !== null; + } + + async startSnapshot(data: { + storageType: StorageType; + storagePath: string; + targets: BackupTarget[]; + }): Promise { + if (this.runningTaskId) { + throw new Error('已有备份任务正在执行,请等待完成'); + } + + // 校验 targets 是否都可用 + const unavailable = data.targets.filter((t) => !this.handlers.has(t)); + if (unavailable.length > 0) { + throw new Error(`以下备份目标不可用: ${unavailable.join(', ')}`); + } + + // 按预定义顺序排序 + const sortedTargets = BACKUP_TARGET_ORDER.filter((t) => data.targets.includes(t)); + + const task = await this.repo.createTask({ + storageType: data.storageType, + storagePath: data.storagePath, + targets: sortedTargets, + }); + + this.logger.log(`备份任务已创建: ${task.id}, targets: ${sortedTargets.join(',')}`); + + // 异步执行,不阻塞 HTTP 请求 + this.executeSnapshot(task.id).catch((err) => { + this.logger.error(`备份任务异常: ${task.id}, ${err.message}`); + }); + + return task.id; + } + + private async executeSnapshot(taskId: string): Promise { + this.runningTaskId = taskId; + const startTime = Date.now(); + + try { + await this.repo.updateTaskStatus(taskId, SnapshotStatus.RUNNING); + + const task = await this.repo.findById(taskId); + if (!task) throw new Error(`任务不存在: ${taskId}`); + + const targets: BackupTarget[] = JSON.parse(task.targets); + const outputDir = this.localStorage.getTaskDir(taskId); + + this.gateway.emitTaskStarted(taskId, targets); + + // 串行执行各目标 + for (const target of targets) { + await this.repo.updateDetailStatus(taskId, target, SnapshotStatus.RUNNING); + this.gateway.emitProgress(taskId, target, 0, '开始备份...'); + + try { + const handler = this.handlers.get(target); + if (!handler) { + throw new Error(`备份处理器不存在: ${target}`); + } + + const result = await handler.execute(outputDir, (percent, msg) => { + this.gateway.emitProgress(taskId, target, percent, msg); + // 进度更新不频繁写库,每10%写一次 + if (percent % 10 === 0) { + this.repo.updateDetailProgress(taskId, target, percent).catch(() => {}); + } + }); + + // 如果存储到 MinIO,上传后删除本地临时文件 + if (task.storageType === StorageType.MINIO) { + this.gateway.emitProgress(taskId, target, 99, '上传到 MinIO...'); + await this.minioStorage.upload( + result.filePath, + `${task.storagePath}/${result.fileName}`, + ); + fs.unlinkSync(result.filePath); + } + + await this.repo.completeDetail(taskId, target, result); + this.gateway.emitTargetComplete(taskId, target, result.fileSize); + this.logger.log(`备份目标完成: ${target}, 大小: ${result.fileSize} bytes`); + } catch (error) { + const errMsg = error instanceof Error ? error.message : String(error); + await this.repo.failDetail(taskId, target, errMsg); + this.gateway.emitError(taskId, target, errMsg); + this.logger.error(`备份目标失败: ${target}, 错误: ${errMsg}`); + // 单个目标失败不中断整体任务 + } + } + + await this.repo.completeTask(taskId); + const duration = Date.now() - startTime; + const completedTask = await this.repo.findById(taskId); + this.gateway.emitComplete( + taskId, + Number(completedTask?.totalSize ?? 0), + duration, + ); + this.logger.log(`备份任务完成: ${taskId}, 耗时: ${duration}ms`); + } catch (error) { + const errMsg = error instanceof Error ? error.message : String(error); + await this.repo.updateTaskStatus(taskId, SnapshotStatus.FAILED, errMsg); + this.gateway.emitError(taskId, undefined, errMsg); + this.logger.error(`备份任务失败: ${taskId}, ${errMsg}`); + } finally { + this.runningTaskId = null; + + // MinIO 模式下清理空的临时目录 + const task = await this.repo.findById(taskId); + if (task?.storageType === StorageType.MINIO) { + this.localStorage.deleteTask(taskId); + } + } + } + + async deleteSnapshot(taskId: string): Promise { + const task = await this.repo.findById(taskId); + if (!task) throw new Error(`任务不存在: ${taskId}`); + + if (task.storageType === StorageType.LOCAL) { + this.localStorage.deleteTask(taskId); + } else if (task.storageType === StorageType.MINIO) { + await this.minioStorage.deleteByPrefix(`${task.storagePath}/`); + } + + await this.repo.deleteTask(taskId); + this.logger.log(`备份已删除: ${taskId}`); + } + + @Cron(CronExpression.EVERY_HOUR) + async cleanupExpiredSnapshots(): Promise { + const deletedIds = this.localStorage.cleanupExpired(this.retentionHours); + if (deletedIds.length > 0) { + this.logger.log(`已清理 ${deletedIds.length} 个过期本地备份`); + for (const id of deletedIds) { + try { + await this.repo.deleteTask(id); + } catch { + // 任务记录可能已不存在 + } + } + } + } +} diff --git a/backend/services/snapshot-service/src/config/app.config.ts b/backend/services/snapshot-service/src/config/app.config.ts new file mode 100644 index 00000000..2e4f2aee --- /dev/null +++ b/backend/services/snapshot-service/src/config/app.config.ts @@ -0,0 +1,4 @@ +export const appConfig = () => ({ + port: parseInt(process.env.APP_PORT || '3099', 10), + nodeEnv: process.env.NODE_ENV || 'development', +}); diff --git a/backend/services/snapshot-service/src/domain/domain.module.ts b/backend/services/snapshot-service/src/domain/domain.module.ts new file mode 100644 index 00000000..47683c2a --- /dev/null +++ b/backend/services/snapshot-service/src/domain/domain.module.ts @@ -0,0 +1,4 @@ +import { Module } from '@nestjs/common'; + +@Module({}) +export class DomainModule {} diff --git a/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts b/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts new file mode 100644 index 00000000..5e8f50e9 --- /dev/null +++ b/backend/services/snapshot-service/src/domain/enums/backup-target.enum.ts @@ -0,0 +1,28 @@ +export enum BackupTarget { + POSTGRES = 'POSTGRES', + REDIS = 'REDIS', + KAFKA = 'KAFKA', + ZOOKEEPER = 'ZOOKEEPER', + MINIO = 'MINIO', + UPLOADS = 'UPLOADS', +} + +/** 备份执行顺序 */ +export const BACKUP_TARGET_ORDER: BackupTarget[] = [ + BackupTarget.POSTGRES, + BackupTarget.REDIS, + BackupTarget.KAFKA, + BackupTarget.ZOOKEEPER, + BackupTarget.MINIO, + BackupTarget.UPLOADS, +]; + +/** 备份目标中文名 */ +export const BACKUP_TARGET_LABELS: Record = { + [BackupTarget.POSTGRES]: 'PostgreSQL', + [BackupTarget.REDIS]: 'Redis', + [BackupTarget.KAFKA]: 'Kafka', + [BackupTarget.ZOOKEEPER]: 'ZooKeeper', + [BackupTarget.MINIO]: 'MinIO', + [BackupTarget.UPLOADS]: 'Uploads', +}; diff --git a/backend/services/snapshot-service/src/domain/enums/index.ts b/backend/services/snapshot-service/src/domain/enums/index.ts new file mode 100644 index 00000000..2a965573 --- /dev/null +++ b/backend/services/snapshot-service/src/domain/enums/index.ts @@ -0,0 +1,3 @@ +export * from './backup-target.enum'; +export * from './snapshot-status.enum'; +export * from './storage-type.enum'; diff --git a/backend/services/snapshot-service/src/domain/enums/snapshot-status.enum.ts b/backend/services/snapshot-service/src/domain/enums/snapshot-status.enum.ts new file mode 100644 index 00000000..2ab8dd9c --- /dev/null +++ b/backend/services/snapshot-service/src/domain/enums/snapshot-status.enum.ts @@ -0,0 +1,6 @@ +export enum SnapshotStatus { + PENDING = 'PENDING', + RUNNING = 'RUNNING', + COMPLETED = 'COMPLETED', + FAILED = 'FAILED', +} diff --git a/backend/services/snapshot-service/src/domain/enums/storage-type.enum.ts b/backend/services/snapshot-service/src/domain/enums/storage-type.enum.ts new file mode 100644 index 00000000..e3d4411a --- /dev/null +++ b/backend/services/snapshot-service/src/domain/enums/storage-type.enum.ts @@ -0,0 +1,4 @@ +export enum StorageType { + MINIO = 'MINIO', + LOCAL = 'LOCAL', +} diff --git a/backend/services/snapshot-service/src/domain/interfaces/backup-handler.interface.ts b/backend/services/snapshot-service/src/domain/interfaces/backup-handler.interface.ts new file mode 100644 index 00000000..cac951a6 --- /dev/null +++ b/backend/services/snapshot-service/src/domain/interfaces/backup-handler.interface.ts @@ -0,0 +1,16 @@ +import { BackupTarget } from '../enums'; + +export interface BackupResult { + fileName: string; + filePath: string; + fileSize: number; +} + +export type ProgressCallback = (percent: number, message: string) => void; + +export interface BackupHandler { + readonly target: BackupTarget; + execute(outputDir: string, onProgress: ProgressCallback): Promise; +} + +export const BACKUP_HANDLER_TOKEN = 'BACKUP_HANDLER'; diff --git a/backend/services/snapshot-service/src/infrastructure/backup/kafka-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/kafka-backup.handler.ts new file mode 100644 index 00000000..3695ae45 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/kafka-backup.handler.ts @@ -0,0 +1,84 @@ +import { Injectable, Logger } from '@nestjs/common'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as archiver from 'archiver'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class KafkaBackupHandler implements BackupHandler { + readonly target = BackupTarget.KAFKA; + private readonly logger = new Logger(KafkaBackupHandler.name); + + private readonly sourceDir = '/backup-source/kafka'; + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `kafka-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + if (!fs.existsSync(this.sourceDir)) { + throw new Error(`Kafka 数据目录不存在: ${this.sourceDir}`); + } + + onProgress(0, 'Kafka 数据打包开始...'); + + // 先计算总大小 + const totalSize = this.getDirSize(this.sourceDir); + this.logger.log(`Kafka 数据目录大小: ${totalSize} bytes`); + + return new Promise((resolve, reject) => { + const output = fs.createWriteStream(filePath); + const archive = archiver('tar', { gzip: true }); + + let processedBytes = 0; + archive.on('progress', (progress) => { + processedBytes = progress.fs.processedBytes; + if (totalSize > 0) { + const percent = Math.min(99, Math.floor((processedBytes / totalSize) * 100)); + onProgress(percent, `Kafka 打包中 ${percent}%`); + } + }); + + output.on('close', () => { + const fileSize = archive.pointer(); + this.logger.log(`Kafka 备份完成: ${fileName}, 大小: ${fileSize} bytes`); + onProgress(100, 'Kafka 备份完成'); + resolve({ fileName, filePath, fileSize }); + }); + + archive.on('error', (err) => { + this.logger.error(`Kafka 备份打包失败: ${err.message}`); + reject(err); + }); + + archive.pipe(output); + archive.directory(this.sourceDir, 'kafka'); + archive.finalize(); + }); + } + + private getDirSize(dirPath: string): number { + let size = 0; + try { + const entries = fs.readdirSync(dirPath, { withFileTypes: true }); + for (const entry of entries) { + const fullPath = path.join(dirPath, entry.name); + if (entry.isFile()) { + size += fs.statSync(fullPath).size; + } else if (entry.isDirectory()) { + size += this.getDirSize(fullPath); + } + } + } catch { + // 忽略权限错误 + } + return size; + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/backup/minio-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/minio-backup.handler.ts new file mode 100644 index 00000000..23873dcd --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/minio-backup.handler.ts @@ -0,0 +1,135 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as Minio from 'minio'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as archiver from 'archiver'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class MinioBackupHandler implements BackupHandler { + readonly target = BackupTarget.MINIO; + private readonly logger = new Logger(MinioBackupHandler.name); + private readonly minioClient: Minio.Client; + + constructor(private readonly configService: ConfigService) { + const endpoint = this.configService.get('MINIO_ENDPOINT', 'localhost'); + const port = parseInt(this.configService.get('MINIO_PORT', '9000'), 10); + const useSSL = this.configService.get('MINIO_USE_SSL', 'false') === 'true'; + const accessKey = this.configService.get('MINIO_ACCESS_KEY', 'admin'); + const secretKey = this.configService.get('MINIO_SECRET_KEY', ''); + + this.minioClient = new Minio.Client({ + endPoint: endpoint, + port, + useSSL, + accessKey, + secretKey, + }); + } + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `minio-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + const tempDownloadDir = path.join(outputDir, `minio-temp-${timestamp}`); + + fs.mkdirSync(outputDir, { recursive: true }); + fs.mkdirSync(tempDownloadDir, { recursive: true }); + + onProgress(0, 'MinIO 桶列表获取中...'); + + try { + // 1. 列出所有桶 + const buckets = await this.minioClient.listBuckets(); + const backupBucket = this.configService.get('MINIO_BACKUP_BUCKET', 'snapshots'); + // 排除备份桶自身 + const bucketsToBackup = buckets.filter((b) => b.name !== backupBucket); + + if (bucketsToBackup.length === 0) { + throw new Error('没有可备份的 MinIO 桶'); + } + + this.logger.log(`发现 ${bucketsToBackup.length} 个桶待备份`); + + // 2. 下载所有桶的所有对象到临时目录 + let totalObjects = 0; + let downloadedObjects = 0; + + // 先统计对象总数 + for (const bucket of bucketsToBackup) { + const objects = await this.listAllObjects(bucket.name); + totalObjects += objects.length; + } + + onProgress(5, `共 ${totalObjects} 个对象待下载...`); + + for (const bucket of bucketsToBackup) { + const bucketDir = path.join(tempDownloadDir, bucket.name); + fs.mkdirSync(bucketDir, { recursive: true }); + + const objects = await this.listAllObjects(bucket.name); + + for (const obj of objects) { + const objectDir = path.join(bucketDir, path.dirname(obj.name)); + fs.mkdirSync(objectDir, { recursive: true }); + + const objectPath = path.join(bucketDir, obj.name); + const dataStream = await this.minioClient.getObject(bucket.name, obj.name); + const writeStream = fs.createWriteStream(objectPath); + + await new Promise((resolve, reject) => { + dataStream.pipe(writeStream); + writeStream.on('finish', resolve); + writeStream.on('error', reject); + dataStream.on('error', reject); + }); + + downloadedObjects++; + const percent = 5 + Math.floor((downloadedObjects / totalObjects) * 70); + onProgress(percent, `下载对象 ${downloadedObjects}/${totalObjects}: ${bucket.name}/${obj.name}`); + } + } + + onProgress(75, 'MinIO 对象下载完成,开始打包...'); + + // 3. 打包为 tar.gz + const result = await new Promise((resolve, reject) => { + const output = fs.createWriteStream(filePath); + const archive = archiver('tar', { gzip: true }); + + output.on('close', () => { + const fileSize = archive.pointer(); + onProgress(100, 'MinIO 备份完成'); + resolve({ fileName, filePath, fileSize }); + }); + + archive.on('error', reject); + archive.pipe(output); + archive.directory(tempDownloadDir, 'minio'); + archive.finalize(); + }); + + this.logger.log(`MinIO 备份完成: ${fileName}, 大小: ${result.fileSize} bytes`); + return result; + } finally { + // 清理临时下载目录 + fs.rmSync(tempDownloadDir, { recursive: true, force: true }); + } + } + + private async listAllObjects(bucket: string): Promise { + return new Promise((resolve, reject) => { + const objects: Minio.BucketItem[] = []; + const stream = this.minioClient.listObjects(bucket, '', true); + stream.on('data', (obj) => objects.push(obj)); + stream.on('end', () => resolve(objects)); + stream.on('error', reject); + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts new file mode 100644 index 00000000..b6772d3d --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/postgres-backup.handler.ts @@ -0,0 +1,92 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { spawn } from 'child_process'; +import * as fs from 'fs'; +import * as path from 'path'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class PostgresBackupHandler implements BackupHandler { + readonly target = BackupTarget.POSTGRES; + private readonly logger = new Logger(PostgresBackupHandler.name); + + private readonly host: string; + private readonly port: string; + private readonly user: string; + private readonly password: string; + + constructor(private readonly configService: ConfigService) { + this.host = this.configService.get('PG_HOST', 'localhost'); + this.port = this.configService.get('PG_PORT', '5432'); + this.user = this.configService.get('PG_USER', 'rwa_user'); + this.password = this.configService.get('PG_PASSWORD', ''); + } + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `postgres-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + onProgress(0, 'pg_basebackup 开始...'); + + return new Promise((resolve, reject) => { + const outputStream = fs.createWriteStream(filePath); + + const proc = spawn('pg_basebackup', [ + '-h', this.host, + '-p', this.port, + '-U', this.user, + '-D', '-', + '-Ft', + '-z', + '-P', + '-v', + ], { + env: { ...process.env, PGPASSWORD: this.password }, + }); + + proc.stdout.pipe(outputStream); + + let stderrBuffer = ''; + proc.stderr.on('data', (data: Buffer) => { + const text = data.toString(); + stderrBuffer += text; + + // pg_basebackup 进度格式: "12345/67890 kB (18%), 0/1 tablespace" + const match = text.match(/\((\d+)%\)/); + if (match) { + const percent = parseInt(match[1], 10); + onProgress(percent, `PostgreSQL 备份中 ${percent}%`); + } + }); + + proc.on('close', (code) => { + if (code === 0) { + const stat = fs.statSync(filePath); + this.logger.log(`PostgreSQL 备份完成: ${fileName}, 大小: ${stat.size} bytes`); + onProgress(100, 'PostgreSQL 备份完成'); + resolve({ fileName, filePath, fileSize: stat.size }); + } else { + const error = `pg_basebackup 退出码: ${code}, stderr: ${stderrBuffer.slice(-500)}`; + this.logger.error(error); + // 清理不完整的文件 + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + reject(new Error(error)); + } + }); + + proc.on('error', (err) => { + this.logger.error(`pg_basebackup 启动失败: ${err.message}`); + if (fs.existsSync(filePath)) fs.unlinkSync(filePath); + reject(new Error(`pg_basebackup 启动失败: ${err.message}`)); + }); + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts new file mode 100644 index 00000000..2c049edf --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/redis-backup.handler.ts @@ -0,0 +1,116 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as archiver from 'archiver'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class RedisBackupHandler implements BackupHandler { + readonly target = BackupTarget.REDIS; + private readonly logger = new Logger(RedisBackupHandler.name); + + private readonly redisHost: string; + private readonly redisPort: number; + private readonly redisPassword: string; + private readonly sourceDir = '/backup-source/redis'; + + constructor(private readonly configService: ConfigService) { + this.redisHost = this.configService.get('REDIS_HOST', 'localhost'); + this.redisPort = this.configService.get('REDIS_PORT', 6379); + this.redisPassword = this.configService.get('REDIS_PASSWORD', ''); + } + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `redis-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + onProgress(0, 'Redis BGSAVE 触发中...'); + + // 1. 连接 Redis 触发 BGSAVE + const redis = new Redis({ + host: this.redisHost, + port: this.redisPort, + password: this.redisPassword || undefined, + maxRetriesPerRequest: 3, + }); + + try { + const lastSaveBefore = await redis.lastsave(); + await redis.bgsave(); + this.logger.log('Redis BGSAVE 已触发'); + + // 2. 轮询等待 BGSAVE 完成 + onProgress(10, 'Redis BGSAVE 执行中...'); + let attempts = 0; + const maxAttempts = 120; // 最多等待 60 秒 + while (attempts < maxAttempts) { + await new Promise((r) => setTimeout(r, 500)); + const lastSaveAfter = await redis.lastsave(); + if (lastSaveAfter > lastSaveBefore) { + break; + } + attempts++; + onProgress(10 + Math.min(40, Math.floor((attempts / maxAttempts) * 40)), 'Redis BGSAVE 执行中...'); + } + + if (attempts >= maxAttempts) { + throw new Error('Redis BGSAVE 超时 (60秒)'); + } + + onProgress(50, 'Redis BGSAVE 完成,开始打包文件...'); + } finally { + redis.disconnect(); + } + + // 3. 从只读卷复制 dump.rdb(+ aof 如果存在) + return new Promise((resolve, reject) => { + const output = fs.createWriteStream(filePath); + const archive = archiver('tar', { gzip: true }); + + output.on('close', () => { + const fileSize = archive.pointer(); + this.logger.log(`Redis 备份完成: ${fileName}, 大小: ${fileSize} bytes`); + onProgress(100, 'Redis 备份完成'); + resolve({ fileName, filePath, fileSize }); + }); + + archive.on('error', (err) => { + this.logger.error(`Redis 备份打包失败: ${err.message}`); + reject(err); + }); + + archive.pipe(output); + + const dumpPath = path.join(this.sourceDir, 'dump.rdb'); + if (fs.existsSync(dumpPath)) { + archive.file(dumpPath, { name: 'dump.rdb' }); + onProgress(70, '打包 dump.rdb...'); + } + + const aofPath = path.join(this.sourceDir, 'appendonly.aof'); + if (fs.existsSync(aofPath)) { + archive.file(aofPath, { name: 'appendonly.aof' }); + onProgress(85, '打包 appendonly.aof...'); + } + + // AOF 目录(Redis 7 multi-part AOF) + const aofDir = path.join(this.sourceDir, 'appendonlydir'); + if (fs.existsSync(aofDir)) { + archive.directory(aofDir, 'appendonlydir'); + onProgress(85, '打包 appendonlydir/...'); + } + + archive.finalize(); + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/backup/uploads-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/uploads-backup.handler.ts new file mode 100644 index 00000000..e6e90c94 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/uploads-backup.handler.ts @@ -0,0 +1,82 @@ +import { Injectable, Logger } from '@nestjs/common'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as archiver from 'archiver'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class UploadsBackupHandler implements BackupHandler { + readonly target = BackupTarget.UPLOADS; + private readonly logger = new Logger(UploadsBackupHandler.name); + + private readonly sourceDir = '/backup-source/uploads'; + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `uploads-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + if (!fs.existsSync(this.sourceDir)) { + throw new Error(`Uploads 目录不存在: ${this.sourceDir}`); + } + + onProgress(0, 'Uploads 文件打包开始...'); + + // 计算总大小 + const totalSize = this.getDirSize(this.sourceDir); + this.logger.log(`Uploads 目录大小: ${totalSize} bytes`); + + return new Promise((resolve, reject) => { + const output = fs.createWriteStream(filePath); + const archive = archiver('tar', { gzip: true }); + + archive.on('progress', (progress) => { + if (totalSize > 0) { + const percent = Math.min(99, Math.floor((progress.fs.processedBytes / totalSize) * 100)); + onProgress(percent, `Uploads 打包中 ${percent}%`); + } + }); + + output.on('close', () => { + const fileSize = archive.pointer(); + this.logger.log(`Uploads 备份完成: ${fileName}, 大小: ${fileSize} bytes`); + onProgress(100, 'Uploads 备份完成'); + resolve({ fileName, filePath, fileSize }); + }); + + archive.on('error', (err) => { + this.logger.error(`Uploads 备份打包失败: ${err.message}`); + reject(err); + }); + + archive.pipe(output); + archive.directory(this.sourceDir, 'uploads'); + archive.finalize(); + }); + } + + private getDirSize(dirPath: string): number { + let size = 0; + try { + const entries = fs.readdirSync(dirPath, { withFileTypes: true }); + for (const entry of entries) { + const fullPath = path.join(dirPath, entry.name); + if (entry.isFile()) { + size += fs.statSync(fullPath).size; + } else if (entry.isDirectory()) { + size += this.getDirSize(fullPath); + } + } + } catch { + // 忽略权限错误 + } + return size; + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/backup/zookeeper-backup.handler.ts b/backend/services/snapshot-service/src/infrastructure/backup/zookeeper-backup.handler.ts new file mode 100644 index 00000000..5724a3e0 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/backup/zookeeper-backup.handler.ts @@ -0,0 +1,60 @@ +import { Injectable, Logger } from '@nestjs/common'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as archiver from 'archiver'; +import { BackupTarget } from '@/domain/enums'; +import { + BackupHandler, + BackupResult, + ProgressCallback, +} from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class ZookeeperBackupHandler implements BackupHandler { + readonly target = BackupTarget.ZOOKEEPER; + private readonly logger = new Logger(ZookeeperBackupHandler.name); + + private readonly dataDir = '/backup-source/zookeeper/data'; + private readonly logDir = '/backup-source/zookeeper/log'; + + async execute(outputDir: string, onProgress: ProgressCallback): Promise { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const fileName = `zookeeper-${timestamp}.tar.gz`; + const filePath = path.join(outputDir, fileName); + + fs.mkdirSync(outputDir, { recursive: true }); + + onProgress(0, 'ZooKeeper 数据打包开始...'); + + return new Promise((resolve, reject) => { + const output = fs.createWriteStream(filePath); + const archive = archiver('tar', { gzip: true }); + + output.on('close', () => { + const fileSize = archive.pointer(); + this.logger.log(`ZooKeeper 备份完成: ${fileName}, 大小: ${fileSize} bytes`); + onProgress(100, 'ZooKeeper 备份完成'); + resolve({ fileName, filePath, fileSize }); + }); + + archive.on('error', (err) => { + this.logger.error(`ZooKeeper 备份打包失败: ${err.message}`); + reject(err); + }); + + archive.pipe(output); + + if (fs.existsSync(this.dataDir)) { + archive.directory(this.dataDir, 'data'); + onProgress(30, '打包 ZooKeeper data/...'); + } + + if (fs.existsSync(this.logDir)) { + archive.directory(this.logDir, 'log'); + onProgress(60, '打包 ZooKeeper log/...'); + } + + archive.finalize(); + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts b/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts new file mode 100644 index 00000000..57acb441 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/infrastructure.module.ts @@ -0,0 +1,80 @@ +import { Module } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { PrismaService } from './persistence/prisma/prisma.service'; +import { SnapshotRepository } from './persistence/repositories/snapshot.repository'; +import { PostgresBackupHandler } from './backup/postgres-backup.handler'; +import { RedisBackupHandler } from './backup/redis-backup.handler'; +import { KafkaBackupHandler } from './backup/kafka-backup.handler'; +import { ZookeeperBackupHandler } from './backup/zookeeper-backup.handler'; +import { MinioBackupHandler } from './backup/minio-backup.handler'; +import { UploadsBackupHandler } from './backup/uploads-backup.handler'; +import { MinioStorageAdapter } from './storage/minio-storage.adapter'; +import { LocalStorageAdapter } from './storage/local-storage.adapter'; +import { SnapshotGateway } from '@/api/gateways/snapshot.gateway'; +import { BACKUP_HANDLER_TOKEN } from '@/domain/interfaces/backup-handler.interface'; +import { BackupTarget } from '@/domain/enums'; + +@Module({ + providers: [ + PrismaService, + SnapshotRepository, + MinioStorageAdapter, + LocalStorageAdapter, + SnapshotGateway, + // 各备份 handler + PostgresBackupHandler, + RedisBackupHandler, + KafkaBackupHandler, + ZookeeperBackupHandler, + MinioBackupHandler, + UploadsBackupHandler, + // 备份 handler Map 注入 + { + provide: BACKUP_HANDLER_TOKEN, + useFactory: ( + configService: ConfigService, + pg: PostgresBackupHandler, + redis: RedisBackupHandler, + kafka: KafkaBackupHandler, + zk: ZookeeperBackupHandler, + minio: MinioBackupHandler, + uploads: UploadsBackupHandler, + ) => { + const availableTargets = configService + .get('AVAILABLE_TARGETS', '') + .split(',') + .map((t) => t.trim()) + .filter(Boolean); + + const allHandlers = [pg, redis, kafka, zk, minio, uploads]; + const handlerMap = new Map(); + + for (const handler of allHandlers) { + if (availableTargets.includes(handler.target)) { + handlerMap.set(handler.target, handler); + } + } + + return handlerMap; + }, + inject: [ + ConfigService, + PostgresBackupHandler, + RedisBackupHandler, + KafkaBackupHandler, + ZookeeperBackupHandler, + MinioBackupHandler, + UploadsBackupHandler, + ], + }, + ], + exports: [ + PrismaService, + SnapshotRepository, + MinioStorageAdapter, + LocalStorageAdapter, + SnapshotGateway, + BACKUP_HANDLER_TOKEN, + ], +}) +export class InfrastructureModule {} diff --git a/backend/services/snapshot-service/src/infrastructure/persistence/prisma/prisma.service.ts b/backend/services/snapshot-service/src/infrastructure/persistence/prisma/prisma.service.ts new file mode 100644 index 00000000..9c325898 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/persistence/prisma/prisma.service.ts @@ -0,0 +1,16 @@ +import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; + +@Injectable() +export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PrismaService.name); + + async onModuleInit() { + await this.$connect(); + this.logger.log('SQLite database connected'); + } + + async onModuleDestroy() { + await this.$disconnect(); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts b/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts new file mode 100644 index 00000000..b1331104 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/persistence/repositories/snapshot.repository.ts @@ -0,0 +1,159 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { BackupTarget, SnapshotStatus } from '@/domain/enums'; +import { BackupResult } from '@/domain/interfaces/backup-handler.interface'; + +@Injectable() +export class SnapshotRepository { + private readonly logger = new Logger(SnapshotRepository.name); + + constructor(private readonly prisma: PrismaService) {} + + async createTask(data: { + storageType: string; + storagePath: string; + targets: BackupTarget[]; + }) { + return this.prisma.snapshotTask.create({ + data: { + storageType: data.storageType, + storagePath: data.storagePath, + targets: JSON.stringify(data.targets), + status: SnapshotStatus.PENDING, + details: { + create: data.targets.map((target) => ({ + target, + status: SnapshotStatus.PENDING, + })), + }, + }, + include: { details: true }, + }); + } + + async findById(id: string) { + return this.prisma.snapshotTask.findUnique({ + where: { id }, + include: { details: true }, + }); + } + + async findAll(page: number = 1, limit: number = 20) { + const skip = (page - 1) * limit; + const [tasks, total] = await Promise.all([ + this.prisma.snapshotTask.findMany({ + skip, + take: limit, + orderBy: { createdAt: 'desc' }, + include: { details: true }, + }), + this.prisma.snapshotTask.count(), + ]); + return { tasks, total, page, limit }; + } + + async updateTaskStatus(id: string, status: SnapshotStatus, error?: string) { + const data: Record = { status }; + if (status === SnapshotStatus.RUNNING) { + data.startedAt = new Date(); + } + if (status === SnapshotStatus.COMPLETED || status === SnapshotStatus.FAILED) { + data.completedAt = new Date(); + } + if (error) { + data.error = error; + } + return this.prisma.snapshotTask.update({ where: { id }, data }); + } + + async updateDetailStatus(taskId: string, target: string, status: SnapshotStatus) { + const detail = await this.prisma.snapshotDetail.findFirst({ + where: { taskId, target }, + }); + if (!detail) return; + + const data: Record = { status }; + if (status === SnapshotStatus.RUNNING) { + data.startedAt = new Date(); + } + return this.prisma.snapshotDetail.update({ where: { id: detail.id }, data }); + } + + async updateDetailProgress(taskId: string, target: string, progress: number) { + const detail = await this.prisma.snapshotDetail.findFirst({ + where: { taskId, target }, + }); + if (!detail) return; + return this.prisma.snapshotDetail.update({ + where: { id: detail.id }, + data: { progress }, + }); + } + + async completeDetail(taskId: string, target: string, result: BackupResult) { + const detail = await this.prisma.snapshotDetail.findFirst({ + where: { taskId, target }, + }); + if (!detail) return; + return this.prisma.snapshotDetail.update({ + where: { id: detail.id }, + data: { + status: SnapshotStatus.COMPLETED, + progress: 100, + fileSize: BigInt(result.fileSize), + fileName: result.fileName, + completedAt: new Date(), + }, + }); + } + + async failDetail(taskId: string, target: string, error: string) { + const detail = await this.prisma.snapshotDetail.findFirst({ + where: { taskId, target }, + }); + if (!detail) return; + return this.prisma.snapshotDetail.update({ + where: { id: detail.id }, + data: { + status: SnapshotStatus.FAILED, + error, + completedAt: new Date(), + }, + }); + } + + async completeTask(id: string) { + const details = await this.prisma.snapshotDetail.findMany({ + where: { taskId: id }, + }); + + const totalSize = details.reduce((sum, d) => sum + d.fileSize, BigInt(0)); + const hasFailure = details.some((d) => d.status === SnapshotStatus.FAILED); + const allFailed = details.every((d) => d.status === SnapshotStatus.FAILED); + + return this.prisma.snapshotTask.update({ + where: { id }, + data: { + status: allFailed ? SnapshotStatus.FAILED : SnapshotStatus.COMPLETED, + totalSize, + completedAt: new Date(), + error: hasFailure ? '部分备份目标失败,请查看详情' : null, + }, + }); + } + + async deleteTask(id: string) { + return this.prisma.snapshotTask.delete({ where: { id } }); + } + + async findExpiredLocalTasks(retentionHours: number) { + const threshold = new Date(Date.now() - retentionHours * 60 * 60 * 1000); + return this.prisma.snapshotTask.findMany({ + where: { + storageType: 'LOCAL', + createdAt: { lt: threshold }, + }, + include: { details: true }, + }); + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/storage/local-storage.adapter.ts b/backend/services/snapshot-service/src/infrastructure/storage/local-storage.adapter.ts new file mode 100644 index 00000000..6b56a55c --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/storage/local-storage.adapter.ts @@ -0,0 +1,61 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as fs from 'fs'; +import * as path from 'path'; + +@Injectable() +export class LocalStorageAdapter { + private readonly logger = new Logger(LocalStorageAdapter.name); + private readonly tempDir: string; + + constructor(private readonly configService: ConfigService) { + this.tempDir = this.configService.get('SNAPSHOT_TEMP_DIR', './data/snapshots'); + fs.mkdirSync(this.tempDir, { recursive: true }); + this.logger.log(`Local storage adapter configured: ${this.tempDir}`); + } + + getTaskDir(taskId: string): string { + const taskDir = path.join(this.tempDir, taskId); + fs.mkdirSync(taskDir, { recursive: true }); + return taskDir; + } + + getFilePath(taskId: string, fileName: string): string { + return path.join(this.tempDir, taskId, fileName); + } + + fileExists(taskId: string, fileName: string): boolean { + return fs.existsSync(this.getFilePath(taskId, fileName)); + } + + deleteTask(taskId: string): void { + const taskDir = path.join(this.tempDir, taskId); + if (fs.existsSync(taskDir)) { + fs.rmSync(taskDir, { recursive: true, force: true }); + this.logger.log(`Deleted local snapshot: ${taskDir}`); + } + } + + cleanupExpired(retentionHours: number): string[] { + const threshold = Date.now() - retentionHours * 60 * 60 * 1000; + const deletedIds: string[] = []; + + if (!fs.existsSync(this.tempDir)) return deletedIds; + + const entries = fs.readdirSync(this.tempDir, { withFileTypes: true }); + for (const entry of entries) { + if (!entry.isDirectory()) continue; + + const dirPath = path.join(this.tempDir, entry.name); + const stat = fs.statSync(dirPath); + + if (stat.mtimeMs < threshold) { + fs.rmSync(dirPath, { recursive: true, force: true }); + deletedIds.push(entry.name); + this.logger.log(`Cleaned up expired snapshot: ${entry.name}`); + } + } + + return deletedIds; + } +} diff --git a/backend/services/snapshot-service/src/infrastructure/storage/minio-storage.adapter.ts b/backend/services/snapshot-service/src/infrastructure/storage/minio-storage.adapter.ts new file mode 100644 index 00000000..0ee94812 --- /dev/null +++ b/backend/services/snapshot-service/src/infrastructure/storage/minio-storage.adapter.ts @@ -0,0 +1,88 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as Minio from 'minio'; +import * as fs from 'fs'; + +@Injectable() +export class MinioStorageAdapter implements OnModuleInit { + private readonly logger = new Logger(MinioStorageAdapter.name); + private readonly minioClient: Minio.Client; + private readonly bucketName: string; + + constructor(private readonly configService: ConfigService) { + const endpoint = this.configService.get('MINIO_ENDPOINT', 'localhost'); + const port = parseInt(this.configService.get('MINIO_PORT', '9000'), 10); + const useSSL = this.configService.get('MINIO_USE_SSL', 'false') === 'true'; + const accessKey = this.configService.get('MINIO_ACCESS_KEY', 'admin'); + const secretKey = this.configService.get('MINIO_SECRET_KEY', ''); + + this.bucketName = this.configService.get('MINIO_BACKUP_BUCKET', 'snapshots'); + + this.minioClient = new Minio.Client({ + endPoint: endpoint, + port, + useSSL, + accessKey, + secretKey, + }); + + this.logger.log(`MinIO storage adapter configured: ${endpoint}:${port}, bucket: ${this.bucketName}`); + } + + async onModuleInit() { + await this.ensureBucketExists(); + } + + private async ensureBucketExists(): Promise { + try { + const exists = await this.minioClient.bucketExists(this.bucketName); + if (!exists) { + await this.minioClient.makeBucket(this.bucketName, 'cn-east-1'); + this.logger.log(`Created backup bucket: ${this.bucketName}`); + } else { + this.logger.log(`Backup bucket exists: ${this.bucketName}`); + } + } catch (error) { + this.logger.error(`Failed to ensure backup bucket exists: ${error.message}`); + } + } + + async upload(localFilePath: string, objectPath: string): Promise { + const stat = fs.statSync(localFilePath); + + await this.minioClient.fPutObject( + this.bucketName, + objectPath, + localFilePath, + { 'Content-Type': 'application/gzip' }, + ); + + this.logger.log(`Uploaded to MinIO: ${this.bucketName}/${objectPath}, size: ${stat.size} bytes`); + return `${this.bucketName}/${objectPath}`; + } + + async delete(objectPath: string): Promise { + try { + await this.minioClient.removeObject(this.bucketName, objectPath); + this.logger.log(`Deleted from MinIO: ${this.bucketName}/${objectPath}`); + } catch (error) { + this.logger.error(`Failed to delete from MinIO: ${error.message}`); + } + } + + async deleteByPrefix(prefix: string): Promise { + const objects: string[] = []; + const stream = this.minioClient.listObjects(this.bucketName, prefix, true); + + await new Promise((resolve, reject) => { + stream.on('data', (obj) => objects.push(obj.name)); + stream.on('end', resolve); + stream.on('error', reject); + }); + + if (objects.length > 0) { + await this.minioClient.removeObjects(this.bucketName, objects); + this.logger.log(`Deleted ${objects.length} objects from MinIO with prefix: ${prefix}`); + } + } +} diff --git a/backend/services/snapshot-service/src/main.ts b/backend/services/snapshot-service/src/main.ts new file mode 100644 index 00000000..e2c35ece --- /dev/null +++ b/backend/services/snapshot-service/src/main.ts @@ -0,0 +1,49 @@ +import { NestFactory } from '@nestjs/core'; +import { ValidationPipe, Logger } from '@nestjs/common'; +import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { AppModule } from './app.module'; + +async function bootstrap() { + const logger = new Logger('Bootstrap'); + const app = await NestFactory.create(AppModule); + + // 全局前缀 + app.setGlobalPrefix('api/v1'); + + // 全局验证管道 + app.useGlobalPipes( + new ValidationPipe({ + whitelist: true, + forbidNonWhitelisted: true, + transform: true, + transformOptions: { enableImplicitConversion: true }, + }), + ); + + // CORS 配置 + app.enableCors({ + origin: '*', + methods: 'GET,HEAD,PUT,PATCH,POST,DELETE', + credentials: true, + }); + + // Swagger API 文档 + const config = new DocumentBuilder() + .setTitle('Snapshot Service API') + .setDescription('RWA 榴莲皇后平台数据快照备份服务 API') + .setVersion('1.0.0') + .addTag('快照备份', '数据备份相关接口') + .addTag('健康检查', '服务健康检查接口') + .build(); + + const document = SwaggerModule.createDocument(app, config); + SwaggerModule.setup('api/docs', app, document); + + const port = process.env.APP_PORT || 3099; + await app.listen(port); + + logger.log(`Snapshot Service is running on port ${port}`); + logger.log(`Swagger docs: http://localhost:${port}/api/docs`); +} + +bootstrap(); diff --git a/backend/services/snapshot-service/tsconfig.json b/backend/services/snapshot-service/tsconfig.json new file mode 100644 index 00000000..bd3c3946 --- /dev/null +++ b/backend/services/snapshot-service/tsconfig.json @@ -0,0 +1,24 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2021", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": true, + "noImplicitAny": true, + "strictBindCallApply": true, + "forceConsistentCasingInFileNames": true, + "noFallthroughCasesInSwitch": true, + "paths": { + "@/*": ["src/*"] + } + } +} diff --git a/frontend/admin-web/package.json b/frontend/admin-web/package.json index 0de95462..6998d710 100644 --- a/frontend/admin-web/package.json +++ b/frontend/admin-web/package.json @@ -28,6 +28,7 @@ "react-redux": "^9.2.0", "recharts": "^2.15.0", "redux-persist": "^6.0.0", + "socket.io-client": "^4.7.4", "xlsx": "^0.18.5", "zustand": "^5.0.3" }, diff --git a/frontend/admin-web/src/app/(dashboard)/snapshots/page.module.scss b/frontend/admin-web/src/app/(dashboard)/snapshots/page.module.scss new file mode 100644 index 00000000..1efec265 --- /dev/null +++ b/frontend/admin-web/src/app/(dashboard)/snapshots/page.module.scss @@ -0,0 +1,290 @@ +.container { + max-width: 1200px; + margin: 0 auto; +} + +.title { + font-size: 1.5rem; + font-weight: 600; + margin-bottom: 1.5rem; + color: #1a1a2e; +} + +.card { + background: #fff; + border-radius: 12px; + padding: 1.5rem; + margin-bottom: 1.5rem; + box-shadow: 0 1px 3px rgba(0, 0, 0, 0.08); +} + +.cardTitle { + font-size: 1.1rem; + font-weight: 600; + margin-bottom: 1rem; + display: flex; + align-items: center; + gap: 0.75rem; +} + +.wsStatus { + font-size: 0.75rem; + font-weight: 400; + color: #52c41a; + background: #f6ffed; + padding: 2px 8px; + border-radius: 4px; +} + +.formGroup { + margin-bottom: 1rem; +} + +.label { + display: flex; + align-items: center; + gap: 0.5rem; + font-weight: 500; + margin-bottom: 0.5rem; + font-size: 0.9rem; + color: #333; +} + +.radioGroup { + display: flex; + gap: 1.5rem; +} + +.radio { + display: flex; + align-items: center; + gap: 0.35rem; + cursor: pointer; + font-size: 0.9rem; + + input { + cursor: pointer; + } +} + +.checkboxGroup { + display: flex; + flex-wrap: wrap; + gap: 1rem; +} + +.checkbox { + display: flex; + align-items: center; + gap: 0.35rem; + cursor: pointer; + font-size: 0.9rem; + background: #f5f5f5; + padding: 6px 12px; + border-radius: 6px; + transition: background 0.2s; + + &:hover { + background: #e8e8e8; + } + + input { + cursor: pointer; + } +} + +.selectAllBtn { + background: none; + border: none; + color: #1890ff; + cursor: pointer; + font-size: 0.8rem; + padding: 0; + + &:hover { + text-decoration: underline; + } + + &:disabled { + color: #ccc; + cursor: not-allowed; + } +} + +.input { + width: 100%; + max-width: 400px; + padding: 8px 12px; + border: 1px solid #d9d9d9; + border-radius: 6px; + font-size: 0.9rem; + outline: none; + transition: border-color 0.2s; + + &:focus { + border-color: #1890ff; + } + + &:disabled { + background: #f5f5f5; + cursor: not-allowed; + } +} + +.startBtn { + background: #1890ff; + color: #fff; + border: none; + padding: 10px 32px; + border-radius: 6px; + font-size: 0.95rem; + font-weight: 500; + cursor: pointer; + transition: background 0.2s; + + &:hover:not(:disabled) { + background: #40a9ff; + } + + &:disabled { + background: #d9d9d9; + cursor: not-allowed; + } +} + +.progressList { + display: flex; + flex-direction: column; + gap: 1rem; +} + +.progressItem { + background: #fafafa; + padding: 0.75rem 1rem; + border-radius: 8px; +} + +.progressHeader { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 6px; + font-size: 0.9rem; +} + +.progressTarget { + font-weight: 500; +} + +.progressBar { + height: 8px; + background: #f0f0f0; + border-radius: 4px; + overflow: hidden; +} + +.progressFill { + height: 100%; + border-radius: 4px; + transition: width 0.3s ease; +} + +.progressMsg { + font-size: 0.8rem; + color: #888; + margin-top: 4px; +} + +.completedInfo { + margin-top: 1rem; + padding: 0.75rem 1rem; + background: #f6ffed; + border: 1px solid #b7eb8f; + border-radius: 6px; + color: #52c41a; + font-weight: 500; +} + +.tableWrapper { + overflow-x: auto; +} + +.table { + width: 100%; + border-collapse: collapse; + font-size: 0.9rem; + + th, + td { + padding: 10px 12px; + text-align: left; + border-bottom: 1px solid #f0f0f0; + } + + th { + font-weight: 600; + color: #666; + background: #fafafa; + } + + tr:hover td { + background: #fafafa; + } +} + +.empty { + text-align: center !important; + color: #999; + padding: 2rem 0 !important; +} + +.downloadLink { + color: #1890ff; + text-decoration: none; + margin-right: 8px; + font-size: 0.85rem; + + &:hover { + text-decoration: underline; + } +} + +.deleteBtn { + background: none; + border: none; + color: #ff4d4f; + cursor: pointer; + font-size: 0.85rem; + padding: 0; + + &:hover { + text-decoration: underline; + } +} + +.pagination { + display: flex; + justify-content: center; + align-items: center; + gap: 1rem; + margin-top: 1rem; + font-size: 0.9rem; + + button { + background: #fff; + border: 1px solid #d9d9d9; + padding: 4px 12px; + border-radius: 4px; + cursor: pointer; + + &:hover:not(:disabled) { + border-color: #1890ff; + color: #1890ff; + } + + &:disabled { + color: #ccc; + cursor: not-allowed; + } + } +} diff --git a/frontend/admin-web/src/app/(dashboard)/snapshots/page.tsx b/frontend/admin-web/src/app/(dashboard)/snapshots/page.tsx new file mode 100644 index 00000000..a05a2438 --- /dev/null +++ b/frontend/admin-web/src/app/(dashboard)/snapshots/page.tsx @@ -0,0 +1,356 @@ +'use client'; + +import { useState, useEffect, useCallback } from 'react'; +import { snapshotApi } from '@/infrastructure/api/snapshot.api'; +import { useSnapshotWebSocket } from '@/hooks/useSnapshotWebSocket'; +import type { + BackupTarget, + StorageType, + SnapshotTask, + CreateSnapshotDto, + SnapshotStatus, +} from '@/types/snapshot.types'; +import { BACKUP_TARGET_LABELS } from '@/types/snapshot.types'; +import styles from './page.module.scss'; + +function formatBytes(bytes: string | number): string { + const b = typeof bytes === 'string' ? parseInt(bytes, 10) : bytes; + if (b === 0) return '0 B'; + const units = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(b) / Math.log(1024)); + return `${(b / Math.pow(1024, i)).toFixed(2)} ${units[i]}`; +} + +function formatDuration(ms: number): string { + const seconds = Math.floor(ms / 1000); + const m = Math.floor(seconds / 60); + const s = seconds % 60; + return m > 0 ? `${m}分${s}秒` : `${s}秒`; +} + +function formatTime(iso: string | null): string { + if (!iso) return '-'; + return new Date(iso).toLocaleString('zh-CN'); +} + +function statusLabel(status: SnapshotStatus): string { + const map: Record = { + PENDING: '等待中', + RUNNING: '执行中', + COMPLETED: '已完成', + FAILED: '失败', + }; + return map[status] || status; +} + +function statusColor(status: SnapshotStatus): string { + const map: Record = { + PENDING: '#999', + RUNNING: '#1890ff', + COMPLETED: '#52c41a', + FAILED: '#ff4d4f', + }; + return map[status] || '#999'; +} + +export default function SnapshotsPage() { + // 可用目标 + const [availableTargets, setAvailableTargets] = useState([]); + const [isServiceRunning, setIsServiceRunning] = useState(false); + + // 表单状态 + const [storageType, setStorageType] = useState('MINIO'); + const [storagePath, setStoragePath] = useState(''); + const [selectedTargets, setSelectedTargets] = useState([]); + const [isSubmitting, setIsSubmitting] = useState(false); + + // 当前执行中的任务 + const [activeTaskId, setActiveTaskId] = useState(null); + const { progresses, isConnected, taskCompleted, totalSize, duration } = + useSnapshotWebSocket(activeTaskId); + + // 历史列表 + const [tasks, setTasks] = useState([]); + const [total, setTotal] = useState(0); + const [page, setPage] = useState(1); + + // 加载可用目标 + useEffect(() => { + snapshotApi + .getTargets() + .then((res) => { + setAvailableTargets(res.targets as BackupTarget[]); + setSelectedTargets(res.targets as BackupTarget[]); + setIsServiceRunning(res.isRunning); + }) + .catch(console.error); + }, []); + + // 加载历史列表 + const loadHistory = useCallback(() => { + snapshotApi + .list(page, 10) + .then((res) => { + setTasks(res.tasks); + setTotal(res.total); + }) + .catch(console.error); + }, [page]); + + useEffect(() => { + loadHistory(); + }, [loadHistory]); + + // 任务完成后刷新列表 + useEffect(() => { + if (taskCompleted) { + loadHistory(); + setIsServiceRunning(false); + } + }, [taskCompleted, loadHistory]); + + // 默认 MinIO 路径 + useEffect(() => { + if (storageType === 'MINIO' && !storagePath) { + const date = new Date().toISOString().split('T')[0]; + setStoragePath(`snapshots/${date}`); + } + }, [storageType, storagePath]); + + // 开始备份 + const handleStart = async () => { + if (selectedTargets.length === 0) return; + setIsSubmitting(true); + + try { + const dto: CreateSnapshotDto = { + storageType, + storagePath: storageType === 'LOCAL' ? 'local' : storagePath, + targets: selectedTargets, + }; + const res = await snapshotApi.create(dto); + setActiveTaskId(res.taskId); + setIsServiceRunning(true); + } catch (err: any) { + alert(err.message || '创建备份任务失败'); + } finally { + setIsSubmitting(false); + } + }; + + // 切换目标选择 + const toggleTarget = (target: BackupTarget) => { + setSelectedTargets((prev) => + prev.includes(target) ? prev.filter((t) => t !== target) : [...prev, target], + ); + }; + + // 全选/取消全选 + const toggleAll = () => { + if (selectedTargets.length === availableTargets.length) { + setSelectedTargets([]); + } else { + setSelectedTargets([...availableTargets]); + } + }; + + // 删除备份 + const handleDelete = async (id: string) => { + if (!confirm('确定删除该备份?文件将被永久删除。')) return; + try { + await snapshotApi.delete(id); + loadHistory(); + } catch (err: any) { + alert(err.message || '删除失败'); + } + }; + + return ( +
+

数据快照

+ + {/* 创建备份表单 */} +
+

创建备份

+ +
+ +
+ + +
+
+ + {storageType === 'MINIO' && ( +
+ + setStoragePath(e.target.value)} + placeholder="snapshots/2026-02-23" + disabled={isServiceRunning} + /> +
+ )} + +
+ +
+ {availableTargets.map((target) => ( + + ))} +
+
+ + +
+ + {/* 实时进度 */} + {activeTaskId && progresses.size > 0 && ( +
+

+ 备份进度 + {isConnected && WebSocket 已连接} +

+ +
+ {Array.from(progresses.values()).map((p) => ( +
+
+ + {BACKUP_TARGET_LABELS[p.target] || p.target} + + + {p.status === 'completed' ? '100%' : p.status === 'failed' ? '失败' : `${p.percent}%`} + +
+
+
+
+
{p.message}
+
+ ))} +
+ + {taskCompleted && ( +
+ 备份完成!总大小: {formatBytes(totalSize || '0')},耗时: {formatDuration(duration || 0)} +
+ )} +
+ )} + + {/* 历史备份列表 */} +
+

历史备份

+ +
+ + + + + + + + + + + + + {tasks.length === 0 && ( + + + + )} + {tasks.map((task) => ( + + + + + + + + + ))} + +
时间备份目标大小存储方式状态操作
暂无备份记录
{formatTime(task.createdAt)}{task.targets.map((t) => BACKUP_TARGET_LABELS[t] || t).join(', ')}{formatBytes(task.totalSize)}{task.storageType === 'MINIO' ? 'MinIO' : '本地'}{statusLabel(task.status)} + {task.storageType === 'LOCAL' && task.status === 'COMPLETED' && ( + <> + {task.details + .filter((d) => d.fileName) + .map((d) => ( + + {BACKUP_TARGET_LABELS[d.target]} + + ))} + + )} + +
+
+ + {total > 10 && ( +
+ + 第 {page} 页 / 共 {Math.ceil(total / 10)} 页 + +
+ )} +
+
+ ); +} diff --git a/frontend/admin-web/src/components/layout/Sidebar/Sidebar.tsx b/frontend/admin-web/src/components/layout/Sidebar/Sidebar.tsx index 05ab1bf2..8d2ac766 100644 --- a/frontend/admin-web/src/components/layout/Sidebar/Sidebar.tsx +++ b/frontend/admin-web/src/components/layout/Sidebar/Sidebar.tsx @@ -39,6 +39,8 @@ const topMenuItems: MenuItem[] = [ { key: 'pre-planting', icon: '/images/Container3.svg', label: '预种管理', path: '/pre-planting' }, // [2026-02-19] 纯新增:树转让管理 { key: 'transfers', icon: '/images/Container5.svg', label: '转让管理', path: '/transfers' }, + // [2026-02-23] 纯新增:数据快照备份 + { key: 'snapshots', icon: '/images/Container6.svg', label: '数据快照', path: '/snapshots' }, { key: 'maintenance', icon: '/images/Container6.svg', label: '系统维护', path: '/maintenance' }, { key: 'settings', icon: '/images/Container6.svg', label: '系统设置', path: '/settings' }, ]; diff --git a/frontend/admin-web/src/hooks/useSnapshotWebSocket.ts b/frontend/admin-web/src/hooks/useSnapshotWebSocket.ts new file mode 100644 index 00000000..aad2947c --- /dev/null +++ b/frontend/admin-web/src/hooks/useSnapshotWebSocket.ts @@ -0,0 +1,117 @@ +'use client'; + +import { useEffect, useState, useRef, useCallback } from 'react'; +import { io, Socket } from 'socket.io-client'; +import type { BackupTarget, SnapshotProgress } from '@/types/snapshot.types'; + +const SNAPSHOT_WS_URL = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3099'; + +interface TargetProgress { + target: BackupTarget; + percent: number; + message: string; + status: 'pending' | 'running' | 'completed' | 'failed'; +} + +interface UseSnapshotWebSocketReturn { + progresses: Map; + isConnected: boolean; + taskCompleted: boolean; + totalSize: string | null; + duration: number | null; + error: string | null; +} + +export function useSnapshotWebSocket(taskId: string | null): UseSnapshotWebSocketReturn { + const [progresses, setProgresses] = useState>(new Map()); + const [isConnected, setIsConnected] = useState(false); + const [taskCompleted, setTaskCompleted] = useState(false); + const [totalSize, setTotalSize] = useState(null); + const [duration, setDuration] = useState(null); + const [error, setError] = useState(null); + const socketRef = useRef(null); + + useEffect(() => { + if (!taskId) return; + + // 重置状态 + setProgresses(new Map()); + setTaskCompleted(false); + setTotalSize(null); + setDuration(null); + setError(null); + + const socket = io(`${SNAPSHOT_WS_URL}/snapshots`, { + transports: ['websocket', 'polling'], + }); + socketRef.current = socket; + + socket.on('connect', () => setIsConnected(true)); + socket.on('disconnect', () => setIsConnected(false)); + + socket.on('snapshot:started', (data: { taskId: string; targets: BackupTarget[] }) => { + if (data.taskId !== taskId) return; + const initial = new Map(); + data.targets.forEach((t) => { + initial.set(t, { target: t, percent: 0, message: '等待中', status: 'pending' }); + }); + setProgresses(initial); + }); + + socket.on('snapshot:progress', (data: SnapshotProgress) => { + if (data.taskId !== taskId) return; + setProgresses((prev) => { + const next = new Map(prev); + next.set(data.target, { + target: data.target, + percent: data.percent, + message: data.message, + status: 'running', + }); + return next; + }); + }); + + socket.on('snapshot:target-complete', (data: { taskId: string; target: string; fileSize: string }) => { + if (data.taskId !== taskId) return; + setProgresses((prev) => { + const next = new Map(prev); + const existing = next.get(data.target); + if (existing) { + next.set(data.target, { ...existing, percent: 100, message: '完成', status: 'completed' }); + } + return next; + }); + }); + + socket.on('snapshot:complete', (data: { taskId: string; totalSize: string; duration: number }) => { + if (data.taskId !== taskId) return; + setTaskCompleted(true); + setTotalSize(data.totalSize); + setDuration(data.duration); + }); + + socket.on('snapshot:error', (data: { taskId: string; target?: string; error: string }) => { + if (data.taskId !== taskId) return; + if (data.target) { + setProgresses((prev) => { + const next = new Map(prev); + const existing = next.get(data.target!); + if (existing) { + next.set(data.target!, { ...existing, message: data.error, status: 'failed' }); + } + return next; + }); + } else { + setError(data.error); + } + }); + + return () => { + socket.disconnect(); + socketRef.current = null; + }; + }, [taskId]); + + return { progresses, isConnected, taskCompleted, totalSize, duration, error }; +} diff --git a/frontend/admin-web/src/infrastructure/api/snapshot.api.ts b/frontend/admin-web/src/infrastructure/api/snapshot.api.ts new file mode 100644 index 00000000..4ed7403b --- /dev/null +++ b/frontend/admin-web/src/infrastructure/api/snapshot.api.ts @@ -0,0 +1,44 @@ +import type { CreateSnapshotDto, SnapshotTask } from '@/types/snapshot.types'; + +const SNAPSHOT_BASE = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3099'; + +/** snapshot-service 独立请求(不走通用 apiClient,因为 snapshot 服务有独立端口) */ +async function snapshotFetch(url: string, options?: RequestInit): Promise { + const token = typeof window !== 'undefined' ? localStorage.getItem('token') : null; + const res = await fetch(`${SNAPSHOT_BASE}/api/v1${url}`, { + headers: { + 'Content-Type': 'application/json', + ...(token ? { Authorization: `Bearer ${token}` } : {}), + }, + ...options, + }); + if (!res.ok) { + const err = await res.json().catch(() => ({ message: res.statusText })); + throw new Error(err.message || `请求失败: ${res.status}`); + } + return res.json(); +} + +export const snapshotApi = { + getTargets: () => + snapshotFetch<{ targets: string[]; isRunning: boolean }>('/snapshots/targets'), + + create: (data: CreateSnapshotDto) => + snapshotFetch<{ taskId: string; message: string }>('/snapshots', { + method: 'POST', + body: JSON.stringify(data), + }), + + list: (page = 1, limit = 20) => + snapshotFetch<{ tasks: SnapshotTask[]; total: number; page: number; limit: number }>( + `/snapshots?page=${page}&limit=${limit}`, + ), + + getById: (id: string) => snapshotFetch(`/snapshots/${id}`), + + delete: (id: string) => + snapshotFetch<{ message: string }>(`/snapshots/${id}`, { method: 'DELETE' }), + + getDownloadUrl: (id: string, target: string) => + `${SNAPSHOT_BASE}/api/v1/snapshots/${id}/download/${target}`, +}; diff --git a/frontend/admin-web/src/types/snapshot.types.ts b/frontend/admin-web/src/types/snapshot.types.ts new file mode 100644 index 00000000..fe3a0a6f --- /dev/null +++ b/frontend/admin-web/src/types/snapshot.types.ts @@ -0,0 +1,51 @@ +export type BackupTarget = 'POSTGRES' | 'REDIS' | 'KAFKA' | 'ZOOKEEPER' | 'MINIO' | 'UPLOADS'; +export type StorageType = 'MINIO' | 'LOCAL'; +export type SnapshotStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED'; + +export const BACKUP_TARGET_LABELS: Record = { + POSTGRES: 'PostgreSQL', + REDIS: 'Redis', + KAFKA: 'Kafka', + ZOOKEEPER: 'ZooKeeper', + MINIO: 'MinIO', + UPLOADS: 'Uploads', +}; + +export interface SnapshotDetail { + id: string; + target: BackupTarget; + status: SnapshotStatus; + progress: number; + fileSize: string; + fileName: string | null; + error: string | null; + startedAt: string | null; + completedAt: string | null; +} + +export interface SnapshotTask { + id: string; + status: SnapshotStatus; + storageType: StorageType; + storagePath: string; + targets: BackupTarget[]; + totalSize: string; + error: string | null; + startedAt: string | null; + completedAt: string | null; + createdAt: string; + details: SnapshotDetail[]; +} + +export interface CreateSnapshotDto { + storageType: StorageType; + storagePath: string; + targets: BackupTarget[]; +} + +export interface SnapshotProgress { + taskId: string; + target: BackupTarget; + percent: number; + message: string; +} diff --git a/frontend/mining-admin-web/package.json b/frontend/mining-admin-web/package.json index fd2c5901..6903fe2f 100644 --- a/frontend/mining-admin-web/package.json +++ b/frontend/mining-admin-web/package.json @@ -39,6 +39,7 @@ "react-dom": "^18.2.0", "react-hook-form": "^7.50.1", "react-redux": "^9.1.0", + "socket.io-client": "^4.7.4", "tailwind-merge": "^2.2.1", "tailwindcss-animate": "^1.0.7", "zod": "^3.22.4", diff --git a/frontend/mining-admin-web/src/app/(dashboard)/snapshots/page.tsx b/frontend/mining-admin-web/src/app/(dashboard)/snapshots/page.tsx new file mode 100644 index 00000000..d567e92b --- /dev/null +++ b/frontend/mining-admin-web/src/app/(dashboard)/snapshots/page.tsx @@ -0,0 +1,359 @@ +'use client'; + +import { useState, useEffect, useCallback } from 'react'; +import { snapshotApi } from '@/lib/api/snapshot.api'; +import { useSnapshotWebSocket } from '@/lib/hooks/useSnapshotWebSocket'; +import type { + BackupTarget, + StorageType, + SnapshotTask, + CreateSnapshotDto, + SnapshotStatus, +} from '@/types/snapshot.types'; +import { BACKUP_TARGET_LABELS } from '@/types/snapshot.types'; + +function formatBytes(bytes: string | number): string { + const b = typeof bytes === 'string' ? parseInt(bytes, 10) : bytes; + if (b === 0) return '0 B'; + const units = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(b) / Math.log(1024)); + return `${(b / Math.pow(1024, i)).toFixed(2)} ${units[i]}`; +} + +function formatDuration(ms: number): string { + const seconds = Math.floor(ms / 1000); + const m = Math.floor(seconds / 60); + const s = seconds % 60; + return m > 0 ? `${m}分${s}秒` : `${s}秒`; +} + +function formatTime(iso: string | null): string { + if (!iso) return '-'; + return new Date(iso).toLocaleString('zh-CN'); +} + +const STATUS_MAP: Record = { + PENDING: { label: '等待中', color: 'text-muted-foreground' }, + RUNNING: { label: '执行中', color: 'text-blue-500' }, + COMPLETED: { label: '已完成', color: 'text-green-500' }, + FAILED: { label: '失败', color: 'text-red-500' }, +}; + +export default function SnapshotsPage() { + const [availableTargets, setAvailableTargets] = useState([]); + const [isServiceRunning, setIsServiceRunning] = useState(false); + const [storageType, setStorageType] = useState('MINIO'); + const [storagePath, setStoragePath] = useState(''); + const [selectedTargets, setSelectedTargets] = useState([]); + const [isSubmitting, setIsSubmitting] = useState(false); + const [activeTaskId, setActiveTaskId] = useState(null); + const { progresses, isConnected, taskCompleted, totalSize, duration } = + useSnapshotWebSocket(activeTaskId); + const [tasks, setTasks] = useState([]); + const [total, setTotal] = useState(0); + const [page, setPage] = useState(1); + + useEffect(() => { + snapshotApi + .getTargets() + .then((res) => { + setAvailableTargets(res.targets as BackupTarget[]); + setSelectedTargets(res.targets as BackupTarget[]); + setIsServiceRunning(res.isRunning); + }) + .catch(console.error); + }, []); + + const loadHistory = useCallback(() => { + snapshotApi + .list(page, 10) + .then((res) => { + setTasks(res.tasks); + setTotal(res.total); + }) + .catch(console.error); + }, [page]); + + useEffect(() => { loadHistory(); }, [loadHistory]); + + useEffect(() => { + if (taskCompleted) { + loadHistory(); + setIsServiceRunning(false); + } + }, [taskCompleted, loadHistory]); + + useEffect(() => { + if (storageType === 'MINIO' && !storagePath) { + const date = new Date().toISOString().split('T')[0]; + setStoragePath(`snapshots/${date}`); + } + }, [storageType, storagePath]); + + const handleStart = async () => { + if (selectedTargets.length === 0) return; + setIsSubmitting(true); + try { + const dto: CreateSnapshotDto = { + storageType, + storagePath: storageType === 'LOCAL' ? 'local' : storagePath, + targets: selectedTargets, + }; + const res = await snapshotApi.create(dto); + setActiveTaskId(res.taskId); + setIsServiceRunning(true); + } catch (err: any) { + alert(err.message || '创建备份任务失败'); + } finally { + setIsSubmitting(false); + } + }; + + const toggleTarget = (target: BackupTarget) => { + setSelectedTargets((prev) => + prev.includes(target) ? prev.filter((t) => t !== target) : [...prev, target], + ); + }; + + const toggleAll = () => { + setSelectedTargets( + selectedTargets.length === availableTargets.length ? [] : [...availableTargets], + ); + }; + + const handleDelete = async (id: string) => { + if (!confirm('确定删除该备份?')) return; + try { + await snapshotApi.delete(id); + loadHistory(); + } catch (err: any) { + alert(err.message || '删除失败'); + } + }; + + return ( +
+

数据快照

+ + {/* 创建备份 */} +
+

创建备份

+ +
+ +
+ + +
+
+ + {storageType === 'MINIO' && ( +
+ + setStoragePath(e.target.value)} + placeholder="snapshots/2026-02-23" + disabled={isServiceRunning} + /> +
+ )} + +
+ +
+ {availableTargets.map((target) => ( + + ))} +
+
+ + +
+ + {/* 实时进度 */} + {activeTaskId && progresses.size > 0 && ( +
+

+ 备份进度 + {isConnected && ( + + WebSocket 已连接 + + )} +

+ +
+ {Array.from(progresses.values()).map((p) => ( +
+
+ {BACKUP_TARGET_LABELS[p.target] || p.target} + + {p.status === 'completed' ? '100%' : p.status === 'failed' ? '失败' : `${p.percent}%`} + +
+
+
+
+

{p.message}

+
+ ))} +
+ + {taskCompleted && ( +
+ 备份完成!总大小: {formatBytes(totalSize || '0')},耗时: {formatDuration(duration || 0)} +
+ )} +
+ )} + + {/* 历史列表 */} +
+

历史备份

+ +
+ + + + + + + + + + + + + {tasks.length === 0 && ( + + + + )} + {tasks.map((task) => { + const st = STATUS_MAP[task.status]; + return ( + + + + + + + + + ); + })} + +
时间备份目标大小存储方式状态操作
暂无备份记录
{formatTime(task.createdAt)} + {task.targets.map((t) => BACKUP_TARGET_LABELS[t] || t).join(', ')} + {formatBytes(task.totalSize)}{task.storageType === 'MINIO' ? 'MinIO' : '本地'}{st.label} + {task.storageType === 'LOCAL' && task.status === 'COMPLETED' && + task.details + .filter((d) => d.fileName) + .map((d) => ( + + {BACKUP_TARGET_LABELS[d.target]} + + ))} + +
+
+ + {total > 10 && ( +
+ + + 第 {page} 页 / 共 {Math.ceil(total / 10)} 页 + + +
+ )} +
+
+ ); +} diff --git a/frontend/mining-admin-web/src/components/layout/sidebar.tsx b/frontend/mining-admin-web/src/components/layout/sidebar.tsx index 008007ba..e3432f51 100644 --- a/frontend/mining-admin-web/src/components/layout/sidebar.tsx +++ b/frontend/mining-admin-web/src/components/layout/sidebar.tsx @@ -19,6 +19,7 @@ import { HandCoins, FileSpreadsheet, SendHorizontal, + HardDrive, } from 'lucide-react'; import { Button } from '@/components/ui/button'; @@ -35,6 +36,7 @@ const menuItems = [ { name: '系统账户', href: '/system-accounts', icon: Building2 }, { name: '报表统计', href: '/reports', icon: FileBarChart }, { name: '审计日志', href: '/audit-logs', icon: ClipboardList }, + { name: '数据快照', href: '/snapshots', icon: HardDrive }, ]; export function Sidebar() { diff --git a/frontend/mining-admin-web/src/lib/api/snapshot.api.ts b/frontend/mining-admin-web/src/lib/api/snapshot.api.ts new file mode 100644 index 00000000..60195f9c --- /dev/null +++ b/frontend/mining-admin-web/src/lib/api/snapshot.api.ts @@ -0,0 +1,43 @@ +import type { CreateSnapshotDto, SnapshotTask } from '@/types/snapshot.types'; + +const SNAPSHOT_BASE = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3199'; + +async function snapshotFetch(url: string, options?: RequestInit): Promise { + const token = typeof window !== 'undefined' ? localStorage.getItem('admin_token') : null; + const res = await fetch(`${SNAPSHOT_BASE}/api/v1${url}`, { + headers: { + 'Content-Type': 'application/json', + ...(token ? { Authorization: `Bearer ${token}` } : {}), + }, + ...options, + }); + if (!res.ok) { + const err = await res.json().catch(() => ({ message: res.statusText })); + throw new Error(err.message || `请求失败: ${res.status}`); + } + return res.json(); +} + +export const snapshotApi = { + getTargets: () => + snapshotFetch<{ targets: string[]; isRunning: boolean }>('/snapshots/targets'), + + create: (data: CreateSnapshotDto) => + snapshotFetch<{ taskId: string; message: string }>('/snapshots', { + method: 'POST', + body: JSON.stringify(data), + }), + + list: (page = 1, limit = 20) => + snapshotFetch<{ tasks: SnapshotTask[]; total: number; page: number; limit: number }>( + `/snapshots?page=${page}&limit=${limit}`, + ), + + getById: (id: string) => snapshotFetch(`/snapshots/${id}`), + + delete: (id: string) => + snapshotFetch<{ message: string }>(`/snapshots/${id}`, { method: 'DELETE' }), + + getDownloadUrl: (id: string, target: string) => + `${SNAPSHOT_BASE}/api/v1/snapshots/${id}/download/${target}`, +}; diff --git a/frontend/mining-admin-web/src/lib/hooks/useSnapshotWebSocket.ts b/frontend/mining-admin-web/src/lib/hooks/useSnapshotWebSocket.ts new file mode 100644 index 00000000..c431e028 --- /dev/null +++ b/frontend/mining-admin-web/src/lib/hooks/useSnapshotWebSocket.ts @@ -0,0 +1,116 @@ +'use client'; + +import { useEffect, useState, useRef } from 'react'; +import { io, Socket } from 'socket.io-client'; +import type { BackupTarget, SnapshotProgress } from '@/types/snapshot.types'; + +const SNAPSHOT_WS_URL = process.env.NEXT_PUBLIC_SNAPSHOT_API_URL || 'http://localhost:3199'; + +interface TargetProgress { + target: BackupTarget; + percent: number; + message: string; + status: 'pending' | 'running' | 'completed' | 'failed'; +} + +interface UseSnapshotWebSocketReturn { + progresses: Map; + isConnected: boolean; + taskCompleted: boolean; + totalSize: string | null; + duration: number | null; + error: string | null; +} + +export function useSnapshotWebSocket(taskId: string | null): UseSnapshotWebSocketReturn { + const [progresses, setProgresses] = useState>(new Map()); + const [isConnected, setIsConnected] = useState(false); + const [taskCompleted, setTaskCompleted] = useState(false); + const [totalSize, setTotalSize] = useState(null); + const [duration, setDuration] = useState(null); + const [error, setError] = useState(null); + const socketRef = useRef(null); + + useEffect(() => { + if (!taskId) return; + + setProgresses(new Map()); + setTaskCompleted(false); + setTotalSize(null); + setDuration(null); + setError(null); + + const socket = io(`${SNAPSHOT_WS_URL}/snapshots`, { + transports: ['websocket', 'polling'], + }); + socketRef.current = socket; + + socket.on('connect', () => setIsConnected(true)); + socket.on('disconnect', () => setIsConnected(false)); + + socket.on('snapshot:started', (data: { taskId: string; targets: BackupTarget[] }) => { + if (data.taskId !== taskId) return; + const initial = new Map(); + data.targets.forEach((t) => { + initial.set(t, { target: t, percent: 0, message: '等待中', status: 'pending' }); + }); + setProgresses(initial); + }); + + socket.on('snapshot:progress', (data: SnapshotProgress) => { + if (data.taskId !== taskId) return; + setProgresses((prev) => { + const next = new Map(prev); + next.set(data.target, { + target: data.target, + percent: data.percent, + message: data.message, + status: 'running', + }); + return next; + }); + }); + + socket.on('snapshot:target-complete', (data: { taskId: string; target: string; fileSize: string }) => { + if (data.taskId !== taskId) return; + setProgresses((prev) => { + const next = new Map(prev); + const existing = next.get(data.target); + if (existing) { + next.set(data.target, { ...existing, percent: 100, message: '完成', status: 'completed' }); + } + return next; + }); + }); + + socket.on('snapshot:complete', (data: { taskId: string; totalSize: string; duration: number }) => { + if (data.taskId !== taskId) return; + setTaskCompleted(true); + setTotalSize(data.totalSize); + setDuration(data.duration); + }); + + socket.on('snapshot:error', (data: { taskId: string; target?: string; error: string }) => { + if (data.taskId !== taskId) return; + if (data.target) { + setProgresses((prev) => { + const next = new Map(prev); + const existing = next.get(data.target!); + if (existing) { + next.set(data.target!, { ...existing, message: data.error, status: 'failed' }); + } + return next; + }); + } else { + setError(data.error); + } + }); + + return () => { + socket.disconnect(); + socketRef.current = null; + }; + }, [taskId]); + + return { progresses, isConnected, taskCompleted, totalSize, duration, error }; +} diff --git a/frontend/mining-admin-web/src/types/snapshot.types.ts b/frontend/mining-admin-web/src/types/snapshot.types.ts new file mode 100644 index 00000000..fe3a0a6f --- /dev/null +++ b/frontend/mining-admin-web/src/types/snapshot.types.ts @@ -0,0 +1,51 @@ +export type BackupTarget = 'POSTGRES' | 'REDIS' | 'KAFKA' | 'ZOOKEEPER' | 'MINIO' | 'UPLOADS'; +export type StorageType = 'MINIO' | 'LOCAL'; +export type SnapshotStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED'; + +export const BACKUP_TARGET_LABELS: Record = { + POSTGRES: 'PostgreSQL', + REDIS: 'Redis', + KAFKA: 'Kafka', + ZOOKEEPER: 'ZooKeeper', + MINIO: 'MinIO', + UPLOADS: 'Uploads', +}; + +export interface SnapshotDetail { + id: string; + target: BackupTarget; + status: SnapshotStatus; + progress: number; + fileSize: string; + fileName: string | null; + error: string | null; + startedAt: string | null; + completedAt: string | null; +} + +export interface SnapshotTask { + id: string; + status: SnapshotStatus; + storageType: StorageType; + storagePath: string; + targets: BackupTarget[]; + totalSize: string; + error: string | null; + startedAt: string | null; + completedAt: string | null; + createdAt: string; + details: SnapshotDetail[]; +} + +export interface CreateSnapshotDto { + storageType: StorageType; + storagePath: string; + targets: BackupTarget[]; +} + +export interface SnapshotProgress { + taskId: string; + target: BackupTarget; + percent: number; + message: string; +}