diff --git a/backend/api-gateway/kong.yml b/backend/api-gateway/kong.yml index 251f43b2..4b3246b7 100644 --- a/backend/api-gateway/kong.yml +++ b/backend/api-gateway/kong.yml @@ -323,6 +323,23 @@ services: - /api/v2/trading/health strip_path: true + # --------------------------------------------------------------------------- + # Trading Service WebSocket - 价格实时推送 + # WebSocket 连接: wss://api.xxx.com/ws/price -> ws://192.168.1.111:3022/price + # --------------------------------------------------------------------------- + - name: trading-ws-service + url: http://192.168.1.111:3022 + routes: + - name: trading-ws-price + paths: + - /ws/price + strip_path: true + protocols: + - http + - https + - ws + - wss + # --------------------------------------------------------------------------- # Mining Admin Service 2.0 - 挖矿管理后台服务 # 前端路径: /api/v2/mining-admin/... -> 后端路径: /api/v2/... diff --git a/backend/services/trading-service/package-lock.json b/backend/services/trading-service/package-lock.json index dd1e3a87..6acc7b0b 100644 --- a/backend/services/trading-service/package-lock.json +++ b/backend/services/trading-service/package-lock.json @@ -13,8 +13,10 @@ "@nestjs/core": "^10.3.0", "@nestjs/microservices": "^10.3.0", "@nestjs/platform-express": "^10.3.0", + "@nestjs/platform-socket.io": "^10.3.0", "@nestjs/schedule": "^4.0.0", "@nestjs/swagger": "^7.1.17", + "@nestjs/websockets": "^10.3.0", "@prisma/client": "^5.7.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", @@ -24,6 +26,7 @@ "kafkajs": "^2.2.4", "reflect-metadata": "^0.1.14", "rxjs": "^7.8.1", + "socket.io": "^4.7.4", "swagger-ui-express": "^5.0.0" }, "devDependencies": { @@ -1843,6 +1846,60 @@ "@nestjs/core": "^10.0.0" } }, + "node_modules/@nestjs/platform-socket.io": { + "version": "10.4.22", + "resolved": "https://registry.npmjs.org/@nestjs/platform-socket.io/-/platform-socket.io-10.4.22.tgz", + "integrity": "sha512-xxGw3R0Ihr51/Omq23z3//bKmCXyVKaikxbH0/pkwqMsQrxkUv9NabNUZ22b4Jnlwwi02X+zlwo8GRa9u8oV9g==", + "license": "MIT", + "dependencies": { + "socket.io": "4.8.1", + "tslib": "2.8.1" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/nest" + }, + "peerDependencies": { + "@nestjs/common": "^10.0.0", + "@nestjs/websockets": "^10.0.0", + "rxjs": "^7.1.0" + } + }, + "node_modules/@nestjs/platform-socket.io/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/@nestjs/platform-socket.io/node_modules/socket.io": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.1.tgz", + "integrity": "sha512-oZ7iUCxph8WYRHHcjBEc9unw3adt5CmSNlppj/5Q4k2RIrhl8Z5yY2Xr4j9zj0+wzVZ0bxmYoGSzKJnRl6A4yg==", + "license": "MIT", + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.6.0", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, "node_modules/@nestjs/schedule": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-4.1.2.tgz", @@ -1942,6 +1999,30 @@ } } }, + "node_modules/@nestjs/websockets": { + "version": "10.4.22", + "resolved": "https://registry.npmjs.org/@nestjs/websockets/-/websockets-10.4.22.tgz", + "integrity": "sha512-OLd4i0Faq7vgdtB5vVUrJ54hWEtcXy9poJ6n7kbbh/5ms+KffUl+wwGsbe7uSXLrkoyI8xXU6fZPkFArI+XiRg==", + "license": "MIT", + "peer": true, + "dependencies": { + "iterare": "1.2.1", + "object-hash": "3.0.0", + "tslib": "2.8.1" + }, + "peerDependencies": { + "@nestjs/common": "^10.0.0", + "@nestjs/core": "^10.0.0", + "@nestjs/platform-socket.io": "^10.0.0", + "reflect-metadata": "^0.1.12 || ^0.2.0", + "rxjs": "^7.1.0" + }, + "peerDependenciesMeta": { + "@nestjs/platform-socket.io": { + "optional": true + } + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -2117,6 +2198,12 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "license": "MIT" + }, "node_modules/@tokenizer/inflate": { "version": "0.2.7", "resolved": "https://registry.npmjs.org/@tokenizer/inflate/-/inflate-0.2.7.tgz", @@ -2235,6 +2322,15 @@ "@types/node": "*" } }, + "node_modules/@types/cors": { + "version": "2.8.19", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.19.tgz", + "integrity": "sha512-mFNylyeyqN93lfe/9CSxOGREz8cpzAhH+E93xJ4xWQf62V8sQ/24reV2nyzUWM6H6Xji+GGHpkbLe7pVoUEskg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/eslint": { "version": "9.6.1", "resolved": "https://registry.npmjs.org/@types/eslint/-/eslint-9.6.1.tgz", @@ -2388,7 +2484,6 @@ "version": "20.19.28", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.28.tgz", "integrity": "sha512-VyKBr25BuFDzBFCK5sUM6ZXiWfqgCTwTAOK8qzGV/m9FCirXYDlmczJ+d5dXBAQALGCdRRdbteKYfJ84NGEusw==", - "dev": true, "license": "MIT", "peer": true, "dependencies": { @@ -3267,6 +3362,15 @@ ], "license": "MIT" }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "license": "MIT", + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/baseline-browser-mapping": { "version": "2.9.14", "resolved": "https://registry.npmjs.org/baseline-browser-mapping/-/baseline-browser-mapping-2.9.14.tgz", @@ -4280,6 +4384,35 @@ "node": ">= 0.8" } }, + "node_modules/engine.io": { + "version": "6.6.5", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.6.5.tgz", + "integrity": "sha512-2RZdgEbXmp5+dVbRm0P7HQUImZpICccJy7rN7Tv+SFa55pH+lxnuw6/K1ZxxBfHoYpSkHLAO92oa8O4SwFXA2A==", + "license": "MIT", + "dependencies": { + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.7.2", + "cors": "~2.8.5", + "debug": "~4.4.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.18.3" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", + "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/enhanced-resolve": { "version": "5.18.4", "resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.18.4.tgz", @@ -7293,6 +7426,15 @@ "node": ">=0.10.0" } }, + "node_modules/object-hash": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/object-hash/-/object-hash-3.0.0.tgz", + "integrity": "sha512-RSn9F68PjH9HqtltsSnqYC1XXoWe9Bju5+213R98cNGttag9q9yAOTzdbsqvIa7aNm5WffBZFpWYr2aWrklWAw==", + "license": "MIT", + "engines": { + "node": ">= 6" + } + }, "node_modules/object-inspect": { "version": "1.13.4", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.4.tgz", @@ -8462,6 +8604,47 @@ "node": ">=8" } }, + "node_modules/socket.io": { + "version": "4.8.3", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.3.tgz", + "integrity": "sha512-2Dd78bqzzjE6KPkD5fHZmDAKRNe3J15q+YHDrIsy9WEkqttc7GY+kT9OBLSMaPbQaEd0x1BjcmtMtXkfpc+T5A==", + "license": "MIT", + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.4.1", + "engine.io": "~6.6.0", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/socket.io-adapter": { + "version": "2.5.6", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.6.tgz", + "integrity": "sha512-DkkO/dz7MGln0dHn5bmN3pPy+JmywNICWrJqVWiVOyvXjWQFIv9c2h24JrQLLFJ2aQVQf/Cvl1vblnd4r2apLQ==", + "license": "MIT", + "dependencies": { + "debug": "~4.4.1", + "ws": "~8.18.3" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.5", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.5.tgz", + "integrity": "sha512-bPMmpy/5WWKHea5Y/jYAP6k74A+hvmRCQaJuJB6I/ML5JZq/KfNieUVo/3Mh7SAqn7TyFdIo6wqYHInG1MU1bQ==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.4.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.7.4", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.7.4.tgz", @@ -9388,7 +9571,6 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", - "dev": true, "license": "MIT" }, "node_modules/universalify": { @@ -9756,6 +9938,27 @@ "dev": true, "license": "ISC" }, + "node_modules/ws": { + "version": "8.18.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz", + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/backend/services/trading-service/package.json b/backend/services/trading-service/package.json index f09ff781..538495d4 100644 --- a/backend/services/trading-service/package.json +++ b/backend/services/trading-service/package.json @@ -27,8 +27,10 @@ "@nestjs/core": "^10.3.0", "@nestjs/microservices": "^10.3.0", "@nestjs/platform-express": "^10.3.0", + "@nestjs/platform-socket.io": "^10.3.0", "@nestjs/schedule": "^4.0.0", "@nestjs/swagger": "^7.1.17", + "@nestjs/websockets": "^10.3.0", "@prisma/client": "^5.7.1", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", @@ -38,6 +40,7 @@ "kafkajs": "^2.2.4", "reflect-metadata": "^0.1.14", "rxjs": "^7.8.1", + "socket.io": "^4.7.4", "swagger-ui-express": "^5.0.0" }, "devDependencies": { diff --git a/backend/services/trading-service/src/api/api.module.ts b/backend/services/trading-service/src/api/api.module.ts index cf493213..32cbbcd1 100644 --- a/backend/services/trading-service/src/api/api.module.ts +++ b/backend/services/trading-service/src/api/api.module.ts @@ -9,6 +9,7 @@ import { PriceController } from './controllers/price.controller'; import { BurnController } from './controllers/burn.controller'; import { AssetController } from './controllers/asset.controller'; import { MarketMakerController } from './controllers/market-maker.controller'; +import { PriceGateway } from './gateways/price.gateway'; @Module({ imports: [ApplicationModule, InfrastructureModule], @@ -22,5 +23,7 @@ import { MarketMakerController } from './controllers/market-maker.controller'; AssetController, MarketMakerController, ], + providers: [PriceGateway], + exports: [PriceGateway], }) export class ApiModule {} diff --git a/backend/services/trading-service/src/api/gateways/price.gateway.ts b/backend/services/trading-service/src/api/gateways/price.gateway.ts new file mode 100644 index 00000000..e047a0ac --- /dev/null +++ b/backend/services/trading-service/src/api/gateways/price.gateway.ts @@ -0,0 +1,74 @@ +import { + WebSocketGateway, + WebSocketServer, + OnGatewayInit, + OnGatewayConnection, + OnGatewayDisconnect, +} from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { Logger, Injectable } from '@nestjs/common'; + +export interface PriceUpdate { + price: string; + burnMultiplier: string; + timestamp: number; +} + +@Injectable() +@WebSocketGateway({ + namespace: '/price', + cors: { + origin: '*', + credentials: true, + }, + transports: ['websocket', 'polling'], +}) +export class PriceGateway + implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect +{ + @WebSocketServer() + server: Server; + + private readonly logger = new Logger(PriceGateway.name); + private connectedClients = 0; + + afterInit() { + this.logger.log('🚀 Price WebSocket Gateway initialized'); + this.logger.log(`📡 Namespace: /price`); + this.logger.log(`🔧 Transports: websocket, polling`); + } + + handleConnection(client: Socket) { + this.connectedClients++; + this.logger.log( + `✅ Client connected: ${client.id}, IP: ${client.handshake.address}, total: ${this.connectedClients}`, + ); + this.logger.debug(`📋 Handshake headers: ${JSON.stringify(client.handshake.headers)}`); + } + + handleDisconnect(client: Socket) { + this.connectedClients--; + this.logger.log( + `❌ Client disconnected: ${client.id}, total: ${this.connectedClients}`, + ); + } + + /** + * 广播价格更新到所有连接的客户端 + */ + broadcastPriceUpdate(priceUpdate: PriceUpdate): void { + if (this.server && this.connectedClients > 0) { + this.server.emit('priceUpdate', priceUpdate); + this.logger.log( + `📊 Price broadcast to ${this.connectedClients} clients: price=${priceUpdate.price}, burnMultiplier=${priceUpdate.burnMultiplier}`, + ); + } + } + + /** + * 获取当前连接数 + */ + getConnectedClientsCount(): number { + return this.connectedClients; + } +} diff --git a/backend/services/trading-service/src/application/application.module.ts b/backend/services/trading-service/src/application/application.module.ts index 0d89e29a..9294f601 100644 --- a/backend/services/trading-service/src/application/application.module.ts +++ b/backend/services/trading-service/src/application/application.module.ts @@ -1,6 +1,7 @@ -import { Module } from '@nestjs/common'; +import { Module, forwardRef } from '@nestjs/common'; import { ScheduleModule } from '@nestjs/schedule'; import { InfrastructureModule } from '../infrastructure/infrastructure.module'; +import { ApiModule } from '../api/api.module'; import { OrderService } from './services/order.service'; import { TransferService } from './services/transfer.service'; import { PriceService } from './services/price.service'; @@ -9,9 +10,14 @@ import { AssetService } from './services/asset.service'; import { MarketMakerService } from './services/market-maker.service'; import { OutboxScheduler } from './schedulers/outbox.scheduler'; import { BurnScheduler } from './schedulers/burn.scheduler'; +import { PriceBroadcastScheduler } from './schedulers/price-broadcast.scheduler'; @Module({ - imports: [ScheduleModule.forRoot(), InfrastructureModule], + imports: [ + ScheduleModule.forRoot(), + InfrastructureModule, + forwardRef(() => ApiModule), + ], providers: [ // Services PriceService, @@ -23,6 +29,7 @@ import { BurnScheduler } from './schedulers/burn.scheduler'; // Schedulers OutboxScheduler, BurnScheduler, + PriceBroadcastScheduler, ], exports: [OrderService, TransferService, PriceService, BurnService, AssetService, MarketMakerService], }) diff --git a/backend/services/trading-service/src/application/schedulers/price-broadcast.scheduler.ts b/backend/services/trading-service/src/application/schedulers/price-broadcast.scheduler.ts new file mode 100644 index 00000000..23956c8a --- /dev/null +++ b/backend/services/trading-service/src/application/schedulers/price-broadcast.scheduler.ts @@ -0,0 +1,96 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { PriceService } from '../services/price.service'; +import { PriceGateway } from '../../api/gateways/price.gateway'; +import { TradingConfigRepository } from '../../infrastructure/persistence/repositories/trading-config.repository'; + +/** + * 每秒广播价格更新的调度器 + * 使用 setInterval 而非 @Cron,因为 @Cron 最小精度是 1 秒, + * 且 setInterval 更适合高频率任务 + */ +@Injectable() +export class PriceBroadcastScheduler implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PriceBroadcastScheduler.name); + private intervalId: NodeJS.Timeout | null = null; + private lastPrice: string | null = null; + + constructor( + private readonly priceService: PriceService, + private readonly priceGateway: PriceGateway, + private readonly tradingConfigRepository: TradingConfigRepository, + ) {} + + async onModuleInit() { + this.logger.log('🚀 Price broadcast scheduler initializing...'); + // 启动每秒广播 + this.startBroadcast(); + } + + onModuleDestroy() { + this.stopBroadcast(); + } + + private startBroadcast() { + if (this.intervalId) { + this.logger.warn('⚠️ Broadcast already running, skipping start'); + return; + } + + // 每秒执行一次 + this.intervalId = setInterval(async () => { + await this.broadcastPrice(); + }, 1000); + + this.logger.log('✅ Price broadcast started (1 second interval)'); + } + + private stopBroadcast() { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + this.logger.log('🛑 Price broadcast stopped'); + } + } + + private broadcastCount = 0; + + private async broadcastPrice() { + try { + // 检查是否有客户端连接,没有则跳过 + const clientsCount = this.priceGateway.getConnectedClientsCount(); + if (clientsCount === 0) { + // 每60秒打印一次等待日志 + this.broadcastCount++; + if (this.broadcastCount % 60 === 0) { + this.logger.debug('⏳ No clients connected, waiting...'); + } + return; + } + + // 检查交易系统是否激活 + const config = await this.tradingConfigRepository.getConfig(); + if (!config || !config.isActive) { + this.logger.debug('⏸️ Trading system not active, skipping broadcast'); + return; + } + + // 获取当前价格 + const priceInfo = await this.priceService.getCurrentPrice(); + + // 广播价格更新 + this.priceGateway.broadcastPriceUpdate({ + price: priceInfo.price, + burnMultiplier: priceInfo.burnMultiplier, + timestamp: Date.now(), + }); + + // 记录价格变化(用于调试) + if (this.lastPrice !== priceInfo.price) { + this.logger.log(`💰 Price changed: ${this.lastPrice} -> ${priceInfo.price}`); + this.lastPrice = priceInfo.price; + } + } catch (error) { + this.logger.error('❌ Failed to broadcast price', error); + } + } +} diff --git a/frontend/mining-app/lib/core/network/price_websocket_service.dart b/frontend/mining-app/lib/core/network/price_websocket_service.dart new file mode 100644 index 00000000..43573182 --- /dev/null +++ b/frontend/mining-app/lib/core/network/price_websocket_service.dart @@ -0,0 +1,230 @@ +import 'dart:async'; +import 'package:flutter/foundation.dart'; +import 'package:socket_io_client/socket_io_client.dart' as IO; + +/// 价格更新数据 +class PriceUpdate { + final String price; + final String burnMultiplier; + final int timestamp; + + PriceUpdate({ + required this.price, + required this.burnMultiplier, + required this.timestamp, + }); + + factory PriceUpdate.fromJson(Map json) { + return PriceUpdate( + price: json['price']?.toString() ?? '0', + burnMultiplier: json['burnMultiplier']?.toString() ?? '0', + timestamp: json['timestamp'] ?? DateTime.now().millisecondsSinceEpoch, + ); + } +} + +/// WebSocket 连接状态 +enum WebSocketStatus { + disconnected, + connecting, + connected, + reconnecting, +} + +/// 价格 WebSocket 服务 +/// +/// 特性: +/// - 自动断线重连 (指数退避) +/// - 页面切换时手动断开连接 +/// - 心跳检测 +/// - 连接状态监听 +class PriceWebSocketService { + static PriceWebSocketService? _instance; + static PriceWebSocketService get instance { + _instance ??= PriceWebSocketService._(); + return _instance!; + } + + PriceWebSocketService._(); + + IO.Socket? _socket; + + // 连接状态 + WebSocketStatus _status = WebSocketStatus.disconnected; + WebSocketStatus get status => _status; + + // 重连相关 + Timer? _reconnectTimer; + int _reconnectAttempts = 0; + static const int _maxReconnectAttempts = 10; + static const Duration _initialReconnectDelay = Duration(seconds: 1); + static const Duration _maxReconnectDelay = Duration(seconds: 30); + + // 价格更新流 + final _priceUpdateController = StreamController.broadcast(); + Stream get priceUpdates => _priceUpdateController.stream; + + // 连接状态流 + final _statusController = StreamController.broadcast(); + Stream get statusStream => _statusController.stream; + + // WebSocket URL + String _wsUrl = ''; + + /// 连接到 WebSocket 服务器 + /// + /// [baseUrl] API 基础地址,例如 https://api.example.com + void connect(String baseUrl) { + debugPrint('[PriceWS] 🔌 connect() called with baseUrl: $baseUrl'); + + // 构建 WebSocket URL + // https://api.example.com -> wss://api.example.com/ws/price + _wsUrl = baseUrl + .replaceFirst('https://', 'wss://') + .replaceFirst('http://', 'ws://'); + _wsUrl = '$_wsUrl/ws/price'; + + debugPrint('[PriceWS] 🔌 Constructed WebSocket URL: $_wsUrl'); + + _doConnect(); + } + + void _doConnect() { + if (_status == WebSocketStatus.connected || + _status == WebSocketStatus.connecting) { + return; + } + + _updateStatus(WebSocketStatus.connecting); + debugPrint('[PriceWS] Connecting to $_wsUrl'); + + _socket = IO.io( + _wsUrl, + IO.OptionBuilder() + .setTransports(['websocket']) + .disableAutoConnect() + .enableReconnection() + .setReconnectionAttempts(_maxReconnectAttempts) + .setReconnectionDelay(1000) + .setReconnectionDelayMax(30000) + .build(), + ); + + _socket!.onConnect((_) { + debugPrint('[PriceWS] ✅ Connected successfully to $_wsUrl'); + _updateStatus(WebSocketStatus.connected); + _reconnectAttempts = 0; + _cancelReconnectTimer(); + }); + + _socket!.onDisconnect((reason) { + debugPrint('[PriceWS] ❌ Disconnected, reason: $reason'); + _updateStatus(WebSocketStatus.disconnected); + // 如果不是主动断开,尝试重连 + if (_status != WebSocketStatus.disconnected) { + _scheduleReconnect(); + } + }); + + _socket!.onConnectError((error) { + debugPrint('[PriceWS] ❌ Connect error: $error'); + debugPrint('[PriceWS] URL was: $_wsUrl'); + _updateStatus(WebSocketStatus.disconnected); + _scheduleReconnect(); + }); + + _socket!.onError((error) { + debugPrint('[PriceWS] ⚠️ Socket error: $error'); + }); + + // 监听连接超时 + _socket!.onConnectTimeout((_) { + debugPrint('[PriceWS] ⏰ Connection timeout'); + }); + + // 监听价格更新事件 + _socket!.on('priceUpdate', (data) { + try { + debugPrint('[PriceWS] 📊 Received price update: $data'); + final update = PriceUpdate.fromJson(data as Map); + debugPrint('[PriceWS] 📊 Parsed: price=${update.price}, burnMultiplier=${update.burnMultiplier}'); + _priceUpdateController.add(update); + } catch (e) { + debugPrint('[PriceWS] ❌ Parse error: $e, data: $data'); + } + }); + + _socket!.connect(); + } + + /// 断开连接 + /// + /// 调用此方法后不会自动重连 + void disconnect() { + debugPrint('[PriceWS] 🔌 disconnect() called, current status: $_status'); + _cancelReconnectTimer(); + _reconnectAttempts = 0; + + if (_socket != null) { + debugPrint('[PriceWS] 🔌 Disposing socket...'); + _socket!.disconnect(); + _socket!.dispose(); + _socket = null; + debugPrint('[PriceWS] 🔌 Socket disposed'); + } + + _updateStatus(WebSocketStatus.disconnected); + debugPrint('[PriceWS] 🔌 Disconnect complete'); + } + + /// 检查是否已连接 + bool get isConnected => _status == WebSocketStatus.connected; + + /// 安排重连 + void _scheduleReconnect() { + if (_reconnectAttempts >= _maxReconnectAttempts) { + debugPrint('[PriceWS] Max reconnect attempts reached'); + return; + } + + _cancelReconnectTimer(); + _updateStatus(WebSocketStatus.reconnecting); + + // 指数退避: 1s, 2s, 4s, 8s, ... 最大30s + final delay = Duration( + milliseconds: (_initialReconnectDelay.inMilliseconds * + (1 << _reconnectAttempts)).clamp( + _initialReconnectDelay.inMilliseconds, + _maxReconnectDelay.inMilliseconds, + ), + ); + + _reconnectAttempts++; + debugPrint('[PriceWS] Reconnect attempt $_reconnectAttempts in ${delay.inSeconds}s'); + + _reconnectTimer = Timer(delay, () { + if (_status != WebSocketStatus.connected) { + _doConnect(); + } + }); + } + + void _cancelReconnectTimer() { + _reconnectTimer?.cancel(); + _reconnectTimer = null; + } + + void _updateStatus(WebSocketStatus newStatus) { + if (_status != newStatus) { + _status = newStatus; + _statusController.add(newStatus); + } + } + + /// 释放资源 + void dispose() { + disconnect(); + _priceUpdateController.close(); + _statusController.close(); + } +} diff --git a/frontend/mining-app/lib/presentation/pages/asset/asset_page.dart b/frontend/mining-app/lib/presentation/pages/asset/asset_page.dart index a5f38627..34395010 100644 --- a/frontend/mining-app/lib/presentation/pages/asset/asset_page.dart +++ b/frontend/mining-app/lib/presentation/pages/asset/asset_page.dart @@ -4,6 +4,8 @@ import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:go_router/go_router.dart'; import '../../../core/router/routes.dart'; import '../../../core/utils/format_utils.dart'; +import '../../../core/network/price_websocket_service.dart'; +import '../../../core/constants/app_constants.dart'; import '../../../domain/entities/asset_display.dart'; import '../../providers/user_providers.dart'; import '../../providers/asset_providers.dart'; @@ -33,18 +35,64 @@ class _AssetPageState extends ConsumerState { // 实时刷新相关状态 Timer? _refreshTimer; int _elapsedSeconds = 0; - double _initialDisplayValue = 0; double _initialShareBalance = 0; double _growthPerSecond = 0; String? _lastAccountSequence; bool _timerStarted = false; + // WebSocket 相关 + StreamSubscription? _priceSubscription; + String _currentPrice = '0'; + String _currentBurnMultiplier = '0'; + + @override + void initState() { + super.initState(); + debugPrint('[AssetPage] 📱 initState - connecting WebSocket'); + // 连接 WebSocket + _connectWebSocket(); + } + @override void dispose() { + debugPrint('[AssetPage] 📱 dispose - disconnecting WebSocket'); + // 断开 WebSocket + _disconnectWebSocket(); _refreshTimer?.cancel(); super.dispose(); } + /// 连接 WebSocket + void _connectWebSocket() { + debugPrint('[AssetPage] 🔌 _connectWebSocket called'); + final wsService = PriceWebSocketService.instance; + // 使用 AppConstants 的 baseUrl + debugPrint('[AssetPage] 🔌 Using baseUrl: ${AppConstants.baseUrl}'); + wsService.connect(AppConstants.baseUrl); + + // 监听价格更新 + _priceSubscription = wsService.priceUpdates.listen((update) { + debugPrint('[AssetPage] 📊 Received price update: price=${update.price}, multiplier=${update.burnMultiplier}'); + if (mounted) { + setState(() { + _currentPrice = update.price; + _currentBurnMultiplier = update.burnMultiplier; + }); + debugPrint('[AssetPage] 📊 State updated: _currentPrice=$_currentPrice, _currentBurnMultiplier=$_currentBurnMultiplier'); + } + }); + debugPrint('[AssetPage] 🔌 WebSocket listener attached'); + } + + /// 断开 WebSocket + void _disconnectWebSocket() { + debugPrint('[AssetPage] 🔌 _disconnectWebSocket called'); + _priceSubscription?.cancel(); + _priceSubscription = null; + PriceWebSocketService.instance.disconnect(); + debugPrint('[AssetPage] 🔌 WebSocket disconnected'); + } + /// 启动定时器(使用外部传入的每秒增长值) void _startTimerWithGrowth(AssetDisplay asset, String perSecondEarning) { // 防止重复启动 @@ -54,10 +102,14 @@ class _AssetPageState extends ConsumerState { _refreshTimer?.cancel(); _elapsedSeconds = 0; - _initialDisplayValue = double.tryParse(asset.displayAssetValue) ?? 0; _initialShareBalance = double.tryParse(asset.shareBalance) ?? 0; // 使用传入的每秒增长值(来自 mining-service) _growthPerSecond = double.tryParse(perSecondEarning) ?? 0; + // 初始化价格(如果 WebSocket 还没推送) + if (_currentPrice == '0') { + _currentPrice = asset.currentPrice; + _currentBurnMultiplier = asset.burnMultiplier; + } _timerStarted = true; _refreshTimer = Timer.periodic(const Duration(seconds: 1), (timer) { @@ -78,13 +130,14 @@ class _AssetPageState extends ConsumerState { } /// 计算当前实时资产显示值 - /// 资产显示值 = 积分股余额 × (1 + burnMultiplier) × price - /// 每秒资产增长 = 每秒积分股增长 × (1 + burnMultiplier) × price + /// 使用 WebSocket 推送的实时价格 + /// 资产显示值 = 当前积分股余额 × (1 + burnMultiplier) × price double get _currentDisplayValue { - final price = double.tryParse(_lastAsset?.currentPrice ?? '0') ?? 0; - final burnMultiplier = double.tryParse(_lastAsset?.burnMultiplier ?? '0') ?? 0; + // 优先使用 WebSocket 推送的价格,否则使用 API 返回的价格 + final price = double.tryParse(_currentPrice) ?? 0; + final burnMultiplier = double.tryParse(_currentBurnMultiplier) ?? 0; final multiplierFactor = 1 + burnMultiplier; - return _initialDisplayValue + (_elapsedSeconds * _growthPerSecond * multiplierFactor * price); + return _currentShareBalance * multiplierFactor * price; } /// 计算当前实时积分股余额 @@ -92,6 +145,12 @@ class _AssetPageState extends ConsumerState { return _initialShareBalance + (_elapsedSeconds * _growthPerSecond); } + /// 计算当前有效积分股(含倍数) + double get _currentEffectiveShares { + final burnMultiplier = double.tryParse(_currentBurnMultiplier) ?? 0; + return _currentShareBalance * (1 + burnMultiplier); + } + AssetDisplay? _lastAsset; @override diff --git a/frontend/mining-app/pubspec.yaml b/frontend/mining-app/pubspec.yaml index e3395888..ea20228d 100644 --- a/frontend/mining-app/pubspec.yaml +++ b/frontend/mining-app/pubspec.yaml @@ -20,6 +20,7 @@ dependencies: # 网络 dio: ^5.3.0 connectivity_plus: ^5.0.0 + socket_io_client: ^2.0.3+1 # 本地存储 hive: ^2.2.0