gcx/blockchain/genex-sdk-dart/lib/src/rpc/websocket_client.dart

137 lines
3.4 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../models/chain_event.dart';
/// WebSocket JSON-RPC 客户端(用于事件订阅)
class GenexWebSocketClient {
final String wsUrl;
WebSocketChannel? _channel;
int _requestId = 0;
final Map<int, Completer<dynamic>> _pendingRequests = {};
final Map<String, StreamController<ChainEvent>> _subscriptions = {};
bool _connected = false;
GenexWebSocketClient(this.wsUrl);
/// 连接 WebSocket
Future<void> connect() async {
if (_connected) return;
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
_connected = true;
_channel!.stream.listen(
_onMessage,
onError: _onError,
onDone: _onDone,
);
}
/// 订阅事件
Stream<ChainEvent> subscribe(
String eventType, Map<String, dynamic> params) {
final controller = StreamController<ChainEvent>.broadcast();
_ensureConnected().then((_) {
_requestId++;
final id = _requestId;
final completer = Completer<dynamic>();
_pendingRequests[id] = completer;
_channel!.sink.add(jsonEncode({
'jsonrpc': '2.0',
'id': id,
'method': 'eth_subscribe',
'params': [eventType, if (params.isNotEmpty) params],
}));
completer.future.then((subscriptionId) {
_subscriptions[subscriptionId as String] = controller;
});
});
return controller.stream;
}
/// 取消订阅
Future<void> unsubscribe(String subscriptionId) async {
await _ensureConnected();
_requestId++;
final id = _requestId;
final completer = Completer<dynamic>();
_pendingRequests[id] = completer;
_channel!.sink.add(jsonEncode({
'jsonrpc': '2.0',
'id': id,
'method': 'eth_unsubscribe',
'params': [subscriptionId],
}));
await completer.future;
_subscriptions[subscriptionId]?.close();
_subscriptions.remove(subscriptionId);
}
Future<void> _ensureConnected() async {
if (!_connected) await connect();
}
void _onMessage(dynamic message) {
final json = jsonDecode(message as String) as Map<String, dynamic>;
// 处理 RPC 响应
if (json.containsKey('id') && json['id'] != null) {
final id = json['id'] as int;
final completer = _pendingRequests.remove(id);
if (completer != null) {
if (json.containsKey('error')) {
completer.completeError(
Exception(json['error'].toString()),
);
} else {
completer.complete(json['result']);
}
}
return;
}
// 处理订阅推送
if (json['method'] == 'eth_subscription') {
final params = json['params'] as Map<String, dynamic>;
final subId = params['subscription'] as String;
final controller = _subscriptions[subId];
if (controller != null) {
controller.add(ChainEvent.fromSubscription(params));
}
}
}
void _onError(Object error) {
for (final controller in _subscriptions.values) {
controller.addError(error);
}
}
void _onDone() {
_connected = false;
for (final controller in _subscriptions.values) {
controller.close();
}
_subscriptions.clear();
_pendingRequests.clear();
}
void close() {
_channel?.sink.close();
_connected = false;
for (final controller in _subscriptions.values) {
controller.close();
}
_subscriptions.clear();
}
}