From 263be15028f0b72dbe43d8186a48375cff8e42bf Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 27 Jan 2026 09:52:57 -0800 Subject: [PATCH] =?UTF-8?q?fix(android):=20=E4=BF=AE=E5=A4=8D=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E6=97=B6gRPC=E6=B5=81=E5=BC=82=E5=B8=B8=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E5=BE=85=E6=9C=BA=E9=97=AA=E9=80=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:心跳失败触发重连时,shutdownNow关闭旧channel会导致 gRPC流抛出UNAVAILABLE异常,虽然检测到过时流但仍传播异常 到TssRepository的collect协程,导致应用崩溃。 修复: - GrpcClient: 过时流错误时使用close()而非close(t)避免传播异常 - GrpcClient: 添加shutdownNow错误检测避免不必要的重连 - TssRepository: 为subscribeSessionEvents和subscribeMessages流添加.catch Co-Authored-By: Claude Opus 4.5 --- .../durian/tssparty/data/remote/GrpcClient.kt | 18 ++++++++++-------- .../tssparty/data/repository/TssRepository.kt | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt index 86da13d5..52b374ba 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt @@ -790,15 +790,16 @@ class GrpcClient @Inject constructor() { override fun onError(t: Throwable) { Log.e(TAG, "Message stream error: ${t.message}") - // Ignore events from stale streams + // Ignore events from stale streams - close without exception to avoid crash if (messageStreamVersion.get() != streamVersion) { Log.d(TAG, "Ignoring error from stale message stream") - close(t) + close() return } - // Don't trigger reconnect for CANCELLED errors - if (!t.message.orEmpty().contains("CANCELLED")) { + // Don't trigger reconnect for CANCELLED or channel shutdown errors + val errorMessage = t.message.orEmpty() + if (!errorMessage.contains("CANCELLED") && !errorMessage.contains("shutdownNow")) { triggerReconnect("Message stream error: ${t.message}") } close(t) @@ -870,15 +871,16 @@ class GrpcClient @Inject constructor() { override fun onError(t: Throwable) { Log.e(TAG, "Session event stream error: ${t.message}") - // Ignore events from stale streams + // Ignore events from stale streams - close without exception to avoid crash if (eventStreamVersion.get() != streamVersion) { Log.d(TAG, "Ignoring error from stale event stream") - close(t) + close() return } - // Don't trigger reconnect for CANCELLED errors - if (!t.message.orEmpty().contains("CANCELLED")) { + // Don't trigger reconnect for CANCELLED or channel shutdown errors + val errorMessage = t.message.orEmpty() + if (!errorMessage.contains("CANCELLED") && !errorMessage.contains("shutdownNow")) { triggerReconnect("Event stream error: ${t.message}") } close(t) diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 4cedbdca..98e62d39 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -284,7 +284,12 @@ class TssRepository @Inject constructor( currentSessionEventPartyId = effectivePartyId android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $partyId)") sessionEventJob = repositoryScope.launch { - grpcClient.subscribeSessionEvents(effectivePartyId).collect { event -> + grpcClient.subscribeSessionEvents(effectivePartyId) + .catch { e -> + // Log error but don't crash - connection will be restored by GrpcClient + android.util.Log.e("TssRepository", "Session event stream error: ${e.message}") + } + .collect { event -> android.util.Log.d("TssRepository", "=== Session event received ===") android.util.Log.d("TssRepository", " eventType: ${event.eventType}") android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}") @@ -1859,7 +1864,12 @@ class TssRepository @Inject constructor( // Collect incoming messages from gRPC and send to TSS launch { - grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { message -> + grpcClient.subscribeMessages(sessionId, effectivePartyId) + .catch { e -> + // Log error but don't crash - connection will be restored by GrpcClient + android.util.Log.e("TssRepository", "Message stream error: ${e.message}") + } + .collect { message -> // Find party index from party ID val session = _currentSession.value val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex