fix: properly close WebSocket with subscription cancel + fire-and-forget

Root cause: IOWebSocketChannel.sink.close() can hang indefinitely
(dart-lang/web_socket_channel#185). Previous fix used unawaited close
but didn't cancel the stream subscription, so the old listener could
still push events to _messageController.

Fix: Extract _closeCurrentConnection() that:
1. Cancels StreamSubscription first (stops duplicate events immediately)
2. Fire-and-forget sink.close(goingAway) (frees underlying socket)

This follows the workaround recommended in the official issue tracker.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-25 03:45:43 -08:00
parent 45eb6bc453
commit 7b71a4f2fc
1 changed files with 31 additions and 13 deletions

View File

@ -3,11 +3,13 @@ import 'dart:convert';
import 'dart:math'; import 'dart:math';
import 'package:web_socket_channel/io.dart'; import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../config/app_config.dart'; import '../config/app_config.dart';
class WebSocketClient { class WebSocketClient {
WebSocketChannel? _channel; WebSocketChannel? _channel;
StreamSubscription? _streamSubscription;
final String baseUrl; final String baseUrl;
Timer? _heartbeatTimer; Timer? _heartbeatTimer;
Timer? _reconnectTimer; Timer? _reconnectTimer;
@ -30,16 +32,13 @@ class WebSocketClient {
_lastPath = path; _lastPath = path;
_lastToken = token; _lastToken = token;
// Close previous connection to prevent duplicate event delivery. // Tear down previous connection to prevent duplicate event delivery.
// Fire-and-forget: don't await close() as it can hang waiting for // 1) Cancel the stream subscription first stops events from reaching
// the server to acknowledge the close handshake. // _messageController immediately (no more duplicates).
if (_channel != null) { // 2) Fire-and-forget sink.close() don't await because
_heartbeatTimer?.cancel(); // IOWebSocketChannel.sink.close() can hang indefinitely
final old = _channel!; // (known issue: https://github.com/dart-lang/web_socket_channel/issues/185).
_channel = null; await _closeCurrentConnection();
_isConnected = false;
unawaited(old.sink.close().catchError((_) {}));
}
final uri = Uri.parse('$baseUrl$path'); final uri = Uri.parse('$baseUrl$path');
try { try {
@ -50,7 +49,7 @@ class WebSocketClient {
_isConnected = true; _isConnected = true;
_reconnectAttempts = 0; _reconnectAttempts = 0;
_channel!.stream.listen( _streamSubscription = _channel!.stream.listen(
(data) { (data) {
final decoded = jsonDecode(data as String) as Map<String, dynamic>; final decoded = jsonDecode(data as String) as Map<String, dynamic>;
_messageController.add(decoded); _messageController.add(decoded);
@ -122,9 +121,28 @@ class WebSocketClient {
Future<void> disconnect() async { Future<void> disconnect() async {
_heartbeatTimer?.cancel(); _heartbeatTimer?.cancel();
_reconnectTimer?.cancel(); _reconnectTimer?.cancel();
_isConnected = false;
_reconnectAttempts = _maxReconnectAttempts; // Prevent auto-reconnect _reconnectAttempts = _maxReconnectAttempts; // Prevent auto-reconnect
await _channel?.sink.close(); await _closeCurrentConnection();
}
/// Cancel stream subscription, then fire-and-forget sink.close().
Future<void> _closeCurrentConnection() async {
_heartbeatTimer?.cancel();
_isConnected = false;
// Cancel subscription first this immediately stops events from
// the old channel reaching _messageController.
if (_streamSubscription != null) {
await _streamSubscription!.cancel();
_streamSubscription = null;
}
// Close the underlying WebSocket. Don't await — sink.close() can hang.
if (_channel != null) {
final old = _channel!;
_channel = null;
unawaited(old.sink.close(status.goingAway).catchError((_) {}));
}
} }
void dispose() { void dispose() {