From 15a5fb6c14c5c7f1a4052fe9b75c9f25b75f9f93 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 10 Jan 2026 20:36:21 -0800 Subject: [PATCH] =?UTF-8?q?feat(mining-admin-service):=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0CDC=E5=90=8C=E6=AD=A5=E5=92=8C=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E7=AE=A1=E7=90=86API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Prisma Schema 更新 - 添加 CDC 同步表:SyncedUser, SyncedContributionAccount, SyncedMiningAccount, SyncedTradingAccount - 添加系统数据同步表:SyncedMiningConfig, SyncedDailyMiningStat, SyncedDayKLine, SyncedCirculationPool - 添加 CDC 进度跟踪:CdcSyncProgress, ProcessedEvent ## CDC 消费者模块 - CdcConsumerService: Kafka 消费者,支持 Debezium CDC 和服务间事件 - CdcSyncService: 同步处理器,从 auth/contribution/mining/trading 服务同步数据 ## 新增 API 端点 ### 用户管理 (/api/v1/users) - GET /users - 用户列表(分页、搜索、过滤) - GET /users/:accountSequence - 用户详情 - GET /users/:accountSequence/contributions - 算力记录 - GET /users/:accountSequence/mining-records - 挖矿记录 - GET /users/:accountSequence/orders - 交易订单 ### 系统账户 (/api/v1/system-accounts) - GET /system-accounts - 系统账户列表 - GET /system-accounts/summary - 系统账户汇总 ### 仪表盘增强 (/api/v1/dashboard) - GET /dashboard - 统计数据(新增用户/算力/挖矿/交易统计) - GET /dashboard/realtime - 实时数据 - GET /dashboard/stats - 统计数据(别名) ## Docker Compose 更新 - 添加 Kafka 依赖和 CDC topic 配置 - 添加与 auth-service 的依赖关系 Co-Authored-By: Claude Opus 4.5 --- backend/services/docker-compose.2.0.yml | 17 +- .../mining-admin-service/package-lock.json | 10 + .../mining-admin-service/package.json | 1 + .../mining-admin-service/prisma/schema.prisma | 243 ++++++++- .../src/api/api.module.ts | 13 +- .../api/controllers/dashboard.controller.ts | 26 +- .../controllers/system-accounts.controller.ts | 22 + .../src/api/controllers/users.controller.ts | 105 ++++ .../src/application/application.module.ts | 20 +- .../application/services/dashboard.service.ts | 323 ++++++++++-- .../services/system-accounts.service.ts | 118 +++++ .../src/application/services/users.service.ts | 344 +++++++++++++ .../infrastructure/infrastructure.module.ts | 8 +- .../kafka/cdc-consumer.service.ts | 251 +++++++++ .../infrastructure/kafka/cdc-sync.service.ts | 486 ++++++++++++++++++ .../src/infrastructure/kafka/kafka.module.ts | 11 + 16 files changed, 1933 insertions(+), 65 deletions(-) create mode 100644 backend/services/mining-admin-service/src/api/controllers/system-accounts.controller.ts create mode 100644 backend/services/mining-admin-service/src/api/controllers/users.controller.ts create mode 100644 backend/services/mining-admin-service/src/application/services/system-accounts.service.ts create mode 100644 backend/services/mining-admin-service/src/application/services/users.service.ts create mode 100644 backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts create mode 100644 backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts create mode 100644 backend/services/mining-admin-service/src/infrastructure/kafka/kafka.module.ts diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index f0f63cd2..22e17670 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -151,10 +151,16 @@ services: condition: service_healthy redis: condition: service_healthy + kafka: + condition: service_healthy contribution-service: condition: service_healthy mining-service: condition: service_healthy + trading-service: + condition: service_healthy + auth-service: + condition: service_healthy environment: NODE_ENV: production TZ: Asia/Shanghai @@ -166,10 +172,19 @@ services: REDIS_PORT: 6379 REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 13 - # 2.0 内部服务调用 + # Kafka - 消费 2.0 服务间事件 + KAFKA_BROKERS: kafka:29092 + CDC_CONSUMER_GROUP: mining-admin-service-cdc-group + # CDC Topics - 从各 2.0 服务同步数据 + CDC_TOPIC_USERS: ${CDC_TOPIC_ADMIN_USERS:-mining-admin.auth.users} + CDC_TOPIC_CONTRIBUTION: ${CDC_TOPIC_ADMIN_CONTRIBUTION:-mining-admin.contribution.accounts} + CDC_TOPIC_MINING: ${CDC_TOPIC_ADMIN_MINING:-mining-admin.mining.accounts} + CDC_TOPIC_TRADING: ${CDC_TOPIC_ADMIN_TRADING:-mining-admin.trading.accounts} + # 2.0 内部服务调用(备用) CONTRIBUTION_SERVICE_URL: http://contribution-service:3020 MINING_SERVICE_URL: http://mining-service:3021 TRADING_SERVICE_URL: http://trading-service:3022 + AUTH_SERVICE_URL: http://auth-service:3024 JWT_SECRET: ${ADMIN_JWT_SECRET:-your-admin-jwt-secret-change-in-production} ports: - "3023:3023" diff --git a/backend/services/mining-admin-service/package-lock.json b/backend/services/mining-admin-service/package-lock.json index 66c68930..8c0234db 100644 --- a/backend/services/mining-admin-service/package-lock.json +++ b/backend/services/mining-admin-service/package-lock.json @@ -19,6 +19,7 @@ "decimal.js": "^10.4.3", "ioredis": "^5.3.2", "jsonwebtoken": "^9.0.2", + "kafkajs": "^2.2.4", "reflect-metadata": "^0.1.14", "rxjs": "^7.8.1", "swagger-ui-express": "^5.0.0" @@ -3802,6 +3803,15 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", diff --git a/backend/services/mining-admin-service/package.json b/backend/services/mining-admin-service/package.json index a3fbd6ab..073fe68a 100644 --- a/backend/services/mining-admin-service/package.json +++ b/backend/services/mining-admin-service/package.json @@ -26,6 +26,7 @@ "decimal.js": "^10.4.3", "ioredis": "^5.3.2", "jsonwebtoken": "^9.0.2", + "kafkajs": "^2.2.4", "reflect-metadata": "^0.1.14", "rxjs": "^7.8.1", "swagger-ui-express": "^5.0.0" diff --git a/backend/services/mining-admin-service/prisma/schema.prisma b/backend/services/mining-admin-service/prisma/schema.prisma index b7ad3c49..7f5b7559 100644 --- a/backend/services/mining-admin-service/prisma/schema.prisma +++ b/backend/services/mining-admin-service/prisma/schema.prisma @@ -7,7 +7,9 @@ datasource db { url = env("DATABASE_URL") } -// ==================== 管理员 ==================== +// ============================================================================= +// 管理员模块 +// ============================================================================= model AdminUser { id String @id @default(uuid()) @@ -25,7 +27,9 @@ model AdminUser { @@map("admin_users") } -// ==================== 系统配置 ==================== +// ============================================================================= +// 系统配置 +// ============================================================================= model SystemConfig { id String @id @default(uuid()) @@ -41,11 +45,13 @@ model SystemConfig { @@map("system_configs") } -// ==================== 系统账户(运营/省/市)==================== +// ============================================================================= +// 系统账户(运营/省/市) +// ============================================================================= model SystemAccount { id String @id @default(uuid()) - accountType String @unique // OPERATION, PROVINCE, CITY + accountType String @unique // OPERATION, PROVINCE, CITY, HEADQUARTERS name String description String? totalContribution Decimal @db.Decimal(30, 8) @default(0) @@ -55,11 +61,13 @@ model SystemAccount { @@map("system_accounts") } -// ==================== 初始化记录 ==================== +// ============================================================================= +// 初始化记录 +// ============================================================================= model InitializationRecord { id String @id @default(uuid()) - type String // MINING_CONFIG, BLACK_HOLE, SYSTEM_ACCOUNTS + type String // MINING_CONFIG, BLACK_HOLE, SYSTEM_ACCOUNTS, ACTIVATE_MINING status String // PENDING, COMPLETED, FAILED config Json executedBy String @@ -70,7 +78,9 @@ model InitializationRecord { @@map("initialization_records") } -// ==================== 审计日志 ==================== +// ============================================================================= +// 审计日志 +// ============================================================================= model AuditLog { id String @id @default(uuid()) @@ -93,7 +103,9 @@ model AuditLog { @@map("audit_logs") } -// ==================== 报表快照 ==================== +// ============================================================================= +// 报表快照 +// ============================================================================= model DailyReport { id String @id @default(uuid()) @@ -132,3 +144,218 @@ model DailyReport { @@map("daily_reports") } + +// ============================================================================= +// CDC 同步表 - 用户数据 (from auth-service) +// ============================================================================= + +model SyncedUser { + id String @id @default(uuid()) + originalUserId String @unique // auth-service 中的原始 ID + accountSequence String @unique // 账户序列号 + phone String + status String // ACTIVE, DISABLED, DELETED + kycStatus String // PENDING, SUBMITTED, VERIFIED, REJECTED + realName String? + isLegacyUser Boolean @default(false) // 是否为 1.0 迁移用户 + createdAt DateTime + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + // 关联同步表 + contributionAccount SyncedContributionAccount? + miningAccount SyncedMiningAccount? + tradingAccount SyncedTradingAccount? + + @@index([phone]) + @@index([status]) + @@index([kycStatus]) + @@index([createdAt(sort: Desc)]) + @@map("synced_users") +} + +// ============================================================================= +// CDC 同步表 - 算力账户 (from contribution-service) +// ============================================================================= + +model SyncedContributionAccount { + id String @id @default(uuid()) + accountSequence String @unique + personalContribution Decimal @db.Decimal(30, 8) @default(0) + teamLevelContribution Decimal @db.Decimal(30, 8) @default(0) + teamBonusContribution Decimal @db.Decimal(30, 8) @default(0) + totalContribution Decimal @db.Decimal(30, 8) @default(0) + effectiveContribution Decimal @db.Decimal(30, 8) @default(0) + hasAdopted Boolean @default(false) + directReferralCount Int @default(0) + unlockedLevelDepth Int @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + user SyncedUser @relation(fields: [accountSequence], references: [accountSequence]) + + @@map("synced_contribution_accounts") +} + +// ============================================================================= +// CDC 同步表 - 挖矿账户 (from mining-service) +// ============================================================================= + +model SyncedMiningAccount { + id String @id @default(uuid()) + accountSequence String @unique + totalMined Decimal @db.Decimal(30, 8) @default(0) + availableBalance Decimal @db.Decimal(30, 8) @default(0) + frozenBalance Decimal @db.Decimal(30, 8) @default(0) + totalContribution Decimal @db.Decimal(30, 8) @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + user SyncedUser @relation(fields: [accountSequence], references: [accountSequence]) + + @@map("synced_mining_accounts") +} + +// ============================================================================= +// CDC 同步表 - 交易账户 (from trading-service) +// ============================================================================= + +model SyncedTradingAccount { + id String @id @default(uuid()) + accountSequence String @unique + shareBalance Decimal @db.Decimal(30, 8) @default(0) + cashBalance Decimal @db.Decimal(30, 8) @default(0) + frozenShares Decimal @db.Decimal(30, 8) @default(0) + frozenCash Decimal @db.Decimal(30, 8) @default(0) + totalBought Decimal @db.Decimal(30, 8) @default(0) + totalSold Decimal @db.Decimal(30, 8) @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + user SyncedUser @relation(fields: [accountSequence], references: [accountSequence]) + + @@map("synced_trading_accounts") +} + +// ============================================================================= +// CDC 同步表 - 挖矿配置 (from mining-service) +// ============================================================================= + +model SyncedMiningConfig { + id String @id @default(uuid()) + totalShares Decimal @db.Decimal(30, 8) + distributionPool Decimal @db.Decimal(30, 8) + remainingDistribution Decimal @db.Decimal(30, 8) + halvingPeriodYears Int + currentEra Int @default(1) + minuteDistribution Decimal @db.Decimal(30, 8) + isActive Boolean @default(false) + activatedAt DateTime? + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_mining_configs") +} + +// ============================================================================= +// CDC 同步表 - 每日挖矿统计 (from mining-service) +// ============================================================================= + +model SyncedDailyMiningStat { + id String @id @default(uuid()) + statDate DateTime @unique @db.Date + totalContribution Decimal @db.Decimal(30, 8) @default(0) + totalDistributed Decimal @db.Decimal(30, 8) @default(0) + totalBurned Decimal @db.Decimal(30, 8) @default(0) + participantCount Int @default(0) + avgContributionRate Decimal @db.Decimal(30, 18) @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_daily_mining_stats") +} + +// ============================================================================= +// CDC 同步表 - 日K线 (from trading-service) +// ============================================================================= + +model SyncedDayKLine { + id String @id @default(uuid()) + klineDate DateTime @unique @db.Date + open Decimal @db.Decimal(30, 18) + high Decimal @db.Decimal(30, 18) + low Decimal @db.Decimal(30, 18) + close Decimal @db.Decimal(30, 18) + volume Decimal @db.Decimal(30, 8) @default(0) + amount Decimal @db.Decimal(30, 8) @default(0) + tradeCount Int @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_day_klines") +} + +// ============================================================================= +// CDC 同步表 - 流通池 (from trading-service) +// ============================================================================= + +model SyncedCirculationPool { + id String @id @default(uuid()) + totalShares Decimal @db.Decimal(30, 8) @default(0) + totalCash Decimal @db.Decimal(30, 8) @default(0) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_circulation_pools") +} + +// ============================================================================= +// CDC 同步表 - 系统账户算力 (from contribution-service) +// ============================================================================= + +model SyncedSystemContribution { + id String @id @default(uuid()) + accountType String @unique // OPERATION, PROVINCE, CITY, HEADQUARTERS + name String + contributionBalance Decimal @db.Decimal(30, 8) @default(0) + contributionNeverExpires Boolean @default(false) + syncedAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@map("synced_system_contributions") +} + +// ============================================================================= +// CDC 同步进度跟踪 +// ============================================================================= + +model CdcSyncProgress { + id String @id @default(uuid()) + sourceTopic String @unique + sourceService String // auth, contribution, mining, trading + lastOffset String? // Kafka offset + lastSequenceNum BigInt @default(0) + lastSyncedAt DateTime @default(now()) + errorCount Int @default(0) + lastError String? + updatedAt DateTime @updatedAt + + @@index([sourceService]) + @@map("cdc_sync_progress") +} + +// ============================================================================= +// 已处理事件(幂等性) +// ============================================================================= + +model ProcessedEvent { + id String @id @default(uuid()) + eventId String @unique + eventType String + sourceService String + processedAt DateTime @default(now()) + + @@index([sourceService]) + @@index([processedAt]) + @@map("processed_events") +} diff --git a/backend/services/mining-admin-service/src/api/api.module.ts b/backend/services/mining-admin-service/src/api/api.module.ts index ee6b6c3a..656a5a2d 100644 --- a/backend/services/mining-admin-service/src/api/api.module.ts +++ b/backend/services/mining-admin-service/src/api/api.module.ts @@ -6,9 +6,20 @@ import { ConfigController } from './controllers/config.controller'; import { InitializationController } from './controllers/initialization.controller'; import { AuditController } from './controllers/audit.controller'; import { HealthController } from './controllers/health.controller'; +import { UsersController } from './controllers/users.controller'; +import { SystemAccountsController } from './controllers/system-accounts.controller'; @Module({ imports: [ApplicationModule], - controllers: [AuthController, DashboardController, ConfigController, InitializationController, AuditController, HealthController], + controllers: [ + AuthController, + DashboardController, + ConfigController, + InitializationController, + AuditController, + HealthController, + UsersController, + SystemAccountsController, + ], }) export class ApiModule {} diff --git a/backend/services/mining-admin-service/src/api/controllers/dashboard.controller.ts b/backend/services/mining-admin-service/src/api/controllers/dashboard.controller.ts index c0f74952..babc70e5 100644 --- a/backend/services/mining-admin-service/src/api/controllers/dashboard.controller.ts +++ b/backend/services/mining-admin-service/src/api/controllers/dashboard.controller.ts @@ -1,5 +1,10 @@ import { Controller, Get, Query } from '@nestjs/common'; -import { ApiTags, ApiOperation, ApiBearerAuth, ApiQuery } from '@nestjs/swagger'; +import { + ApiTags, + ApiOperation, + ApiBearerAuth, + ApiQuery, +} from '@nestjs/swagger'; import { DashboardService } from '../../application/services/dashboard.service'; @ApiTags('Dashboard') @@ -8,17 +13,32 @@ import { DashboardService } from '../../application/services/dashboard.service'; export class DashboardController { constructor(private readonly dashboardService: DashboardService) {} - @Get('stats') + @Get() @ApiOperation({ summary: '获取仪表盘统计数据' }) async getStats() { return this.dashboardService.getDashboardStats(); } + @Get('stats') + @ApiOperation({ summary: '获取仪表盘统计数据(别名)' }) + async getStatsAlias() { + return this.dashboardService.getDashboardStats(); + } + + @Get('realtime') + @ApiOperation({ summary: '获取实时数据' }) + async getRealtimeStats() { + return this.dashboardService.getRealtimeStats(); + } + @Get('reports') @ApiOperation({ summary: '获取每日报表' }) @ApiQuery({ name: 'page', required: false, type: Number }) @ApiQuery({ name: 'pageSize', required: false, type: Number }) - async getReports(@Query('page') page?: number, @Query('pageSize') pageSize?: number) { + async getReports( + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + ) { return this.dashboardService.getReports(page ?? 1, pageSize ?? 30); } } diff --git a/backend/services/mining-admin-service/src/api/controllers/system-accounts.controller.ts b/backend/services/mining-admin-service/src/api/controllers/system-accounts.controller.ts new file mode 100644 index 00000000..934bdda4 --- /dev/null +++ b/backend/services/mining-admin-service/src/api/controllers/system-accounts.controller.ts @@ -0,0 +1,22 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger'; +import { SystemAccountsService } from '../../application/services/system-accounts.service'; + +@ApiTags('System Accounts') +@ApiBearerAuth() +@Controller('system-accounts') +export class SystemAccountsController { + constructor(private readonly systemAccountsService: SystemAccountsService) {} + + @Get() + @ApiOperation({ summary: '获取系统账户列表' }) + async getSystemAccounts() { + return this.systemAccountsService.getSystemAccounts(); + } + + @Get('summary') + @ApiOperation({ summary: '获取系统账户汇总' }) + async getSystemAccountsSummary() { + return this.systemAccountsService.getSystemAccountsSummary(); + } +} diff --git a/backend/services/mining-admin-service/src/api/controllers/users.controller.ts b/backend/services/mining-admin-service/src/api/controllers/users.controller.ts new file mode 100644 index 00000000..c8017d3a --- /dev/null +++ b/backend/services/mining-admin-service/src/api/controllers/users.controller.ts @@ -0,0 +1,105 @@ +import { Controller, Get, Param, Query } from '@nestjs/common'; +import { + ApiTags, + ApiOperation, + ApiBearerAuth, + ApiQuery, + ApiParam, +} from '@nestjs/swagger'; +import { UsersService } from '../../application/services/users.service'; + +@ApiTags('Users') +@ApiBearerAuth() +@Controller('users') +export class UsersController { + constructor(private readonly usersService: UsersService) {} + + @Get() + @ApiOperation({ summary: '获取用户列表' }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'pageSize', required: false, type: Number }) + @ApiQuery({ name: 'search', required: false, type: String, description: '手机号或账户序列号' }) + @ApiQuery({ name: 'status', required: false, type: String, description: 'ACTIVE, DISABLED, DELETED' }) + @ApiQuery({ name: 'kycStatus', required: false, type: String, description: 'PENDING, SUBMITTED, VERIFIED, REJECTED' }) + @ApiQuery({ name: 'hasAdopted', required: false, type: Boolean }) + async getUsers( + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + @Query('search') search?: string, + @Query('status') status?: string, + @Query('kycStatus') kycStatus?: string, + @Query('hasAdopted') hasAdopted?: boolean, + ) { + return this.usersService.getUsers({ + page: page ?? 1, + pageSize: pageSize ?? 20, + search, + status, + kycStatus, + hasAdopted, + }); + } + + @Get(':accountSequence') + @ApiOperation({ summary: '获取用户详情' }) + @ApiParam({ name: 'accountSequence', type: String }) + async getUserDetail(@Param('accountSequence') accountSequence: string) { + return this.usersService.getUserDetail(accountSequence); + } + + @Get(':accountSequence/contributions') + @ApiOperation({ summary: '获取用户算力记录' }) + @ApiParam({ name: 'accountSequence', type: String }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'pageSize', required: false, type: Number }) + async getUserContributions( + @Param('accountSequence') accountSequence: string, + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + ) { + return this.usersService.getUserContributions( + accountSequence, + page ?? 1, + pageSize ?? 20, + ); + } + + @Get(':accountSequence/mining-records') + @ApiOperation({ summary: '获取用户挖矿记录' }) + @ApiParam({ name: 'accountSequence', type: String }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'pageSize', required: false, type: Number }) + async getUserMiningRecords( + @Param('accountSequence') accountSequence: string, + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + ) { + return this.usersService.getUserMiningRecords( + accountSequence, + page ?? 1, + pageSize ?? 20, + ); + } + + @Get(':accountSequence/orders') + @ApiOperation({ summary: '获取用户交易订单' }) + @ApiParam({ name: 'accountSequence', type: String }) + @ApiQuery({ name: 'page', required: false, type: Number }) + @ApiQuery({ name: 'pageSize', required: false, type: Number }) + @ApiQuery({ name: 'type', required: false, type: String, description: 'BUY, SELL' }) + @ApiQuery({ name: 'status', required: false, type: String, description: 'PENDING, PARTIAL, FILLED, CANCELLED' }) + async getUserOrders( + @Param('accountSequence') accountSequence: string, + @Query('page') page?: number, + @Query('pageSize') pageSize?: number, + @Query('type') type?: string, + @Query('status') status?: string, + ) { + return this.usersService.getUserOrders(accountSequence, { + page: page ?? 1, + pageSize: pageSize ?? 20, + type, + status, + }); + } +} diff --git a/backend/services/mining-admin-service/src/application/application.module.ts b/backend/services/mining-admin-service/src/application/application.module.ts index 8c3a572d..37104688 100644 --- a/backend/services/mining-admin-service/src/application/application.module.ts +++ b/backend/services/mining-admin-service/src/application/application.module.ts @@ -4,11 +4,27 @@ import { AuthService } from './services/auth.service'; import { ConfigManagementService } from './services/config.service'; import { InitializationService } from './services/initialization.service'; import { DashboardService } from './services/dashboard.service'; +import { UsersService } from './services/users.service'; +import { SystemAccountsService } from './services/system-accounts.service'; @Module({ imports: [InfrastructureModule], - providers: [AuthService, ConfigManagementService, InitializationService, DashboardService], - exports: [AuthService, ConfigManagementService, InitializationService, DashboardService], + providers: [ + AuthService, + ConfigManagementService, + InitializationService, + DashboardService, + UsersService, + SystemAccountsService, + ], + exports: [ + AuthService, + ConfigManagementService, + InitializationService, + DashboardService, + UsersService, + SystemAccountsService, + ], }) export class ApplicationModule implements OnModuleInit { constructor(private readonly authService: AuthService) {} diff --git a/backend/services/mining-admin-service/src/application/services/dashboard.service.ts b/backend/services/mining-admin-service/src/application/services/dashboard.service.ts index 37195405..199a558f 100644 --- a/backend/services/mining-admin-service/src/application/services/dashboard.service.ts +++ b/backend/services/mining-admin-service/src/application/services/dashboard.service.ts @@ -11,66 +11,228 @@ export class DashboardService { private readonly configService: ConfigService, ) {} + /** + * 获取仪表盘统计数据 + * 优先从本地同步表获取,如果没有数据则调用远程服务 + */ async getDashboardStats(): Promise { - const [contributionStats, miningStats, tradingStats, latestReport] = await Promise.all([ - this.fetchContributionStats(), - this.fetchMiningStats(), - this.fetchTradingStats(), + const [ + userStats, + contributionStats, + miningStats, + tradingStats, + latestReport, + latestKLine, + ] = await Promise.all([ + this.getUserStats(), + this.getContributionStats(), + this.getMiningStats(), + this.getTradingStats(), this.prisma.dailyReport.findFirst({ orderBy: { reportDate: 'desc' } }), + this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }), ]); return { + users: userStats, contribution: contributionStats, mining: miningStats, trading: tradingStats, - latestReport, + latestReport: latestReport + ? this.formatDailyReport(latestReport) + : null, + latestPrice: latestKLine + ? { + date: latestKLine.klineDate, + open: latestKLine.open.toString(), + high: latestKLine.high.toString(), + low: latestKLine.low.toString(), + close: latestKLine.close.toString(), + volume: latestKLine.volume.toString(), + amount: latestKLine.amount.toString(), + } + : null, timestamp: new Date().toISOString(), }; } - private async fetchContributionStats(): Promise { - try { - const url = `${this.configService.get('CONTRIBUTION_SERVICE_URL')}/api/v1/contributions/stats`; - const response = await fetch(url); - if (response.ok) { - const result = await response.json(); - return result.data; - } - } catch (error) { - this.logger.error('Failed to fetch contribution stats', error); - } - return null; + /** + * 获取实时数据 + */ + async getRealtimeStats(): Promise { + const [miningConfig, circulationPool, latestKLine] = await Promise.all([ + this.prisma.syncedMiningConfig.findFirst(), + this.prisma.syncedCirculationPool.findFirst(), + this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }), + ]); + + return { + miningActive: miningConfig?.isActive || false, + currentEra: miningConfig?.currentEra || 0, + minuteDistribution: miningConfig?.minuteDistribution?.toString() || '0', + remainingDistribution: + miningConfig?.remainingDistribution?.toString() || '0', + circulationShares: circulationPool?.totalShares?.toString() || '0', + circulationCash: circulationPool?.totalCash?.toString() || '0', + currentPrice: latestKLine?.close?.toString() || '1', + timestamp: new Date().toISOString(), + }; } - private async fetchMiningStats(): Promise { - try { - const url = `${this.configService.get('MINING_SERVICE_URL')}/api/v1/mining/stats`; - const response = await fetch(url); - if (response.ok) { - const result = await response.json(); - return result.data; - } - } catch (error) { - this.logger.error('Failed to fetch mining stats', error); - } - return null; + /** + * 获取用户统计 + */ + private async getUserStats() { + const [totalUsers, activeUsers, kycVerifiedUsers, adoptedUsers] = + await Promise.all([ + this.prisma.syncedUser.count(), + this.prisma.syncedUser.count({ where: { status: 'ACTIVE' } }), + this.prisma.syncedUser.count({ where: { kycStatus: 'VERIFIED' } }), + this.prisma.syncedContributionAccount.count({ + where: { hasAdopted: true }, + }), + ]); + + // 今日新增用户 + const today = new Date(); + today.setHours(0, 0, 0, 0); + const newUsersToday = await this.prisma.syncedUser.count({ + where: { createdAt: { gte: today } }, + }); + + return { + total: totalUsers, + active: activeUsers, + kycVerified: kycVerifiedUsers, + adopted: adoptedUsers, + newToday: newUsersToday, + }; } - private async fetchTradingStats(): Promise { - try { - const url = `${this.configService.get('TRADING_SERVICE_URL')}/api/v1/trading/stats`; - const response = await fetch(url); - if (response.ok) { - const result = await response.json(); - return result.data; - } - } catch (error) { - this.logger.error('Failed to fetch trading stats', error); - } - return null; + /** + * 获取算力统计 + */ + private async getContributionStats() { + const accounts = await this.prisma.syncedContributionAccount.aggregate({ + _sum: { + totalContribution: true, + effectiveContribution: true, + personalContribution: true, + teamLevelContribution: true, + teamBonusContribution: true, + }, + _count: true, + }); + + const systemContributions = + await this.prisma.syncedSystemContribution.aggregate({ + _sum: { contributionBalance: true }, + _count: true, + }); + + return { + totalAccounts: accounts._count, + totalContribution: accounts._sum.totalContribution?.toString() || '0', + effectiveContribution: + accounts._sum.effectiveContribution?.toString() || '0', + personalContribution: + accounts._sum.personalContribution?.toString() || '0', + teamLevelContribution: + accounts._sum.teamLevelContribution?.toString() || '0', + teamBonusContribution: + accounts._sum.teamBonusContribution?.toString() || '0', + systemAccounts: systemContributions._count, + systemContribution: + systemContributions._sum.contributionBalance?.toString() || '0', + }; } - async getReports(page: number = 1, pageSize: number = 30): Promise<{ data: any[]; total: number }> { + /** + * 获取挖矿统计 + */ + private async getMiningStats() { + const [config, accounts, latestDailyStat] = await Promise.all([ + this.prisma.syncedMiningConfig.findFirst(), + this.prisma.syncedMiningAccount.aggregate({ + _sum: { + totalMined: true, + availableBalance: true, + frozenBalance: true, + }, + _count: true, + }), + this.prisma.syncedDailyMiningStat.findFirst({ + orderBy: { statDate: 'desc' }, + }), + ]); + + return { + isActive: config?.isActive || false, + currentEra: config?.currentEra || 0, + totalShares: config?.totalShares?.toString() || '0', + distributionPool: config?.distributionPool?.toString() || '0', + remainingDistribution: config?.remainingDistribution?.toString() || '0', + minuteDistribution: config?.minuteDistribution?.toString() || '0', + activatedAt: config?.activatedAt, + totalAccounts: accounts._count, + totalMined: accounts._sum.totalMined?.toString() || '0', + totalAvailable: accounts._sum.availableBalance?.toString() || '0', + totalFrozen: accounts._sum.frozenBalance?.toString() || '0', + latestDailyStat: latestDailyStat + ? { + date: latestDailyStat.statDate, + totalDistributed: latestDailyStat.totalDistributed.toString(), + totalBurned: latestDailyStat.totalBurned.toString(), + participantCount: latestDailyStat.participantCount, + } + : null, + }; + } + + /** + * 获取交易统计 + */ + private async getTradingStats() { + const [accounts, circulationPool, latestKLine] = await Promise.all([ + this.prisma.syncedTradingAccount.aggregate({ + _sum: { + shareBalance: true, + cashBalance: true, + frozenShares: true, + frozenCash: true, + totalBought: true, + totalSold: true, + }, + _count: true, + }), + this.prisma.syncedCirculationPool.findFirst(), + this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }), + ]); + + return { + totalAccounts: accounts._count, + totalShareBalance: accounts._sum.shareBalance?.toString() || '0', + totalCashBalance: accounts._sum.cashBalance?.toString() || '0', + totalFrozenShares: accounts._sum.frozenShares?.toString() || '0', + totalFrozenCash: accounts._sum.frozenCash?.toString() || '0', + totalBought: accounts._sum.totalBought?.toString() || '0', + totalSold: accounts._sum.totalSold?.toString() || '0', + circulationPool: circulationPool + ? { + totalShares: circulationPool.totalShares.toString(), + totalCash: circulationPool.totalCash.toString(), + } + : null, + latestPrice: latestKLine?.close?.toString() || '1', + }; + } + + /** + * 获取每日报表 + */ + async getReports( + page: number = 1, + pageSize: number = 30, + ): Promise<{ data: any[]; total: number; pagination: any }> { const [reports, total] = await Promise.all([ this.prisma.dailyReport.findMany({ orderBy: { reportDate: 'desc' }, @@ -79,12 +241,29 @@ export class DashboardService { }), this.prisma.dailyReport.count(), ]); - return { data: reports, total }; + + return { + data: reports.map((r) => this.formatDailyReport(r)), + total, + pagination: { + page, + pageSize, + total, + totalPages: Math.ceil(total / pageSize), + }, + }; } - async getAuditLogs( - options: { adminId?: string; action?: string; resource?: string; page?: number; pageSize?: number }, - ): Promise<{ data: any[]; total: number }> { + /** + * 获取审计日志 + */ + async getAuditLogs(options: { + adminId?: string; + action?: string; + resource?: string; + page?: number; + pageSize?: number; + }): Promise<{ data: any[]; total: number; pagination: any }> { const where: any = {}; if (options.adminId) where.adminId = options.adminId; if (options.action) where.action = options.action; @@ -104,6 +283,56 @@ export class DashboardService { this.prisma.auditLog.count({ where }), ]); - return { data: logs, total }; + return { + data: logs, + total, + pagination: { + page, + pageSize, + total, + totalPages: Math.ceil(total / pageSize), + }, + }; + } + + // =========================================================================== + // 辅助方法 + // =========================================================================== + + private formatDailyReport(report: any) { + return { + id: report.id, + reportDate: report.reportDate, + users: { + total: report.totalUsers, + new: report.newUsers, + active: report.activeUsers, + }, + adoptions: { + total: report.totalAdoptions, + new: report.newAdoptions, + totalTrees: report.totalTrees, + }, + contribution: { + total: report.totalContribution?.toString() || '0', + growth: report.contributionGrowth?.toString() || '0', + }, + mining: { + distributed: report.totalDistributed?.toString() || '0', + burned: report.totalBurned?.toString() || '0', + }, + trading: { + volume: report.tradingVolume?.toString() || '0', + amount: report.tradingAmount?.toString() || '0', + count: report.tradeCount, + }, + price: { + open: report.openPrice?.toString() || '1', + close: report.closePrice?.toString() || '1', + high: report.highPrice?.toString() || '1', + low: report.lowPrice?.toString() || '1', + }, + createdAt: report.createdAt, + }; } } diff --git a/backend/services/mining-admin-service/src/application/services/system-accounts.service.ts b/backend/services/mining-admin-service/src/application/services/system-accounts.service.ts new file mode 100644 index 00000000..c4f886f6 --- /dev/null +++ b/backend/services/mining-admin-service/src/application/services/system-accounts.service.ts @@ -0,0 +1,118 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; + +@Injectable() +export class SystemAccountsService { + constructor(private readonly prisma: PrismaService) {} + + /** + * 获取系统账户列表 + */ + async getSystemAccounts() { + // 先从本地 SystemAccount 表获取 + const localAccounts = await this.prisma.systemAccount.findMany({ + orderBy: { accountType: 'asc' }, + }); + + // 再从 CDC 同步的 SyncedSystemContribution 获取算力数据 + const syncedContributions = + await this.prisma.syncedSystemContribution.findMany(); + + // 合并数据 + const accountsMap = new Map(); + + // 添加本地账户 + for (const account of localAccounts) { + accountsMap.set(account.accountType, { + accountType: account.accountType, + name: account.name, + description: account.description, + totalContribution: account.totalContribution.toString(), + createdAt: account.createdAt, + source: 'local', + }); + } + + // 更新或添加同步的算力数据 + for (const contrib of syncedContributions) { + const existing = accountsMap.get(contrib.accountType); + if (existing) { + existing.contributionBalance = contrib.contributionBalance.toString(); + existing.contributionNeverExpires = contrib.contributionNeverExpires; + existing.syncedAt = contrib.syncedAt; + existing.source = 'synced'; + } else { + accountsMap.set(contrib.accountType, { + accountType: contrib.accountType, + name: contrib.name, + contributionBalance: contrib.contributionBalance.toString(), + contributionNeverExpires: contrib.contributionNeverExpires, + syncedAt: contrib.syncedAt, + source: 'synced', + }); + } + } + + return { + accounts: Array.from(accountsMap.values()), + total: accountsMap.size, + }; + } + + /** + * 获取系统账户汇总 + */ + async getSystemAccountsSummary() { + const [localAccounts, syncedContributions, miningConfig, circulationPool] = + await Promise.all([ + this.prisma.systemAccount.findMany(), + this.prisma.syncedSystemContribution.findMany(), + this.prisma.syncedMiningConfig.findFirst(), + this.prisma.syncedCirculationPool.findFirst(), + ]); + + // 计算总算力 + let totalSystemContribution = 0n; + for (const account of localAccounts) { + totalSystemContribution += BigInt( + account.totalContribution.toString().replace('.', ''), + ); + } + + let totalSyncedContribution = 0n; + for (const contrib of syncedContributions) { + totalSyncedContribution += BigInt( + contrib.contributionBalance.toString().replace('.', ''), + ); + } + + return { + systemAccounts: { + count: localAccounts.length, + totalContribution: ( + Number(totalSystemContribution) / 100000000 + ).toFixed(8), + }, + syncedContributions: { + count: syncedContributions.length, + totalBalance: (Number(totalSyncedContribution) / 100000000).toFixed(8), + }, + miningConfig: miningConfig + ? { + totalShares: miningConfig.totalShares.toString(), + distributionPool: miningConfig.distributionPool.toString(), + remainingDistribution: miningConfig.remainingDistribution.toString(), + currentEra: miningConfig.currentEra, + isActive: miningConfig.isActive, + activatedAt: miningConfig.activatedAt, + } + : null, + circulationPool: circulationPool + ? { + totalShares: circulationPool.totalShares.toString(), + totalCash: circulationPool.totalCash.toString(), + } + : null, + }; + } +} diff --git a/backend/services/mining-admin-service/src/application/services/users.service.ts b/backend/services/mining-admin-service/src/application/services/users.service.ts new file mode 100644 index 00000000..34dafad3 --- /dev/null +++ b/backend/services/mining-admin-service/src/application/services/users.service.ts @@ -0,0 +1,344 @@ +import { Injectable, NotFoundException } from '@nestjs/common'; +import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service'; +import { Prisma } from '@prisma/client'; + +export interface GetUsersQuery { + page: number; + pageSize: number; + search?: string; + status?: string; + kycStatus?: string; + hasAdopted?: boolean; +} + +export interface GetOrdersQuery { + page: number; + pageSize: number; + type?: string; + status?: string; +} + +@Injectable() +export class UsersService { + constructor(private readonly prisma: PrismaService) {} + + /** + * 获取用户列表 + */ + async getUsers(query: GetUsersQuery) { + const { page, pageSize, search, status, kycStatus, hasAdopted } = query; + const skip = (page - 1) * pageSize; + + const where: Prisma.SyncedUserWhereInput = {}; + + if (search) { + where.OR = [ + { phone: { contains: search } }, + { accountSequence: { contains: search } }, + { realName: { contains: search } }, + ]; + } + + if (status) { + where.status = status; + } + + if (kycStatus) { + where.kycStatus = kycStatus; + } + + if (hasAdopted !== undefined) { + where.contributionAccount = { + hasAdopted: hasAdopted, + }; + } + + const [users, total] = await Promise.all([ + this.prisma.syncedUser.findMany({ + where, + skip, + take: pageSize, + orderBy: { createdAt: 'desc' }, + include: { + contributionAccount: true, + miningAccount: true, + tradingAccount: true, + }, + }), + this.prisma.syncedUser.count({ where }), + ]); + + return { + data: users.map((user) => this.formatUserListItem(user)), + pagination: { + page, + pageSize, + total, + totalPages: Math.ceil(total / pageSize), + }, + }; + } + + /** + * 获取用户详情 + */ + async getUserDetail(accountSequence: string) { + const user = await this.prisma.syncedUser.findUnique({ + where: { accountSequence }, + include: { + contributionAccount: true, + miningAccount: true, + tradingAccount: true, + }, + }); + + if (!user) { + throw new NotFoundException(`用户 ${accountSequence} 不存在`); + } + + return this.formatUserDetail(user); + } + + /** + * 获取用户算力记录(从同步表获取概要) + * 注:详细流水需要调用 contribution-service + */ + async getUserContributions( + accountSequence: string, + page: number, + pageSize: number, + ) { + const user = await this.prisma.syncedUser.findUnique({ + where: { accountSequence }, + include: { contributionAccount: true }, + }); + + if (!user) { + throw new NotFoundException(`用户 ${accountSequence} 不存在`); + } + + // 返回算力账户概要 + const contribution = user.contributionAccount; + if (!contribution) { + return { + summary: { + accountSequence, + personalContribution: '0', + teamLevelContribution: '0', + teamBonusContribution: '0', + totalContribution: '0', + effectiveContribution: '0', + hasAdopted: false, + }, + records: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + }; + } + + return { + summary: { + accountSequence, + personalContribution: contribution.personalContribution.toString(), + teamLevelContribution: contribution.teamLevelContribution.toString(), + teamBonusContribution: contribution.teamBonusContribution.toString(), + totalContribution: contribution.totalContribution.toString(), + effectiveContribution: contribution.effectiveContribution.toString(), + hasAdopted: contribution.hasAdopted, + directReferralCount: contribution.directReferralCount, + unlockedLevelDepth: contribution.unlockedLevelDepth, + }, + // 详细流水需要从 contribution-service 获取 + records: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + note: '详细算力流水请查看 contribution-service', + }; + } + + /** + * 获取用户挖矿记录(从同步表获取概要) + * 注:详细流水需要调用 mining-service + */ + async getUserMiningRecords( + accountSequence: string, + page: number, + pageSize: number, + ) { + const user = await this.prisma.syncedUser.findUnique({ + where: { accountSequence }, + include: { miningAccount: true }, + }); + + if (!user) { + throw new NotFoundException(`用户 ${accountSequence} 不存在`); + } + + const mining = user.miningAccount; + if (!mining) { + return { + summary: { + accountSequence, + totalMined: '0', + availableBalance: '0', + frozenBalance: '0', + totalContribution: '0', + }, + records: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + }; + } + + return { + summary: { + accountSequence, + totalMined: mining.totalMined.toString(), + availableBalance: mining.availableBalance.toString(), + frozenBalance: mining.frozenBalance.toString(), + totalContribution: mining.totalContribution.toString(), + }, + // 详细流水需要从 mining-service 获取 + records: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + note: '详细挖矿记录请查看 mining-service', + }; + } + + /** + * 获取用户交易订单(从同步表获取概要) + * 注:详细订单需要调用 trading-service + */ + async getUserOrders(accountSequence: string, query: GetOrdersQuery) { + const { page, pageSize } = query; + + const user = await this.prisma.syncedUser.findUnique({ + where: { accountSequence }, + include: { tradingAccount: true }, + }); + + if (!user) { + throw new NotFoundException(`用户 ${accountSequence} 不存在`); + } + + const trading = user.tradingAccount; + if (!trading) { + return { + summary: { + accountSequence, + shareBalance: '0', + cashBalance: '0', + frozenShares: '0', + frozenCash: '0', + totalBought: '0', + totalSold: '0', + }, + orders: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + }; + } + + return { + summary: { + accountSequence, + shareBalance: trading.shareBalance.toString(), + cashBalance: trading.cashBalance.toString(), + frozenShares: trading.frozenShares.toString(), + frozenCash: trading.frozenCash.toString(), + totalBought: trading.totalBought.toString(), + totalSold: trading.totalSold.toString(), + }, + // 详细订单需要从 trading-service 获取 + orders: [], + pagination: { page, pageSize, total: 0, totalPages: 0 }, + note: '详细交易订单请查看 trading-service', + }; + } + + // =========================================================================== + // 辅助方法 + // =========================================================================== + + private formatUserListItem(user: any) { + return { + accountSequence: user.accountSequence, + phone: this.maskPhone(user.phone), + status: user.status, + kycStatus: user.kycStatus, + realName: user.realName, + isLegacyUser: user.isLegacyUser, + createdAt: user.createdAt, + contribution: user.contributionAccount + ? { + totalContribution: + user.contributionAccount.totalContribution.toString(), + effectiveContribution: + user.contributionAccount.effectiveContribution.toString(), + hasAdopted: user.contributionAccount.hasAdopted, + } + : null, + mining: user.miningAccount + ? { + totalMined: user.miningAccount.totalMined.toString(), + availableBalance: user.miningAccount.availableBalance.toString(), + } + : null, + trading: user.tradingAccount + ? { + shareBalance: user.tradingAccount.shareBalance.toString(), + cashBalance: user.tradingAccount.cashBalance.toString(), + } + : null, + }; + } + + private formatUserDetail(user: any) { + return { + accountSequence: user.accountSequence, + phone: user.phone, + status: user.status, + kycStatus: user.kycStatus, + realName: user.realName, + isLegacyUser: user.isLegacyUser, + createdAt: user.createdAt, + syncedAt: user.syncedAt, + contribution: user.contributionAccount + ? { + personalContribution: + user.contributionAccount.personalContribution.toString(), + teamLevelContribution: + user.contributionAccount.teamLevelContribution.toString(), + teamBonusContribution: + user.contributionAccount.teamBonusContribution.toString(), + totalContribution: + user.contributionAccount.totalContribution.toString(), + effectiveContribution: + user.contributionAccount.effectiveContribution.toString(), + hasAdopted: user.contributionAccount.hasAdopted, + directReferralCount: user.contributionAccount.directReferralCount, + unlockedLevelDepth: user.contributionAccount.unlockedLevelDepth, + } + : null, + mining: user.miningAccount + ? { + totalMined: user.miningAccount.totalMined.toString(), + availableBalance: user.miningAccount.availableBalance.toString(), + frozenBalance: user.miningAccount.frozenBalance.toString(), + totalContribution: user.miningAccount.totalContribution.toString(), + } + : null, + trading: user.tradingAccount + ? { + shareBalance: user.tradingAccount.shareBalance.toString(), + cashBalance: user.tradingAccount.cashBalance.toString(), + frozenShares: user.tradingAccount.frozenShares.toString(), + frozenCash: user.tradingAccount.frozenCash.toString(), + totalBought: user.tradingAccount.totalBought.toString(), + totalSold: user.tradingAccount.totalSold.toString(), + } + : null, + }; + } + + private maskPhone(phone: string): string { + if (!phone || phone.length < 7) return phone; + return phone.substring(0, 3) + '****' + phone.substring(phone.length - 4); + } +} diff --git a/backend/services/mining-admin-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-admin-service/src/infrastructure/infrastructure.module.ts index a43c0076..6bd4ee2e 100644 --- a/backend/services/mining-admin-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-admin-service/src/infrastructure/infrastructure.module.ts @@ -1,11 +1,13 @@ import { Module, Global } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { PrismaModule } from './persistence/prisma/prisma.module'; +import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService } from './redis/redis.service'; +import { KafkaModule } from './kafka/kafka.module'; @Global() @Module({ - imports: [PrismaModule], + imports: [PrismaModule, KafkaModule], providers: [ { provide: 'REDIS_OPTIONS', @@ -13,12 +15,12 @@ import { RedisService } from './redis/redis.service'; host: configService.get('REDIS_HOST', 'localhost'), port: configService.get('REDIS_PORT', 6379), password: configService.get('REDIS_PASSWORD'), - db: configService.get('REDIS_DB', 3), + db: configService.get('REDIS_DB', 13), }), inject: [ConfigService], }, RedisService, ], - exports: [RedisService], + exports: [PrismaService, RedisService, KafkaModule], }) export class InfrastructureModule {} diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts new file mode 100644 index 00000000..c338a956 --- /dev/null +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -0,0 +1,251 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; + +/** + * CDC 事件结构 (Debezium 格式) + */ +export interface CdcEvent { + schema: any; + payload: { + before: any | null; + after: any | null; + source: { + version: string; + connector: string; + name: string; + ts_ms: number; + snapshot: string; + db: string; + sequence: string; + schema: string; + table: string; + txId: number; + lsn: number; + xmin: number | null; + }; + op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) + ts_ms: number; + transaction: any; + }; + sequenceNum: bigint; +} + +/** + * 2.0 服务间事件结构 (Outbox 格式) + */ +export interface ServiceEvent { + id: string; + aggregateType: string; + aggregateId: string; + eventType: string; + payload: any; + createdAt: string; + sequenceNum: bigint; +} + +export type CdcHandler = (event: CdcEvent) => Promise; +export type ServiceEventHandler = (event: ServiceEvent) => Promise; + +@Injectable() +export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(CdcConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private cdcHandlers: Map = new Map(); + private serviceHandlers: Map = new Map(); + private isRunning = false; + private topics: string[] = []; + + constructor(private readonly configService: ConfigService) { + const brokers = this.configService + .get('KAFKA_BROKERS', 'localhost:9092') + .split(','); + + this.kafka = new Kafka({ + clientId: 'mining-admin-service-cdc', + brokers, + }); + + this.consumer = this.kafka.consumer({ + groupId: this.configService.get( + 'CDC_CONSUMER_GROUP', + 'mining-admin-service-cdc-group', + ), + }); + } + + async onModuleInit() { + // 启动延迟到 CdcSyncService 注册完处理器后 + } + + async onModuleDestroy() { + await this.stop(); + } + + /** + * 注册 CDC 事件处理器 (1.0 → 2.0 同步) + */ + registerCdcHandler(tableName: string, handler: CdcHandler): void { + this.cdcHandlers.set(tableName, handler); + this.logger.log(`Registered CDC handler for table: ${tableName}`); + } + + /** + * 注册服务事件处理器 (2.0 服务间同步) + */ + registerServiceHandler(eventType: string, handler: ServiceEventHandler): void { + this.serviceHandlers.set(eventType, handler); + this.logger.log(`Registered service event handler for: ${eventType}`); + } + + /** + * 添加要订阅的 topic + */ + addTopic(topic: string): void { + if (!this.topics.includes(topic)) { + this.topics.push(topic); + } + } + + /** + * 启动消费者 + */ + async start(): Promise { + if (this.isRunning) { + this.logger.warn('CDC consumer is already running'); + return; + } + + if (this.topics.length === 0) { + this.logger.warn('No topics to subscribe, skipping CDC consumer start'); + return; + } + + try { + await this.consumer.connect(); + this.logger.log('CDC consumer connected'); + + await this.consumer.subscribe({ + topics: this.topics, + fromBeginning: false, + }); + this.logger.log(`Subscribed to topics: ${this.topics.join(', ')}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.isRunning = true; + this.logger.log('CDC consumer started'); + } catch (error) { + this.logger.error('Failed to start CDC consumer', error); + // 不抛出错误,允许服务继续运行(CDC 可能暂时不可用) + } + } + + /** + * 停止消费者 + */ + async stop(): Promise { + if (!this.isRunning) { + return; + } + + try { + await this.consumer.disconnect(); + this.isRunning = false; + this.logger.log('CDC consumer stopped'); + } catch (error) { + this.logger.error('Failed to stop CDC consumer', error); + } + } + + private async handleMessage(payload: EachMessagePayload): Promise { + const { topic, partition, message } = payload; + + try { + if (!message.value) { + return; + } + + const eventData = JSON.parse(message.value.toString()); + const sequenceNum = BigInt(message.offset); + + // 判断事件类型:Debezium CDC 或 服务 Outbox 事件 + if (this.isDebeziumEvent(eventData)) { + await this.handleCdcEvent(topic, eventData, sequenceNum); + } else if (this.isServiceEvent(eventData)) { + await this.handleServiceEvent(topic, eventData, sequenceNum); + } else { + this.logger.warn(`Unknown event format from topic: ${topic}`); + } + } catch (error) { + this.logger.error( + `Error processing message from topic ${topic}, partition ${partition}`, + error, + ); + } + } + + private isDebeziumEvent(data: any): boolean { + return data.payload && data.payload.source && data.payload.op; + } + + private isServiceEvent(data: any): boolean { + return data.eventType && data.aggregateType; + } + + private async handleCdcEvent( + topic: string, + eventData: any, + sequenceNum: bigint, + ): Promise { + const event: CdcEvent = { + ...eventData, + sequenceNum, + }; + + // 从 topic 名称提取表名 (格式: dbserver1.schema.tablename) + const parts = topic.split('.'); + const tableName = parts[parts.length - 1]; + + const handler = this.cdcHandlers.get(tableName); + if (handler) { + await handler(event); + this.logger.debug( + `Processed CDC event for table ${tableName}, op: ${event.payload.op}`, + ); + } + } + + private async handleServiceEvent( + topic: string, + eventData: any, + sequenceNum: bigint, + ): Promise { + const event: ServiceEvent = { + ...eventData, + sequenceNum, + }; + + const handler = this.serviceHandlers.get(event.eventType); + if (handler) { + await handler(event); + this.logger.debug(`Processed service event: ${event.eventType}`); + } else { + // 尝试通配符处理器 + const aggregateHandler = this.serviceHandlers.get( + `${event.aggregateType}.*`, + ); + if (aggregateHandler) { + await aggregateHandler(event); + this.logger.debug( + `Processed service event via wildcard: ${event.eventType}`, + ); + } + } + } +} diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts new file mode 100644 index 00000000..fb5f802a --- /dev/null +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -0,0 +1,486 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { PrismaService } from '../persistence/prisma/prisma.service'; +import { + CdcConsumerService, + CdcEvent, + ServiceEvent, +} from './cdc-consumer.service'; + +/** + * CDC 同步服务 + * 负责从各个 2.0 服务同步数据到 mining-admin-service + */ +@Injectable() +export class CdcSyncService implements OnModuleInit { + private readonly logger = new Logger(CdcSyncService.name); + + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + private readonly cdcConsumer: CdcConsumerService, + ) {} + + async onModuleInit() { + await this.registerHandlers(); + await this.cdcConsumer.start(); + } + + private async registerHandlers(): Promise { + // =========================================================================== + // 从 auth-service 同步用户数据 + // =========================================================================== + const usersTopic = this.configService.get( + 'CDC_TOPIC_USERS', + 'mining-admin.auth.users', + ); + this.cdcConsumer.addTopic(usersTopic); + this.cdcConsumer.registerServiceHandler( + 'UserCreated', + this.handleUserCreated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'UserUpdated', + this.handleUserUpdated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'KycStatusChanged', + this.handleKycStatusChanged.bind(this), + ); + + // =========================================================================== + // 从 contribution-service 同步算力数据 + // =========================================================================== + const contributionTopic = this.configService.get( + 'CDC_TOPIC_CONTRIBUTION', + 'mining-admin.contribution.accounts', + ); + this.cdcConsumer.addTopic(contributionTopic); + this.cdcConsumer.registerServiceHandler( + 'ContributionAccountUpdated', + this.handleContributionAccountUpdated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'SystemContributionUpdated', + this.handleSystemContributionUpdated.bind(this), + ); + + // =========================================================================== + // 从 mining-service 同步挖矿数据 + // =========================================================================== + const miningTopic = this.configService.get( + 'CDC_TOPIC_MINING', + 'mining-admin.mining.accounts', + ); + this.cdcConsumer.addTopic(miningTopic); + this.cdcConsumer.registerServiceHandler( + 'MiningAccountUpdated', + this.handleMiningAccountUpdated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'MiningConfigUpdated', + this.handleMiningConfigUpdated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'DailyMiningStatCreated', + this.handleDailyMiningStatCreated.bind(this), + ); + + // =========================================================================== + // 从 trading-service 同步交易数据 + // =========================================================================== + const tradingTopic = this.configService.get( + 'CDC_TOPIC_TRADING', + 'mining-admin.trading.accounts', + ); + this.cdcConsumer.addTopic(tradingTopic); + this.cdcConsumer.registerServiceHandler( + 'TradingAccountUpdated', + this.handleTradingAccountUpdated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'DayKLineCreated', + this.handleDayKLineCreated.bind(this), + ); + this.cdcConsumer.registerServiceHandler( + 'CirculationPoolUpdated', + this.handleCirculationPoolUpdated.bind(this), + ); + + this.logger.log('CDC sync handlers registered'); + } + + // =========================================================================== + // 用户事件处理 + // =========================================================================== + + private async handleUserCreated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.upsert({ + where: { originalUserId: payload.id }, + create: { + originalUserId: payload.id, + accountSequence: payload.accountSequence, + phone: payload.phone, + status: payload.status || 'ACTIVE', + kycStatus: payload.kycStatus || 'PENDING', + realName: payload.realName, + isLegacyUser: payload.isLegacyUser || false, + createdAt: new Date(payload.createdAt), + }, + update: { + phone: payload.phone, + status: payload.status || 'ACTIVE', + kycStatus: payload.kycStatus || 'PENDING', + realName: payload.realName, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced user: ${payload.accountSequence}`); + } catch (error) { + this.logger.error(`Failed to sync user: ${payload.id}`, error); + } + } + + private async handleUserUpdated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.updateMany({ + where: { originalUserId: payload.id }, + data: { + phone: payload.phone, + status: payload.status, + kycStatus: payload.kycStatus, + realName: payload.realName, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Updated user: ${payload.id}`); + } catch (error) { + this.logger.error(`Failed to update user: ${payload.id}`, error); + } + } + + private async handleKycStatusChanged(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedUser.updateMany({ + where: { accountSequence: payload.accountSequence }, + data: { + kycStatus: payload.kycStatus, + realName: payload.realName, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Updated KYC status: ${payload.accountSequence}`); + } catch (error) { + this.logger.error( + `Failed to update KYC status: ${payload.accountSequence}`, + error, + ); + } + } + + // =========================================================================== + // 算力账户事件处理 + // =========================================================================== + + private async handleContributionAccountUpdated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + await this.prisma.syncedContributionAccount.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + accountSequence: payload.accountSequence, + personalContribution: payload.personalContribution || 0, + teamLevelContribution: payload.teamLevelContribution || 0, + teamBonusContribution: payload.teamBonusContribution || 0, + totalContribution: payload.totalContribution || 0, + effectiveContribution: payload.effectiveContribution || 0, + hasAdopted: payload.hasAdopted || false, + directReferralCount: payload.directReferralAdoptedCount || 0, + unlockedLevelDepth: payload.unlockedLevelDepth || 0, + }, + update: { + personalContribution: payload.personalContribution, + teamLevelContribution: payload.teamLevelContribution, + teamBonusContribution: payload.teamBonusContribution, + totalContribution: payload.totalContribution, + effectiveContribution: payload.effectiveContribution, + hasAdopted: payload.hasAdopted, + directReferralCount: payload.directReferralAdoptedCount, + unlockedLevelDepth: payload.unlockedLevelDepth, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug( + `Synced contribution account: ${payload.accountSequence}`, + ); + } catch (error) { + this.logger.error( + `Failed to sync contribution account: ${payload.accountSequence}`, + error, + ); + } + } + + private async handleSystemContributionUpdated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + await this.prisma.syncedSystemContribution.upsert({ + where: { accountType: payload.accountType }, + create: { + accountType: payload.accountType, + name: payload.name, + contributionBalance: payload.contributionBalance || 0, + contributionNeverExpires: payload.contributionNeverExpires || false, + }, + update: { + name: payload.name, + contributionBalance: payload.contributionBalance, + contributionNeverExpires: payload.contributionNeverExpires, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug( + `Synced system contribution: ${payload.accountType}`, + ); + } catch (error) { + this.logger.error( + `Failed to sync system contribution: ${payload.accountType}`, + error, + ); + } + } + + // =========================================================================== + // 挖矿账户事件处理 + // =========================================================================== + + private async handleMiningAccountUpdated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedMiningAccount.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + accountSequence: payload.accountSequence, + totalMined: payload.totalMined || 0, + availableBalance: payload.availableBalance || 0, + frozenBalance: payload.frozenBalance || 0, + totalContribution: payload.totalContribution || 0, + }, + update: { + totalMined: payload.totalMined, + availableBalance: payload.availableBalance, + frozenBalance: payload.frozenBalance, + totalContribution: payload.totalContribution, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced mining account: ${payload.accountSequence}`); + } catch (error) { + this.logger.error( + `Failed to sync mining account: ${payload.accountSequence}`, + error, + ); + } + } + + private async handleMiningConfigUpdated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + // 只保留一条挖矿配置记录 + await this.prisma.syncedMiningConfig.deleteMany({}); + await this.prisma.syncedMiningConfig.create({ + data: { + totalShares: payload.totalShares, + distributionPool: payload.distributionPool, + remainingDistribution: payload.remainingDistribution, + halvingPeriodYears: payload.halvingPeriodYears, + currentEra: payload.currentEra || 1, + minuteDistribution: payload.minuteDistribution, + isActive: payload.isActive || false, + activatedAt: payload.activatedAt + ? new Date(payload.activatedAt) + : null, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug('Synced mining config'); + } catch (error) { + this.logger.error('Failed to sync mining config', error); + } + } + + private async handleDailyMiningStatCreated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + await this.prisma.syncedDailyMiningStat.upsert({ + where: { statDate: new Date(payload.date) }, + create: { + statDate: new Date(payload.date), + totalContribution: payload.totalContribution || 0, + totalDistributed: payload.totalDistributed || 0, + totalBurned: payload.totalBurned || 0, + participantCount: payload.participantCount || 0, + avgContributionRate: payload.avgContributionRate || 0, + }, + update: { + totalContribution: payload.totalContribution, + totalDistributed: payload.totalDistributed, + totalBurned: payload.totalBurned, + participantCount: payload.participantCount, + avgContributionRate: payload.avgContributionRate, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced daily mining stat: ${payload.date}`); + } catch (error) { + this.logger.error( + `Failed to sync daily mining stat: ${payload.date}`, + error, + ); + } + } + + // =========================================================================== + // 交易账户事件处理 + // =========================================================================== + + private async handleTradingAccountUpdated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + await this.prisma.syncedTradingAccount.upsert({ + where: { accountSequence: payload.accountSequence }, + create: { + accountSequence: payload.accountSequence, + shareBalance: payload.shareBalance || 0, + cashBalance: payload.cashBalance || 0, + frozenShares: payload.frozenShares || 0, + frozenCash: payload.frozenCash || 0, + totalBought: payload.totalBought || 0, + totalSold: payload.totalSold || 0, + }, + update: { + shareBalance: payload.shareBalance, + cashBalance: payload.cashBalance, + frozenShares: payload.frozenShares, + frozenCash: payload.frozenCash, + totalBought: payload.totalBought, + totalSold: payload.totalSold, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced trading account: ${payload.accountSequence}`); + } catch (error) { + this.logger.error( + `Failed to sync trading account: ${payload.accountSequence}`, + error, + ); + } + } + + private async handleDayKLineCreated(event: ServiceEvent): Promise { + const { payload } = event; + + try { + await this.prisma.syncedDayKLine.upsert({ + where: { klineDate: new Date(payload.date) }, + create: { + klineDate: new Date(payload.date), + open: payload.open, + high: payload.high, + low: payload.low, + close: payload.close, + volume: payload.volume || 0, + amount: payload.amount || 0, + tradeCount: payload.tradeCount || 0, + }, + update: { + open: payload.open, + high: payload.high, + low: payload.low, + close: payload.close, + volume: payload.volume, + amount: payload.amount, + tradeCount: payload.tradeCount, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug(`Synced day K-line: ${payload.date}`); + } catch (error) { + this.logger.error(`Failed to sync day K-line: ${payload.date}`, error); + } + } + + private async handleCirculationPoolUpdated( + event: ServiceEvent, + ): Promise { + const { payload } = event; + + try { + // 只保留一条流通池记录 + await this.prisma.syncedCirculationPool.deleteMany({}); + await this.prisma.syncedCirculationPool.create({ + data: { + totalShares: payload.totalShares || 0, + totalCash: payload.totalCash || 0, + }, + }); + + await this.recordProcessedEvent(event); + this.logger.debug('Synced circulation pool'); + } catch (error) { + this.logger.error('Failed to sync circulation pool', error); + } + } + + // =========================================================================== + // 辅助方法 + // =========================================================================== + + private async recordProcessedEvent(event: ServiceEvent): Promise { + try { + await this.prisma.processedEvent.upsert({ + where: { eventId: event.id }, + create: { + eventId: event.id, + eventType: event.eventType, + sourceService: event.aggregateType, + }, + update: {}, + }); + } catch (error) { + // 忽略幂等性记录失败 + this.logger.warn(`Failed to record processed event: ${event.id}`); + } + } +} diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/kafka.module.ts new file mode 100644 index 00000000..0e5f04b4 --- /dev/null +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/kafka.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { CdcConsumerService } from './cdc-consumer.service'; +import { CdcSyncService } from './cdc-sync.service'; + +@Module({ + imports: [ConfigModule], + providers: [CdcConsumerService, CdcSyncService], + exports: [CdcConsumerService, CdcSyncService], +}) +export class KafkaModule {}