feat(mining-admin-service): 添加CDC同步和完整用户管理API

## Prisma Schema 更新
- 添加 CDC 同步表:SyncedUser, SyncedContributionAccount, SyncedMiningAccount, SyncedTradingAccount
- 添加系统数据同步表:SyncedMiningConfig, SyncedDailyMiningStat, SyncedDayKLine, SyncedCirculationPool
- 添加 CDC 进度跟踪:CdcSyncProgress, ProcessedEvent

## CDC 消费者模块
- CdcConsumerService: Kafka 消费者,支持 Debezium CDC 和服务间事件
- CdcSyncService: 同步处理器,从 auth/contribution/mining/trading 服务同步数据

## 新增 API 端点
### 用户管理 (/api/v1/users)
- GET /users - 用户列表(分页、搜索、过滤)
- GET /users/:accountSequence - 用户详情
- GET /users/:accountSequence/contributions - 算力记录
- GET /users/:accountSequence/mining-records - 挖矿记录
- GET /users/:accountSequence/orders - 交易订单

### 系统账户 (/api/v1/system-accounts)
- GET /system-accounts - 系统账户列表
- GET /system-accounts/summary - 系统账户汇总

### 仪表盘增强 (/api/v1/dashboard)
- GET /dashboard - 统计数据(新增用户/算力/挖矿/交易统计)
- GET /dashboard/realtime - 实时数据
- GET /dashboard/stats - 统计数据(别名)

## Docker Compose 更新
- 添加 Kafka 依赖和 CDC topic 配置
- 添加与 auth-service 的依赖关系

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-10 20:36:21 -08:00
parent 2a09fca728
commit 15a5fb6c14
16 changed files with 1933 additions and 65 deletions

View File

@ -151,10 +151,16 @@ services:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_healthy
contribution-service:
condition: service_healthy
mining-service:
condition: service_healthy
trading-service:
condition: service_healthy
auth-service:
condition: service_healthy
environment:
NODE_ENV: production
TZ: Asia/Shanghai
@ -166,10 +172,19 @@ services:
REDIS_PORT: 6379
REDIS_PASSWORD: ${REDIS_PASSWORD:-}
REDIS_DB: 13
# 2.0 内部服务调用
# Kafka - 消费 2.0 服务间事件
KAFKA_BROKERS: kafka:29092
CDC_CONSUMER_GROUP: mining-admin-service-cdc-group
# CDC Topics - 从各 2.0 服务同步数据
CDC_TOPIC_USERS: ${CDC_TOPIC_ADMIN_USERS:-mining-admin.auth.users}
CDC_TOPIC_CONTRIBUTION: ${CDC_TOPIC_ADMIN_CONTRIBUTION:-mining-admin.contribution.accounts}
CDC_TOPIC_MINING: ${CDC_TOPIC_ADMIN_MINING:-mining-admin.mining.accounts}
CDC_TOPIC_TRADING: ${CDC_TOPIC_ADMIN_TRADING:-mining-admin.trading.accounts}
# 2.0 内部服务调用(备用)
CONTRIBUTION_SERVICE_URL: http://contribution-service:3020
MINING_SERVICE_URL: http://mining-service:3021
TRADING_SERVICE_URL: http://trading-service:3022
AUTH_SERVICE_URL: http://auth-service:3024
JWT_SECRET: ${ADMIN_JWT_SECRET:-your-admin-jwt-secret-change-in-production}
ports:
- "3023:3023"

View File

@ -19,6 +19,7 @@
"decimal.js": "^10.4.3",
"ioredis": "^5.3.2",
"jsonwebtoken": "^9.0.2",
"kafkajs": "^2.2.4",
"reflect-metadata": "^0.1.14",
"rxjs": "^7.8.1",
"swagger-ui-express": "^5.0.0"
@ -3802,6 +3803,15 @@
"safe-buffer": "^5.0.1"
}
},
"node_modules/kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
"license": "MIT",
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/keyv": {
"version": "4.5.4",
"resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz",

View File

@ -26,6 +26,7 @@
"decimal.js": "^10.4.3",
"ioredis": "^5.3.2",
"jsonwebtoken": "^9.0.2",
"kafkajs": "^2.2.4",
"reflect-metadata": "^0.1.14",
"rxjs": "^7.8.1",
"swagger-ui-express": "^5.0.0"

View File

@ -7,7 +7,9 @@ datasource db {
url = env("DATABASE_URL")
}
// ==================== 管理员 ====================
// =============================================================================
// 管理员模块
// =============================================================================
model AdminUser {
id String @id @default(uuid())
@ -25,7 +27,9 @@ model AdminUser {
@@map("admin_users")
}
// ==================== 系统配置 ====================
// =============================================================================
// 系统配置
// =============================================================================
model SystemConfig {
id String @id @default(uuid())
@ -41,11 +45,13 @@ model SystemConfig {
@@map("system_configs")
}
// ==================== 系统账户(运营/省/市)====================
// =============================================================================
// 系统账户(运营/省/市)
// =============================================================================
model SystemAccount {
id String @id @default(uuid())
accountType String @unique // OPERATION, PROVINCE, CITY
accountType String @unique // OPERATION, PROVINCE, CITY, HEADQUARTERS
name String
description String?
totalContribution Decimal @db.Decimal(30, 8) @default(0)
@ -55,11 +61,13 @@ model SystemAccount {
@@map("system_accounts")
}
// ==================== 初始化记录 ====================
// =============================================================================
// 初始化记录
// =============================================================================
model InitializationRecord {
id String @id @default(uuid())
type String // MINING_CONFIG, BLACK_HOLE, SYSTEM_ACCOUNTS
type String // MINING_CONFIG, BLACK_HOLE, SYSTEM_ACCOUNTS, ACTIVATE_MINING
status String // PENDING, COMPLETED, FAILED
config Json
executedBy String
@ -70,7 +78,9 @@ model InitializationRecord {
@@map("initialization_records")
}
// ==================== 审计日志 ====================
// =============================================================================
// 审计日志
// =============================================================================
model AuditLog {
id String @id @default(uuid())
@ -93,7 +103,9 @@ model AuditLog {
@@map("audit_logs")
}
// ==================== 报表快照 ====================
// =============================================================================
// 报表快照
// =============================================================================
model DailyReport {
id String @id @default(uuid())
@ -132,3 +144,218 @@ model DailyReport {
@@map("daily_reports")
}
// =============================================================================
// CDC 同步表 - 用户数据 (from auth-service)
// =============================================================================
model SyncedUser {
id String @id @default(uuid())
originalUserId String @unique // auth-service 中的原始 ID
accountSequence String @unique // 账户序列号
phone String
status String // ACTIVE, DISABLED, DELETED
kycStatus String // PENDING, SUBMITTED, VERIFIED, REJECTED
realName String?
isLegacyUser Boolean @default(false) // 是否为 1.0 迁移用户
createdAt DateTime
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关联同步表
contributionAccount SyncedContributionAccount?
miningAccount SyncedMiningAccount?
tradingAccount SyncedTradingAccount?
@@index([phone])
@@index([status])
@@index([kycStatus])
@@index([createdAt(sort: Desc)])
@@map("synced_users")
}
// =============================================================================
// CDC 同步表 - 算力账户 (from contribution-service)
// =============================================================================
model SyncedContributionAccount {
id String @id @default(uuid())
accountSequence String @unique
personalContribution Decimal @db.Decimal(30, 8) @default(0)
teamLevelContribution Decimal @db.Decimal(30, 8) @default(0)
teamBonusContribution Decimal @db.Decimal(30, 8) @default(0)
totalContribution Decimal @db.Decimal(30, 8) @default(0)
effectiveContribution Decimal @db.Decimal(30, 8) @default(0)
hasAdopted Boolean @default(false)
directReferralCount Int @default(0)
unlockedLevelDepth Int @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
user SyncedUser @relation(fields: [accountSequence], references: [accountSequence])
@@map("synced_contribution_accounts")
}
// =============================================================================
// CDC 同步表 - 挖矿账户 (from mining-service)
// =============================================================================
model SyncedMiningAccount {
id String @id @default(uuid())
accountSequence String @unique
totalMined Decimal @db.Decimal(30, 8) @default(0)
availableBalance Decimal @db.Decimal(30, 8) @default(0)
frozenBalance Decimal @db.Decimal(30, 8) @default(0)
totalContribution Decimal @db.Decimal(30, 8) @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
user SyncedUser @relation(fields: [accountSequence], references: [accountSequence])
@@map("synced_mining_accounts")
}
// =============================================================================
// CDC 同步表 - 交易账户 (from trading-service)
// =============================================================================
model SyncedTradingAccount {
id String @id @default(uuid())
accountSequence String @unique
shareBalance Decimal @db.Decimal(30, 8) @default(0)
cashBalance Decimal @db.Decimal(30, 8) @default(0)
frozenShares Decimal @db.Decimal(30, 8) @default(0)
frozenCash Decimal @db.Decimal(30, 8) @default(0)
totalBought Decimal @db.Decimal(30, 8) @default(0)
totalSold Decimal @db.Decimal(30, 8) @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
user SyncedUser @relation(fields: [accountSequence], references: [accountSequence])
@@map("synced_trading_accounts")
}
// =============================================================================
// CDC 同步表 - 挖矿配置 (from mining-service)
// =============================================================================
model SyncedMiningConfig {
id String @id @default(uuid())
totalShares Decimal @db.Decimal(30, 8)
distributionPool Decimal @db.Decimal(30, 8)
remainingDistribution Decimal @db.Decimal(30, 8)
halvingPeriodYears Int
currentEra Int @default(1)
minuteDistribution Decimal @db.Decimal(30, 8)
isActive Boolean @default(false)
activatedAt DateTime?
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_mining_configs")
}
// =============================================================================
// CDC 同步表 - 每日挖矿统计 (from mining-service)
// =============================================================================
model SyncedDailyMiningStat {
id String @id @default(uuid())
statDate DateTime @unique @db.Date
totalContribution Decimal @db.Decimal(30, 8) @default(0)
totalDistributed Decimal @db.Decimal(30, 8) @default(0)
totalBurned Decimal @db.Decimal(30, 8) @default(0)
participantCount Int @default(0)
avgContributionRate Decimal @db.Decimal(30, 18) @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_daily_mining_stats")
}
// =============================================================================
// CDC 同步表 - 日K线 (from trading-service)
// =============================================================================
model SyncedDayKLine {
id String @id @default(uuid())
klineDate DateTime @unique @db.Date
open Decimal @db.Decimal(30, 18)
high Decimal @db.Decimal(30, 18)
low Decimal @db.Decimal(30, 18)
close Decimal @db.Decimal(30, 18)
volume Decimal @db.Decimal(30, 8) @default(0)
amount Decimal @db.Decimal(30, 8) @default(0)
tradeCount Int @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_day_klines")
}
// =============================================================================
// CDC 同步表 - 流通池 (from trading-service)
// =============================================================================
model SyncedCirculationPool {
id String @id @default(uuid())
totalShares Decimal @db.Decimal(30, 8) @default(0)
totalCash Decimal @db.Decimal(30, 8) @default(0)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_circulation_pools")
}
// =============================================================================
// CDC 同步表 - 系统账户算力 (from contribution-service)
// =============================================================================
model SyncedSystemContribution {
id String @id @default(uuid())
accountType String @unique // OPERATION, PROVINCE, CITY, HEADQUARTERS
name String
contributionBalance Decimal @db.Decimal(30, 8) @default(0)
contributionNeverExpires Boolean @default(false)
syncedAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@map("synced_system_contributions")
}
// =============================================================================
// CDC 同步进度跟踪
// =============================================================================
model CdcSyncProgress {
id String @id @default(uuid())
sourceTopic String @unique
sourceService String // auth, contribution, mining, trading
lastOffset String? // Kafka offset
lastSequenceNum BigInt @default(0)
lastSyncedAt DateTime @default(now())
errorCount Int @default(0)
lastError String?
updatedAt DateTime @updatedAt
@@index([sourceService])
@@map("cdc_sync_progress")
}
// =============================================================================
// 已处理事件(幂等性)
// =============================================================================
model ProcessedEvent {
id String @id @default(uuid())
eventId String @unique
eventType String
sourceService String
processedAt DateTime @default(now())
@@index([sourceService])
@@index([processedAt])
@@map("processed_events")
}

View File

@ -6,9 +6,20 @@ import { ConfigController } from './controllers/config.controller';
import { InitializationController } from './controllers/initialization.controller';
import { AuditController } from './controllers/audit.controller';
import { HealthController } from './controllers/health.controller';
import { UsersController } from './controllers/users.controller';
import { SystemAccountsController } from './controllers/system-accounts.controller';
@Module({
imports: [ApplicationModule],
controllers: [AuthController, DashboardController, ConfigController, InitializationController, AuditController, HealthController],
controllers: [
AuthController,
DashboardController,
ConfigController,
InitializationController,
AuditController,
HealthController,
UsersController,
SystemAccountsController,
],
})
export class ApiModule {}

View File

@ -1,5 +1,10 @@
import { Controller, Get, Query } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiBearerAuth, ApiQuery } from '@nestjs/swagger';
import {
ApiTags,
ApiOperation,
ApiBearerAuth,
ApiQuery,
} from '@nestjs/swagger';
import { DashboardService } from '../../application/services/dashboard.service';
@ApiTags('Dashboard')
@ -8,17 +13,32 @@ import { DashboardService } from '../../application/services/dashboard.service';
export class DashboardController {
constructor(private readonly dashboardService: DashboardService) {}
@Get('stats')
@Get()
@ApiOperation({ summary: '获取仪表盘统计数据' })
async getStats() {
return this.dashboardService.getDashboardStats();
}
@Get('stats')
@ApiOperation({ summary: '获取仪表盘统计数据(别名)' })
async getStatsAlias() {
return this.dashboardService.getDashboardStats();
}
@Get('realtime')
@ApiOperation({ summary: '获取实时数据' })
async getRealtimeStats() {
return this.dashboardService.getRealtimeStats();
}
@Get('reports')
@ApiOperation({ summary: '获取每日报表' })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
async getReports(@Query('page') page?: number, @Query('pageSize') pageSize?: number) {
async getReports(
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
) {
return this.dashboardService.getReports(page ?? 1, pageSize ?? 30);
}
}

View File

@ -0,0 +1,22 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiBearerAuth } from '@nestjs/swagger';
import { SystemAccountsService } from '../../application/services/system-accounts.service';
@ApiTags('System Accounts')
@ApiBearerAuth()
@Controller('system-accounts')
export class SystemAccountsController {
constructor(private readonly systemAccountsService: SystemAccountsService) {}
@Get()
@ApiOperation({ summary: '获取系统账户列表' })
async getSystemAccounts() {
return this.systemAccountsService.getSystemAccounts();
}
@Get('summary')
@ApiOperation({ summary: '获取系统账户汇总' })
async getSystemAccountsSummary() {
return this.systemAccountsService.getSystemAccountsSummary();
}
}

View File

@ -0,0 +1,105 @@
import { Controller, Get, Param, Query } from '@nestjs/common';
import {
ApiTags,
ApiOperation,
ApiBearerAuth,
ApiQuery,
ApiParam,
} from '@nestjs/swagger';
import { UsersService } from '../../application/services/users.service';
@ApiTags('Users')
@ApiBearerAuth()
@Controller('users')
export class UsersController {
constructor(private readonly usersService: UsersService) {}
@Get()
@ApiOperation({ summary: '获取用户列表' })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
@ApiQuery({ name: 'search', required: false, type: String, description: '手机号或账户序列号' })
@ApiQuery({ name: 'status', required: false, type: String, description: 'ACTIVE, DISABLED, DELETED' })
@ApiQuery({ name: 'kycStatus', required: false, type: String, description: 'PENDING, SUBMITTED, VERIFIED, REJECTED' })
@ApiQuery({ name: 'hasAdopted', required: false, type: Boolean })
async getUsers(
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
@Query('search') search?: string,
@Query('status') status?: string,
@Query('kycStatus') kycStatus?: string,
@Query('hasAdopted') hasAdopted?: boolean,
) {
return this.usersService.getUsers({
page: page ?? 1,
pageSize: pageSize ?? 20,
search,
status,
kycStatus,
hasAdopted,
});
}
@Get(':accountSequence')
@ApiOperation({ summary: '获取用户详情' })
@ApiParam({ name: 'accountSequence', type: String })
async getUserDetail(@Param('accountSequence') accountSequence: string) {
return this.usersService.getUserDetail(accountSequence);
}
@Get(':accountSequence/contributions')
@ApiOperation({ summary: '获取用户算力记录' })
@ApiParam({ name: 'accountSequence', type: String })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
async getUserContributions(
@Param('accountSequence') accountSequence: string,
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
) {
return this.usersService.getUserContributions(
accountSequence,
page ?? 1,
pageSize ?? 20,
);
}
@Get(':accountSequence/mining-records')
@ApiOperation({ summary: '获取用户挖矿记录' })
@ApiParam({ name: 'accountSequence', type: String })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
async getUserMiningRecords(
@Param('accountSequence') accountSequence: string,
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
) {
return this.usersService.getUserMiningRecords(
accountSequence,
page ?? 1,
pageSize ?? 20,
);
}
@Get(':accountSequence/orders')
@ApiOperation({ summary: '获取用户交易订单' })
@ApiParam({ name: 'accountSequence', type: String })
@ApiQuery({ name: 'page', required: false, type: Number })
@ApiQuery({ name: 'pageSize', required: false, type: Number })
@ApiQuery({ name: 'type', required: false, type: String, description: 'BUY, SELL' })
@ApiQuery({ name: 'status', required: false, type: String, description: 'PENDING, PARTIAL, FILLED, CANCELLED' })
async getUserOrders(
@Param('accountSequence') accountSequence: string,
@Query('page') page?: number,
@Query('pageSize') pageSize?: number,
@Query('type') type?: string,
@Query('status') status?: string,
) {
return this.usersService.getUserOrders(accountSequence, {
page: page ?? 1,
pageSize: pageSize ?? 20,
type,
status,
});
}
}

View File

@ -4,11 +4,27 @@ import { AuthService } from './services/auth.service';
import { ConfigManagementService } from './services/config.service';
import { InitializationService } from './services/initialization.service';
import { DashboardService } from './services/dashboard.service';
import { UsersService } from './services/users.service';
import { SystemAccountsService } from './services/system-accounts.service';
@Module({
imports: [InfrastructureModule],
providers: [AuthService, ConfigManagementService, InitializationService, DashboardService],
exports: [AuthService, ConfigManagementService, InitializationService, DashboardService],
providers: [
AuthService,
ConfigManagementService,
InitializationService,
DashboardService,
UsersService,
SystemAccountsService,
],
exports: [
AuthService,
ConfigManagementService,
InitializationService,
DashboardService,
UsersService,
SystemAccountsService,
],
})
export class ApplicationModule implements OnModuleInit {
constructor(private readonly authService: AuthService) {}

View File

@ -11,66 +11,228 @@ export class DashboardService {
private readonly configService: ConfigService,
) {}
/**
*
*
*/
async getDashboardStats(): Promise<any> {
const [contributionStats, miningStats, tradingStats, latestReport] = await Promise.all([
this.fetchContributionStats(),
this.fetchMiningStats(),
this.fetchTradingStats(),
const [
userStats,
contributionStats,
miningStats,
tradingStats,
latestReport,
latestKLine,
] = await Promise.all([
this.getUserStats(),
this.getContributionStats(),
this.getMiningStats(),
this.getTradingStats(),
this.prisma.dailyReport.findFirst({ orderBy: { reportDate: 'desc' } }),
this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }),
]);
return {
users: userStats,
contribution: contributionStats,
mining: miningStats,
trading: tradingStats,
latestReport,
latestReport: latestReport
? this.formatDailyReport(latestReport)
: null,
latestPrice: latestKLine
? {
date: latestKLine.klineDate,
open: latestKLine.open.toString(),
high: latestKLine.high.toString(),
low: latestKLine.low.toString(),
close: latestKLine.close.toString(),
volume: latestKLine.volume.toString(),
amount: latestKLine.amount.toString(),
}
: null,
timestamp: new Date().toISOString(),
};
}
private async fetchContributionStats(): Promise<any> {
try {
const url = `${this.configService.get('CONTRIBUTION_SERVICE_URL')}/api/v1/contributions/stats`;
const response = await fetch(url);
if (response.ok) {
const result = await response.json();
return result.data;
}
} catch (error) {
this.logger.error('Failed to fetch contribution stats', error);
}
return null;
/**
*
*/
async getRealtimeStats(): Promise<any> {
const [miningConfig, circulationPool, latestKLine] = await Promise.all([
this.prisma.syncedMiningConfig.findFirst(),
this.prisma.syncedCirculationPool.findFirst(),
this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }),
]);
return {
miningActive: miningConfig?.isActive || false,
currentEra: miningConfig?.currentEra || 0,
minuteDistribution: miningConfig?.minuteDistribution?.toString() || '0',
remainingDistribution:
miningConfig?.remainingDistribution?.toString() || '0',
circulationShares: circulationPool?.totalShares?.toString() || '0',
circulationCash: circulationPool?.totalCash?.toString() || '0',
currentPrice: latestKLine?.close?.toString() || '1',
timestamp: new Date().toISOString(),
};
}
private async fetchMiningStats(): Promise<any> {
try {
const url = `${this.configService.get('MINING_SERVICE_URL')}/api/v1/mining/stats`;
const response = await fetch(url);
if (response.ok) {
const result = await response.json();
return result.data;
}
} catch (error) {
this.logger.error('Failed to fetch mining stats', error);
}
return null;
/**
*
*/
private async getUserStats() {
const [totalUsers, activeUsers, kycVerifiedUsers, adoptedUsers] =
await Promise.all([
this.prisma.syncedUser.count(),
this.prisma.syncedUser.count({ where: { status: 'ACTIVE' } }),
this.prisma.syncedUser.count({ where: { kycStatus: 'VERIFIED' } }),
this.prisma.syncedContributionAccount.count({
where: { hasAdopted: true },
}),
]);
// 今日新增用户
const today = new Date();
today.setHours(0, 0, 0, 0);
const newUsersToday = await this.prisma.syncedUser.count({
where: { createdAt: { gte: today } },
});
return {
total: totalUsers,
active: activeUsers,
kycVerified: kycVerifiedUsers,
adopted: adoptedUsers,
newToday: newUsersToday,
};
}
private async fetchTradingStats(): Promise<any> {
try {
const url = `${this.configService.get('TRADING_SERVICE_URL')}/api/v1/trading/stats`;
const response = await fetch(url);
if (response.ok) {
const result = await response.json();
return result.data;
}
} catch (error) {
this.logger.error('Failed to fetch trading stats', error);
}
return null;
/**
*
*/
private async getContributionStats() {
const accounts = await this.prisma.syncedContributionAccount.aggregate({
_sum: {
totalContribution: true,
effectiveContribution: true,
personalContribution: true,
teamLevelContribution: true,
teamBonusContribution: true,
},
_count: true,
});
const systemContributions =
await this.prisma.syncedSystemContribution.aggregate({
_sum: { contributionBalance: true },
_count: true,
});
return {
totalAccounts: accounts._count,
totalContribution: accounts._sum.totalContribution?.toString() || '0',
effectiveContribution:
accounts._sum.effectiveContribution?.toString() || '0',
personalContribution:
accounts._sum.personalContribution?.toString() || '0',
teamLevelContribution:
accounts._sum.teamLevelContribution?.toString() || '0',
teamBonusContribution:
accounts._sum.teamBonusContribution?.toString() || '0',
systemAccounts: systemContributions._count,
systemContribution:
systemContributions._sum.contributionBalance?.toString() || '0',
};
}
async getReports(page: number = 1, pageSize: number = 30): Promise<{ data: any[]; total: number }> {
/**
*
*/
private async getMiningStats() {
const [config, accounts, latestDailyStat] = await Promise.all([
this.prisma.syncedMiningConfig.findFirst(),
this.prisma.syncedMiningAccount.aggregate({
_sum: {
totalMined: true,
availableBalance: true,
frozenBalance: true,
},
_count: true,
}),
this.prisma.syncedDailyMiningStat.findFirst({
orderBy: { statDate: 'desc' },
}),
]);
return {
isActive: config?.isActive || false,
currentEra: config?.currentEra || 0,
totalShares: config?.totalShares?.toString() || '0',
distributionPool: config?.distributionPool?.toString() || '0',
remainingDistribution: config?.remainingDistribution?.toString() || '0',
minuteDistribution: config?.minuteDistribution?.toString() || '0',
activatedAt: config?.activatedAt,
totalAccounts: accounts._count,
totalMined: accounts._sum.totalMined?.toString() || '0',
totalAvailable: accounts._sum.availableBalance?.toString() || '0',
totalFrozen: accounts._sum.frozenBalance?.toString() || '0',
latestDailyStat: latestDailyStat
? {
date: latestDailyStat.statDate,
totalDistributed: latestDailyStat.totalDistributed.toString(),
totalBurned: latestDailyStat.totalBurned.toString(),
participantCount: latestDailyStat.participantCount,
}
: null,
};
}
/**
*
*/
private async getTradingStats() {
const [accounts, circulationPool, latestKLine] = await Promise.all([
this.prisma.syncedTradingAccount.aggregate({
_sum: {
shareBalance: true,
cashBalance: true,
frozenShares: true,
frozenCash: true,
totalBought: true,
totalSold: true,
},
_count: true,
}),
this.prisma.syncedCirculationPool.findFirst(),
this.prisma.syncedDayKLine.findFirst({ orderBy: { klineDate: 'desc' } }),
]);
return {
totalAccounts: accounts._count,
totalShareBalance: accounts._sum.shareBalance?.toString() || '0',
totalCashBalance: accounts._sum.cashBalance?.toString() || '0',
totalFrozenShares: accounts._sum.frozenShares?.toString() || '0',
totalFrozenCash: accounts._sum.frozenCash?.toString() || '0',
totalBought: accounts._sum.totalBought?.toString() || '0',
totalSold: accounts._sum.totalSold?.toString() || '0',
circulationPool: circulationPool
? {
totalShares: circulationPool.totalShares.toString(),
totalCash: circulationPool.totalCash.toString(),
}
: null,
latestPrice: latestKLine?.close?.toString() || '1',
};
}
/**
*
*/
async getReports(
page: number = 1,
pageSize: number = 30,
): Promise<{ data: any[]; total: number; pagination: any }> {
const [reports, total] = await Promise.all([
this.prisma.dailyReport.findMany({
orderBy: { reportDate: 'desc' },
@ -79,12 +241,29 @@ export class DashboardService {
}),
this.prisma.dailyReport.count(),
]);
return { data: reports, total };
return {
data: reports.map((r) => this.formatDailyReport(r)),
total,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}
async getAuditLogs(
options: { adminId?: string; action?: string; resource?: string; page?: number; pageSize?: number },
): Promise<{ data: any[]; total: number }> {
/**
*
*/
async getAuditLogs(options: {
adminId?: string;
action?: string;
resource?: string;
page?: number;
pageSize?: number;
}): Promise<{ data: any[]; total: number; pagination: any }> {
const where: any = {};
if (options.adminId) where.adminId = options.adminId;
if (options.action) where.action = options.action;
@ -104,6 +283,56 @@ export class DashboardService {
this.prisma.auditLog.count({ where }),
]);
return { data: logs, total };
return {
data: logs,
total,
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}
// ===========================================================================
// 辅助方法
// ===========================================================================
private formatDailyReport(report: any) {
return {
id: report.id,
reportDate: report.reportDate,
users: {
total: report.totalUsers,
new: report.newUsers,
active: report.activeUsers,
},
adoptions: {
total: report.totalAdoptions,
new: report.newAdoptions,
totalTrees: report.totalTrees,
},
contribution: {
total: report.totalContribution?.toString() || '0',
growth: report.contributionGrowth?.toString() || '0',
},
mining: {
distributed: report.totalDistributed?.toString() || '0',
burned: report.totalBurned?.toString() || '0',
},
trading: {
volume: report.tradingVolume?.toString() || '0',
amount: report.tradingAmount?.toString() || '0',
count: report.tradeCount,
},
price: {
open: report.openPrice?.toString() || '1',
close: report.closePrice?.toString() || '1',
high: report.highPrice?.toString() || '1',
low: report.lowPrice?.toString() || '1',
},
createdAt: report.createdAt,
};
}
}

View File

@ -0,0 +1,118 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
@Injectable()
export class SystemAccountsService {
constructor(private readonly prisma: PrismaService) {}
/**
*
*/
async getSystemAccounts() {
// 先从本地 SystemAccount 表获取
const localAccounts = await this.prisma.systemAccount.findMany({
orderBy: { accountType: 'asc' },
});
// 再从 CDC 同步的 SyncedSystemContribution 获取算力数据
const syncedContributions =
await this.prisma.syncedSystemContribution.findMany();
// 合并数据
const accountsMap = new Map<string, any>();
// 添加本地账户
for (const account of localAccounts) {
accountsMap.set(account.accountType, {
accountType: account.accountType,
name: account.name,
description: account.description,
totalContribution: account.totalContribution.toString(),
createdAt: account.createdAt,
source: 'local',
});
}
// 更新或添加同步的算力数据
for (const contrib of syncedContributions) {
const existing = accountsMap.get(contrib.accountType);
if (existing) {
existing.contributionBalance = contrib.contributionBalance.toString();
existing.contributionNeverExpires = contrib.contributionNeverExpires;
existing.syncedAt = contrib.syncedAt;
existing.source = 'synced';
} else {
accountsMap.set(contrib.accountType, {
accountType: contrib.accountType,
name: contrib.name,
contributionBalance: contrib.contributionBalance.toString(),
contributionNeverExpires: contrib.contributionNeverExpires,
syncedAt: contrib.syncedAt,
source: 'synced',
});
}
}
return {
accounts: Array.from(accountsMap.values()),
total: accountsMap.size,
};
}
/**
*
*/
async getSystemAccountsSummary() {
const [localAccounts, syncedContributions, miningConfig, circulationPool] =
await Promise.all([
this.prisma.systemAccount.findMany(),
this.prisma.syncedSystemContribution.findMany(),
this.prisma.syncedMiningConfig.findFirst(),
this.prisma.syncedCirculationPool.findFirst(),
]);
// 计算总算力
let totalSystemContribution = 0n;
for (const account of localAccounts) {
totalSystemContribution += BigInt(
account.totalContribution.toString().replace('.', ''),
);
}
let totalSyncedContribution = 0n;
for (const contrib of syncedContributions) {
totalSyncedContribution += BigInt(
contrib.contributionBalance.toString().replace('.', ''),
);
}
return {
systemAccounts: {
count: localAccounts.length,
totalContribution: (
Number(totalSystemContribution) / 100000000
).toFixed(8),
},
syncedContributions: {
count: syncedContributions.length,
totalBalance: (Number(totalSyncedContribution) / 100000000).toFixed(8),
},
miningConfig: miningConfig
? {
totalShares: miningConfig.totalShares.toString(),
distributionPool: miningConfig.distributionPool.toString(),
remainingDistribution: miningConfig.remainingDistribution.toString(),
currentEra: miningConfig.currentEra,
isActive: miningConfig.isActive,
activatedAt: miningConfig.activatedAt,
}
: null,
circulationPool: circulationPool
? {
totalShares: circulationPool.totalShares.toString(),
totalCash: circulationPool.totalCash.toString(),
}
: null,
};
}
}

View File

@ -0,0 +1,344 @@
import { Injectable, NotFoundException } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/persistence/prisma/prisma.service';
import { Prisma } from '@prisma/client';
export interface GetUsersQuery {
page: number;
pageSize: number;
search?: string;
status?: string;
kycStatus?: string;
hasAdopted?: boolean;
}
export interface GetOrdersQuery {
page: number;
pageSize: number;
type?: string;
status?: string;
}
@Injectable()
export class UsersService {
constructor(private readonly prisma: PrismaService) {}
/**
*
*/
async getUsers(query: GetUsersQuery) {
const { page, pageSize, search, status, kycStatus, hasAdopted } = query;
const skip = (page - 1) * pageSize;
const where: Prisma.SyncedUserWhereInput = {};
if (search) {
where.OR = [
{ phone: { contains: search } },
{ accountSequence: { contains: search } },
{ realName: { contains: search } },
];
}
if (status) {
where.status = status;
}
if (kycStatus) {
where.kycStatus = kycStatus;
}
if (hasAdopted !== undefined) {
where.contributionAccount = {
hasAdopted: hasAdopted,
};
}
const [users, total] = await Promise.all([
this.prisma.syncedUser.findMany({
where,
skip,
take: pageSize,
orderBy: { createdAt: 'desc' },
include: {
contributionAccount: true,
miningAccount: true,
tradingAccount: true,
},
}),
this.prisma.syncedUser.count({ where }),
]);
return {
data: users.map((user) => this.formatUserListItem(user)),
pagination: {
page,
pageSize,
total,
totalPages: Math.ceil(total / pageSize),
},
};
}
/**
*
*/
async getUserDetail(accountSequence: string) {
const user = await this.prisma.syncedUser.findUnique({
where: { accountSequence },
include: {
contributionAccount: true,
miningAccount: true,
tradingAccount: true,
},
});
if (!user) {
throw new NotFoundException(`用户 ${accountSequence} 不存在`);
}
return this.formatUserDetail(user);
}
/**
*
* contribution-service
*/
async getUserContributions(
accountSequence: string,
page: number,
pageSize: number,
) {
const user = await this.prisma.syncedUser.findUnique({
where: { accountSequence },
include: { contributionAccount: true },
});
if (!user) {
throw new NotFoundException(`用户 ${accountSequence} 不存在`);
}
// 返回算力账户概要
const contribution = user.contributionAccount;
if (!contribution) {
return {
summary: {
accountSequence,
personalContribution: '0',
teamLevelContribution: '0',
teamBonusContribution: '0',
totalContribution: '0',
effectiveContribution: '0',
hasAdopted: false,
},
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
};
}
return {
summary: {
accountSequence,
personalContribution: contribution.personalContribution.toString(),
teamLevelContribution: contribution.teamLevelContribution.toString(),
teamBonusContribution: contribution.teamBonusContribution.toString(),
totalContribution: contribution.totalContribution.toString(),
effectiveContribution: contribution.effectiveContribution.toString(),
hasAdopted: contribution.hasAdopted,
directReferralCount: contribution.directReferralCount,
unlockedLevelDepth: contribution.unlockedLevelDepth,
},
// 详细流水需要从 contribution-service 获取
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
note: '详细算力流水请查看 contribution-service',
};
}
/**
*
* mining-service
*/
async getUserMiningRecords(
accountSequence: string,
page: number,
pageSize: number,
) {
const user = await this.prisma.syncedUser.findUnique({
where: { accountSequence },
include: { miningAccount: true },
});
if (!user) {
throw new NotFoundException(`用户 ${accountSequence} 不存在`);
}
const mining = user.miningAccount;
if (!mining) {
return {
summary: {
accountSequence,
totalMined: '0',
availableBalance: '0',
frozenBalance: '0',
totalContribution: '0',
},
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
};
}
return {
summary: {
accountSequence,
totalMined: mining.totalMined.toString(),
availableBalance: mining.availableBalance.toString(),
frozenBalance: mining.frozenBalance.toString(),
totalContribution: mining.totalContribution.toString(),
},
// 详细流水需要从 mining-service 获取
records: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
note: '详细挖矿记录请查看 mining-service',
};
}
/**
*
* trading-service
*/
async getUserOrders(accountSequence: string, query: GetOrdersQuery) {
const { page, pageSize } = query;
const user = await this.prisma.syncedUser.findUnique({
where: { accountSequence },
include: { tradingAccount: true },
});
if (!user) {
throw new NotFoundException(`用户 ${accountSequence} 不存在`);
}
const trading = user.tradingAccount;
if (!trading) {
return {
summary: {
accountSequence,
shareBalance: '0',
cashBalance: '0',
frozenShares: '0',
frozenCash: '0',
totalBought: '0',
totalSold: '0',
},
orders: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
};
}
return {
summary: {
accountSequence,
shareBalance: trading.shareBalance.toString(),
cashBalance: trading.cashBalance.toString(),
frozenShares: trading.frozenShares.toString(),
frozenCash: trading.frozenCash.toString(),
totalBought: trading.totalBought.toString(),
totalSold: trading.totalSold.toString(),
},
// 详细订单需要从 trading-service 获取
orders: [],
pagination: { page, pageSize, total: 0, totalPages: 0 },
note: '详细交易订单请查看 trading-service',
};
}
// ===========================================================================
// 辅助方法
// ===========================================================================
private formatUserListItem(user: any) {
return {
accountSequence: user.accountSequence,
phone: this.maskPhone(user.phone),
status: user.status,
kycStatus: user.kycStatus,
realName: user.realName,
isLegacyUser: user.isLegacyUser,
createdAt: user.createdAt,
contribution: user.contributionAccount
? {
totalContribution:
user.contributionAccount.totalContribution.toString(),
effectiveContribution:
user.contributionAccount.effectiveContribution.toString(),
hasAdopted: user.contributionAccount.hasAdopted,
}
: null,
mining: user.miningAccount
? {
totalMined: user.miningAccount.totalMined.toString(),
availableBalance: user.miningAccount.availableBalance.toString(),
}
: null,
trading: user.tradingAccount
? {
shareBalance: user.tradingAccount.shareBalance.toString(),
cashBalance: user.tradingAccount.cashBalance.toString(),
}
: null,
};
}
private formatUserDetail(user: any) {
return {
accountSequence: user.accountSequence,
phone: user.phone,
status: user.status,
kycStatus: user.kycStatus,
realName: user.realName,
isLegacyUser: user.isLegacyUser,
createdAt: user.createdAt,
syncedAt: user.syncedAt,
contribution: user.contributionAccount
? {
personalContribution:
user.contributionAccount.personalContribution.toString(),
teamLevelContribution:
user.contributionAccount.teamLevelContribution.toString(),
teamBonusContribution:
user.contributionAccount.teamBonusContribution.toString(),
totalContribution:
user.contributionAccount.totalContribution.toString(),
effectiveContribution:
user.contributionAccount.effectiveContribution.toString(),
hasAdopted: user.contributionAccount.hasAdopted,
directReferralCount: user.contributionAccount.directReferralCount,
unlockedLevelDepth: user.contributionAccount.unlockedLevelDepth,
}
: null,
mining: user.miningAccount
? {
totalMined: user.miningAccount.totalMined.toString(),
availableBalance: user.miningAccount.availableBalance.toString(),
frozenBalance: user.miningAccount.frozenBalance.toString(),
totalContribution: user.miningAccount.totalContribution.toString(),
}
: null,
trading: user.tradingAccount
? {
shareBalance: user.tradingAccount.shareBalance.toString(),
cashBalance: user.tradingAccount.cashBalance.toString(),
frozenShares: user.tradingAccount.frozenShares.toString(),
frozenCash: user.tradingAccount.frozenCash.toString(),
totalBought: user.tradingAccount.totalBought.toString(),
totalSold: user.tradingAccount.totalSold.toString(),
}
: null,
};
}
private maskPhone(phone: string): string {
if (!phone || phone.length < 7) return phone;
return phone.substring(0, 3) + '****' + phone.substring(phone.length - 4);
}
}

View File

@ -1,11 +1,13 @@
import { Module, Global } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { PrismaModule } from './persistence/prisma/prisma.module';
import { PrismaService } from './persistence/prisma/prisma.service';
import { RedisService } from './redis/redis.service';
import { KafkaModule } from './kafka/kafka.module';
@Global()
@Module({
imports: [PrismaModule],
imports: [PrismaModule, KafkaModule],
providers: [
{
provide: 'REDIS_OPTIONS',
@ -13,12 +15,12 @@ import { RedisService } from './redis/redis.service';
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 3),
db: configService.get<number>('REDIS_DB', 13),
}),
inject: [ConfigService],
},
RedisService,
],
exports: [RedisService],
exports: [PrismaService, RedisService, KafkaModule],
})
export class InfrastructureModule {}

View File

@ -0,0 +1,251 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
/**
* CDC (Debezium )
*/
export interface CdcEvent {
schema: any;
payload: {
before: any | null;
after: any | null;
source: {
version: string;
connector: string;
name: string;
ts_ms: number;
snapshot: string;
db: string;
sequence: string;
schema: string;
table: string;
txId: number;
lsn: number;
xmin: number | null;
};
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
ts_ms: number;
transaction: any;
};
sequenceNum: bigint;
}
/**
* 2.0 (Outbox )
*/
export interface ServiceEvent {
id: string;
aggregateType: string;
aggregateId: string;
eventType: string;
payload: any;
createdAt: string;
sequenceNum: bigint;
}
export type CdcHandler = (event: CdcEvent) => Promise<void>;
export type ServiceEventHandler = (event: ServiceEvent) => Promise<void>;
@Injectable()
export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CdcConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private cdcHandlers: Map<string, CdcHandler> = new Map();
private serviceHandlers: Map<string, ServiceEventHandler> = new Map();
private isRunning = false;
private topics: string[] = [];
constructor(private readonly configService: ConfigService) {
const brokers = this.configService
.get<string>('KAFKA_BROKERS', 'localhost:9092')
.split(',');
this.kafka = new Kafka({
clientId: 'mining-admin-service-cdc',
brokers,
});
this.consumer = this.kafka.consumer({
groupId: this.configService.get<string>(
'CDC_CONSUMER_GROUP',
'mining-admin-service-cdc-group',
),
});
}
async onModuleInit() {
// 启动延迟到 CdcSyncService 注册完处理器后
}
async onModuleDestroy() {
await this.stop();
}
/**
* CDC (1.0 2.0 )
*/
registerCdcHandler(tableName: string, handler: CdcHandler): void {
this.cdcHandlers.set(tableName, handler);
this.logger.log(`Registered CDC handler for table: ${tableName}`);
}
/**
* (2.0 )
*/
registerServiceHandler(eventType: string, handler: ServiceEventHandler): void {
this.serviceHandlers.set(eventType, handler);
this.logger.log(`Registered service event handler for: ${eventType}`);
}
/**
* topic
*/
addTopic(topic: string): void {
if (!this.topics.includes(topic)) {
this.topics.push(topic);
}
}
/**
*
*/
async start(): Promise<void> {
if (this.isRunning) {
this.logger.warn('CDC consumer is already running');
return;
}
if (this.topics.length === 0) {
this.logger.warn('No topics to subscribe, skipping CDC consumer start');
return;
}
try {
await this.consumer.connect();
this.logger.log('CDC consumer connected');
await this.consumer.subscribe({
topics: this.topics,
fromBeginning: false,
});
this.logger.log(`Subscribed to topics: ${this.topics.join(', ')}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.isRunning = true;
this.logger.log('CDC consumer started');
} catch (error) {
this.logger.error('Failed to start CDC consumer', error);
// 不抛出错误允许服务继续运行CDC 可能暂时不可用)
}
}
/**
*
*/
async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
try {
await this.consumer.disconnect();
this.isRunning = false;
this.logger.log('CDC consumer stopped');
} catch (error) {
this.logger.error('Failed to stop CDC consumer', error);
}
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
try {
if (!message.value) {
return;
}
const eventData = JSON.parse(message.value.toString());
const sequenceNum = BigInt(message.offset);
// 判断事件类型Debezium CDC 或 服务 Outbox 事件
if (this.isDebeziumEvent(eventData)) {
await this.handleCdcEvent(topic, eventData, sequenceNum);
} else if (this.isServiceEvent(eventData)) {
await this.handleServiceEvent(topic, eventData, sequenceNum);
} else {
this.logger.warn(`Unknown event format from topic: ${topic}`);
}
} catch (error) {
this.logger.error(
`Error processing message from topic ${topic}, partition ${partition}`,
error,
);
}
}
private isDebeziumEvent(data: any): boolean {
return data.payload && data.payload.source && data.payload.op;
}
private isServiceEvent(data: any): boolean {
return data.eventType && data.aggregateType;
}
private async handleCdcEvent(
topic: string,
eventData: any,
sequenceNum: bigint,
): Promise<void> {
const event: CdcEvent = {
...eventData,
sequenceNum,
};
// 从 topic 名称提取表名 (格式: dbserver1.schema.tablename)
const parts = topic.split('.');
const tableName = parts[parts.length - 1];
const handler = this.cdcHandlers.get(tableName);
if (handler) {
await handler(event);
this.logger.debug(
`Processed CDC event for table ${tableName}, op: ${event.payload.op}`,
);
}
}
private async handleServiceEvent(
topic: string,
eventData: any,
sequenceNum: bigint,
): Promise<void> {
const event: ServiceEvent = {
...eventData,
sequenceNum,
};
const handler = this.serviceHandlers.get(event.eventType);
if (handler) {
await handler(event);
this.logger.debug(`Processed service event: ${event.eventType}`);
} else {
// 尝试通配符处理器
const aggregateHandler = this.serviceHandlers.get(
`${event.aggregateType}.*`,
);
if (aggregateHandler) {
await aggregateHandler(event);
this.logger.debug(
`Processed service event via wildcard: ${event.eventType}`,
);
}
}
}
}

View File

@ -0,0 +1,486 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { PrismaService } from '../persistence/prisma/prisma.service';
import {
CdcConsumerService,
CdcEvent,
ServiceEvent,
} from './cdc-consumer.service';
/**
* CDC
* 2.0 mining-admin-service
*/
@Injectable()
export class CdcSyncService implements OnModuleInit {
private readonly logger = new Logger(CdcSyncService.name);
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
private readonly cdcConsumer: CdcConsumerService,
) {}
async onModuleInit() {
await this.registerHandlers();
await this.cdcConsumer.start();
}
private async registerHandlers(): Promise<void> {
// ===========================================================================
// 从 auth-service 同步用户数据
// ===========================================================================
const usersTopic = this.configService.get<string>(
'CDC_TOPIC_USERS',
'mining-admin.auth.users',
);
this.cdcConsumer.addTopic(usersTopic);
this.cdcConsumer.registerServiceHandler(
'UserCreated',
this.handleUserCreated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'UserUpdated',
this.handleUserUpdated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'KycStatusChanged',
this.handleKycStatusChanged.bind(this),
);
// ===========================================================================
// 从 contribution-service 同步算力数据
// ===========================================================================
const contributionTopic = this.configService.get<string>(
'CDC_TOPIC_CONTRIBUTION',
'mining-admin.contribution.accounts',
);
this.cdcConsumer.addTopic(contributionTopic);
this.cdcConsumer.registerServiceHandler(
'ContributionAccountUpdated',
this.handleContributionAccountUpdated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'SystemContributionUpdated',
this.handleSystemContributionUpdated.bind(this),
);
// ===========================================================================
// 从 mining-service 同步挖矿数据
// ===========================================================================
const miningTopic = this.configService.get<string>(
'CDC_TOPIC_MINING',
'mining-admin.mining.accounts',
);
this.cdcConsumer.addTopic(miningTopic);
this.cdcConsumer.registerServiceHandler(
'MiningAccountUpdated',
this.handleMiningAccountUpdated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'MiningConfigUpdated',
this.handleMiningConfigUpdated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'DailyMiningStatCreated',
this.handleDailyMiningStatCreated.bind(this),
);
// ===========================================================================
// 从 trading-service 同步交易数据
// ===========================================================================
const tradingTopic = this.configService.get<string>(
'CDC_TOPIC_TRADING',
'mining-admin.trading.accounts',
);
this.cdcConsumer.addTopic(tradingTopic);
this.cdcConsumer.registerServiceHandler(
'TradingAccountUpdated',
this.handleTradingAccountUpdated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'DayKLineCreated',
this.handleDayKLineCreated.bind(this),
);
this.cdcConsumer.registerServiceHandler(
'CirculationPoolUpdated',
this.handleCirculationPoolUpdated.bind(this),
);
this.logger.log('CDC sync handlers registered');
}
// ===========================================================================
// 用户事件处理
// ===========================================================================
private async handleUserCreated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedUser.upsert({
where: { originalUserId: payload.id },
create: {
originalUserId: payload.id,
accountSequence: payload.accountSequence,
phone: payload.phone,
status: payload.status || 'ACTIVE',
kycStatus: payload.kycStatus || 'PENDING',
realName: payload.realName,
isLegacyUser: payload.isLegacyUser || false,
createdAt: new Date(payload.createdAt),
},
update: {
phone: payload.phone,
status: payload.status || 'ACTIVE',
kycStatus: payload.kycStatus || 'PENDING',
realName: payload.realName,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced user: ${payload.accountSequence}`);
} catch (error) {
this.logger.error(`Failed to sync user: ${payload.id}`, error);
}
}
private async handleUserUpdated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedUser.updateMany({
where: { originalUserId: payload.id },
data: {
phone: payload.phone,
status: payload.status,
kycStatus: payload.kycStatus,
realName: payload.realName,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Updated user: ${payload.id}`);
} catch (error) {
this.logger.error(`Failed to update user: ${payload.id}`, error);
}
}
private async handleKycStatusChanged(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedUser.updateMany({
where: { accountSequence: payload.accountSequence },
data: {
kycStatus: payload.kycStatus,
realName: payload.realName,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Updated KYC status: ${payload.accountSequence}`);
} catch (error) {
this.logger.error(
`Failed to update KYC status: ${payload.accountSequence}`,
error,
);
}
}
// ===========================================================================
// 算力账户事件处理
// ===========================================================================
private async handleContributionAccountUpdated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedContributionAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
personalContribution: payload.personalContribution || 0,
teamLevelContribution: payload.teamLevelContribution || 0,
teamBonusContribution: payload.teamBonusContribution || 0,
totalContribution: payload.totalContribution || 0,
effectiveContribution: payload.effectiveContribution || 0,
hasAdopted: payload.hasAdopted || false,
directReferralCount: payload.directReferralAdoptedCount || 0,
unlockedLevelDepth: payload.unlockedLevelDepth || 0,
},
update: {
personalContribution: payload.personalContribution,
teamLevelContribution: payload.teamLevelContribution,
teamBonusContribution: payload.teamBonusContribution,
totalContribution: payload.totalContribution,
effectiveContribution: payload.effectiveContribution,
hasAdopted: payload.hasAdopted,
directReferralCount: payload.directReferralAdoptedCount,
unlockedLevelDepth: payload.unlockedLevelDepth,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(
`Synced contribution account: ${payload.accountSequence}`,
);
} catch (error) {
this.logger.error(
`Failed to sync contribution account: ${payload.accountSequence}`,
error,
);
}
}
private async handleSystemContributionUpdated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedSystemContribution.upsert({
where: { accountType: payload.accountType },
create: {
accountType: payload.accountType,
name: payload.name,
contributionBalance: payload.contributionBalance || 0,
contributionNeverExpires: payload.contributionNeverExpires || false,
},
update: {
name: payload.name,
contributionBalance: payload.contributionBalance,
contributionNeverExpires: payload.contributionNeverExpires,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(
`Synced system contribution: ${payload.accountType}`,
);
} catch (error) {
this.logger.error(
`Failed to sync system contribution: ${payload.accountType}`,
error,
);
}
}
// ===========================================================================
// 挖矿账户事件处理
// ===========================================================================
private async handleMiningAccountUpdated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedMiningAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
totalMined: payload.totalMined || 0,
availableBalance: payload.availableBalance || 0,
frozenBalance: payload.frozenBalance || 0,
totalContribution: payload.totalContribution || 0,
},
update: {
totalMined: payload.totalMined,
availableBalance: payload.availableBalance,
frozenBalance: payload.frozenBalance,
totalContribution: payload.totalContribution,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced mining account: ${payload.accountSequence}`);
} catch (error) {
this.logger.error(
`Failed to sync mining account: ${payload.accountSequence}`,
error,
);
}
}
private async handleMiningConfigUpdated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
// 只保留一条挖矿配置记录
await this.prisma.syncedMiningConfig.deleteMany({});
await this.prisma.syncedMiningConfig.create({
data: {
totalShares: payload.totalShares,
distributionPool: payload.distributionPool,
remainingDistribution: payload.remainingDistribution,
halvingPeriodYears: payload.halvingPeriodYears,
currentEra: payload.currentEra || 1,
minuteDistribution: payload.minuteDistribution,
isActive: payload.isActive || false,
activatedAt: payload.activatedAt
? new Date(payload.activatedAt)
: null,
},
});
await this.recordProcessedEvent(event);
this.logger.debug('Synced mining config');
} catch (error) {
this.logger.error('Failed to sync mining config', error);
}
}
private async handleDailyMiningStatCreated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedDailyMiningStat.upsert({
where: { statDate: new Date(payload.date) },
create: {
statDate: new Date(payload.date),
totalContribution: payload.totalContribution || 0,
totalDistributed: payload.totalDistributed || 0,
totalBurned: payload.totalBurned || 0,
participantCount: payload.participantCount || 0,
avgContributionRate: payload.avgContributionRate || 0,
},
update: {
totalContribution: payload.totalContribution,
totalDistributed: payload.totalDistributed,
totalBurned: payload.totalBurned,
participantCount: payload.participantCount,
avgContributionRate: payload.avgContributionRate,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced daily mining stat: ${payload.date}`);
} catch (error) {
this.logger.error(
`Failed to sync daily mining stat: ${payload.date}`,
error,
);
}
}
// ===========================================================================
// 交易账户事件处理
// ===========================================================================
private async handleTradingAccountUpdated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedTradingAccount.upsert({
where: { accountSequence: payload.accountSequence },
create: {
accountSequence: payload.accountSequence,
shareBalance: payload.shareBalance || 0,
cashBalance: payload.cashBalance || 0,
frozenShares: payload.frozenShares || 0,
frozenCash: payload.frozenCash || 0,
totalBought: payload.totalBought || 0,
totalSold: payload.totalSold || 0,
},
update: {
shareBalance: payload.shareBalance,
cashBalance: payload.cashBalance,
frozenShares: payload.frozenShares,
frozenCash: payload.frozenCash,
totalBought: payload.totalBought,
totalSold: payload.totalSold,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced trading account: ${payload.accountSequence}`);
} catch (error) {
this.logger.error(
`Failed to sync trading account: ${payload.accountSequence}`,
error,
);
}
}
private async handleDayKLineCreated(event: ServiceEvent): Promise<void> {
const { payload } = event;
try {
await this.prisma.syncedDayKLine.upsert({
where: { klineDate: new Date(payload.date) },
create: {
klineDate: new Date(payload.date),
open: payload.open,
high: payload.high,
low: payload.low,
close: payload.close,
volume: payload.volume || 0,
amount: payload.amount || 0,
tradeCount: payload.tradeCount || 0,
},
update: {
open: payload.open,
high: payload.high,
low: payload.low,
close: payload.close,
volume: payload.volume,
amount: payload.amount,
tradeCount: payload.tradeCount,
},
});
await this.recordProcessedEvent(event);
this.logger.debug(`Synced day K-line: ${payload.date}`);
} catch (error) {
this.logger.error(`Failed to sync day K-line: ${payload.date}`, error);
}
}
private async handleCirculationPoolUpdated(
event: ServiceEvent,
): Promise<void> {
const { payload } = event;
try {
// 只保留一条流通池记录
await this.prisma.syncedCirculationPool.deleteMany({});
await this.prisma.syncedCirculationPool.create({
data: {
totalShares: payload.totalShares || 0,
totalCash: payload.totalCash || 0,
},
});
await this.recordProcessedEvent(event);
this.logger.debug('Synced circulation pool');
} catch (error) {
this.logger.error('Failed to sync circulation pool', error);
}
}
// ===========================================================================
// 辅助方法
// ===========================================================================
private async recordProcessedEvent(event: ServiceEvent): Promise<void> {
try {
await this.prisma.processedEvent.upsert({
where: { eventId: event.id },
create: {
eventId: event.id,
eventType: event.eventType,
sourceService: event.aggregateType,
},
update: {},
});
} catch (error) {
// 忽略幂等性记录失败
this.logger.warn(`Failed to record processed event: ${event.id}`);
}
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { CdcConsumerService } from './cdc-consumer.service';
import { CdcSyncService } from './cdc-sync.service';
@Module({
imports: [ConfigModule],
providers: [CdcConsumerService, CdcSyncService],
exports: [CdcConsumerService, CdcSyncService],
})
export class KafkaModule {}