fix(android): 修复重连时gRPC流异常导致待机闪退

问题:心跳失败触发重连时,shutdownNow关闭旧channel会导致
gRPC流抛出UNAVAILABLE异常,虽然检测到过时流但仍传播异常
到TssRepository的collect协程,导致应用崩溃。

修复:
- GrpcClient: 过时流错误时使用close()而非close(t)避免传播异常
- GrpcClient: 添加shutdownNow错误检测避免不必要的重连
- TssRepository: 为subscribeSessionEvents和subscribeMessages流添加.catch

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-27 09:52:57 -08:00
parent d83c859965
commit 263be15028
2 changed files with 22 additions and 10 deletions

View File

@ -790,15 +790,16 @@ class GrpcClient @Inject constructor() {
override fun onError(t: Throwable) {
Log.e(TAG, "Message stream error: ${t.message}")
// Ignore events from stale streams
// Ignore events from stale streams - close without exception to avoid crash
if (messageStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring error from stale message stream")
close(t)
close()
return
}
// Don't trigger reconnect for CANCELLED errors
if (!t.message.orEmpty().contains("CANCELLED")) {
// Don't trigger reconnect for CANCELLED or channel shutdown errors
val errorMessage = t.message.orEmpty()
if (!errorMessage.contains("CANCELLED") && !errorMessage.contains("shutdownNow")) {
triggerReconnect("Message stream error: ${t.message}")
}
close(t)
@ -870,15 +871,16 @@ class GrpcClient @Inject constructor() {
override fun onError(t: Throwable) {
Log.e(TAG, "Session event stream error: ${t.message}")
// Ignore events from stale streams
// Ignore events from stale streams - close without exception to avoid crash
if (eventStreamVersion.get() != streamVersion) {
Log.d(TAG, "Ignoring error from stale event stream")
close(t)
close()
return
}
// Don't trigger reconnect for CANCELLED errors
if (!t.message.orEmpty().contains("CANCELLED")) {
// Don't trigger reconnect for CANCELLED or channel shutdown errors
val errorMessage = t.message.orEmpty()
if (!errorMessage.contains("CANCELLED") && !errorMessage.contains("shutdownNow")) {
triggerReconnect("Event stream error: ${t.message}")
}
close(t)

View File

@ -284,7 +284,12 @@ class TssRepository @Inject constructor(
currentSessionEventPartyId = effectivePartyId
android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $effectivePartyId (device partyId: $partyId)")
sessionEventJob = repositoryScope.launch {
grpcClient.subscribeSessionEvents(effectivePartyId).collect { event ->
grpcClient.subscribeSessionEvents(effectivePartyId)
.catch { e ->
// Log error but don't crash - connection will be restored by GrpcClient
android.util.Log.e("TssRepository", "Session event stream error: ${e.message}")
}
.collect { event ->
android.util.Log.d("TssRepository", "=== Session event received ===")
android.util.Log.d("TssRepository", " eventType: ${event.eventType}")
android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}")
@ -1859,7 +1864,12 @@ class TssRepository @Inject constructor(
// Collect incoming messages from gRPC and send to TSS
launch {
grpcClient.subscribeMessages(sessionId, effectivePartyId).collect { message ->
grpcClient.subscribeMessages(sessionId, effectivePartyId)
.catch { e ->
// Log error but don't crash - connection will be restored by GrpcClient
android.util.Log.e("TssRepository", "Message stream error: ${e.message}")
}
.collect { message ->
// Find party index from party ID
val session = _currentSession.value
val fromPartyIndex = session?.participants?.find { it.partyId == message.fromParty }?.partyIndex